Python 標准數據庫接口為 Python DB-API,Python DB-API為開發人員提供了數據庫應用編程接口。Python 數據庫接口支持非常多的數據庫,你可以選擇適合你項目的數據庫:
你可以訪問Python數據庫接口及API查看詳細的支持數據庫列表。
不同的數據庫你需要下載不同的DB API模塊,例如你需要訪問Oracle數據庫和Mysql數據,你需要下載Oracle和MySQL數據庫模塊。
DB-API 是一個規范. 它定義了一系列必需的對象和數據庫存取方式, 以便為各種各樣的底層數據庫系統和多種多樣的數據庫接口程序提供一致的訪問接口 。
Python的DB-API,為大多數的數據庫實現了接口,使用它連接各數據庫後,就可以用相同的方式操作各數據庫。
Python DB-API使用流程:
引入 API 模塊。
獲取與數據庫的連接。
執行SQL語句和存儲過程
關閉數據庫連接。
Python操作MySQL主要使用兩種方式:
DB模塊(原生SQL)
ORM框架
2.1PyMySQL模塊
本文主要介紹PyMySQL模塊,MySQLdb使用方式類似
2.1.1 安裝PyMySQL
PyMySQL是一個Python編寫的MySQL驅動程序,讓我們可以用Python語言操作MySQL數據庫。
pip install PyMySQL
2.2 基本使用
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "shuke" # Date: 2018/5/13 import pymysql # 創建連接 conn = pymysql.connect(host="127.0.0.1", port=3306, user='zff', passwd='zff123', db='zff', charset='utf8mb4') # 創建游標(查詢數據返回為元組格式) # cursor = conn.cursor() # 創建游標(查詢數據返回為字典格式) cursor = conn.cursor(pymysql.cursors.DictCursor) # 1. 執行SQL,返回受影響的行數 effect_row1 = cursor.execute("select * from USER") # 2. 執行SQL,返回受影響的行數,一次插入多行數據 effect_row2 = cursor.executemany("insert into USER (NAME) values(%s)", [("jack"), ("boom"), ("lucy")]) # 3 # 查詢所有數據,返回數據為元組格式 result = cursor.fetchall() # 增/刪/改均需要進行commit提交,進行保存 conn.commit() # 關閉游標 cursor.close() # 關閉連接 conn.close() print(result) """ [{'id': 6, 'name': 'boom'}, {'id': 5, 'name': 'jack'}, {'id': 7, 'name': 'lucy'}, {'id': 4, 'name': 'tome'}, {'id': 3, 'name': 'zff'}, {'id': 1, 'name': 'zhaofengfeng'}, {'id': 2, 'name': 'zhaofengfeng02'}] """
2.3 獲取最新創建的數據自增ID
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "shuke" # Date: 2018/5/13 import pymysql # 創建連接 conn = pymysql.connect(host="127.0.0.1", port=3306, user='zff', passwd='zff123', db='zff', charset='utf8mb4') # 創建游標(查詢數據返回為元組格式) cursor = conn.cursor() # 獲取新創建數據自增ID effect_row = cursor.executemany("insert into USER (NAME)values(%s)", [("eric")]) # 增刪改均需要進行commit提交 conn.commit() # 關閉游標 cursor.close() # 關閉連接 conn.close() new_id = cursor.lastrowid print(new_id) """ 8 """
2.4 查詢操作
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "shuke" # Date: 2018/5/13 import pymysql # 創建連接 conn = pymysql.connect(host="127.0.0.1", port=3306, user='zff', passwd='zff123', db='zff', charset='utf8mb4') # 創建游標 cursor = conn.cursor() cursor.execute("select * from USER") # 獲取第一行數據 row_1 = cursor.fetchone() # 獲取前n行數據 row_2 = cursor.fetchmany(3) # # # 獲取所有數據 row_3 = cursor.fetchall() # 關閉游標 cursor.close() # 關閉連接 conn.close() print(row_1) print(row_2) print(row_3)
:warning: 在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:
2.5 防止SQL注入
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "shuke" # Date: 2018/5/13 import pymysql # 創建連接 conn = pymysql.connect(host="127.0.0.1", port=3306, user='zff', passwd='zff123', db='zff', charset='utf8mb4') # 創建游標 cursor = conn.cursor() # 存在sql注入情況(不要用格式化字符串的方式拼接SQL) sql = "insert into USER (NAME) values('%s')" % ('zhangsan',) effect_row = cursor.execute(sql) # 正確方式一 # execute函數接受一個元組/列表作為SQL參數,元素個數只能有1個 sql = "insert into USER (NAME) values(%s)" effect_row1 = cursor.execute(sql, ['wang6']) effect_row2 = cursor.execute(sql, ('wang7',)) # 正確方式二 sql = "insert into USER (NAME) values(%(name)s)" effect_row1 = cursor.execute(sql, {'name': 'wudalang'}) # 寫入插入多行數據 effect_row2 = cursor.executemany("insert into USER (NAME) values(%s)", [('ermazi'), ('dianxiaoer')]) # 提交 conn.commit() # 關閉游標 cursor.close() # 關閉連接 conn.close()
這樣,SQL操作就更安全了。如果需要更詳細的文檔參考PyMySQL文檔吧。不過好像這些SQL數據庫的實現還不太一樣,PyMySQL的參數占位符使用%s這樣的C格式化符,而Python自帶的sqlite3模塊的占位符好像是問號(?)。因此在使用其他數據庫的時候還是仔細閱讀文檔吧。Welcome to PyMySQL’s documentation
上文中的方式存在一個問題,單線程情況下可以滿足,程序需要頻繁的創建釋放連接來完成對數據庫的操作,那麼,我們的程序/腳本在多線程情況下會引發什麼問題呢?此時,我們就需要使用數據庫連接池來解決這個問題!
3.1 DBUtils模塊
DBUtils是Python的一個用於實現數據庫連接池的模塊。
此連接池有兩種連接模式:
3.2 模式一
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "shuke" # Date: 2018/5/13 from DBUtils.PersistentDB import PersistentDB import pymysql POOL = PersistentDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always closeable=False, # 如果為False時, conn.close() 實際上被忽略,供下次使用,在線程關閉時,才會自動關閉鏈接。如果為True時, conn.close()則關閉鏈接,那麼再次調用pool.connection時就會報錯,因為已經真的關閉了連接(pool.steady_connection()可以獲取一個新的鏈接) threadlocal=None, # 本線程獨享值得對象,用於保存鏈接對象,如果鏈接對象被重置 host='127.0.0.1', port=3306, user='zff', password='zff123', database='zff', charset='utf8', ) def func(): conn = POOL.connection(shareable=False) cursor = conn.cursor() cursor.execute('select * from USER') result = cursor.fetchall() cursor.close() conn.close() return result result = func() print(result)
3.3 模式二
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "shuke" # Date: 2018/5/13 import time import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閒的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閒置的鏈接,0和None不限制 maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='zff', password='zff123', database='zff', charset='utf8' ) def func(): # 檢測當前正在運行連接數的是否小於最大鏈接數,如果不小於則:等待或報raise TooManyConnections異常 # 否則 # 則優先去初始化時創建的鏈接中獲取鏈接 SteadyDBConnection。 # 然後將SteadyDBConnection對象封裝到PooledDedicatedDBConnection中並返回。 # 如果最開始創建的鏈接沒有鏈接,則去創建一個SteadyDBConnection對象,再封裝到PooledDedicatedDBConnection中並返回。 # 一旦關閉鏈接後,連接就返回到連接池讓後續線程繼續使用。 conn = POOL.connection() # print('連接被拿走了', conn._con) # print('池子裡目前有', POOL._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from USER') result = cursor.fetchall() conn.close() return result result = func() print(result)
:warning: 由於pymysql、MySQLdb等threadsafety值為1,所以該模式連接池中的線程會被所有線程共享,因此是線程安全的。如果沒有連接池,使用pymysql來連接數據庫時,單線程應用完全沒有問題,但如果涉及到多線程應用那麼就需要加鎖,一旦加鎖那麼連接勢必就會排隊等待,當請求比較多時,性能就會降低了。
3.4 加鎖
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "shuke" # Date: 2018/5/13 import pymysql import threading from threading import RLock LOCK = RLock() CONN = pymysql.connect(host='127.0.0.1', port=3306, user='zff', password='zff123', database='zff', charset='utf8') def task(arg): with LOCK: cursor = CONN.cursor() cursor.execute('select * from USER ') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
3.5 無鎖(報錯)
#! /usr/bin/env python # -*- coding: utf-8 -*- # __author__ = "shuke" # Date: 2018/5/13 import pymysql import threading CONN = pymysql.connect(host='127.0.0.1', port=3306, user='zff', password='zff123', database='zff', charset='utf8') def task(arg): cursor = CONN.cursor() cursor.execute('select * from USER ') # cursor.execute('select sleep(10)') result = cursor.fetchall() cursor.close() print(result) for i in range(10): t = threading.Thread(target=task, args=(i,)) t.start()
此時可以在數據庫中查看連接情況: show status like 'Threads%';
# cat sql_helper.py import pymysql import threading from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=20, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閒的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閒置的鏈接,0和None不限制 #maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='192.168.11.38', port=3306, user='root', passwd='apNXgF6RDitFtDQx', db='m2day03db', charset='utf8' ) def connect(): # 創建連接 # conn = pymysql.connect(host='192.168.11.38', port=3306, user='root', passwd='apNXgF6RDitFtDQx', db='m2day03db') conn = POOL.connection() # 創建游標 cursor = conn.cursor(pymysql.cursors.DictCursor) return conn,cursor def close(conn,cursor): # 關閉游標 cursor.close() # 關閉連接 conn.close() def fetch_one(sql,args): conn,cursor = connect() # 執行SQL,並返回收影響行數 effect_row = cursor.execute(sql,args) result = cursor.fetchone() close(conn,cursor) return result def fetch_all(sql,args): conn, cursor = connect() # 執行SQL,並返回收影響行數 cursor.execute(sql,args) result = cursor.fetchall() close(conn, cursor) return result def insert(sql,args): """ 創建數據 :param sql: 含有占位符的SQL :return: """ conn, cursor = connect() # 執行SQL,並返回收影響行數 effect_row = cursor.execute(sql,args) conn.commit() close(conn, cursor) def delete(sql,args): """ 創建數據 :param sql: 含有占位符的SQL :return: """ conn, cursor = connect() # 執行SQL,並返回收影響行數 effect_row = cursor.execute(sql,args) conn.commit() close(conn, cursor) return effect_row def update(sql,args): conn, cursor = connect() # 執行SQL,並返回收影響行數 effect_row = cursor.execute(sql, args) conn.commit() close(conn, cursor) return effect_row
PS: 可以利用靜態方法封裝到一個類中,方便使用。