程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python項目實戰│ Python實現線程池工作模式

編輯:Python

本文章基於蘋果樹病蟲害預測模型,自定義應用層通信邏輯,設計服務器與客戶機。客戶機向服務器發送圖像數據,服務器回送預測結果。為增強服務器的可靠性與可擴展性,服務器端采用線程池工作模式。為了增強客戶機的可操作性,客戶機采用PyQt5完成圖形化界面設計。

01、客戶機/服務器通信邏輯

客戶機與服務器通信邏輯如圖1所示。

 ■ 圖1 智能桌面App的客戶機/服務器通信邏輯

02、數據交換協議

客戶機與服務器之間一次信息往返的協議會話過程,定義為圖2所示的邏輯時序。

 ■ 圖2 應用層通信協議

協議會話邏輯解析:

(1)消息交換基於消息頭機制。消息頭中包含消息類型和消息長度。消息類型包含圖像消息與下線消息。

(2)用Json格式的數據表示消息頭。圖像數據用base64編碼與解碼。

(3)發送數據分兩個步驟完成,首先發送消息頭,然後發送消息內容。

(4)接收數據分兩個步驟完成,首先接收消息頭,然後接收消息內容。

消息頭的結構設計如圖3所示,消息頭的固定長度為128字節,包含消息類型(msg_type)與消息內容長度(msg_len)兩個字段。

         ■ 圖3 消息頭的結構

消息類型包括:

(1)CLIENT_IMAGE:表示收到來自客戶機的圖像數據。

(2)CLIENT_MESSAGE:表示收到來自客戶機的下線消息。

消息內容長度用消息包含的字符數表示。對於圖像數據而言,因為采用base64編碼,其傳輸的數據也是字符消息。

消息頭的長度在服務器與客戶機兩端均約定為128字節,用常量MSG_HEADER_LEN定義。發送消息頭之前,需要檢查消息的長度,如果不足128字節,其左側用字節型空格字符填充。

03、服務器主體邏輯

根據圖1描述的服務器邏輯,完成服務器的主體邏輯設計,如程序段P7.1所示。

第32~39行定義服務器端的主循環,處理客戶機連接,采用的是一客戶一線程模式。服務器會話線程定義為handle_client模塊,主線程向會話線程傳遞三個參數:

(1)client_socket: 會話套接字

(2)client_addr: 客戶機地址

(3)model: 用於預測的智能模型

運行服務器程序,觀察輸出結果,此時服務器雖然處於偵聽連接的狀態,但是由於handle_client模塊還沒有實現,故無法處理來自客戶機的各種消息。

04、服務器會話線程

服務器會話線程包括接收數據與發送數據兩個模塊,對應圖1中的內循環。服務器完成數據接收後,需要回送預測結果或者確認消息給客戶機,所以將接收數據與發送數據的邏輯定義在同一函數模塊handle_client中,收發數據的邏輯流程如圖4所示。

 

■ 圖4 服務器收發數據會話線程邏輯

會話線程的主邏輯是一個循環,循環條件為遠程客戶機是否結束會話,邏輯流程解析如下:

(1)如果客戶機斷開了與服務器的連接,會話線程結束。

(2)在連接正常的情況下,服務器首先接收來自客戶機的消息頭,解析消息頭,根據消息類型,分為一般消息與圖像消息。

(3)如果是圖像消息,則通過一個循環,根據圖像的大小完成數據接收,然後經過base64解碼、圖像變換(調整顏色模式、歸一化、縮放)、模型預測、重構預測結果、定義消息頭、回送消息頭、回送預測結果。回到步驟(1)。

(4)如果是一般消息,則繼續判斷是否為下線消息。

(5)如果是下線消息,則更新連接數量,定義下線消息(原消息加上時間戳),定義消息頭,回送消息頭,回送消息內容,會話線程結束。

(6)如果不是下線消息,則做其他消息處理,為簡化設計,其他消息處理模塊暫不編程,留作擴展。回到步驟(1)。

會話線程handle_client的邏輯實現如程序段P7.2所示。

 

第46行–第51行定義的循環結構,根據圖像數據的長度msg_len完成數據接收工作。

