本文介紹如何使用 timing wheel 來踢掉空閒的連接,一個連接如果若干秒沒有收到數據,就認為 是空閒連接。
本文的代碼見 http://code.google.com/p/muduo/source/browse/trunk/examples/idleconnection
在嚴肅的網絡程序中,應用層的心跳協議是必不可少的。應該用心跳消息來判斷對方進程是否能正 常工作,“踢掉空閒連接”只是一時權宜之計。我這裡想順便講講 shared_ptr 和 weak_ptr 的用法。
如果一個連接連續幾秒鐘(後文以 8s 為例)內沒有收到數據,就把它斷開,為此有兩種簡單 粗暴的做法:
每個連接保存“最後收到數據的時間 lastReceiveTime”,然後用一個定時器,每秒鐘遍歷一遍所 有連接,斷開那些 (now - connection.lastReceiveTime) > 8s 的 connection。這種做法全局只 有一個 repeated timer,不過每次 timeout 都要檢查全部連接,如果連接數目比較大(幾千上萬), 這一步可能會比較費時。
每個連接設置一個 one-shot timer,超時定為 8s,在超時的時候就斷開本連接。當然,每次收到 數據要去更新 timer。這種做法需要很多個 one-shot timer,會頻繁地更新 timers。如果連接數目比 較大,可能對 reactor 的 timer queue 造成壓力。
使用 timing wheel 能避免上述兩種做法的缺點。timing wheel 可以翻譯為“時間輪盤”或“刻度 盤”,本文保留英文。
連接超時不需要精確定時,只要大致 8 秒鐘超時斷開就行,多一秒少一 秒關系不大。處理連接超時可以用一個簡單的數據結構:8 個桶組成的循環隊列。第一個桶放下一秒將 要超時的連接,第二個放下 2 秒將要超時的連接。每個連接一收到數據就把自己放到第 8 個桶,然後 在每秒鐘的 callback 裡把第一個桶裡的連接斷開,把這個空桶挪到隊尾。這樣大致可以做到 8 秒鐘 沒有數據就超時斷開連接。更重要的是,每次不用檢查全部的 connection,只要檢查第一個桶裡的 connections,相當於把任務分散了。
Timing wheel 原理
《Hashed and hierarchical timing wheels: efficient data structures for implementing a timer facility》這篇論文詳細比 較了實現定時器的各種數據結構,並提出了層次化的 timing wheel 與 hash timing wheel 等新結構 。針對本文要解決的問題的特點,我們不需要實現一個通用的定時器,只用實現 simple timing wheel 即可。
Simple timing wheel 的基本結構是一個循環隊列,還有一個指向隊尾的指針 (tail), 這個指針每秒鐘移動一格,就像鐘表上的時針,timing wheel 由此得名。
以下是某一時刻 timing wheel 的狀態,格子裡的數字是倒計時(與通常的 timing wheel 相反),表示這個格子(桶 子)中的連接的剩余壽命。
一秒鐘以後,tail 指針 移動一格,原來四點鐘方向的格子被清空,其中的連接已被斷開。
連接超時 被踢掉的過程
假設在某個時刻,conn 1 到達,把它放到當前格子中,它的剩余壽命是 7 秒。 此後 conn 1 上沒有收到數據。
1 秒鐘之後,tail 指向 下一個格子,conn 1 的剩余壽命是 6 秒。
又過了幾秒鐘,tail 指 向 conn 1 之前的那個格子,conn 1 即將被斷開。
下一秒,tail 重新指向 conn 1 原來所在的格子,清空其中的數據,斷開 conn 1 連接。
連接刷新
如果在斷開 conn 1 之前收到數據,就把它移到當前的格子裡。
收到數據 ,conn 1 的壽命延長為 7 秒。
時間繼續前進,conn 1 壽命遞減,不過它已經比第一種情況長壽了。
多個連接
timing wheel 中的每個格子是個 hash set,可以容納不止一個連接。
比如一開始,conn 1 到達。
隨後,conn 2 到達,這 時候 tail 還沒有移動,兩個連接位於同一個格子中,具有相同的剩余壽命。(下圖中畫成鏈表,代碼 中是哈希表。)
幾秒鐘之後,conn 1 收 到數據,而 conn 2 一直沒有收到數據,那麼 conn 1 被移到當前的格子中。這時 conn 1 的壽命比 conn 2 長。
代碼實現與改進
我們用以前多次出現的 EchoServer 來說明具體如何實現 timing wheel。代碼見 http://code.google.com/p/muduo/source/browse/trunk/examples/idleconnection
在具體實 現中,格子裡放的不是連接,而是一個特制的 Entry struct,每個 Entry 包含 TcpConnection 的 weak_ptr。Entry 的析構函數會判斷連接是否還存在(用 weak_ptr),如果還存在則斷開連接。
數據結構:
typedef boost::weak_ptr WeakTcpConnectionPtr; struct Entry : public muduo::copyable { Entry(const WeakTcpConnectionPtr& weakConn) : weakConn_(weakConn) { } ~Entry() { muduo::net::TcpConnectionPtr conn = weakConn_.lock(); if (conn) { conn->shutdown(); } } WeakTcpConnectionPtr weakConn_; }; typedef boost::shared_ptr EntryPtr; typedef boost::weak_ptr WeakEntryPtr; typedef boost::unordered_set Bucket; typedef boost::circular_buffer WeakConnectionList;
在實現中,為了簡單起見,我們 不會真的把一個連接從一個格子移到另一個格子,而是采用引用計數的辦法,用 shared_ptr 來管理 Entry。如果從連接收到數據,就把對應的 EntryPtr 放到這個格子裡,這樣它的引用計數就遞增了。 當 Entry 的引用計數遞減到零,說明它沒有在任何一個格子裡出現,那麼連接超時,Entry 的析構函 數會斷開連接。
Timing wheel 用 boost::circular_buffer 實現,其中每個 Bucket 元素是個 hash set of EntryPtr。
在構造函數中,注冊每秒鐘的回調(EventLoop::runEvery() 注冊 EchoServer::onTimer() ),然後把 timing wheel 設為適當的大小。
EchoServer::EchoServer(EventLoop* loop, const InetAddress& listenAddr, int idleSeconds) : loop_(loop), server_(loop, listenAddr, "EchoServer"), connectionBuckets_(idleSeconds) { server_.setConnectionCallback( boost::bind(&EchoServer::onConnection, this, _1)); server_.setMessageCallback( boost::bind(&EchoServer::onMessage, this, _1, _2, _3)); loop->runEvery(1.0, boost::bind(&EchoServer::onTimer, this)); connectionBuckets_.resize(idleSeconds); }
其中 EchoServer::onTimer() 的實現只有一行:往隊尾添加一個空的 Bucket,這樣 circular_buffer 會自動彈出隊首的 Bucket,並析構之。在析構 Bucket 的時候,會依次析構其中的 EntryPtr 對象,這樣 Entry 的引用計數就不用我們去操心,C++ 的值語意會幫我們搞定一切。
void EchoServer::onTimer() { connectionBuckets_.push_back(Bucket()); }
查看本欄目
在連接建立時,創建一個 Entry 對象,把它放到 timing wheel 的隊尾。另外,我們還需 要把 Entry 的弱引用保存到 TcpConnection 的 context 裡,因為在收到數據的時候還要用到 Entry 。(思考題:如果 TcpConnection::setContext 保存的是強引用 EntryPtr,會出現什麼情況?)
void EchoServer::onConnection(const TcpConnectionPtr& conn) { LOG_INFO << "EchoServer - " << conn->peerAddress().toHostPort() << " -> " << conn->localAddress().toHostPort() << " is " << (conn->connected() ? "UP" : "DOWN"); if (conn->connected()) { EntryPtr entry(new Entry(conn)); connectionBuckets_.back().insert(entry); WeakEntryPtr weakEntry(entry); conn->setContext(weakEntry); } else { assert(!conn->getContext().empty()); WeakEntryPtr weakEntry(boost::any_cast(conn->getContext())); LOG_DEBUG << "Entry use_count = " << weakEntry.use_count(); } }
在收到消息時,從 TcpConnection 的 context 中取出 Entry 的弱引用,把它提升為強引 用 EntryPtr,然後放到當前的 timing wheel 隊尾。(思考題,為什麼要把 Entry 作為 TcpConnection 的 context 保存,如果這裡再創建一個新的 Entry 會有什麼後果?)
void EchoServer::onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time) { string msg(buf->retrieveAsString()); LOG_INFO << conn->name() << " echo " << msg.size() << " bytes at " << time.toString(); conn->send(msg); assert(!conn->getContext().empty()); WeakEntryPtr weakEntry(boost::any_cast(conn->getContext())); EntryPtr entry(weakEntry.lock()); if (entry) { connectionBuckets_.back().insert(entry); } }
然後呢?沒有然後了,程序已經完成了我們想要的功能。(完整的代碼會打印 circular_buffer 變化的情況,運行一下即可理解。)
希望本文有助於您理解 shared_ptr 和 weak_ptr。
改進
在現在的實現中,每次收到消息都會往隊尾添加 EntryPtr (當然, hash set 會幫我們去重。)一個簡單的改進措施是,在 TcpConnection 裡保存“最後一次往隊尾添加 引用時的 tail 位置”,然後先檢查 tail 是否變化,若無變化則不重復添加 EntryPtr。這樣或許能 提高效率。
以上改進留作練習。