Connexion de référence:
python3 mysql Pool de connexion_yFwillhBlog de-CSDNBlogs_python3 Pool de connexion à la base de données
Le texte original existe Problèmes multithreadés,Plusieurs Threads créent plusieurs pools de connexion.
Après modification,Problème de sécurité du fil.Quand2Lorsque les Threads utilisent le pool de Threads en même temps,Sera créé en même temps2Pools de Threads.Si plus d'un thread,Les décalages utilisent des pools de fils,Un seul pool de Threads sera créé,Partagera un pool de Threads.
Le mode Singleton que j'ai utilisé pour l'annotation,J'ai l'impression que c'est un exemple de cette note,Problème multithreadé résolu,Mais n'a pas résolu le problème de sécurité du fil,Ce mode Singleton doit être optimisé.
Voici mes modifications.
Principalement par PooledDB Mise en œuvre du module.
db_config.py
# -*- coding:utf-8 -*-
# @Author: La sauce MIAOU
# @time: 2022 - 06 -19
# @File: db_config.py
# -*- coding: UTF-8 -*-
import pymysql
# Informations sur la base de données
DB_TEST_HOST = "127.0.0.1"
DB_TEST_PORT = 3308
DB_TEST_DBNAME = "bt"
DB_TEST_USER = "root"
DB_TEST_PASSWORD = "123456"
# Code de connexion à la base de données
DB_CHARSET = "utf8"
# mincached : Nombre de connexions inactives ouvertes au démarrage (Par défaut 0 Ne pas créer de connexion au début )
DB_MIN_CACHED = 5
# maxcached : Nombre maximum de connexions autorisées inactives dans le pool de connexions (Par défaut 0 Représente la taille du pool de connexion non inactif )
DB_MAX_CACHED = 0
# maxshared : Nombre maximum de connexions partagées autorisées (Par défaut 0 Représente que toutes les connexions sont dédiées ) Si la quantité maximale est atteinte , La connexion demandée pour le partage sera partagée
DB_MAX_SHARED = 5
# maxconnecyions : Nombre maximum de pools de connexion créés (Par défaut 0 La représentation n'est pas limitée)
DB_MAX_CONNECYIONS = 300
# blocking : Définir le comportement lorsque le nombre maximum de pools de connexion est atteint (Par défaut 0 Ou False Le représentant a renvoyé une erreur <toMany......> Les autres représentent un blocage jusqu'à ce que le nombre de connexions diminue , Connexion assignée )
DB_BLOCKING = True
# maxusage : Nombre maximum de multiplexages autorisés pour une seule connexion (Par défaut 0 Ou False Représente un multiplexage illimité ). Quand le nombre maximum est atteint , La connexion se reconnecte automatiquement ( Fermer et rouvrir )
DB_MAX_USAGE = 0
# setsession : Une optionSQL La liste des commandes est utilisée pour préparer chaque session ,Par exemple:["set datestyle to german", ...]
DB_SET_SESSION = None
# creator : Utiliser un module pour se connecter à la base de données
DB_CREATOR = pymysql
Définir le pool de connexion à un minimum maximum de 5- Oui.. Lors du démarrage du pool de connexion ,Et ça va créer5Connexions.
singleton.py
# -*- coding:utf-8 -*-
# @Author: La sauce MIAOU
# @time: 2022 - 06 -10
# @File: singleton.py
#coding:utf-8
#Fonction de mode Singleton,Pour modifier la classe
def singleton(cls,*args,**kw):
instances = {}
def _singleton():
if cls not in instances:
instances[cls] = cls(*args,**kw)
return instances[cls]
return _singleton
db_dbutils_init.py
# -*- coding:utf-8 -*-
# @Author: La sauce MIAOU
# @time: 2022 - 06 -19
# @File: db_dbutils_init.py
from dbutils.pooled_db import PooledDB
import db_config as config
# import random
from singleton import singleton
"""
@Fonction:Créer un pool de connexion à la base de données
"""
class MyConnectionPool(object):
# Propriété privée
# Accès direct par objet ,Mais accessible à l'intérieur de cette classe;
__pool = None
# def __init__(self):
# self.conn = self.__getConn()
# self.cursor = self.conn.cursor()
# Créer une connexion à la base de donnéesconnEt le curseurcursor
def __enter__(self):
self.conn = self.__getconn()
self.cursor = self.conn.cursor()
# Créer un pool de connexion à la base de données
def __getconn(self):
if self.__pool is None:
# i = random.randint(1, 100)
# print(" Nombre de pools de Threads créés "+str(i))
self.__pool = PooledDB(
creator=config.DB_CREATOR,
mincached=config.DB_MIN_CACHED,
maxcached=config.DB_MAX_CACHED,
maxshared=config.DB_MAX_SHARED,
maxconnections=config.DB_MAX_CONNECYIONS,
blocking=config.DB_BLOCKING,
maxusage=config.DB_MAX_USAGE,
setsession=config.DB_SET_SESSION,
host=config.DB_TEST_HOST,
port=config.DB_TEST_PORT,
user=config.DB_TEST_USER,
passwd=config.DB_TEST_PASSWORD,
db=config.DB_TEST_DBNAME,
use_unicode=False,
charset=config.DB_CHARSET
)
return self.__pool.connection()
# Libérer les ressources du pool de connexion
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
# Fermer la connexion pour retourner au pool de liens
# def close(self):
# self.cursor.close()
# self.conn.close()
# Retirer une connexion du pool de connexion
def getconn(self):
conn = self.__getconn()
cursor = conn.cursor()
return cursor, conn
# Obtenir le pool de connexion,Instanciation
@singleton
def get_my_connection():
return MyConnectionPool()
mysqlhelper.py
# -*- coding:utf-8 -*-
# @Author: La sauce MIAOU
# @time: 2022 - 06 -19
# @File: mysqlhelper.py
import time
from db_dbutils_init import get_my_connection
""" Exécution d'une requête d'instruction avec résultat retourné aucun résultat retourné 0;Ajouter/Supprimer/ Changez le nombre d'entrées de données retournées pour changer ,Pas de retour0"""
class MySqLHelper(object):
def __init__(self):
self.db = get_my_connection() # Obtenir une connexion à partir d'un pool de données
#
# def __new__(cls, *args, **kwargs):
# if not hasattr(cls, 'inst'): # Cas unique
# cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs)
# return cls.inst
# Encapsulation des commandes d'exécution
def execute(self, sql, param=None, autoclose=False):
"""
【 Il s'agit principalement de déterminer s'il y a des paramètres et de libérer la connexion après l'exécution 】
:param sql: Type de chaîne,sqlDéclarations
:param param: sql Paramètre à remplacer dans l'instruction "select %s from tab where id=%s" Dont:%sC'est le paramètre.
:param autoclose: Si la connexion est fermée
:return: Retour à la connexionconnEt le curseurcursor
"""
cursor, conn = self.db.getconn() # Obtenir la connexion à partir du pool de connexion
count = 0
try:
# count : Pour le nombre de données modifiées
if param:
count = cursor.execute(sql, param)
else:
count = cursor.execute(sql)
conn.commit()
if autoclose:
self.close(cursor, conn)
except Exception as e:
pass
return cursor, conn, count
# Relâchez la connexion
def close(self, cursor, conn):
""" Libérez la connexion et retournez - la au pool de connexion """
cursor.close()
conn.close()
# Rechercher tout
def selectall(self, sql, param=None):
cursor = None
conn = None
count = None
try:
cursor, conn, count = self.execute(sql, param)
res = cursor.fetchall()
return res
except Exception as e:
print(e)
self.close(cursor, conn)
return count
# Requête simple
def selectone(self, sql, param=None):
cursor = None
conn = None
count = None
try:
cursor, conn, count = self.execute(sql, param)
res = cursor.fetchone()
self.close(cursor, conn)
return res
except Exception as e:
print("error_msg:", e.args)
self.close(cursor, conn)
return count
# Ajouter
def insertone(self, sql, param):
cursor = None
conn = None
count = None
try:
cursor, conn, count = self.execute(sql, param)
# _id = cursor.lastrowid() # Obtient la clé primaire des données actuellement insérées id,Leid Il devrait être bon pour la génération automatique
conn.commit()
self.close(cursor, conn)
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# Ajouter plusieurs lignes
def insertmany(self, sql, param):
"""
:param sql:
:param param: Doit être un Tuple ou une liste [(),()]Ou((),())
:return:
"""
cursor, conn, count = self.db.getconn()
try:
cursor.executemany(sql, param)
conn.commit()
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# Supprimer
def delete(self, sql, param=None):
cursor = None
conn = None
count = None
try:
cursor, conn, count = self.execute(sql, param)
self.close(cursor, conn)
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# Mise à jour
def update(self, sql, param=None):
cursor = None
conn = None
count = None
try:
cursor, conn, count = self.execute(sql, param)
conn.commit()
self.close(cursor, conn)
return count
except Exception as e:
print(e)
conn.rollback()
self.close(cursor, conn)
return count
# if __name__ == '__main__':
# db = MySqLHelper()
# sql = "SELECT SLEEP(10)"
# db.execute(sql)
# time.sleep(20)
# TODO Requête simple
# sql1 = 'select * from userinfo where name=%s'
# args = 'python'
# ret = db.selectone(sql=sql1, param=args)
# print(ret) # (None, b'python', b'123456', b'0')
# TODO Ajouter un seul article
# sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)'
# ret = db.insertone(sql2, ('1', '2', '1', '2', '2'))
# print(ret)
# TODO Ajouter plusieurs
# sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)'
# li = li = [
# ('Sous - Province', '123'),
# ('Arrivée','456')
# ]
# ret = db.insertmany(sql3,li)
# print(ret)
# TODO Supprimer
# sql4 = 'delete from userinfo WHERE name=%s'
# args = 'xxxx'
# ret = db.delete(sql4, args)
# print(ret)
# TODO Mise à jour
# sql5 = r'update userinfo set password=%s WHERE name LIKE %s'
# args = ('993333993', '%old%')
# ret = db.update(sql5, args)
# print(ret)
--------------------------------------------- Explication et test du pool de connexion ----------
Modifier db_dbutils_init.py Documentation, Création d'un pool de connexion def __getconn(self):Méthode 2, Plus un nombre aléatoire imprimé , Il est pratique pour nous de localiser un pool de thread unique à l'avenir .
Après modificationdb_dbutils_init.py Documentation
# -*- coding:utf-8 -*-
# @Author: La sauce MIAOU
# @time: 2022 - 06 -19
# @File: db_dbutils_init.py
from dbutils.pooled_db import PooledDB
import db_config as config
import random
from singleton import singleton
"""
@Fonction:Créer un pool de connexion à la base de données
"""
class MyConnectionPool(object):
# Propriété privée
# Accès direct par objet ,Mais accessible à l'intérieur de cette classe;
__pool = None
# def __init__(self):
# self.conn = self.__getConn()
# self.cursor = self.conn.cursor()
# Créer une connexion à la base de donnéesconnEt le curseurcursor
def __enter__(self):
self.conn = self.__getconn()
self.cursor = self.conn.cursor()
# Créer un pool de connexion à la base de données
def __getconn(self):
if self.__pool is None:
i = random.randint(1, 100)
print(" Nombre aléatoire de pools de Threads "+str(i))
self.__pool = PooledDB(
creator=config.DB_CREATOR,
mincached=config.DB_MIN_CACHED,
maxcached=config.DB_MAX_CACHED,
maxshared=config.DB_MAX_SHARED,
maxconnections=config.DB_MAX_CONNECYIONS,
blocking=config.DB_BLOCKING,
maxusage=config.DB_MAX_USAGE,
setsession=config.DB_SET_SESSION,
host=config.DB_TEST_HOST,
port=config.DB_TEST_PORT,
user=config.DB_TEST_USER,
passwd=config.DB_TEST_PASSWORD,
db=config.DB_TEST_DBNAME,
use_unicode=False,
charset=config.DB_CHARSET
)
return self.__pool.connection()
# Libérer les ressources du pool de connexion
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
# Fermer la connexion pour retourner au pool de liens
# def close(self):
# self.cursor.close()
# self.conn.close()
# Retirer une connexion du pool de connexion
def getconn(self):
conn = self.__getconn()
cursor = conn.cursor()
return cursor, conn
# Obtenir le pool de connexion,Instanciation
@singleton
def get_my_connection():
return MyConnectionPool()
Début du test
from mysqlhelper import MySqLHelper
import time
if __name__ == '__main__':
sql = "SELECT SLEEP(10)"
sql1 = "SELECT SLEEP(15)"
db = MySqLHelper()
db.execute(sql)
db.execute(sql1)
time.sleep(20)
Dans la base de données,Utiliser show processlist;
show processlist;
Quand le premier sqlHeure. Affichage de la connexion à la base de données .
Quand le second sqlHeure. Affichage de la connexion à la base de données .
Quand l'exécution sera terminéesql,ProcéduresleepHeure. Affichage de la connexion à la base de données .
Programme imprimer les résultats :
Nombre aléatoire de pools de Threads 43
Les conclusions peuvent être tirées de ce qui précède.:
Après le démarrage du pool de Threads,Produit5Connexions.Exécuter le premiersqlHeure,Utilisé1Connexions. Après avoir exécuté le premier sqlAprès, J'ai utilisé autre chose 1Connexions. C'est linéaire , Il y a un total de 5Connexions,Mais à chaque fois,Un seul a été utilisé..
J'ai une question., Le pool de connexion n'a aucun sens s'il ne supporte pas la concurrence ?
Comme ci - dessus, Bien que le pool de Threads soit ouvert 5Connexions,Mais à chaque foissql, Il n'y a qu'une seule connexion . Alors pourquoi ne pas définir la taille du pool de Threads à 1Et alors?? Qu'est - ce que ça veut dire de définir la taille du pool de Threads ?( Si dans un scénario non simultané , Est - ce que le réglage de la taille n'a aucun sens ?)
Avantages par rapport à l'absence de pool de Threads :
Si vous n'utilisez pas le pool de Threads , Un à la fois sqlCréer、Déconnecter. Utiliser le pool de connexion comme nous le faisons , Pas besoin de recréer 、Déconnecter, Prenez les connexions prêtes à l'emploi et utilisez - les directement .
from mysqlhelper import MySqLHelper
import time
if __name__ == '__main__':
db = MySqLHelper()
db1 = MySqLHelper()
sql = "SELECT SLEEP(10)"
sql1 = "SELECT SLEEP(15)"
db.execute(sql)
db1.execute(sql1)
time.sleep(20)
Premier exempledb,Mise en œuvresql. Le pool de Threads a démarré 5Connexions
Deuxième exempledb1,Mise en œuvresql
Pendant le sommeil programmé ,Total5Pools de Threads.
Imprimer les résultats:
Il s'avère que:
Bien que nous ayons créé à tour de rôle 2Exemples,Mais(1) Créer des résultats d'impression pour le pool de Threads ,Imprimer seulement1Une fois,Et du début à la fin, Le pool de Threads n'a démarré que 5Connexions, Et connecté idIl n'y a pas eu de changement, Ça veut dire que ça a toujours été 5Connexions.
Preuve,Bien que nous ayons créé2Exemples,Mais ça...2 Les instances sont en fait une instance .( Le mode Singleton est en vigueur )
# -*- coding:utf-8 -*-
# @Author: La sauce MIAOU
# @time: 2022 - 06 -19
# @File: test1.py
# Exécution simultanée
import threading
from mysqlhelper import MySqLHelper
import time
def sl1():
time.sleep(2)
db = MySqLHelper()
sql = "SELECT SLEEP(6)"
db.execute(sql)
def sl2():
time.sleep(4)
db = MySqLHelper()
sql = "SELECT SLEEP(15)"
db.execute(sql)
if __name__ == '__main__':
threads = []
t1 = threading.Thread(target=sl1)
threads.append(t1)
t2 = threading.Thread(target=sl2)
threads.append(t2)
for t in threads:
t.setDaemon(True)
t.start()
time.sleep(20)
2 Les fils sont séparés 2Secondes.
Observez le nombre de connexions à la base de données
Imprimer les résultats:
En exécution simultanée2- Oui.sqlHeure, Partager ceci 5Connexions, Et le résultat n'a été imprimé qu'une seule fois , Description bien que la création simultanée 2Exemples secondaires, Mais un seul pool de connexion a été créé .
# -*- coding:utf-8 -*-
# @Author: La sauce MIAOU
# @time: 2022 - 06 -19
# @File: testconnect.py
import threading
from mysqlhelper import MySqLHelper
import time
if __name__ == '__main__':
db = MySqLHelper()
sql = "SELECT SLEEP(6)"
sql1 = "SELECT SLEEP(15)"
threads = []
t1 = threading.Thread(target=db.execute, args=(sql,))
threads.append(t1)
t2 = threading.Thread(target=db.execute, args=(sql1,))
threads.append(t2)
for t in threads:
t.setDaemon(True)
t.start()
time.sleep(20)
Observer les connexions à la base de données
Imprimer les résultats:
Les résultats montrent que:
Le terminal est imprimé 2Une fois,La base de données a été créée10Connexions,Description créer2Pools de Threads.
Ce mode Singleton ,Problème de sécurité du fil.
Comme l'expérience ci - dessus ,Problème de thread dangereux. Causes profondes, Ça devrait être en mode Singleton .
Quand je veux appliquer ce mode Singleton .