The name of the database is WawaDB, Yes, it is python Realized . thus it can be seen python Ash is always powerful !
The requirements for logging are generally as follows :
Just add , Don't modify , Write writes in chronological order ;
Mass write , Little reading , Query generally queries the data of a time period ;
MongoDB The fixed set of meets this requirement very well , however MongoDB It takes up a lot of memory , A little fire pierces mosquitoes , The feeling of making a mountain out of a molehill .
WawaDB The idea is that every write 1000 Logs , Record the current time and the offset of the log file in an index file .
Then query the log by time , First load the index into memory , Find out the offset of time point by dichotomy , Then open the log file seek To specified location , In this way, the data required by the user can be quickly located and read , Instead of traversing the entire log file .
Core 2 P8400,2.26GHZ,2G Memory ,32 bit win7
Write test :
simulation 1 Minutes to write 10000 Data , Co write 5 Hours of data , Insert 300 Ten thousand data , Every piece of data 54 Characters , when 2 branch 51 second
Read test : Read the log containing a substring within the specified time period
Data range Traversal data volume Number of results when ( second )
5 Hours 300 ten thousand 604 6.6
2 Hours 120 ten thousand 225 2.7
1 Hours 60 ten thousand 96 1.3
30 minute 30 ten thousand 44 0.6
Only the time of logging is indexed , The introduction outlines the implementation of the index , Two point search is definitely not B Tree Efficient , But in general, there is no difference of an order of magnitude , And the implementation is particularly simple .
Because it's a sparse index , Not every log has an index to record its offset , So read more data before reading data , Prevent missing reading , When you read the data you really need, you can return the data to the user .
Here's the picture , For example, the user needs to read 25 To 43 Log , Use dichotomy to find 25, What we found was 30 The point where it is ,
Cable lead :0 10 20 30 40 50 journal :|.........|.........|.........|.........|.........|>>>a = [0, 10, 20, 30, 40, 50]>>>bisect.bisect_left(a, 35)>>>3>>>a[3]>>>30>>>bisect.bisect_left(a, 43)>>>5>>>a[5]>>>50
So we're going to move forward , from 20(30 The previous scale of ) Start reading logs ,21,22,23,24 After reading, because the ratio is 25 Small , So throw it away , Read 25,26,27,... Then return to the user
Read 40(50 The previous scale of ) Then it is necessary to determine whether the current data is greater than 43 了 , If it is greater than 43( Return the data of the fully open section ), It's time to stop reading .
On the whole, we only operate a small part of the large file to get the data users want .
To reduce a large number of disk writes when writing logs , Index in append When the log , hold buffer Set up a 10k, The system default should be 4k.
Empathy , To improve the efficiency of reading logs , Read the buffer Also set up 10k, It also needs to be adjusted according to the size of your log .
The read / write of the index is set to row buffer, Every full line must flush To disk , Prevent reading incomplete index rows ( In fact, practice has proved that , Set the row buffer, I can still read half a la ).
what ? To support SQL, Stop that now the ,100 How does line code support SQL ah .
Now the query is passed in directly lambada expression , When the system traverses a data row within a specified time range , Meet the needs of users lambada The condition will be returned to the user .
Of course, this will read more data that users do not need , And every line has to be lambda The operation of expressions , But there's no way , Simplicity is beauty .
In the past, I put a query condition and log time , Log file offsets are recorded in the index , In this way, we can find the qualified offset from the index , Then each piece of data is stored in the log file seek once ,read once . There is only one benefit , The amount of data read is small , But there are two disadvantages :
The index file is very large , Inconvenient to load into memory
Every time you read, you have to seek, It seems that the buffer is not used , Particularly slow , It is better to read one segment of data continuously , And use lambda Filtration is fourorfive times slower
As I said before , only append, Do not modify data , And the time stamp is at the beginning of each log line .
Multithreading
Query data , You can query multiple threads at the same time , Each query will open a new log file descriptor , So parallel multiple reads won't fight .
If you write it , Although just append operation , However, it is not confirmed that the file is processed by multiple threads append Whether the operation is safe , So it is recommended to use a queue , A dedicated thread writes .
There are no locks .
By default, the queried data is arranged in positive time order , For additional sorting , Take it to the memory and use it python Of sorted Function order , Line up as you like .
# -- coding:utf-8 -- import os import time import bisect import itertools from datetime import datetime import logging default_data_dir = './data/' default_write_buffer_size = 102410 default_read_buffer_size = 102410 default_index_interval = 1000 def ensure_data_dir(): if not os.path.exists(default_data_dir): os.makedirs(default_data_dir) def init(): ensure_data_dir() class WawaIndex: def init(self, index_name): self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1) self.indexes, self.offsets, self.index_count = [], [], 0 self.__load_index() def __update_index(self, key, offset): self.indexes.append(key) self.offsets.append(offset) def __load_index(self): self.fp_index.seek(0) for line in self.fp_index: try: key, offset = line.split() self.__update_index(key, offset) except ValueError: # If there is no index flush Words , You may read half a row of data pass def append_index(self, key, offset): self.index_count += 1 if self.index_count % default_index_interval == 0: self.__update_index(key, offset) self.fp_index.write('%s %s %s' % (key, offset, os.linesep)) def get_offsets(self, begin_key, end_key): left = bisect.bisect_left(self.indexes, str(begin_key)) right = bisect.bisect_left(self.indexes, str(end_key)) left, right = left - 1, right - 1 if left < 0: left = 0 if right < 0: right = 0 if right > len(self.indexes) - 1: right = len(self.indexes) - 1 logging.debug('get_index_range:%s %s %s %s %s %s', self.indexes[0], self.indexes[-1], begin_key, end_key, left, right) return self.offsets[left], self.offsets[right] class WawaDB: def init(self, db_name): self.db_name = db_name self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + '.db'), 'a', default_write_buffer_size) self.index = WawaIndex(db_name) def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset): fp_data = open(os.path.join(default_data_dir, self.db_name + '.db'), 'r', default_read_buffer_size) fp_data.seek(int(begin_offset)) line = fp_data.readline() find_real_begin_offset = False will_read_len, read_len = int(end_offset) - int(begin_offset), 0 while line: read_len += len(line) if (not find_real_begin_offset) and (line < str(begin_key)): line = fp_data.readline() continue find_real_begin_offset = True if (read_len >= will_read_len) and (line > str(end_key)): break yield line.rstrip('\r\n') line = fp_data.readline() def append_data(self, data, record_time=datetime.now()): def check_args(): if not data: raise ValueError('data is null') if not isinstance(data, basestring): raise ValueError('data is not string') if data.find('\r') != -1 or data.find('\n') != -1: raise ValueError('data contains linesep') check_args() record_time = time.mktime(record_time.timetuple()) data = '%s %s %s' % (record_time, data, os.linesep) offset = self.fp_data_for_append.tell() self.fp_data_for_append.write(data) self.index.append_index(record_time, offset) def get_data(self, begin_time, end_time, data_filter=None): def check_args(): if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)): raise ValueError('begin_time or end_time is not datetime') check_args() begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple()) begin_offset, end_offset = self.index.get_offsets(begin_time, end_time) for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset): if data_filter: if data_filter(data): yield data else: yield data def test(): from datetime import datetime, timedelta import uuid, random logging.getLogger().setLevel(logging.NOTSET) def time_test(test_name): def inner(f): def inner2(*args, **kargs): start_time = datetime.now() result = f(*args, **kargs) print '%s take time:%s' % (test_name, (datetime.now() - start_time)) return result return inner2 return inner @time_test('gen_test_data') def gen_test_data(db): now = datetime.now() begin_time = now - timedelta(hours=5) while begin_time < now: print begin_time for i in range(10000): db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time) begin_time += timedelta(minutes=1) @time_test('test_get_data') def test_get_data(db): begin_time = datetime.now() - timedelta(hours=3) end_time = begin_time + timedelta(minutes=120) results = list(db.get_data(begin_time, end_time, lambda x: x.find('1024') != -1)) print 'test_get_data get %s results' % len(results) @time_test('get_db') def get_db(): return WawaDB('test') if not os.path.exists('./data/test.db'): db = get_db() gen_test_data(db) #db.index.fp_index.flush() db = get_db() test_get_data(db) init() if name == 'main': test()</pre>