目錄
前言
Redis
Redis簡介
Python操作redis
結語
手冊目錄:
軟件下載及環境安裝:
實習手冊一(基於Tornado框架的接口響應服務)軟件下載與環境配置
Tornado基本框架搭建:
實習手冊二(基於Tornado框架的接口響應服務)Tornado基本框架的搭建
Tornado框架中日志的記錄,路由的分發,接口的響應:
實習手冊三(基於Tornado框架的接口響應服務)Tornado框架中日志的記錄,路由的分發,接口的響應
通過PyMySQL,SQLAlchemy在PyCharm中實現對數據的增刪改查:
實習手冊四(基於Tornado框架的接口響應服務)通過PyMySQL,SQLAlchemy在PyCharm中實現對數據的增刪改查
SqlAlchemy的再封裝,PyMySQL和SqlAlchemy的結合使用:
實習手冊五(基於Tornado框架的接口響應服務)SqlAlchemy的再封裝,PyMySQL和SqlAlchemy的結合使用
在Tornado實現對數據庫的操作功能:
實習手冊六(基於Tornado框架的接口響應服務)在Tornado實現對數據庫的操作功能
不知不覺已經到實習手冊七了,本章我們要實現利用redis來對數據進行緩存。
簡而言之,redis也跟我們的mysql一樣,是一種數據庫,跟mysql不同的是,redis是一款高性能的NOSQL系列的非關系型數據庫,它是利用內存來存儲數據的,而mysql是利用磁盤來存儲數據的,這樣的特性使得我們利用redis進行數據查詢時,速度會比利用mysql要快得多。
Redis的下載及安裝已在實習手冊一中介紹過,這裡不再贅述。
沒有學習過redis的同學可以點擊下方這個鏈接系統化地學習一下:
python操作redis詳細教程
使用redis後,我們的數據庫查詢步驟應該變成這樣:
獲取到查詢請求→將請求傳遞的參數轉化為key→將key傳入到查詢接口→查詢接口調用query方法→先去redis中進行查詢,看是否有數據,有則返回,無則去mysql中進行查詢,還沒有就返回無結果
因此,我們先確定key的格式,key應該是由select語句中的主要參數構成:查什麼+去哪查+篩選條件
這裡我們以查詢用戶id,用戶名,創建時間為例來進行演示:
# User表數據模型
class User(SqlAchBase().base, MysqlBase):
__tablename__ = 'user'
userid = Column(Integer, autoincrement=True, primary_key=True)
uname = Column(String(30))
pwd = Column(String(30))
create_time = Column(String(30))
def search_user(self):
# 獲取表名(去哪查)
table_name = User.__tablename__
# 獲取請求參數
uname = self.get_argument('uname')
userid = self.get_argument('userid')
# key建立:查詢內容+查詢表
key = 'uname,userid,create_time' + '|' + table_name
# 判斷篩選條件是否存在,存在的話就拼接到key中
if uname:
key += '|' + 'uname="{}"'.format(uname)
if userid:
key += '|' + 'userid={}'.format(userid)
# 獲取查詢結果
result = User.query(key, *[User.userid, User.uname, User.create_time])
# 將結果呈現出來
self.write(str(result))
因為查詢一般調用的是get請求,所以獲取參數的方式變成了self.get_argument('參數名稱')
然後是MysqlBase中的query方法更改:
@classmethod
def query(cls, key, *columns):
cls.TestConn()
search_list = key.split('|')
# 判斷是否獲取的到這個key
if RedisBase.get(key):
print('successfully get data from redis')
return RedisBase.get(key)
else:
item_list = cls.session.query(*columns or cls).filter(text(search_list[2]+' and '+search_list[3])).all()
print(item_list)
RedisBase.set(key, str(item_list), expire_time=10)
return item_list
然後是RedisBase:
import redis
from settings.config import redis_host, redis_port
class RedisBase():
redis_tool = None
@classmethod
def TestConn(cls):
if cls.redis_tool:
print("already connect to redis_tool\n")
pass
else:
pool = redis.ConnectionPool(host=redis_host, port=redis_port, db=0, decode_responses=True)
cls.redis_tool = redis.Redis(connection_pool=pool)
print("trying to connect to redis_tool\n")
@classmethod
def get(cls, key):
cls.TestConn()
return cls.redis_tool.get(key)
@classmethod
def set(cls, key, target, expire_time):
cls.TestConn()
cls.redis_tool.set(key, target, ex=expire_time)
@classmethod
def expire_seconds(cls, key):
cls.TestConn()
# 查詢某個鍵的剩余時間
return cls.redis_tool.ttl(key)
@classmethod
def keys(cls):
cls.TestConn()
return cls.redis_tool.keys()
@classmethod
def delete(cls, key):
cls.TestConn()
cls.redis_tool.delete(key)
@classmethod
def exists(cls, key):
cls.TestConn()
return cls.redis_tool.exists(key)
然後開啟redis服務,在訪達中前往/usr/local/bin中打開redis-cli和redis-server兩個文件,啟動redis服務:
然後在Postman中輸入對應的參數,點擊Send:
可以看到,我們已經成功查詢到了對應的數據。
完整的代碼如下:
import redis
from tornado.web import RequestHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
import pymysql
from sqlalchemy import create_engine, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData
from sqlalchemy import Column, String, Integer
# Redis部分
class RedisBase():
redis_tool = None
redis_host = '127.0.0.1'
redis_port = 6379
@classmethod
def TestConn(cls):
if cls.redis_tool:
print("already connect to redis_tool\n")
pass
else:
pool = redis.ConnectionPool(host=cls.redis_host, port=cls.redis_port, db=0, decode_responses=True)
cls.redis_tool = redis.Redis(connection_pool=pool)
print("trying to connect to redis_tool\n")
# 獲取鍵
@classmethod
def get(cls, key):
cls.TestConn()
return cls.redis_tool.get(key)
# 添加鍵
@classmethod
def set(cls, key, target, expire_time):
cls.TestConn()
cls.redis_tool.set(key, target, ex=expire_time)
# 查詢鍵剩余時間
@classmethod
def expire_seconds(cls, key):
cls.TestConn()
# 查詢某個鍵的剩余時間
return cls.redis_tool.ttl(key)
# 返回全部鍵
@classmethod
def keys(cls):
cls.TestConn()
return cls.redis_tool.keys()
# 刪除鍵
@classmethod
def delete(cls, key):
cls.TestConn()
cls.redis_tool.delete(key)
# 判斷鍵是否存在
@classmethod
def exists(cls, key):
cls.TestConn()
return cls.redis_tool.exists(key)
# SqlAch部分
# URL用來作為SqlAlchemy創造引擎的url
# 格式為:"mysql+pymysql://root:你的mysql密碼@localhost:3306/數據庫名字"
URL = "mysql+pymysql://root:你的密碼@localhost:3306/數據庫名字"
class SqlAchBase():
engine = create_engine(URL)
base = declarative_base()
session = sessionmaker(bind=engine)()
metadata = MetaData()
@classmethod
def get_session(cls):
return cls.session
# Mysql部分
class MysqlBase():
session = SqlAchBase.get_session()
conn = None
# 測試連接方法
@classmethod
def TestConn(cls):
if not cls.conn:
cls.conn = pymysql.connect(user='root', host='localhost', password=你的密碼, db=數據庫名稱, port=3306)
print("trying to connect to '%s'" % cls.conn)
else:
print("already connected to '%s'" % cls.conn)
# 增
@classmethod
def add(cls, user):
cls.TestConn()
cls.session.add(user)
cls.session.commit()
# 刪
@classmethod
def delete(cls, filter):
cls.TestConn()
cls.session.query(cls).filter(text(filter)).delete(synchronize_session=False)
cls.session.commit()
# 改
@classmethod
def update(cls, filter, value):
cls.TestConn()
cls.session.query(cls).filter(text(filter)).update(value, synchronize_session=False)
cls.session.commit()
# 查
@classmethod
def query(cls, key, *columns):
cls.TestConn()
search_list = key.split("|")
if RedisBase.get(key):
print('successfully get data from redis')
return RedisBase.get(key)
else:
item_list = cls.session.query(*columns or cls).filter(text(search_list[2] + ' and ' + search_list[3])).all()
print(item_list)
RedisBase.set(key, str(item_list), expire_time=10)
return item_list
# User數據模型
class User(SqlAchBase().base, MysqlBase):
__tablename__ = 'user'
userid = Column(Integer, autoincrement=True, primary_key=True)
uname = Column(String(30))
pwd = Column(String(30))
create_time = Column(String(30))
# Tornado框架
class SearchHandler(RequestHandler):
# get請求分發
def get(self, *args, **kwargs):
path = self.request.path.split('/')
method = path[-1]
if callable(getattr(self, method)):
getattr(self, method)()
else:
self.write("404 not found")
# search_user接口
def search_user(self):
table_name = User.__tablename__
uname = self.get_argument('uname')
userid = self.get_argument('userid')
key = 'uname,userid,create_time' + '|' + table_name
if uname:
key += '|' + 'uname="{}"'.format(uname)
if userid:
key += '|' + 'userid={}'.format(userid)
print(key)
result = User.query(key, *[User.userid, User.uname, User.create_time])
self.write(str(result))
app = Application([
(r'/user/search/.*', SearchHandler)
])
if __name__ == '__main__':
app.listen(8000)
IOLoop.current().start()
當然,你也可以自己設定redis中鍵存在的時間,根據需求進行更改就行。
至此,所有的分區板塊都已經講完,接下來將會進入整體項目,會有規范的項目目錄,終於不需要將所有代碼打在一個程序裡了:)。