mysql_comm.py
# -*- coding:utf8 -*-
import pymysql
from pymysql.cursors import DictCursor
from dbutils.pooled_db import PooledDB
MYSQL_HOST=""
MYSQL_PORT=3306
MYSQL_USER="crawler"
MYSQL_DATABASE="crawler_deliver"
MYSQL_PASSWORD=""
#父類連接池,用於初始化數據庫連接
class BasePymysqlPool(object):
def __init__(self):
self.db_host = MYSQL_HOST
self.db_port = int(MYSQL_PORT)
self.user = MYSQL_USER
self.password = str(MYSQL_PASSWORD)
self.db = MYSQL_DATABASE
self.conn = None
self.cursor = None
class PymysqlPool(BasePymysqlPool):
""" MYSQL數據庫對象,負責產生數據庫連接 , 此類中的連接采用連接池實現獲取連接對象:conn = Mysql.getConn() 釋放連接對象;conn.close()或del conn """
# 連接池對象
__pool = None
def __init__(self):
super(PymysqlPool, self).__init__()
# 數據庫構造函數,從連接池中取出連接,並生成操作游標
self._conn = self.__getConn()
self._cursor = self._conn.cursor()
def __getConn(self):
""" @summary: 靜態方法,從連接池中取出連接 @return MySQLdb.connection """
if PymysqlPool.__pool is None:
__pool = PooledDB(creator=pymysql,
mincached=1,
maxcached=20,
host=self.db_host,
port=self.db_port,
user=self.user,
passwd=self.password,
db=self.db,
use_unicode=True,#此處應設置為True,否則查詢出來的數據會變成bytes類型
charset="utf8",
cursorclass=DictCursor,
)
return __pool.connection()
def getAll(self, sql, param=None):
""" @summary: 執行查詢,並取出所有結果集 @param sql:查詢SQL,如果有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來 @param param: 可選參數,條件列表值(元組/列表) @return: result list(字典對象)/boolean 查詢到的結果集 """
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchall()
else:
result = False
return result
def getOne(self, sql, param=None):
""" @summary: 執行查詢,並取出第一條 @param sql:查詢SQL,如果有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來 @param param: 可選參數,條件列表值(元組/列表) @return: result list/boolean 查詢到的結果集 """
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchone()
else:
result = False
return result
def getMany(self, sql, num, param=None):
""" @summary: 執行查詢,並取出num條結果 @param sql:查詢SQL,如果有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來 @param num:取得的結果條數 @param param: 可選參數,條件列表值(元組/列表) @return: result list/boolean 查詢到的結果集 """
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchmany(num)
else:
result = False
return result
def insertMany(self, sql, values):
""" @summary: 向數據表插入多條記錄 @param sql:要插入的SQL格式 @param values:要插入的記錄數據tuple(tuple)/list[list] @return: count 受影響的行數 """
count = self._cursor.executemany(sql, values)
return count
def __query(self, sql, param=None):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
def update(self, sql, param=None):
""" @summary: 更新數據表記錄 @param sql: SQL格式及條件,使用(%s,%s) @param param: 要更新的 值 tuple/list @return: count 受影響的行數 """
return self.__query(sql, param)
def insert(self, sql, param=None):
""" @summary: 更新數據表記錄 @param sql: SQL格式及條件,使用(%s,%s) @param param: 要更新的 值 tuple/list @return: count 受影響的行數 """
return self.__query(sql, param)
'''執行建表語句'''
def create_table(self,sql):
return self._cursor.execute(sql)
def insert_data(self, table_name: str, data) -> bool:
''' 插入數據,根據傳入的數據類型進行判斷,自動選者插入方式 @:param table_name 表名 @:param data 要插入的數據 '''
try:
count = 0
if isinstance(data, list):
for item in data:
keys = ",".join(list(item.keys()))
values = ",".join([f"'{
x}'" for x in list(item.values())])
sql = f"INSERT IGNORE INTO {
table_name} ({
keys}) VALUES ({
values});"
count+=self._cursor.execute(sql)
elif isinstance(data, dict):
keys = ",".join(list(data.keys()))
values = ",".join([f"'{
x}'" for x in list(data.values())])
sql = f"INSERT IGNORE INTO {
table_name} ({
keys}) VALUES ({
values});"
print(sql)
count+=self._cursor.execute(sql)
return count
except Exception as ex:
raise Exception(ex)
def delete(self, sql, param=None):
""" @summary: 刪除數據表記錄 @param sql: SQL格式及條件,使用(%s,%s) @param param: 要刪除的條件 值 tuple/list @return: count 受影響的行數 """
return self.__query(sql, param)
def begin(self):
""" @summary: 開啟事務 """
self._conn.autocommit(0)
def end(self, option='commit'):
""" @summary: 結束事務 """
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, isEnd=1):
""" @summary: 釋放連接池資源 """
if isEnd == 1:
self.end('commit')
else:
self.end('rollback')
self._cursor.close()
self._conn.close()