The first section introduces ef-python
, In the second section, the whole collection class Collection
It's done , In this section, we will complete ORM At the heart of , Mapping objects
Section 1 :ORM Use Python Handwriting ORM- One of my database access tools ( One )
In the second quarter :Collection Class construction Python Handwriting ORM- One of my database access tools ( Two )
Because for ORM object , Its added objects will be more rigorous , So disable __iadd__
and add
function
class Record:
def __init__(self, data, mainkey, sign=None):
self.data = data
self.sign = sign
self.mainkey = mainkey
For maintained data , For better searching, we will use DBRecord Object to store
Internal functions in this file
def findkey(source, mainkey):
for i in source:
if i.mainkey == mainkey:
return i
def findsign(source):
data = []
for i in source:
if i.sign is not None:
data.append(i)
return data
def objcmp(object1, object2):
for i, j in zip(object1.__dict__, object2.__dict__):
if object1.__dict__[i] != object2.__dict__[j]:
return False
return True
findkey
function
Return value Indexes int
findsign
Return value list<Record> Traceable Record
objcmp
Return value bool Whether it is equal or not
DBSet The object is ORM Mapping of data tables in
def __init__(self, context, table, init=None):
self._table = table
if init is None:
init = context.create()
self._cursor = init[1]
self._conn = init[0]
self._view = Collection()
self._mainkey = None
self._lines = []
# Find all field names , Generate
sqlstring = f'select COLUMN_NAME from information_schema.columns where table_name=\'{
self._table}\''
self._cursor.execute(sqlstring)
result = list(self._cursor.fetchall())
self._keys = []
for key in result:
self._keys.append(key[0])
self._model = type(self._table, (object,), {
})
sqlstring = f'SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME=\'{
self._table}\''
self._cursor.execute(sqlstring)
self._mainkey = self._cursor.fetchall()[0][0]
sqlstring = f'Select * from {
self._table}'
self._cursor.execute(sqlstring)
result = self._cursor.fetchall()
# Convert data type
for datas in result:
obj = self._model()
for data, key in zip(datas, self._keys):
obj.__dict__[key] = data
record = Record(obj, obj.__dict__[self._mainkey])
self._view.add(record)
Code reading
self._table = table
self._cursor = init[1]
self._conn = init[0]
self._view = Collection()
self._mainkey = None
self._lines = []
self._keys = []
self._model = type(self._table, (object,), {
})
self._table Used to store table names ,self._cursor and self._conn Used to store Cursor object
and Connection object
, and DBContext There is an internal method .create()
Function is used to return these two objects for initialization ,self._view Used to store query objects ,self._mainkey Save primary key ,self._lines Used to store Procedure statement
,self._key Used to store all fields ,self._model Used to store Mapping table objects
This function can get the primary key and fields from the table name to initialize , And automatically structure
Map the table object and load the object
def flush(self):
self._lines.clear()
self._view.clear()
sqlstring = f'Select * from {
self._table}'
self._cursor.execute(sqlstring)
result = self._cursor.fetchall()
# Convert data type
for datas in result:
obj = self._model()
for data, key in zip(datas, self._keys):
obj.__dict__[key] = data
record = Record(obj, obj.__dict__[self._mainkey])
self._view.add(record)
Clear records and refresh queries
def savechanges(self):
# Determine whether the removed object has changed
changes = findsign(self._view)
for i in changes:
# Modify all values
if objcmp(i.data, i.sign):
continue
else:
# Upload the modified value
data = ''
for key, value in zip(i.data.__dict__.keys(), i.data.__dict__.values()):
data += f'{
key}=' + f'\'{
value}\'' + ','
data = str(data).removesuffix(',')
sqlstring = f'update {
self._table} set {
data} where {
self._mainkey}=\'{
i.sign.__dict__[self._mainkey]}\''
self._lines.append(sqlstring)
# Upload each modification statement
for i in self._lines:
print(i)
self._cursor.execute(i)
# Clear all execution statements
self.flush()
self._conn.commit()
Save changes and refresh , By comparing the current value with the previous value of the tracked object , If they are different, they will update , If the same, skip
def where(self, expression):
data = Collection()
for item in self._view:
if expression(item.data):
item.sign = copy.copy(item.data)
data.add(item.data)
return data
def find(self, key):
result = self._view.where(lambda x: x.data.__dict__[self._mainkey] == key).firstordefault()
if result is None:
return None
result.sign = copy.copy(result.data)
return result.data
def add(self, item):
if type(item) != self._model:
raise TypeError()
else:
self._view.add(Record(item, item.__dict__[self._mainkey]))
data = str(tuple(item.__dict__.values())).replace('None', 'Null')
sqlstring = f'insert into {
self._table} values {
data}'
self._lines.append(sqlstring)
class DBContext(object):
def __init__(self, con):
self._conn = con
self._cursor = self._conn.cursor()
def create(self):
return self._conn, self._cursor
def add(self, item):
# Self assign add object
for dbset in self.__dict__.values():
if type(dbset) is DBSet:
if dbset.__dict__['_model'] is type(item):
dbset.add(item)
return
raise Exception('Error Could Not Find DBSet')
def savechanges(self):
for dbset in self.__dict__.values():
if type(dbset) is DBSet:
dbset.savechanges()
def execute(self, sqlstring):
self._cursor.execute(sqlstring)
result = self._cursor.fetchall()
self._conn.commit()
return result
def close(self):
self._cursor.close()
self._conn.close()
del self
The base DBContext object , To write your own context object, you need to inherit it
Click to download the full-text code