這篇文章主要介紹了在Python中編寫數據庫模塊的教程,本文代碼基於Python2.x版本,需要的朋友可以參考下
在一個Web App中,所有數據,包括用戶信息、發布的日志、評論等,都存儲在數據庫中。在awesome-python-app中,我們選擇MySQL作為數據庫。
Web App裡面有很多地方都要訪問數據庫。訪問數據庫需要創建數據庫連接、游標對象,然後執行SQL語句,最後處理異常,清理資源。這些訪問數據庫的代碼如果分散到各個函數中,勢必無法維護,也不利於代碼復用。
此外,在一個Web App中,有多個用戶會同時訪問,系統以多進程或多線程模式來處理每個用戶的請求。假設以多線程為例,每個線程在訪問數據庫時,都必須創建僅屬於自身的連接,對別的線程不可見,否則,就會造成數據庫操作混亂。
所以,我們還要創建一個簡單可靠的數據庫訪問模型,在一個線程中,能既安全又簡單地操作數據庫。
為什麼不選擇SQLAlchemy?SQLAlchemy太龐大,過度地面向對象設計導致API太復雜。
所以我們決定自己設計一個封裝基本的SELECT、INSERT、UPDATE和DELETE操作的db模塊:transwarp.db。
設計db接口
設計底層模塊的原則是,根據上層調用者設計簡單易用的API接口,然後,實現模塊內部代碼。
假設transwarp.db模塊已經編寫完畢,我們希望以這樣的方式來調用它:
首先,初始化數據庫連接信息,通過create_engine()函數:
?
1 2 from transwarp import db db.create_engine(user='root', password='password', database='test', host='127.0.0.1', port=3306)然後,就可以直接操作SQL了。
如果需要做一個查詢,可以直接調用select()方法,返回的是list,每一個元素是用dict表示的對應的行:
?
1 2 3 4 5 6 7 users = db.select('select * from user') # users => # [ # { "id": 1, "name": "Michael"}, # { "id": 2, "name": "Bob"}, # { "id": 3, "name": "Adam"} # ]如果要執行INSERT、UPDATE或DELETE操作,執行update()方法,返回受影響的行數:
?
1 n = db.update('insert into user(id, name) values(?, ?)', 4, 'Jack')update()函數簽名為:
?
1 update(sql, *args)統一用?作為占位符,並傳入可變參數來綁定,從根本上避免SQL注入攻擊。
每個select()或update()調用,都隱含地自動打開並關閉了數據庫連接,這樣,上層調用者就完全不必關心數據庫底層連接。
但是,如果要在一個數據庫連接裡執行多個SQL語句怎麼辦?我們用一個with語句實現:
?
1 2 3 4 with db.connection(): db.select('...') db.update('...') db.update('...')如果要在一個數據庫事務中執行多個SQL語句怎麼辦?我們還是用一個with語句實現:
?
1 2 3 4 with db.transaction(): db.select('...') db.update('...') db.update('...')實現db模塊
由於模塊是全局對象,模塊變量是全局唯一變量,所以,有兩個重要的模塊變量:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 # db.py # 數據庫引擎對象: class _Engine(object): def __init__(self, connect): self._connect = connect def connect(self): return self._connect() engine = None # 持有數據庫連接的上下文對象: class _DbCtx(threading.local): def __init__(self): self.connection = None self.transactions = 0 def is_init(self): return not self.connection is None def init(self): self.connection = _LasyConnection() self.transactions = 0 def cleanup(self): self.connection.cleanup() self.connection = None def cursor(self): return self.connection.cursor() _db_ctx = _DbCtx()由於_db_ctx是threadlocal對象,所以,它持有的數據庫連接對於每個線程看到的都是不一樣的。任何一個線程都無法訪問到其他線程持有的數據庫連接。
有了這兩個全局變量,我們繼續實現數據庫連接的上下文,目的是自動獲取和釋放連接:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class _ConnectionCtx(object): def __enter__(self): global _db_ctx self.should_cleanup = False if not _db_ctx.is_init(): _db_ctx.init() self.should_cleanup = True return self def __exit__(self, exctype, excvalue, traceback): global _db_ctx if self.should_cleanup: _db_ctx.cleanup() def connection(): return _ConnectionCtx()定義了__enter__()和__exit__()的對象可以用於with語句,確保任何情況下__exit__()方法可以被調用。
把_ConnectionCtx的作用域作用到一個函數調用上,可以這麼寫:
?
1 2 with connection(): do_some_db_operation()但是更簡單的寫法是寫個@decorator:
?
1 2 3 @with_connection def do_some_db_operation(): pass這樣,我們實現select()、update()方法就更簡單了:
?
1 2 3 4 5 6 7 @with_connection def select(sql, *args): pass @with_connection def update(sql, *args): pass注意到Connection對象是存儲在_DbCtx這個threadlocal對象裡的,因此,嵌套使用with connection()也沒有問題。_DbCtx永遠檢測當前是否已存在Connection,如果存在,直接使用,如果不存在,則打開一個新的Connection。
對於transaction也是類似的,with transaction()定義了一個數據庫事務:
?
1 2 3 4 with db.transaction(): db.select('...') db.update('...') db.update('...')函數作用域的事務也有一個簡化的@decorator:
?
1 2 3 @with_transaction def do_in_transaction(): pass事務也可以嵌套,內層事務會自動合並到外層事務中,這種事務模型足夠滿足99%的需求。
事務嵌套比Connection嵌套復雜一點,因為事務嵌套需要計數,每遇到一層嵌套就+1,離開一層嵌套就-1,最後到0時提交事務:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class _TransactionCtx(object): def __enter__(self): global _db_ctx self.should_close_conn = False if not _db_ctx.is_init(): _db_ctx.init() self.should_close_conn = True _db_ctx.transactions = _db_ctx.transactions + 1 return self def __exit__(self, exctype, excvalue, traceback): global _db_ctx _db_ctx.transactions = _db_ctx.transactions - 1 try: if _db_ctx.transactions==0: if exctype is None: self.commit() else: self.rollback() finally: if self.should_close_conn: _db_ctx.cleanup() def commit(self): global _db_ctx try: _db_ctx.connection.commit() except: _db_ctx.connection.rollback() raise def rollback(self): global _db_ctx _db_ctx.connection.rollback()最後,把select()和update()方法實現了,db模塊就完成了。