運行服務器程序,輸出結果為:

服務器開始在('192.168.0.102', 5050)偵聽...

待客戶機程序完成後,再做聯合測試。

05、客戶機主體邏輯

新建主程序MyClient.py。根據圖1描述的客戶機邏輯,完成客戶機的主體邏輯設計,其主要模塊如圖5所示。

模塊send_image_data發送圖像數據,模塊send_down_msg發送下線消息,模塊recv_message是用於接收服務器消息的會話線程,類模塊GUI(QMainWindow)負責構建客戶機圖形化界面。主程序完成主控邏輯設計。

■ 圖5 客戶機程序模塊結構

客戶機的消息結構定義如圖3所示,與服務器保持一致。消息的收發邏輯,如圖2所示,亦與服務器保持一致。

客戶機主體邏輯如程序段P7.3所示。

 

首先運行服務器程序,然後運行測試客戶機程序。目前客戶機還做不了具體工作,輸入字符Q退出客戶機主循環。

06、客戶機發送數據

客戶機向服務器發送的數據有兩種類型,一是圖像數據,一是下線消息。發送圖像數據的流程如圖6所示。

 

■ 圖6 發送圖像數據流程

程序段P7.4描述了發送圖像數據模塊send_image_data的完整邏輯。

07、客戶機接收數據

客戶機定義了線程函數recv_message,用於接收兩類數據,一是普通消息(下線消息等),二是預測消息(預測結果)。消息處理流程如圖7所示,分步描述如下。

(1)進入消息循環,接收消息頭。

(2)如果消息頭為空,轉到步驟(1)。

(3)如果消息頭非空,則解析消息頭,獲取消息類型與消息長度。

(4)如果是普通消息,則接收消息內容,進一步判斷是否為下線消息。

(5)如果是下線消息,則跳出消息循環,轉到步驟(9)。

(6)如果非下線消息,則轉到步驟(1)。

(7)如果不是普通消息,則判斷是否為預測消息,如果不是預測消息,則轉到步驟(1)。

(8)如果是預測消息,則接收消息內容,解析消息內容,將預測結果存入隊列中,顯示預測結果。轉到步驟(1)。

(9)顯示下線消息,消息接收線程結束。

■ 圖7 客戶機接收消息邏輯流程

程序段P7.6描述了接收消息線程函數recv_message的完整邏輯。

 

將\dataset\images目錄下的圖像文件Test_0.jpg、Test_7.jpg拷貝到根目錄下。

運行服務器程序,然後運行客戶機程序,做聯合測試。

客戶機輸入待遇測的圖像文件名稱Test_0.jpg,回車後發送圖像數據,服務器返回預測結果。客戶機輸入字符Q,結束客戶機。完成此次客戶機與服務器的通信後,服務器與客戶機的狀態信息如圖8所示。

■ 圖8 客戶機與服務器聯合測試

此時服務器工作於一客戶一線程模式,啟動多個客戶端,可做聯合測試。

08、客戶機界面設計

為了增強客戶機的可操作性,基於PyQt5框架為客戶機設計圖形化界面,界面布局及其控件名稱如圖9所示。

■ 圖9 客戶機圖形化界面布局

定義圖形化界面類GUI(QMainWindow)封裝圖9所示的控件及其事件函數。

運行服務器,然後運行客戶機,從chapter7的根目錄中加載圖像Test_0.jpg,觀察圖像特點。然後單擊“預測”按鈕,觀察服務器反饋的預測結果,如圖10所示。

■ 圖10 客戶機圖形化界面測試結果

09、線程池

服務器現有的工作模式為一客戶一線程,即為每一個連接到服務器的客戶機創建獨立的會話線程,當客戶機並發量較大時,服務器往往面臨資源枯竭的挑戰。

線程池模式可以有效平衡服務器負載能力,與一客戶一線程模式相比,其主要優點有:

(1)通過重用已存在的線程,降低線程創建和銷毀造成的額外消耗。

(2)提高系統響應速度,當有新任務到達時,通過復用已存在的線程便能立即執行,無需等待新線程的創建。

(3)控制資源消耗,將並發線程數量限制在合理的區間。

