目錄
前言
Tornado_program
common
handler_base
mysql_base
sqlalchemy_base
redis_base
model
log
user
server
change_user
register_user
search_user
user_server
settings
config
結語
手冊目錄:
軟件下載及環境安裝:
實習手冊一(基於Tornado框架的接口響應服務)軟件下載與環境配置
Tornado基本框架搭建:
實習手冊二(基於Tornado框架的接口響應服務)Tornado基本框架的搭建
Tornado框架中日志的記錄,路由的分發,接口的響應:
實習手冊三(基於Tornado框架的接口響應服務)Tornado框架中日志的記錄,路由的分發,接口的響應
通過PyMySQL,SQLAlchemy在PyCharm中實現對數據的增刪改查:
實習手冊四(基於Tornado框架的接口響應服務)通過PyMySQL,SQLAlchemy在PyCharm中實現對數據的增刪改查
SqlAlchemy的再封裝,PyMySQL和SqlAlchemy的結合使用:
實習手冊五(基於Tornado框架的接口響應服務)SqlAlchemy的再封裝,PyMySQL和SqlAlchemy的結合使用
在Tornado實現對數據庫的操作功能:
實習手冊六(基於Tornado框架的接口響應服務)在Tornado實現對數據庫的操作功能
在Tornado中使用Redis來緩存數據
實習手冊七(基於Tornado框架的接口響應服務)使用Redis來緩存數據
本章我們將前面幾章的項目整合起來,形成一個大的項目。
項目目錄如下:
其中,common用來存放公共類,logs用來存放日志,model用來存放數據模型,server用來存放服務,user用來存放和用戶有關的服務,settings用來存放設置
# handler基類,用來重載get和post請求,以及log生成器,on_finish時的日志寫入,還有報錯
import datetime
import json
import logging
from tornado.web import RequestHandler
from model.log import Log
class HandlerBase(RequestHandler):
data = None
def get(self, *args, **kwargs):
path = self.request.path.split('/')
method = path[-1]
if callable(getattr(self, method)):
getattr(self, method)()
else:
self.write("404 not found")
def post(self, *args, **kwargs):
path = self.request.path.split('/')
self.data = json.loads(self.request.body)
method = path[-1]
if callable(getattr(self, method)):
getattr(self, method)()
else:
self.write("404 not found")
def log(self, msg):
logger = logging.getLogger('logger')
logger.setLevel(level=logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
file_handler = logging.FileHandler('../logs/log.txt')
file_handler.setLevel(level=logging.INFO)
file_handler.setFormatter(formatter)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logger.info(msg)
def on_finish(self):
msg = str(self.request.path) + str(self.request.method) + str(self.request.headers)
self.log(msg)
new_log = Log(path=self.request.path, method=self.request.method, create_time=str(datetime.datetime.now()), msg=str(self.request.headers))
Log.add(new_log)
# 數據庫處理基類
# sqlalchemy增刪改查等方法封裝,以及測試數據庫連接
from sqlalchemy import text
import pymysql
from common.sqlalchemy_base import SqlAchBase
from common.redis_base import RedisBase
class MysqlBase():
session = SqlAchBase.get_session()
conn = None
@classmethod
def TestConn(cls):
if not cls.conn:
cls.conn = pymysql.connect(user='root', host='localhost', password=你的密碼, db=數據庫名稱, port=3306)
print("trying to connect to '%s'" % cls.conn)
else:
print("already connected to '%s'" % cls.conn)
@classmethod
def add(cls, user):
MysqlBase.TestConn()
cls.session.add(user)
cls.session.commit()
@classmethod
def delete(cls, filter):
cls.TestConn()
cls.session.query(cls).filter(text(filter)).delete(synchronize_session=False)
cls.session.commit()
@classmethod
def update(cls, filter, value):
cls.TestConn()
cls.session.query(cls).filter(text(filter)).update(value, synchronize_session=False)
cls.session.commit()
@classmethod
def query(cls, key, *columns):
cls.TestConn()
search_list = key.split("|")
if RedisBase.get(key):
print('successfully get data from redis')
return RedisBase.get(key)
else:
item_list = cls.session.query(*columns or cls).filter(text(search_list[2]+' and '+search_list[3])).all()
print(item_list)
RedisBase.set(key, str(item_list), expire_time=10)
return item_list
# sqlalchemy基類,用來獲取session
from settings.config import URL
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData
class SqlAchBase():
engine = create_engine(URL)
base = declarative_base()
session = sessionmaker(bind=engine)()
metadata = MetaData()
@classmethod
def get_session(cls):
return cls.session
# redis基類,用來存放和redis相關的操作
import redis
from settings.config import redis_host, redis_port
class RedisBase():
redis_tool = None
@classmethod
def TestConn(cls):
if cls.redis_tool:
print("already connect to redis_tool\n")
pass
else:
pool = redis.ConnectionPool(host=redis_host, port=redis_port, db=0, decode_responses=True)
cls.redis_tool = redis.Redis(connection_pool=pool)
print("trying to connect to redis_tool\n")
@classmethod
def get(cls, key):
cls.TestConn()
return cls.redis_tool.get(key)
@classmethod
def set(cls, key, target, expire_time):
cls.TestConn()
cls.redis_tool.set(key, target, ex=expire_time)
@classmethod
def expire_seconds(cls, key):
cls.TestConn()
# 查詢某個鍵的剩余時間
return cls.redis_tool.ttl(key)
@classmethod
def keys(cls):
cls.TestConn()
return cls.redis_tool.keys()
@classmethod
def delete(cls, key):
cls.TestConn()
cls.redis_tool.delete(key)
@classmethod
def exists(cls, key):
cls.TestConn()
return cls.redis_tool.exists(key)
# 日志模型,用來映射
from common.sqlalchemy_base import SqlAchBase
from common.mysql_base import MysqlBase
from sqlalchemy import Column, String, Integer
class Log(SqlAchBase().base, MysqlBase):
__tablename__ = 'log'
id = Column(Integer, autoincrement=True, primary_key=True)
path = Column(String(30))
method = Column(String(30))
create_time = Column(String(30))
msg = Column(String(255))
# 用戶模型,用來映射
from common.sqlalchemy_base import SqlAchBase
from common.mysql_base import MysqlBase
from sqlalchemy import Column, String, Integer
class User(SqlAchBase().base, MysqlBase):
__tablename__ = 'user'
userid = Column(Integer, autoincrement=True, primary_key=True)
uname = Column(String(30))
pwd = Column(String(30))
create_time = Column(String(30))
from common.handler_base import HandlerBase
from model.user import User
# 用戶更改的相關操作
class ChangeHandler(HandlerBase):
# 刪除用戶
def delete_user(self):
User.delete(self.data['filter'])
# 更改用戶密碼
def update_pwd(self):
filter = self.data['filter']
value = eval(self.data['value'])
User.update(filter, value)
# 用戶注冊處理器
import datetime
from common.handler_base import HandlerBase
from model.user import User
class RegisterHandler(HandlerBase):
def register_user(self):
user = User(uname=self.data['uname'], pwd=self.data['pwd'], create_time=datetime.datetime.now())
User.add(user)
from model.user import User
class SearchHandler(HandlerBase):
def search_user(self):
# 獲取表名
table_name = User.__tablename__
# 獲取傳遞參數
uname = self.get_argument('uname')
userid = self.get_argument('userid')
key = 'uname,userid,create_time' + '|' + table_name
# 判斷參數是否存在
if uname:
key += '|' + 'uname="{}"'.format(uname)
if userid:
key += '|' + 'userid={}'.format(userid)
# 打印key
print(key)
# 獲取返回值
result = User.query(key, *[User.userid, User.uname, User.create_time])
# 將返回的內容呈現到界面中
self.write(str(result))
# 啟動Tornado服務
from tornado.web import Application
from tornado.ioloop import IOLoop
from user.register_user import RegisterHandler
from user.change_user import ChangeHandler
from user.search_user import SearchHandler
app = Application([
(r'/user/register/.*', RegisterHandler),
(r'/user/search/.*', SearchHandler),
(r'/user/change/.*', ChangeHandler)
])
if __name__ == '__main__':
app.listen(8000)
IOLoop.current().start()
# 基本配置
URL = "mysql+pymysql://root:你的密碼@localhost:3306/數據庫名稱"
redis_host = '127.0.0.1'
redis_port = 6379
所有代碼呈現完畢,有些部分如數據庫名字,mysql密碼,端口號這些需要你根據自身情況進行更改,代碼寫的比較差勁,各位多多包涵。