程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

如何用python快速的搭建一個websocket服務器?

編輯:Python

准備工作

python:3.9.x

  • python 依賴
    pip3 install asyncio
    pip3 install json
    pip3 install websockets
    pip3 install pyvisa
    pip3 install configparser

目錄結構

-case(功能代碼模塊文件夾)
--OutputPower.py(功能代碼,接收到客戶端信息,判斷進入對應的功能模塊)
-websocket(webSocket服務器代碼,主要負責維護連接、接收數據、發送數據)
--server.py(服務器代碼)
-app.py(程序入口)

代碼

server.py
功能包括:
1、建立websocket連接
2、收取客戶端信息
3、發送客戶端信息

服務器數據處理模塊

此處我使用了多線程,主要是想到以後的功能模塊交互問題


import asyncio
import websockets
import time
import json
import threading
# 功能模塊
from case.OutputPower import OutputPower
# 存儲所有的客戶端
Clients = []
class Server():
# 發消息給客戶端的回調函數
async def s(self,msg,websocket=None):
await self.sendMsg(msg,websocket)
# 針對不同的信息進行請求,可以考慮json文本
async def runCase(self,jsonMsg,websocket):
print('runCase')
# await OutputPower(jsonMsg,self.s,websocket)
op = OutputPower()
await op.run(jsonMsg,self.s,websocket)
# 每一個客戶端鏈接上來就會進一個循環
async def echo(self,websocket, path):
Clients.append(websocket)
await websocket.send(json.dumps({"type": "handshake"}))
while True:
try:
recv_text = await websocket.recv()
message = "I got your message: {}".format(recv_text)
# 直接返回客戶端收到的信息
await websocket.send(message)
print(message)
# 分析當前的消息 json格式,跳進功能模塊分析
await self.runCase(jsonMsg='',websocket=websocket)
except websockets.ConnectionClosed:
print("ConnectionClosed...", path) # 鏈接斷開
Clients.remove(websocket)
break
except websockets.InvalidState:
print("InvalidState...") # 無效狀態
Clients.remove(websocket)
break
except Exception as e:
print(e)
Clients.remove(websocket)
break
# 發送消息
async def sendMsg(self,msg,websocket):
print('sendMsg:',msg)
if websocket != None:
await websocket.send(msg)
else:
await self.broadcastMsg(msg)
# 群發消息
async def broadcastMsg(self,msg):
for user in Clients:
await user.send(msg)
# 啟動服務器
async def runServer(self):
async with websockets.serve(self.echo, 'localhost', 8888):
await asyncio.Future() # run forever
# 多線程模式,防止阻塞主線程無法做其他事情
def WebSocketServer(self):
asyncio.run(self.runServer())
def startServer(self):
# 多線程啟動,否則會堵塞
thread = threading.Thread(target=self.WebSocketServer)
thread.start()
thread.join()
print("go!!!")

建立功能模塊

做到業務分離(OutputPower.py)

import asyncio
class OutputPower():
async def run(self,arg,s,websocket):
# 發送消息方法,單獨和請求的客戶端發消息
await s('sssss', websocket)
# 群發消息
await s('起來hi')

啟動服務

(app.py)


from webSocket.server import Server
if __name__=='__main__':
s = Server()
s.startServer()

測試

直接找一個在線的websocket進行連接測試即可,如 http://www.websocket-test.com/

結果如圖:

  • 第一個客戶端鏈接
  • 發送:12
  • 服務器返回:I got your message:12
  • 繼續返回功能模塊的:sssss
  • 返回群發的信息:起來hi
  • 返回群發的信息:起來hi
  • 第二個客戶端鏈接上去
  • 重復第一個客戶端工作,但是第一個客戶端多接收到了一個:起來hi

測試源碼

見:https://download.csdn.net/download/weixin_44635546/85645105


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved