本文是《一種自動反射消息類型的 Google Protobuf 網絡傳輸方案》的延續,介紹如何將前文介紹 的打包方案與 muduo::net::Buffer 結合,實現了 protobuf codec 和 dispatcher。
Muduo 的 下載地址: http://muduo.googlecode.com/files/muduo-0.1.9-alpha.tar.gz ,SHA1 dc0bb5f7becdfc0277fb35f6dfaafee8209213bc ,本文的完整代碼可在線閱讀 http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/ 。
考慮 到不是每個人都安裝了 Google Protobuf,muduo 中的 protobuf 相關示例默認是不 build 的,如果 你的機器上安裝了 protobuf 2.3.0 或 2.4.0a,那麼可以用 ./build.sh protobuf_all 來構建 protobuf 相關的 examples。
在介紹 codec 和 dispatcher 之前,先講講前文的一個未決問題 。
為什麼 Protobuf 的默認序列化格式沒有包含消息的長度與類型?
Protobuf 是經過 深思熟慮的消息打包方案,它的默認序列化格式沒有包含消息的長度與類型,自然有其道理。哪些情況 下不需要在 protobuf 序列化得到的字節流中包含消息的長度和(或)類型?我能想到的答案有:
如果把消息寫入文件,一個文件存一個消息,那麼序列化結果中不需要包含長度和類型,因為從文 件名和文件長度中可以得知消息的類型與長度。
如果把消息寫入文件,一個文件存多個消息,那麼序列化結果中不需要包含類型,因為文件名就代 表了消息的類型。
如果把消息存入數據庫(或者 NoSQL),以 VARBINARY 字段保存,那麼序列化結果中不需要包含長 度和類型,因為從字段名和字段長度中可以得知消息的類型與長度。
如果把消息以 UDP 方式發生給對方,而且對方一個 UDP port 只接收一種消息類型,那麼序列化結 果中不需要包含長度和類型,因為從 port 和 UDP packet 長度中可以得知消息的類型與長度。
如果把消息以 TCP 短連接方式發給對方,而且對方一個 TCP port 只接收一種消息類型,那麼序列 化結果中不需要包含長度和類型,因為從 port 和 TCP 字節流長度中可以得知消息的類型與長度。
如果把消息以 TCP 長連接方式發給對方,但是對方一個 TCP port 只接收一種消息類型,那麼序列 化結果中不需要包含類型,因為 port 代表了消息的類型。
如果采用 RPC 方式通信,那麼只需要告訴對方 method name,對方自然能推斷出 Request 和 Response 的消息類型,這些可以由 protoc 生成的 RPC stubs 自動搞定。
對於最後一點,比方說 sudoku.proto 定義為:
service SudokuService { rpc Solve (SudokuRequest) returns (SudokuResponse); }
那麼 RPC method Sudoku.Solve 對應的請求和響應分別是 SudokuRequest 和 SudokuResponse。在發送 RPC 請求的時候,不需要包含 SudokuRequest 的類型,只需要發送 method name Sudoku.Solve,對方自知道應該按照 SudokuRequest 來解析(parse)請求。這個例子來自我的半 成品項目 evproto,見 http://blog.csdn.net/Solstice/archive/2010/04/17/5497699.aspx 。
對於上述這些情況,如果 protobuf 無條件地把長度和類型放到序列化的字節串中,只會浪費 網絡帶寬和存儲。可見 protobuf 默認不發送長度和類型是正確的決定。Protobuf 為消息格式的設計 樹立了典范,哪些該自己搞定,哪些留給外部系統去解決,這些都考慮得很清楚。
只有在使用 TCP 長連接,且在一個連接上傳遞不止一種消息的情況下(比方同時發 Heartbeat 和 Request/Response),才需要我前文提到的那種打包方案。(為什麼要在一個連接上同時發 Heartbeat 和業務消息?請見陳碩《分布式系統的工程化開發方法》 p.51 心跳協議的設計。)這時候我們需要一 個分發器 dispatcher,把不同類型的消息分給各個消息處理函數,這正是本文的主題之一。
以 下均只考慮 TCP 長連接這一應用場景。
先談談編解碼器。
什麼是編解碼器 codec?
Codec 是 encoder 和 decoder 的縮寫,這是一個到軟硬件都在使用的術語,這裡我借指“把 網絡數據和業務消息之間互相轉換”的代碼。
在最簡單的網絡編程中,沒有消息 message 只有 字節流數據,這時候是用不到 codec 的。比如我們前面講過的 echo server,它只需要把收到的數據 原封不動地發送回去,它不必關心消息的邊界(也沒有“消息”的概念),收多少就發多少,這種情況 下它干脆直接使用 muduo::net::Buffer,取到數據再交給 TcpConnection 發送回去,見下圖。
non-trivial 的網絡服務程序通常會以消息為單位來通信,每條消息有明確的長度與界限。程序每 次收到一個完整的消息的時候才開始處理,發送的時候也是把一個完整的消息交給網絡庫。比如我們前 面講過的 asio chat 服務,它的一條聊天記錄就是一條消息,我們設計了一個簡單的消息格式,即在 聊天記錄前面加上 4 字節的 length header,LengthHeaderCodec 代碼及解說見《Muduo 網絡編程示 例之二:Boost.Asio 的聊天服務器》一文。
codec 的基本功能之一是做 TCP 分包:確定每條 消息的長度,為消息劃分界限。在 non-blocking 網絡編程中,codec 幾乎是必不可少的。如果只收到 了半條消息,那麼不會觸發消息回調,數據會停留在 Buffer 裡(數據已經讀到 Buffer 中了),等待 收到一個完整的消息再通知處理函數。既然這個任務太常見,我們干脆做一個 utility class,避免服 務端和客戶端程序都要自己處理分包,這就有了 LengthHeaderCodec。這個 codec 的使用有點奇怪, 不需要繼承,它也沒有基類,只要把它當成普通 data member 來用,把 TcpConnection 的數據喂給它 ,然後向它注冊 onXXXMessage() 回調,代碼見 asio chat 示例。muduo 裡的 codec 都是這樣的風格 ,通過 boost::function 粘合到一起。
codec 是一層間接性,它位於 TcpConnection 和 ChatServer 之間,攔截處理收到的數據,在收到完整的消息之後再調用 CharServer 對應的處理函數 ,注意 CharServer::onStringMessage() 的參數是 std::string,不再是 muduo::net::Buffer,也就 是說 LengthHeaderCodec 把 Buffer 解碼成了 string。另外,在發送消息的時候,ChatServer 通過 LengthHeaderCodec::send() 來發送 string,LengthHeaderCodec 負責把它編碼成 Buffer。這正是“ 編解碼器”名字的由來。
Protobuf codec 與此非常類似,只不過消息類型從 std::string 變成了 protobuf::Message。對 於只接收處理 Query 消息的 QueryServer 來說,用 ProtobufCodec 非常方便,收到 protobuf::Message 之後 down cast 成 Query 來用就行。如果要接收處理不止一種消息, ProtobufCodec 恐怕還不能單獨完成工作,請繼續閱讀下文。
實現 ProtobufCodec
Protobuf 的打包方案我已經在《一種自動反射消息類型的 Google Protobuf 網絡傳輸方案》中講過,並以 string 為載體演示了 encode 和 decode 操作。在 muduo 裡 ,我們有專門的 Buffer class,編碼更輕松。
編碼算法很直截了當,按照前文定義的消息格式 一路打包下來,最後更新一下首部的長度即可。
解碼算法有幾個要點:
protobuf::Message 是 new 出來的對象,它的生命期如何管理?muduo 采用 shared_ptr 來自動管 理對象生命期,這與其他地方的做法是一致的。
出錯如何處理?比方說長度超出范圍、check sum 不正確、message type name 不能識別、message parse 出錯等等。ProtobufCodec 定義了 ErrorCallback,用戶代碼可以注冊這個回調。如果不注冊, 默認的處理是斷開連接,讓客戶重連重試。codec 的單元測試裡模擬了各種出錯情況。
如何處理一次收到半條消息、一條消息、一條半消息、兩條消息等等情況?這是每個 non-blocking 網絡程序中的 codec 都要面對的問題。
ProtobufCodec 在實際使用中有明顯的不足:它只負責把 muduo::net::Buffer 轉換為具體類型的 protobuf::Message,應用程序拿到 Message 之後還有再根據其具體類型做一次分發。我們可以考慮做 一個簡單通用的分發器 dispatcher,以簡化客戶代碼。
此外,目前 ProtobufCodec 的實現非 常初級,它沒有充分利用 ZeroCopyInputStream 和 ZeroCopyOutputStream,而是把收到的數據作為 byte array 交給 protobuf Message 去解析,這給性能優化留下了空間。protobuf Message 不要求數 據連續(像 vector 那樣),只要求數據分段連續(像 deque 那樣),這給 buffer 管理帶來性能上的好 處(避免重新分配內存,減少內存碎片),當然也使得代碼變復雜。muduo::net::Buffer 非常簡單, 它內部是 vector,我目前不想讓 protobuf 影響 muduo 本身的設計,畢竟 muduo 是個通用的網絡庫 ,不是為實現 protobuf RPC 而特制的。
消息分發器 dispatcher 有什麼用?
前面提到 ,在使用 TCP 長連接,且在一個連接上傳遞不止一種 protobuf 消息的情況下,客戶代碼需要對收到 的消息按類型做分發。比方說,收到 Logon 消息就交給 QueryServer::onLogon() 去處理,收到 Query 消息就交給 QueryServer::onQuery() 去處理。這個消息分派機制可以做得稍微有點通用性,讓 所有 muduo+protobuf 程序收益,而且不增加復雜性。
換句話說,又是一層間接性, ProtobufCodec 攔截了 TcpConnection 的數據,把它轉換為 Message,ProtobufDispatcher 攔截了 ProtobufCodec 的 callback,按消息具體類型把它分派給多個 callbacks。
ProtobufCodec 與 ProtobufDispatcher 的綜合運用
我寫了兩個示例代碼,client 和 server,把 ProtobufCodec 和 ProtobufDispatcher 串聯起來使用。server 響應 Query 消息,發生 回 Answer 消息,如果收到未知消息類型,則斷開連接。client 可以選擇發送 Query 或 Empty 消息 ,由命令行控制。這樣可以測試 unknown message callback。
為節省篇幅,這裡就不列出代碼 了,請移步閱讀
http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/client.c c
http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/server.c c
在構造函數中,通過注冊回調函數把四方 (TcpConnection、codec、dispatcher、 QueryServer) 結合起來。
ProtobufDispatcher 的兩種實現
要完成消息分發,那麼就是 對消息做 type-switch,這似乎是一個 bad smell,但是 protobuf Message 的 Descriptor 沒有留下 定制點(比如暴露一個 boost::any 成員),我們只好硬來了。
先定義
typedef boost::function ProtobufMessageCallback;
注意,本節出現的不是 muduo dispatcher 真實 的代碼,僅為示意,突出重點,便於畫圖。
ProtobufDispatcherLite 的結構非常簡單,它有一 個 map 成員,客戶代碼可以以 Descriptor* 為 key 注冊回調(recall: 每個具體消息類型都有一個 全局的 Descriptor 對象,其地址是不變的,可以用來當 key)。在收到 protobuf Message 之後,在 map 中找到對應的 ProtobufMessageCallback,然後調用之。如果找不到,就調用 defaultCallback。
當然,它的設計也有小小的缺陷,那就是 ProtobufMessageCallback 限制了客戶代碼只能接受基類 Message,客戶代碼需要自己做向下轉型,比如:
如果我希望 QueryServer 這麼設計:不想每個消息處理函數自己做 down casting,而是交給 dispatcher 去處理,客戶代碼拿到的就已經是想要的具體類型。如下:
查看本欄目
那麼該該 如何實現 ProtobufDispatcher 呢?它如何與多個未知的消息類型合作?做 down cast 需要知道目標 類型,難道我們要用一長串模板類型參數嗎?
有一個辦法,把多態與模板結合,利用 templated derived class 來提供類型上的靈活性。設計如下。
ProtobufDispatcher 有一個模板成員函數,可以接受注冊任意消息類型 T 的回調,然後它創建一 個模板化的派生類 CallbackT,這樣消息的類新信息就保存在了 CallbackT 中,做 down casting 就 簡單了。
比方說,我們有兩個具體消息類型 Query 和 Answer。
然後我們 這樣注冊回調:
dispatcher_.registerMessageCallback( boost::bind(&QueryServer::onQuery, this, _1, _2, _3)); dispatcher_.registerMessageCallback( boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));
這樣會具現化 (instantiation) 出兩個 CallbackT 實體,如下:
以上設計參考了 shared_ptr 的 deleter,Scott Meyers 也談到過。
ProtobufCodec 和 ProtobufDispatcher 有何意義?
ProtobufCodec 和 ProtobufDispatcher 把每個直接收發 protobuf Message 的網絡程序都會用到的功能提煉出來做成了公用的 utility,這樣以後新寫 protobuf 網絡程序就不必為打包分包和消息分發勞神了。它倆以庫的形式存在,是兩個可以拿來就當 data member 用的 class,它們沒有基類,也沒有用到虛函數或者別的什麼面向對象特征,不侵入 muduo::net 或者你的代碼。如果不這麼做,那將來每個 protobuf 網絡程序都要自己重新實現類似的 功能,徒增負擔。
下一篇文章講《分布式程序的自動回歸測試》會介紹利用 protobuf 的跨語 言特性,采用 Java 為 C++ 服務程序編寫 test harness。