因為業務需求,需要連接數據庫查詢數據
數據庫類型:mysql,mongodb
需求:有中連機制,讀取配置文件,可實例化,有日志記錄
dbconfig.conf
[Mongodbtest]
host=192.168.99.42
port=27018
user=
password=
database=ace_sms
[mysql]
host=192.168.99.42
port=27018
user=
password=
database=ace_sms
# -*- coding: utf-8 -*-
# @Time : 2022/7/12 18:13
# @File : Loggers.py
# @Software: PyCharm
# 加入日志
# 獲取logger實例
import logging
import os
import sys
logger = logging.getLogger("baseSpider")
# 指定輸出格式
formatter = logging.Formatter('%(asctime)s\
%(filename)s-%(lineno)d\
%(levelname)s\
%(message)s')
# 文件日志
# 獲取當前文件路徑
current_path = os.path.abspath(os.path.dirname(__file__))
father_path = os.path.abspath(os.path.dirname(current_path) + os.path.sep + ".")
log_file_path = os.path.join(father_path + "\logs\dingding_message.log")
file_handler = logging.FileHandler(log_file_path)
file_handler.setFormatter(formatter)
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 為logge添加具體的日志處理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
# -*- coding: utf-8 -*-
# @Time : 2022-07-08 21:32
# @File : MyDB.py
# @Software: PyCharm
import time
import pymysql
import configparser
import logging
import sys
from tool.Loggers import logger
class mysql:
def __init__(self, config_file, db):
"""
:param config_file:
:param db:
"""
# 實例化configparser
config = configparser.ConfigParser()
# 從配置文件中讀取數據庫的相關信息
config.read(config_file, encoding='utf-8')
self.host = config[db]['host']
self.port = int(config[db]['port'])
self.user = config[db]['user']
self.password = config[db]['password']
self.database = config[db]['database']
self.db = db
self.conn = None
self._conn()
def _conn(self):
try:
logger.info(f"讀取環境:{self.db},連接信息:主機ip:{self.host},端口:{self.port},用戶:{self.user},連接數據庫:{self.database}")
self.conn = pymysql.Connection(host=self.host, user=self.user, password=self.password,
database=self.database, port=self.port)
logger.info(f"數據庫: {self.database}初始化連接成功")
return True
except Exception as e:
logger.error(f"數據庫: {self.database}初始化連接失敗,錯誤:{e}")
return False
def close(self):
self.conn.close()
logger.info(f"數據庫關閉成功")
def _reConn(self, num=28800, stime=3): # 重試連接總次數為1天,這裡根據實際情況自己設置,如果服務器宕機1天都沒發現就......
_number = 0
_status = True
logger.info(f"檢查數據庫{self.database}連通性,連接IP:{self.host}")
while _status and _number <= num:
try:
self.conn.ping() # cping 校驗連接是否異常
_status = False
logger.info(f"數據庫{self.database}連接============正常,連接IP:{self.host} ")
except:
if self._conn() == True: # 重新連接,成功退出
_status = False
break
_number += 1
logger.info(f"數據庫{self.database}連接============失敗,連接IP:{self.host} ")
time.sleep(stime) # 連接不成功,休眠3秒鐘,繼續循環,知道成功或重試次數結束
def select(self, sql=''):
try:
self._reConn()
logger.info('查詢的語句:%s' % sql)
# 建立游標
db_cursor = self.conn.cursor()
db_cursor.execute(sql)
result = db_cursor.fetchall()
# 返回值和數據表字段組成json格式
lists = []
t = 0
for x in result:
i = 0
onelist = {}
for field in db_cursor.description:
onelist[field[0]] = x[i]
i = i + 1
lists.append(onelist)
logger.info('組合數據:%s' % lists)
return lists
# return result
except pymysql.Error as e:
logger.error('數據庫查詢數據失敗:%s' % e)
return False
def select_limit(self, sql='', offset=0, length=20):
sql = '%s limit %d , %d ;' % (sql, offset, length)
return self.select(sql)
# # 插入
# def execute_insert(self, query):
# print('query:%s' % query)
# try:
# # 建立游標
# db_cursor = self.dbconn.cursor()
# db_cursor.execute(query)
# db_cursor.execute('commit')
# return True
# except Exception as e:
# print('數據庫插入數據失敗:%s' % e)
# # 事務回滾
# db_cursor.execute('rollback')
# db_cursor.close()
# exit()
# -*- coding: utf-8 -*-
# @Time : 2022/7/12 10:44
# @File : MongoDB.py
# @Software: PyCharm
import configparser
import time
import pymongo
from alarm.tool.Loggers import logger
class mongo(object):
def __init__(self, config_file, db):
"""
:param config_file: 配置文件路徑
:param db: 獲取配置的環境
"""
# 實例化configparser
config = configparser.ConfigParser()
# 從配置文件中讀取數據庫的相關信息
config.read(config_file, encoding='utf-8')
self.host = config[db]['host']
self.port = int(config[db]['port'])
self.user = config[db]['user']
self.password = config[db]['password']
self.database = config[db]['database']
self.db = db
self.conn = None
self._conn()
def _conn(self):
try:
logger.info(f"讀取環境:{self.db},連接信息:主機ip:{self.host},端口:{self.port},用戶:{self.user},連接數據庫:{self.database}")
# self.conn = pymongo.MongoClient(host=self.host,
# port=self.port)
self.conn = pymongo.MongoClient(host=self.host,
port=self.port) # username=self.user, password=self.password
# self.db_conn = self.conn[self.database]
# self.db_conn=self.db_conn.authenticate(self.user,self.password)
self.conn[self.database].authenticate(self.user, self.password, self.database)
self.db_conn = self.conn[self.database]
if self.conn.server_info():
logger.info(f"數據庫: {self.database}初始化連接成功")
return True
except Exception as e:
logger.error(f"數據庫: {self.database}初始化連接失敗,錯誤:{e}")
return False
# MongoDB數據庫關閉
def close(self):
self.conn.close()
logger.info(f"數據庫關閉成功")
# 查詢調用狀態
def get_state(self):
return self.conn is not None # and self.db_conn is not None
def _reConn(self, num=28800, stime=3): # 重試連接總次數為1天,這裡根據實際情況自己設置,如果服務器宕機1天都沒發現就......
_number = 0
_status = True
logger.info(f"檢查數據庫{self.database}連通性,連接IP:{self.host}")
while _status and _number <= num:
try:
self.conn.server_info() # 檢查數據庫是否正常連通
_status = False
logger.info(f"數據庫{self.database}連接============正常,連接IP:{self.host} ")
except:
if self._conn() == True: # 重新連接,成功退出
_status = False
break
_number += 1
logger.info(f"數據庫{self.database}連接============失敗,連接IP:{self.host} ")
time.sleep(stime) # 連接不成功,休眠3秒鐘,繼續循環,知道成功或重試次數結束
def insert_one(self, collection, data):
self._reConn()
if self.get_state():
ret = self.db_conn[collection].insert_one(data)
return ret.inserted_id
else:
return ""
def insert_many(self, collection, data):
if self.get_state():
ret = self.db_conn[collection].insert_many(data)
return ret.inserted_id
else:
return ""
def update(self, collection, data):
# data format:
# {key:[old_data,new_data]}
data_filter = {}
data_revised = {}
for key in data.keys():
data_filter[key] = data[key][0]
data_revised[key] = data[key][1]
if self.get_state():
return self.db_conn[collection].update_many(data_filter, {"$set": data_revised}).modified_count
return 0
def find(self, col, condition, column=None):
"""
查詢數據代碼
:param col: 數據庫中的集合
:param condition: 查詢條件,查詢條件必須是個字典
:param column: find 的第二個參數是可選的,可以指定需要返回的鍵。這個特別的 "$slice" 運算符可以返回一個數組鍵中元素的子集。
:return: list 返回查詢到記錄的列表
"""
# print(col, condition)
# data= self.db_conn["sms_log"]
# data=self.db_conn["sms_log"].find({"status":"2","createTime":{"$gte": "2022/07/12 22:18:26"}},{"status":1,"channelCode":1,"_id":0})
# data = self.db_conn["authCode"].find({"use": False,"createdTime": {"$gte": 1657865035}})
# print(list(data))
self._reConn()
if self.get_state():
if column is None:
return list(self.db_conn[col].find(condition))
else:
return list(self.db_conn[col].find(condition, column))
else:
return None
def get_last_data(self, col, number=1):
if self.get_state():
# last_data = list(self.db_conn["authCode"].find().sort("_id", -1 ).limit(50))
last_data = list(self.db_conn[col].find().sort("_id", -1).limit(number))
return last_data
def delete(self, col, condition):
if self.get_state():
return self.db_conn[col].delete_many(filter=condition).deleted_count
return 0
def aggregate(self, col, condition):
if self.get_state():
return list(self.db_conn[col].aggregate(condition))
# 時間戳轉換時間
def timestamp_to_time(timestamp):
timeArray = time.localtime(timestamp) # 轉換為可用的時間,就是下面的%Y %m %d
# day_time = time.strftime("%Y-%m-%d", timeArray) # 取上面的timeArray中的對應值0
second_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) # 這個一樣的
return second_time # 返回相應的值
# 時間轉換時間戳
def time_to_timestamp(time_str):
# 轉換成時間數組
timeArray = time.strptime(time_str, "%Y-%m-%d %H:%M:%S")
# 轉換成時間戳
timestamp = time.mktime(timeArray)
# print(timestamp)
return timestamp
if __name__ == '__main__':
logger.info("開始實例化數據庫對象 ")
config_file = '../config/dbconfig.conf'
db = 'Devfubaodai'
db = mongo(config_file, db)
# 獲取時間
data_time = '2022-07-18 00:00:00'
timestamp = time_to_timestamp(data_time)
# print(timestamp)
data = db.get_last_data("authCode", 1)
last_time_data = data[0]["createdTime"]
satrt_time_data = data[0]["createdTime"]-600
print(timestamp_to_time(last_time_data))
print(timestamp_to_time(satrt_time_data))
data = db.find("authCode", {"createdTime": {"$gte": satrt_time_data, "$lte": last_time_data}}, {"name": 1, "use": 1, "_id": 0})
num, fail = 0, 0
for i in data:
num += 1
print(i)
print(num)
# logger.info(f"短信---最新50條數據,成功使用數量:{num},未使用的數量:{fail}")
db.close()
logger.info("--------------------------------------------------------------------------------------------------")
def main():
#mysqld_test()
schedule.every(1).minutes.do(mysqld_test)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
res = os.path.abspath(__file__) # 獲取當前文件的絕對路徑
print(res)
base_path = os.path.dirname(os.path.dirname(res)) # 獲取當前文件的上兩級目錄
print(base_path)
base_path2 = os.path.dirname(res)
print(base_path2)
sys.path.append(base_path2)
sys.path.append(base_path)
sys.path.insert(0, base_path) # 加入環境變量
# 以上5行代碼必須要加入到文件的最上方
print(sys.path)
main()