(4)針對工作線程提供了更多的控制能力,例如線程延時、定時等。

Python的線程池定義在concurrent.futures包中,使用ThreadPoolExecutor類創建線程池。線程池調度任務過程如圖11所示。

■ 圖11 線程池調度任務示意圖

將一客戶一線程模式修改為線程池模式,只需做以下改動:

(1)導入線程池類ThreadPoolExecutor。在服務器端添加語句:

from concurrent.futures import ThreadPoolExecutor # 線程池類

(2)在服務器主線程的while循環前面添加創建線程池的語句:

pool = ThreadPoolExecutor(max_workers=5) # 創建線程池,指定工作線程數量為5

此處如果省略參數max_workers,則線程池默認工作線程數量是CPU數量的5倍。考慮到線程池往往應用於需要大量I/O交換的場景,而不是CPU計算密集型的場景,故工作線程的數量應該超過CPU的數量。

(3)用線程池調度語句替換原有的線程創建語句。

# 建立與客戶機會話的線程,一客戶一線程
client_thread = threading.Thread(target=handle_client, args=(new_socket, new_addr, model))
client_thread.start()

替換為:

pool.submit(handle_client,new_socket, new_addr, model) # 創建線程任務,提交到線程池

(4)在主程序末尾,while循環外部,添加關閉線程池的語句,釋放資源:

pool.shutdown(wait=True) # 關閉線程池

執行shutdown後,線程池將不再接受新任務。參數wait默認為True,表示關閉線程池之前需要等待所有工作線程結束。

10、聯合測試

為便於觀察,將服務器線程池的工作線程數量調整為2。啟動服務器,然後啟動四個客戶機,標識為客戶機1、客戶機2、客戶機3、客戶機4。

四個客戶機從dataset\images目錄中選擇四幅不同的測試圖片,

假定客戶機1選擇的圖片是Test_17.jpg,客戶機2選擇的是Test_152.jpg,客戶機3選擇的是Test_190.jpg,客戶機4選擇的是Test_1572.jpg,然後依次點擊客戶機1、客戶機2、客戶機3、客戶機4的“預測”按鈕,觀察預測結果。

可以看到,只有客戶機1、客戶機2立即反饋了預測結果,而客戶機3、客戶機4雖然已經連接到服務器,卻並沒有立即得到預測結果,原因是服務器線程池大小為2,客戶機3、客戶機4需要在任務隊列等待。

客戶機1顯示結果如圖12所示。

■ 圖12 客戶機1的預測結果

客戶機2顯示結果圖13所示。

■ 圖13 客戶機2的預測結果

客戶機3顯示結果如圖14所示。由於服務器線程池大小為2,所以客戶機1與客戶機2占用工作線程後,客戶機3只能進入任務隊列等待。

■ 圖14 客戶機3處於等待中

客戶機4顯示結果如圖15所示。同樣,客戶機4也只能進入服務器的任務隊列等待。

■ 圖15 客戶機4處於等待中

關閉客戶機1,則會自動釋放客戶機1占用的工作線程,此時排隊中的客戶機3會立即得到相應,其結果如圖16所示。

■ 圖16 客戶機3得到服務器響應

此時只有客戶機4仍處於等待中。如果繼續關閉客戶機2,則客戶機4會得到立即響應,其預測結果如圖17所示。

■ 圖17 客戶機4得到服務器響應

關閉客戶機3、關閉客戶機4。整個會話期間,服務器狀態監控界面的信息提示如下:

仔細閱讀服務器的狀態提示信息,與客戶機的操作相對照,可以更精准地把握客戶機與服務器的全程會話邏輯。

11、小結

本文基於Socket通信方法,自定義數據交換協議,圍繞蘋果樹病蟲害識別需求,迭代構建了客戶機/服務器模式的智能桌面App。圖像數據的發送采用base64編碼方式,消息頭、消息內容采用Json數據格式。服務器端采用一客戶一線程和線程池技術支持並發訪問,客戶機采用基於PyQt5的圖像化界面技術提高其可操作性。基於Socket技術的網絡編程,在客戶機與服務器兩端提供了更多的設計靈活性。


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved