功能需求
這個連接服務器把多個客戶連接匯聚為一個內部 TCP 連接,起到“數據串並轉換”的作用,讓 backend 的邏輯服務器專心處理業務,而無需顧及多連接的並發性。以下是系統的框圖:
這個連接服務器的作用與數字電路中的數據選擇器 (multiplexer) 類似,所以我把它命名為 multiplexer。(其實 IO-Multiplexing 也是取的這個意思,讓一個 thread-of-control 能有選擇地處理多個 IO 文件描述符。)
(上圖取自 wikipedia,是 public domain 版權)
實現
Multiplexer 的功能需求不復雜,無非是在 backend connection 和 client connections 之間倒騰數據。具體來說,主要是處理四種事件:
對每個新 client connection 分配一個新的整數 id,如果 id 用完了,則斷開新連接(這樣通過控制 id 的數目就能控制最大連接數)。另外,為了避免 id 過快地被復用(有可能造成 backend 串話),multiplexer 采用 queue 來管理 free id,每次從隊列的頭部取 id,用完之後放回 queue 的尾部。
當 client connection 到達或斷開時,向 backend 發出通知。onClientConnection() http://code.google.com/p/muduo/source/browse/tags/0.2.0/examples/multiplexer/multiplexer_simple.cc#54
當從 client connection 收到數據時,把數據連同 connection id 一同發給 backend。onClientMessage() http://code.google.com/p/muduo/source/browse/tags/0.2.0/examples/multiplexer/multiplexer_simple.cc#117
當從 backend connection 收到數據時,辨別數據是發給哪個 client connection,並執行相應的轉發操作。onBackendMessage() http://code.google.com/p/muduo/source/browse/tags/0.2.0/examples/multiplexer/multiplexer_simple.cc#194
如果 backend connection 斷開連接,則斷開所有 client connections(假設 client 會自動重試)。 onBackendConnection() http://code.google.com/p/muduo/source/browse/tags/0.2.0/examples/multiplexer/multiplexer_simple.cc#162
由上可見,multiplexer 的功能與 proxy 頗為類似。multiplexer_simple.cc 是一個線程版的實現,借助 muduo 的 io-multiplexing 特性,可以方便地處理多個並發連接。
在實現的時候有兩點值得注意:
TcpConnection 的 id 如何存放?當從 backend 收到數據,如何根據 id 找到對應的 client connection?當從 client connection 收到數據,如何得知其 id ?
第一個問題比較好解決,用 std::map〈int, TcpConnectionPtr〉 clientConns_; 保存從 id 到 client connection 的映射就行。
第二個問題固然可以用類似的辦法解決,但是我想借此介紹一下 muduo::net::TcpConnection 的 context 功能。每個 TcpConnection 都有一個 boost::any 成員,可由客戶代碼自由支配(get/set),代碼如下。這個 boost::any 是 TcpConnection 的 context,可以用於保存與 connection 綁定的任意數據(比方說 connection id、connection 的最後數據到達時間、connection 所代表的用戶的名字等等)。這樣客戶代碼不必繼承 TcpConnection 就能 attach 自己的狀態,而且也用不著 TcpConnectionFactory 了(如果允許繼承,那麼必然要向 TcpServer 注入此 factory)。
class TcpConnection : public boost::enable_shared_from_this,
boost::noncopyable
{
public:
void setContext(const boost::any& context)
{ context_ = context; }
boost::any& getContext()
{ return context_; }
const boost::any& getContext() const
{ return context_; }
// ...
private:
// ...
boost::any context_;
};
typedef boost::shared_ptr TcpConnectionPtr;
對於 Multiplexer,在 onClientConnection() 裡調用 conn->setContext(id),把 id 存到 TcpConnection 對象中。onClientMessage() 從 TcpConnection 對象中取得 id,連同數據一起發送給 backend,完整實現如下:
void onClientMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)
{
if (!conn->getContext().empty())
{
int id = boost::any_cast(conn->getContext());
sendBackendBuffer(id, buf);
}
else
{
buf->retrieveAll();
}
}
TcpConnection 的生命期如何管理?由於 Client Connection 是動態創建並銷毀,其生與滅完全由客戶決定,如何保證 backend 想向它發送數據的時候,這個 TcpConnection 對象還活著?解決思路是用 reference counting,當然,不用自己寫,用 boost::shared_ptr 即可。TcpConnection 是 muduo 中唯一默認采用 shared_ptr 來管理生命期的對象,蓋由其動態生命期的本質決定。更多內容請參考陳碩《當析構函數遇到多線程──C++ 中線程安全的對象回調》
multiplexer 是二進制協議,如何測試呢?
自動化測試
Multiplexer 是 muduo 網絡編程示例中第一個具有 non-trivial 業務邏輯的網絡程序,根據陳碩《分布式程序的自動化回歸測試》一文的思想,我為它編寫了 test harness。代碼見 http://code.google.com/p/muduo/source/browse/trunk/examples/multiplexer/harness/src/com/chenshuo/muduo/example/multiplexer
這個 Test harness 采用 Java 編寫,用的是 Netty 庫。這個 test harness 要扮演 clients 和 backend,也就是既要主動發起連接,也要被動接受連接。結構如下:
Test harness 會把各種 event 匯聚到一個 blocking queue 裡邊,方便編寫 test case。Test case 則操縱 test harness,發起連接、發送數據、檢查收到的數據,例如以下是其中一個 test case
http://code.google.com/p/muduo/source/browse/trunk/examples/multiplexer/harness/src/com/chenshuo/muduo/example/multiplexer/testcase/TestOneClientSend.java
這裡的幾個 test cases 都以用 java 直接寫的,如果有必要,也可以采用 Groovy 來編寫,這樣可以在不重啟 test harness 的情況下隨時修改添加 test cases。具體做法見陳碩《“過家家”版的移動離線計費系統實現》。
將來的改進
有了這個自動化的 test harness,我們可以比較方便且安全地修改(甚至重新設計)multiplexer。例如
增加“backend 發送指令斷開 client connection”的功能。有了自動化測試,這個新功能可以被單獨測試(指開發者測試),而不需要真正的 backend 參與進來。
將 Multiplexer 改用多線程重寫。有了自動化回歸測試,我們不用擔心破壞原有的功能,可以放心大膽地重寫。而且由於 test harness 是從外部測試,不是單元測試,重寫 multiplexer 的時候不用動 test cases,這樣保證了測試的穩定性。另外,這個 test harness 稍作改進還可以進行 stress test