程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Write a database with more than 100 lines of Python code

編輯:Python

The name of the database is WawaDB, Yes, it is python Realized . thus it can be seen python Ash is always powerful !

brief introduction

The requirements for logging are generally as follows :

Just add , Don't modify , Write writes in chronological order ;

Mass write , Little reading , Query generally queries the data of a time period ;

MongoDB The fixed set of meets this requirement very well , however MongoDB It takes up a lot of memory , A little fire pierces mosquitoes , The feeling of making a mountain out of a molehill .

WawaDB The idea is that every write 1000 Logs , Record the current time and the offset of the log file in an index file .

Then query the log by time , First load the index into memory , Find out the offset of time point by dichotomy , Then open the log file seek To specified location , In this way, the data required by the user can be quickly located and read , Instead of traversing the entire log file .

performance

Core 2 P8400,2.26GHZ,2G Memory ,32 bit win7

Write test :

simulation 1 Minutes to write 10000 Data , Co write 5 Hours of data , Insert 300 Ten thousand data , Every piece of data 54 Characters , when 2 branch 51 second

Read test : Read the log containing a substring within the specified time period

Data range Traversal data volume Number of results when ( second )

5 Hours 300 ten thousand 604 6.6

2 Hours 120 ten thousand 225 2.7

1 Hours 60 ten thousand 96 1.3

30 minute 30 ten thousand 44 0.6

Indexes

Only the time of logging is indexed , The introduction outlines the implementation of the index , Two point search is definitely not B Tree Efficient , But in general, there is no difference of an order of magnitude , And the implementation is particularly simple .

Because it's a sparse index , Not every log has an index to record its offset , So read more data before reading data , Prevent missing reading , When you read the data you really need, you can return the data to the user .

Here's the picture , For example, the user needs to read 25 To 43 Log , Use dichotomy to find 25, What we found was 30 The point where it is ,

Cable lead :0         10        20        30        40        50 journal :|.........|.........|.........|.........|.........|>>>a = [0, 10, 20, 30, 40, 50]>>>bisect.bisect_left(a, 35)>>>3>>>a[3]>>>30>>>bisect.bisect_left(a, 43)>>>5>>>a[5]>>>50

So we're going to move forward , from 20(30 The previous scale of ) Start reading logs ,21,22,23,24 After reading, because the ratio is 25 Small , So throw it away , Read 25,26,27,... Then return to the user

Read 40(50 The previous scale of ) Then it is necessary to determine whether the current data is greater than 43 了 , If it is greater than 43( Return the data of the fully open section ), It's time to stop reading .

On the whole, we only operate a small part of the large file to get the data users want .

buffer

To reduce a large number of disk writes when writing logs , Index in append When the log , hold buffer Set up a 10k, The system default should be 4k.

Empathy , To improve the efficiency of reading logs , Read the buffer Also set up 10k, It also needs to be adjusted according to the size of your log .

The read / write of the index is set to row buffer, Every full line must flush To disk , Prevent reading incomplete index rows ( In fact, practice has proved that , Set the row buffer, I can still read half a la ).

Inquire about

what ? To support SQL, Stop that now the ,100 How does line code support SQL ah .

Now the query is passed in directly lambada expression , When the system traverses a data row within a specified time range , Meet the needs of users lambada The condition will be returned to the user .

Of course, this will read more data that users do not need , And every line has to be lambda The operation of expressions , But there's no way , Simplicity is beauty .

In the past, I put a query condition and log time , Log file offsets are recorded in the index , In this way, we can find the qualified offset from the index , Then each piece of data is stored in the log file seek once ,read once . There is only one benefit , The amount of data read is small , But there are two disadvantages :

The index file is very large , Inconvenient to load into memory

Every time you read, you have to seek, It seems that the buffer is not used , Particularly slow , It is better to read one segment of data continuously , And use lambda Filtration is fourorfive times slower

write in

As I said before , only append, Do not modify data , And the time stamp is at the beginning of each log line .

Multithreading

Query data , You can query multiple threads at the same time , Each query will open a new log file descriptor , So parallel multiple reads won't fight .

If you write it , Although just append operation , However, it is not confirmed that the file is processed by multiple threads append Whether the operation is safe , So it is recommended to use a queue , A dedicated thread writes .

lock

There are no locks .

Sort

By default, the queried data is arranged in positive time order , For additional sorting , Take it to the memory and use it python Of sorted Function order , Line up as you like .

