第一節整體介紹了ef-python
,第二節把整體集合類Collection
完成了,而本節我們來完成ORM的核心,對象映射
第一節:ORM的使用Python 手寫ORM-我的一個數據庫訪問工具(一)
第二節:Collection類的構造Python 手寫ORM-我的一個數據庫訪問工具(二)
因為對於ORM對象,其添加對象會更加嚴謹,所以要禁用__iadd__
和add
函數
class Record:
def __init__(self, data, mainkey, sign=None):
self.data = data
self.sign = sign
self.mainkey = mainkey
對於維護的數據,為了更好的查找會使用DBRecord對象去儲存
該文件中的內部函數
def findkey(source, mainkey):
for i in source:
if i.mainkey == mainkey:
return i
def findsign(source):
data = []
for i in source:
if i.sign is not None:
data.append(i)
return data
def objcmp(object1, object2):
for i, j in zip(object1.__dict__, object2.__dict__):
if object1.__dict__[i] != object2.__dict__[j]:
return False
return True
findkey
函數
返回值索引 int
findsign
返回值list<Record> 可追蹤的Record
objcmp
返回值bool 是否相等
DBSet對象是ORM中數據表的映射
def __init__(self, context, table, init=None):
self._table = table
if init is None:
init = context.create()
self._cursor = init[1]
self._conn = init[0]
self._view = Collection()
self._mainkey = None
self._lines = []
# 查找所有字段名,生成
sqlstring = f'select COLUMN_NAME from information_schema.columns where table_name=\'{
self._table}\''
self._cursor.execute(sqlstring)
result = list(self._cursor.fetchall())
self._keys = []
for key in result:
self._keys.append(key[0])
self._model = type(self._table, (object,), {
})
sqlstring = f'SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME=\'{
self._table}\''
self._cursor.execute(sqlstring)
self._mainkey = self._cursor.fetchall()[0][0]
sqlstring = f'Select * from {
self._table}'
self._cursor.execute(sqlstring)
result = self._cursor.fetchall()
# 轉換數據類型
for datas in result:
obj = self._model()
for data, key in zip(datas, self._keys):
obj.__dict__[key] = data
record = Record(obj, obj.__dict__[self._mainkey])
self._view.add(record)
代碼解讀
self._table = table
self._cursor = init[1]
self._conn = init[0]
self._view = Collection()
self._mainkey = None
self._lines = []
self._keys = []
self._model = type(self._table, (object,), {
})
self._table用於儲存表名,self._cursor和self._conn用於儲存游標對象
和連接對象
,而DBContext有一個內部方法.create()
函數用於返回這兩個對象進行初始化,self._view用於儲存查詢對象,self._mainkey儲存主鍵,self._lines用於儲存過程語句
,self._key用於儲存所有的字段,self._model用於儲存映射表對象
該函數可以通過表名獲取主鍵和字段來初始化,並且自動構造
映射表對象然後載入對象
def flush(self):
self._lines.clear()
self._view.clear()
sqlstring = f'Select * from {
self._table}'
self._cursor.execute(sqlstring)
result = self._cursor.fetchall()
# 轉換數據類型
for datas in result:
obj = self._model()
for data, key in zip(datas, self._keys):
obj.__dict__[key] = data
record = Record(obj, obj.__dict__[self._mainkey])
self._view.add(record)
清空記錄並且刷新查詢
def savechanges(self):
# 判斷移出對象是否改變
changes = findsign(self._view)
for i in changes:
# 修改所有的值
if objcmp(i.data, i.sign):
continue
else:
# 上傳修改了的值
data = ''
for key, value in zip(i.data.__dict__.keys(), i.data.__dict__.values()):
data += f'{
key}=' + f'\'{
value}\'' + ','
data = str(data).removesuffix(',')
sqlstring = f'update {
self._table} set {
data} where {
self._mainkey}=\'{
i.sign.__dict__[self._mainkey]}\''
self._lines.append(sqlstring)
# 上傳每一條修改語句
for i in self._lines:
print(i)
self._cursor.execute(i)
# 清除所有執行語句
self.flush()
self._conn.commit()
保存修改並刷新,通過對追蹤對象比較現在的值和之前的值是否相同,不相同則更新,相同則跳過
def where(self, expression):
data = Collection()
for item in self._view:
if expression(item.data):
item.sign = copy.copy(item.data)
data.add(item.data)
return data
def find(self, key):
result = self._view.where(lambda x: x.data.__dict__[self._mainkey] == key).firstordefault()
if result is None:
return None
result.sign = copy.copy(result.data)
return result.data
def add(self, item):
if type(item) != self._model:
raise TypeError()
else:
self._view.add(Record(item, item.__dict__[self._mainkey]))
data = str(tuple(item.__dict__.values())).replace('None', 'Null')
sqlstring = f'insert into {
self._table} values {
data}'
self._lines.append(sqlstring)
class DBContext(object):
def __init__(self, con):
self._conn = con
self._cursor = self._conn.cursor()
def create(self):
return self._conn, self._cursor
def add(self, item):
# 自分配增加對象
for dbset in self.__dict__.values():
if type(dbset) is DBSet:
if dbset.__dict__['_model'] is type(item):
dbset.add(item)
return
raise Exception('Error Could Not Find DBSet')
def savechanges(self):
for dbset in self.__dict__.values():
if type(dbset) is DBSet:
dbset.savechanges()
def execute(self, sqlstring):
self._cursor.execute(sqlstring)
result = self._cursor.fetchall()
self._conn.commit()
return result
def close(self):
self._cursor.close()
self._conn.close()
del self
基DBContext對象,編寫自己的上下文對象是需繼承該對象
點擊下載全文代碼
author : Empty bad uncle Blog
成功解決TypeError: only size-1 arr