本文講介紹一個與 Boost.Asio 的示例代碼中的聊天服務器功能類似的網絡服務程序,包括客戶端 與服務端的 muduo 實現。這個例子的主要目的是介紹如何處理分包,並初步涉及 Muduo 的多線程功能 。Muduo 的下載地址: http://muduo.googlecode.com/files/muduo-0.1.7-alpha.tar.gz ,SHA1 873567e43b3c2cae592101ea809b30ba730f2ee6,本文的完整代碼可在線閱讀
http://code.google.com/p/muduo/source/browse/trunk/examples/asio/chat/ 。
TCP 分包
前面一篇《五個簡單 TCP 協議》中處理的協議沒有涉及分包,在 TCP 這種字節流協議上做應用層 分包是網絡編程的基本需求。分包指的是在發生一個消息(message)或一幀(frame)數據時,通過一定的 處理,讓接收方能從字節流中識別並截取(還原)出一個個消息。“粘包問題”是個偽問題。
對於短連接的 TCP 服務,分包不是一個問題,只要發送方主動關閉連接,就表示一條消息發送完畢 ,接收方 read() 返回 0,從而知道消息的結尾。例如前一篇文章裡的 daytime 和 time 協議。
對於長連接的 TCP 服務,分包有四種方法:
1. 消息長度固定,比如 muduo 的 roundtrip 示例就采用了固定的 16 字節消息;
2. 使用特殊的字符或字符串作為消息的邊界,例如 HTTP 協議的 headers 以 "rn" 為字段的分隔 符;
3. 在每條消息的頭部加一個長度字段,這恐怕是最常見的做法,本文的聊天協議也采用這一辦法;
4. 利用消息本身的格式來分包,例如 XML 格式的消息中 <root>...</root> 的配對 ,或者 JSON 格式中的 { ... } 的配對。解析這種消息格式通常會用到狀態機。
在後文的代碼講解中還會仔細討論用長度字段分包的常見陷阱。
聊天服務
本文實現的聊天服務非常簡單,由服務端程序和客戶端程序組成,協議如下:
* 服務端程序中某個端口偵聽 (listen) 新的連接;
* 客戶端向服務端發起連接;
* 連接建立之後,客戶端隨時准備接收服務端的消息並在屏幕上顯示出來;
* 客戶端接受鍵盤輸入,以回車為界,把消息發送給服務端;
* 服務端接收到消息之後,依次發送給每個連接到它的客戶端;原來發送消息的客戶端進程也會收 到這條消息;
* 一個服務端進程可以同時服務多個客戶端進程,當有消息到達服務端後,每個客戶端進程都會收 到同一條消息,服務端廣播發送消息的順序是任意的,不一定哪個客戶端會先收到這條消息。
* (可選)如果消息 A 先於消息 B 到達服務端,那麼每個客戶端都會先收到 A 再收到 B。
這實際上是一個簡單的基於 TCP 的應用層廣播協議,由服務端負責把消息發送給每個連接到它的客 戶端。參與“聊天”的既可以是人,也可以是程序。在以後的文章中,我將介紹一個稍微復雜的一點的 例子 hub,它有“聊天室”的功能,客戶端可以注冊特定的 topic(s),並往某個 topic 發送消息,這 樣代碼更有意思。
消息格式
本聊天服務的消息格式非常簡單,“消息”本身是一個字符串,每條消息的有一個 4 字節的頭部, 以網絡序存放字符串的長度。消息之間沒有間隙,字符串也不一定以 '' 結尾。比方說有兩條消息 "hello" 和 "chenshuo",那麼打包後的字節流是:
0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'
共 21 字節。
打包的代碼
這段代碼把 const string& message 打包為 muduo::net::Buffer,並通過 conn 發送。
1: void send(muduo::net::TcpConnection* conn, const string& message) 2: { 3: muduo::net::Buffer buf; 4: buf.append(message.data(), message.size()); 5: int32_t len = muduo::net::sockets::hostToNetwork32(static_cast<int32_t>(message.size())); 6: buf.prepend(&len, sizeof len); 7: conn->send(&buf); 8: }
muduo::Buffer 有一個很好的功能,它在頭部預留了 8 個字節的空間,這樣第 6 行的 prepend() 操作就不需要移動已有的數據,效率較高。
分包的代碼
解析數據往往比生成數據復雜,分包打包也不例外。
1: void onMessage(const muduo::net::TcpConnectionPtr& conn, 2: muduo::net::Buffer* buf, 3: muduo::Timestamp receiveTime) 4: { 5: while (buf->readableBytes() >= kHeaderLen) 6: { 7: const void* data = buf->peek(); 8: int32_t tmp = *static_cast<const int32_t*>(data); 9: int32_t len = muduo::net::sockets::networkToHost32(tmp); 10: if (len > 65536 || len < 0) 11: { 12: LOG_ERROR << "Invalid length " << len; 13: conn->shutdown(); 14: } 15: else if (buf->readableBytes() >= len + kHeaderLen) 16: { 17: buf->retrieve(kHeaderLen); 18: muduo::string message(buf->peek(), len); 19: buf->retrieve(len); 20: messageCallback_(conn, message, receiveTime); // 收到完整的消息,通知用戶 21: } 22: else 23: { 24: break; 25: } 26: } 27: }
上面這段代碼第 7 行用了 while 循環來反復讀取數據,直到 Buffer 中的數據不 夠一條完整的消息。請讀者思考,如果換成 if (buf->readableBytes() >= kHeaderLen) 會有 什麼後果。
以前面提到的兩條消息的字節流為例:
0x00, 0x00, 0x00, 0x05, 'h', 'e', 'l', 'l', 'o', 0x00, 0x00, 0x00, 0x08, 'c', 'h', 'e', 'n', 's', 'h', 'u', 'o'
假設數據最終都全部到達,onMessage() 至少要能正確處理以下各種數據到達的次序,每種情況下 messageCallback_ 都應該被調用兩次:
1. 每次收到一個字節的數據,onMessage() 被調用 21 次;
2. 數據分兩次到達,第一次收到 2 個字節,不足消息的長度字段;
3. 數據分兩 次到達,第一次收到 4 個字節,剛好夠長度字段,但是沒有 body;
4. 數據分兩次到達,第一 次收到 8 個字節,長度完整,但 body 不完整;
5. 數據分兩次到達,第一次收到 9 個字節, 長度完整,body 也完整;
6. 數據分兩次到達,第一次收到 10 個字節,第一條消息的長度完 整、body 也完整,第二條消息長度不完整;
7. 請自行移動分割點,驗證各種情況;
8. 數據一次就全部到達,這時必須用 while 循環來讀出兩條消息,否則消息會堆積。
請讀者驗證 onMessage() 是否做到了以上幾點。這個例子充分說明了 non-blocking read 必須和 input buffer 一起使用。
編解碼器 LengthHeaderCodec
有人評論 Muduo 的接收緩沖區不能設置回調 函數的觸發條件,確實如此。每當 socket 可讀,Muduo 的 TcpConnection 會讀取數據並存入 Input Buffer,然後回調用戶的函數。不過,一個簡單的間接層就能解決問題,讓用戶代碼只關心“消 息到達”而不是“數據到達”,如本例中的 LengthHeaderCodec 所展示的那一樣。
1: #ifndef MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H 2: #define MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H 3: 4: #include <muduo/base/Logging.h> 5: #include <muduo/net/Buffer.h> 6: #include <muduo/net/SocketsOps.h> 7: #include <muduo/net/TcpConnection.h> 8: 9: #include <boost/function.hpp> 10: #include <boost/noncopyable.hpp> 11: 12: using muduo::Logger; 13: 14: class LengthHeaderCodec : boost::noncopyable 15: { 16: public: 17: typedef boost::function<void (const muduo::net::TcpConnectionPtr&, 18: const muduo::string& message, 19: muduo::Timestamp)> StringMessageCallback; 20: 21: explicit LengthHeaderCodec(const StringMessageCallback& cb) 22: : messageCallback_(cb) 23: { 24: } 25: 26: void onMessage(const muduo::net::TcpConnectionPtr& conn, 27: muduo::net::Buffer* buf, 28: muduo::Timestamp receiveTime) 29: { 同上 } 30: 31: void send(muduo::net::TcpConnection* conn, const muduo::string& message) 32: { 同上 } 33: 34: private: 35: StringMessageCallback messageCallback_; 36: const static size_t kHeaderLen = sizeof(int32_t); 37: }; 38: 39: #endif // MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
這段代碼把以 Buffer* 為參數的 MessageCallback 轉換成了以 const string& 為參數的 StringMessageCallback,讓用戶代碼不必關心分包操作。客戶端和服務端都能從中受益。
服務 端的實現
聊天服務器的服務端代碼小於 100 行,不到 asio 的一半。
請先閱讀第 68 行起的數據成員的定義。除了經常見到的 EventLoop 和 TcpServer,ChatServer 還定義了 codec_ 和 std::set<TcpConnectionPtr> connections_ 作為成員,connections_ 是目前已建立的客戶連 接,在收到消息之後,服務器會遍歷整個容器,把消息廣播給其中每一個 TCP 連接。
首先,在 構造函數裡注冊回調:
1: #include "codec.h" 2: 3: #include <muduo/base/Logging.h> 4: #include <muduo/base/Mutex.h> 5: #include <muduo/net/EventLoop.h> 6: #include <muduo/net/SocketsOps.h> 7: #include <muduo/net/TcpServer.h> 8: 9: #include <boost/bind.hpp> 10: 11: #include <set> 12: #include <stdio.h> 13: 14: using namespace muduo; 15: using namespace muduo::net; 16: 17: class ChatServer : boost::noncopyable 18: { 19: public: 20: ChatServer(EventLoop* loop, 21: const InetAddress& listenAddr) 22: : loop_(loop), 23: server_(loop, listenAddr, "ChatServer"), 24: codec_(boost::bind(&ChatServer::onStringMessage, this, _1, _2, _3)) 25: { 26: server_.setConnectionCallback( 27: boost::bind(&ChatServer::onConnection, this, _1)); 28: server_.setMessageCallback( 29: boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3)); 30: } 31: 32: void start() 33: { 34: server_.start(); 35: } 36:
這裡有幾點值得注意,在以往的代碼裡是直接把本 class 的 onMessage() 注冊給 server_;這裡我們把 LengthHeaderCodec::onMessage() 注冊給 server_,然後向 codec_ 注冊了 ChatServer::onStringMessage(),等於說讓 codec_ 負責解析消息,然後把完整的消息回調給 ChatServer。這正是我前面提到的“一個簡單的間接層”,在不增加 Muduo 庫的復雜度的 前提下,提供了足夠的靈活性讓我們在用戶代碼裡完成需要的工作。
另外,server_.start() 絕對不能在構造函數裡調用,這麼做將來會有線程安全的問題,見我在《當析構函數遇到多線程 ── C++ 中線程安全的對象回調》一文中的論述。
以下是處理連接的建立和斷開的代碼,注意它把 新建的連接加入到 connections_ 容器中,把已斷開的連接從容器中刪除。這麼做是為了避免內存和資 源洩漏,TcpConnectionPtr 是 boost::shared_ptr<TcpConnection>,是 muduo 裡唯一一個默 認采用 shared_ptr 來管理生命期的對象。以後我們會談到這麼做的原因。
37: private: 38: void onConnection(const TcpConnectionPtr& conn) 39: { 40: LOG_INFO << conn->localAddress().toHostPort() << " -> " 41: << conn->peerAddress().toHostPort() << " is " 42: << (conn->connected() ? "UP" : "DOWN"); 43: 44: MutexLockGuard lock(mutex_); 45: if (conn->connected()) 46: { 47: connections_.insert(conn); 48: } 49: else 50: { 51: connections_.erase(conn); 52: } 53: } 54:
以下是服務端處理消息的代碼,它遍歷整個 connections_ 容器,把消息打包發送給 各個客戶連接。
55: void onStringMessage(const TcpConnectionPtr&, 56: const string& message, 57: Timestamp) 58: { 59: MutexLockGuard lock(mutex_); 60: for (ConnectionList::iterator it = connections_.begin(); 61: it != connections_.end(); 62: ++it) 63: { 64: codec_.send(get_pointer(*it), message); 65: } 66: } 67:
數據成員:
68: typedef std::set<TcpConnectionPtr> ConnectionList; 69: EventLoop* loop_; 70: TcpServer server_; 71: LengthHeaderCodec codec_; 72: MutexLock mutex_; 73: ConnectionList connections_; 74: }; 75:
main() 函數裡邊是例行公事的代碼:
76: int main(int argc, char* argv[]) 77: { 78: LOG_INFO << "pid = " << getpid(); 79: if (argc > 1) 80: { 81: EventLoop loop; 82: uint16_t port = static_cast<uint16_t>(atoi(argv[1])); 83: InetAddress serverAddr(port); 84: ChatServer server(&loop, serverAddr); 85: server.start(); 86: loop.loop(); 87: } 88: else 89: { 90: printf("Usage: %s portn", argv[0]); 91: } 92: }
如果你讀過 asio 的對應代碼,會不會覺得 Reactor 往往比 Proactor 容易使用?
客戶端的實現
我有時覺得服務端的程序常常比客戶端的更容易寫,聊天服務器再次驗證 了我的看法。客戶端的復雜性來自於它要讀取鍵盤輸入,而 EventLoop 是獨占線程的,所以我用了兩 個線程,main() 函數所在的線程負責讀鍵盤,另外用一個 EventLoopThread 來處理網絡 IO。我暫時 沒有把標准輸入輸出融入 Reactor 的想法,因為服務器程序的 stdin 和 stdout 往往是重定向了的。
來看代碼,首先,在構造函數裡注冊回調,並使用了跟前面一樣的 LengthHeaderCodec 作為中 間層,負責打包分包。
1: #include "codec.h" 2: 3: #include <muduo/base/Logging.h> 4: #include <muduo/base/Mutex.h> 5: #include <muduo/net/EventLoopThread.h> 6: #include <muduo/net/TcpClient.h> 7: 8: #include <boost/bind.hpp> 9: #include <boost/noncopyable.hpp> 10: 11: #include <iostream> 12: #include <stdio.h> 13: 14: using namespace muduo; 15: using namespace muduo::net; 16: 17: class ChatClient : boost::noncopyable 18: { 19: public: 20: ChatClient(EventLoop* loop, const InetAddress& listenAddr) 21: : loop_(loop), 22: client_(loop, listenAddr, "ChatClient"), 23: codec_(boost::bind(&ChatClient::onStringMessage, this, _1, _2, _3)) 24: { 25: client_.setConnectionCallback( 26: boost::bind(&ChatClient::onConnection, this, _1)); 27: client_.setMessageCallback( 28: boost::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3)); 29: client_.enableRetry(); 30: } 31: 32: void connect() 33: { 34: client_.connect(); 35: } 36:
disconnect() 目前為空,客戶端的連接由操作系統在進程終止時關閉。
37: void disconnect() 38: { 39: // client_.disconnect(); 40: } 41:
write() 會由 main 線程調用,所以要加鎖,這個鎖不是為了保護 TcpConnection, 而是保護 shared_ptr。
42: void write(const string& message) 43: { 44: MutexLockGuard lock(mutex_); 45: if (connection_) 46: { 47: codec_.send(get_pointer(connection_), message); 48: } 49: } 50:
onConnection() 會由 EventLoop 線程調用,所以要加鎖以保護 shared_ptr。
51: private: 52: void onConnection(const TcpConnectionPtr& conn) 53: { 54: LOG_INFO << conn->localAddress().toHostPort() << " -> " 55: << conn->peerAddress().toHostPort() << " is " 56: << (conn->connected() ? "UP" : "DOWN"); 57: 58: MutexLockGuard lock(mutex_); 59: if (conn->connected()) 60: { 61: connection_ = conn; 62: } 63: else 64: { 65: connection_.reset(); 66: } 67: } 68:
查看本欄目