100 Multi line database code

# -- coding:utf-8 --
import os
import time
import bisect
import itertools
from datetime import datetime
import logging
default_data_dir = './data/'
default_write_buffer_size = 102410
default_read_buffer_size = 102410
default_index_interval = 1000
def ensure_data_dir():
if not os.path.exists(default_data_dir):
os.makedirs(default_data_dir)
def init():
ensure_data_dir()
class WawaIndex:
def init(self, index_name):
self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1)
self.indexes, self.offsets, self.index_count = [], [], 0
self.__load_index()
def __update_index(self, key, offset):
self.indexes.append(key)
self.offsets.append(offset)
def __load_index(self):
self.fp_index.seek(0)
for line in self.fp_index:
try:
key, offset = line.split()
self.__update_index(key, offset)
except ValueError: # If there is no index flush Words , You may read half a row of data
pass
def append_index(self, key, offset):
self.index_count += 1
if self.index_count % default_index_interval == 0:
self.__update_index(key, offset)
self.fp_index.write('%s %s %s' % (key, offset, os.linesep))
def get_offsets(self, begin_key, end_key):
left = bisect.bisect_left(self.indexes, str(begin_key))
right = bisect.bisect_left(self.indexes, str(end_key))
left, right = left - 1, right - 1
if left < 0: left = 0
if right < 0: right = 0
if right > len(self.indexes) - 1: right = len(self.indexes) - 1
logging.debug('get_index_range:%s %s %s %s %s %s', self.indexes[0], self.indexes[-1], begin_key, end_key, left, right)
return self.offsets[left], self.offsets[right]
class WawaDB:
def init(self, db_name):
self.db_name = db_name
self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + '.db'), 'a', default_write_buffer_size)
self.index = WawaIndex(db_name)
def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset):
fp_data = open(os.path.join(default_data_dir, self.db_name + '.db'), 'r', default_read_buffer_size)
fp_data.seek(int(begin_offset))
line = fp_data.readline()
find_real_begin_offset = False
will_read_len, read_len = int(end_offset) - int(begin_offset), 0
while line:
read_len += len(line)
if (not find_real_begin_offset) and (line < str(begin_key)):
line = fp_data.readline()
continue
find_real_begin_offset = True
if (read_len >= will_read_len) and (line > str(end_key)): break
yield line.rstrip('\r\n')
line = fp_data.readline()
def append_data(self, data, record_time=datetime.now()):
def check_args():
if not data:
raise ValueError('data is null')
if not isinstance(data, basestring):
raise ValueError('data is not string')
if data.find('\r') != -1 or data.find('\n') != -1:
raise ValueError('data contains linesep')
check_args()
record_time = time.mktime(record_time.timetuple())
data = '%s %s %s' % (record_time, data, os.linesep)
offset = self.fp_data_for_append.tell()
self.fp_data_for_append.write(data)
self.index.append_index(record_time, offset)
def get_data(self, begin_time, end_time, data_filter=None):
def check_args():
if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)):
raise ValueError('begin_time or end_time is not datetime')
check_args()
begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple())
begin_offset, end_offset = self.index.get_offsets(begin_time, end_time)
for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset):
if data_filter:
if data_filter(data):
yield data
else:
yield data
def test():
from datetime import datetime, timedelta
import uuid, random
logging.getLogger().setLevel(logging.NOTSET)
def time_test(test_name):
def inner(f):
def inner2(*args, **kargs):
start_time = datetime.now()
result = f(*args, **kargs)
print '%s take time:%s' % (test_name, (datetime.now() - start_time))
return result
return inner2
return inner
@time_test('gen_test_data')
def gen_test_data(db):
now = datetime.now()
begin_time = now - timedelta(hours=5)
while begin_time < now:
print begin_time
for i in range(10000):
db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time)
begin_time += timedelta(minutes=1)
@time_test('test_get_data')
def test_get_data(db):
begin_time = datetime.now() - timedelta(hours=3)
end_time = begin_time + timedelta(minutes=120)
results = list(db.get_data(begin_time, end_time, lambda x: x.find('1024') != -1))
print 'test_get_data get %s results' % len(results)
@time_test('get_db')
def get_db():
return WawaDB('test')
if not os.path.exists('./data/test.db'):
db = get_db()
gen_test_data(db)
#db.index.fp_index.flush()
db = get_db()
test_get_data(db)
init()
if name == 'main':
test()</pre> 

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved