程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python 實現簡單的sqlhelper

編輯:Python

經常使用Python連接mysql數據庫,我們需要設計一個sqlHelper來方便我們訪問mysql數據庫,我們使用DBUtils.PooledDB來創建一個數據庫連接池,每次請求從連接池中獲取一個數據庫連接。

首先安裝pymysqlDBUtils

pip install pymysql==1.0.2
pip install DBUtils==3.0.2

構建SqlHelper

import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # 使用鏈接數據庫的模塊
maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數
mincached=2, # 初始化時,鏈接池中至少創建的鏈接,0表示不創建
blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.171.45',
port=3306,
user='root',
password='123456',
database='vincent',
charset='utf8'
)
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" 獲取所有數據 """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" 獲取所有數據 """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
return self.open()[1]
def __exit__(self):
pass
db = SqlHelper()
print(db)

測試數據庫連接池:

from SqlHelper import db
def task(num):
# 去連接池中獲取一個連接
conn,cursor = db.open()
cursor.execute('select sleep(3)') # 在數據庫執行3秒鐘
result = cursor.fetchall()
cursor.close()
# 將連接放會到連接池
conn.close()
print(num,'------------>',result)
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()

我們執行了數據庫停頓3秒的sql請求,判斷連接池的最大連接數。運行程序輸出時,每隔6個會輸出一次,可以說明連接池的最大連接數是6個。

with 上下文

我們可以結合with上下文,來進行優化一下:

def task(num):
with db as cursor:
cursor.excute('select sleep(3)')
result = cursor.fetchall()
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()

但是需要注意的是,雖然使用了with,自動執行了__enter__賦值給了cursor,但是在多線程中,如何自動執行close(),可以使得線程釋放連接。因為在多線程下,不加干涉的話,每個線程都會干涉彼此(因為我們上面是單例模式,只有一個對象,彼此會干涉),因此需要一個地方可以標識出當前的線程。

因此需要引入threading.local()

threading.local()

class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # 使用鏈接數據庫的模塊
maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數
mincached=2, # 初始化時,鏈接池中至少創建的鏈接,0表示不創建
blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.171.45',
port=3306,
user='root',
password='123456',
database='vincent',
charset='utf8'
)
self.local = threading.local()
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" 獲取所有數據 """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" 獲取所有數據 """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
conn, cursor = self.open()
rv = getattr(self.local, 'stack', None)
if not rv:
self.local.stack = [(conn, cursor)]
else:
rv.append((conn, cursor))
self.local.stack = rv
return cursor
def __exit__(self): # 根據不同的線程關閉對應的conn和cursor
rv = getattr(self.local, 'stack', None)
if not rv:
# del self.local.stack
return
conn, cursor = self.local.stack.pop()
cursor.close()
conn.close()
db = SqlHelper()

測試:

def task(num):
with db as cursor:
cursor.excute('select sleep(3)')
result = cursor.fetchall()
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved