經常使用Python連接mysql數據庫,我們需要設計一個sqlHelper來方便我們訪問mysql數據庫,我們使用DBUtils.PooledDB
來創建一個數據庫連接池,每次請求從連接池中獲取一個數據庫連接。
首先安裝pymysql
和DBUtils
pip install pymysql==1.0.2
pip install DBUtils==3.0.2
import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # 使用鏈接數據庫的模塊
maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數
mincached=2, # 初始化時,鏈接池中至少創建的鏈接,0表示不創建
blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.171.45',
port=3306,
user='root',
password='123456',
database='vincent',
charset='utf8'
)
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" 獲取所有數據 """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" 獲取所有數據 """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
return self.open()[1]
def __exit__(self):
pass
db = SqlHelper()
print(db)
測試數據庫連接池:
from SqlHelper import db
def task(num):
# 去連接池中獲取一個連接
conn,cursor = db.open()
cursor.execute('select sleep(3)') # 在數據庫執行3秒鐘
result = cursor.fetchall()
cursor.close()
# 將連接放會到連接池
conn.close()
print(num,'------------>',result)
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()
我們執行了數據庫停頓3秒的sql請求,判斷連接池的最大連接數。運行程序輸出時,每隔6個會輸出一次,可以說明連接池的最大連接數是6個。
我們可以結合with
上下文,來進行優化一下:
def task(num):
with db as cursor:
cursor.excute('select sleep(3)')
result = cursor.fetchall()
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()
但是需要注意的是,雖然使用了with
,自動執行了__enter__
賦值給了cursor
,但是在多線程中,如何自動執行close()
,可以使得線程釋放連接。因為在多線程下,不加干涉的話,每個線程都會干涉彼此(因為我們上面是單例模式,只有一個對象,彼此會干涉),因此需要一個地方可以標識出當前的線程。
因此需要引入threading.local()
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # 使用鏈接數據庫的模塊
maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數
mincached=2, # 初始化時,鏈接池中至少創建的鏈接,0表示不創建
blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.171.45',
port=3306,
user='root',
password='123456',
database='vincent',
charset='utf8'
)
self.local = threading.local()
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" 獲取所有數據 """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" 獲取所有數據 """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
conn, cursor = self.open()
rv = getattr(self.local, 'stack', None)
if not rv:
self.local.stack = [(conn, cursor)]
else:
rv.append((conn, cursor))
self.local.stack = rv
return cursor
def __exit__(self): # 根據不同的線程關閉對應的conn和cursor
rv = getattr(self.local, 'stack', None)
if not rv:
# del self.local.stack
return
conn, cursor = self.local.stack.pop()
cursor.close()
conn.close()
db = SqlHelper()
測試:
def task(num):
with db as cursor:
cursor.excute('select sleep(3)')
result = cursor.fetchall()
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()