Regular use Python Connect mysql database , We need to design a sqlHelper To facilitate our access mysql database , We use DBUtils.PooledDB
To create a database connection pool , Get a database connection from the connection pool each time .
First installation pymysql
and DBUtils
pip install pymysql==1.0.2
pip install DBUtils==3.0.2
import pymysql
from dbutils.pooled_db import PooledDB
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # Use the linked database module
maxconnections=6, # The maximum number of connections allowed by the connection pool ,0 and None Indicates that there is no limit to the number of connections
mincached=2, # On initialization , At least links created in the link pool ,0 Means not to create
blocking=True, # If there is no connection available in the connection pool , Whether to block waiting .True, wait for ;False, Don't wait and report an error
ping=0,
# ping MySQL Server side , Check if the service is available .# Such as :0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.171.45',
port=3306,
user='root',
password='123456',
database='vincent',
charset='utf8'
)
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" Get all the data """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" Get all the data """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
return self.open()[1]
def __exit__(self):
pass
db = SqlHelper()
print(db)
Test database connection pool :
from SqlHelper import db
def task(num):
# Go to the connection pool to get a connection
conn,cursor = db.open()
cursor.execute('select sleep(3)') # Execute... In the database 3 Second
result = cursor.fetchall()
cursor.close()
# Put the connection to the connection pool
conn.close()
print(num,'------------>',result)
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()
We performed a database pause 3 Of a second sql request , Determine the maximum number of connections in the connection pool . When running program output , every other 6 Will output once , It can be explained that the maximum number of connections in the connection pool is 6 individual .
We can combine with
Context , To optimize :
def task(num):
with db as cursor:
cursor.excute('select sleep(3)')
result = cursor.fetchall()
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()
But here's the thing , Although used with
, Automatic execution __enter__
Assigned to cursor
, But in multithreading , How to execute automatically close()
, It can make the thread release the connection . Because in multithreading , Without interference , Each thread interferes with each other ( Because we have the singleton mode above , There is only one object , Will interfere with each other ), So you need a place to identify the current thread .
So we need to introduce threading.local()
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # Use the linked database module
maxconnections=6, # The maximum number of connections allowed by the connection pool ,0 and None Indicates that there is no limit to the number of connections
mincached=2, # On initialization , At least links created in the link pool ,0 Means not to create
blocking=True, # If there is no connection available in the connection pool , Whether to block waiting .True, wait for ;False, Don't wait and report an error
ping=0,
# ping MySQL Server side , Check if the service is available .# Such as :0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='192.168.171.45',
port=3306,
user='root',
password='123456',
database='vincent',
charset='utf8'
)
self.local = threading.local()
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" Get all the data """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" Get all the data """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
conn, cursor = self.open()
rv = getattr(self.local, 'stack', None)
if not rv:
self.local.stack = [(conn, cursor)]
else:
rv.append((conn, cursor))
self.local.stack = rv
return cursor
def __exit__(self): # According to different threads, close the corresponding conn and cursor
rv = getattr(self.local, 'stack', None)
if not rv:
# del self.local.stack
return
conn, cursor = self.local.stack.pop()
cursor.close()
conn.close()
db = SqlHelper()
test :
def task(num):
with db as cursor:
cursor.excute('select sleep(3)')
result = cursor.fetchall()
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()