這篇文章內容會很短,主要是想給大家分享下我最近在做一個簡單的rabbitmq客戶端類庫的封裝的經驗總結,說是簡單其實一點都不簡單。為了節省時間我主要按照Library的執行順序來介紹,在你看來這裡僅僅是一個簡單的經驗總結,但是在我看來這些經驗只有在你真正的封裝rabbitmq客戶端庫的時候且將你的客戶端安全穩定的發布上線後才會真的發現這些問題。
比如你的庫只是鏈接單個Node的時候和鏈接高可用集群的HAProxy時候是完全兩回事。當你未能在你的庫裡使用反向注入LOG接口的時候一旦在線上發生網絡解析和序列化等一系列在線問題時候你是多麼無能為力。當你使用同步循環獲取隊列消息的時候一旦發生異常你的鏈接就會斷掉等等這些細節。我總結了我在編寫這個library的時候慢慢穩定下來的過程和經驗。至少目前來看網絡上的文章,當然我是指.NET/C#方面的,都沒有講到這些問題,大部分的文章都是簡單的介紹了一個最最基本的使用和最最基本的demo而已,達不到企業級使用的要求。在這個過程中,感謝我的團隊和給過我指導的同事,讓我明白了一些技術道理。
好東西不能石沉大海,尤其是.NET領域更需要這樣的東西來填補這一空缺。廢話不多說了,進入主題,那些編寫框架和組件的大道理這裡就不講了,我只說重點。
就是說你的接受Channel和發送的Channel要分離開,如果不分開會出現偶發性的消息串掉的錯誤,我這裡現在沒有環境無法重現截圖。我是在做壓力測試的時候,用了一個Channel的時候Debug拋出來的異常。如果你有潔癖建議把IConnection也分離開。這樣不容易出錯,就算出錯排錯也會很容易。
(圖1:分開接受和發送的IConnection、Channel)
還有一點,不要將這些對象直接散落在直接使用的Client類中,要建立起一個使用上下文,就算你暴露在外面的是一個具體的類但是那個類也是一個空殼子。
我們可以在創建隊列的時候設置此隊列是持久化的,但是隊列中的消息要在我們發送某個消息的時候打上需要持久化的狀態標記。
(圖2:標記此消息是需要持久化的)
(圖1:在線程內部方法中加try{}catch{})
這個點很多做封裝的人會容易忽視掉,我這裡補充下為了保持這個文章的完整性。
其實在我之前的“.NET應用架構設計—服務端開發多線程使用小結(多線程使用常識)”一文中有講到過。
這個時候你的try{}catch{}其實是不會捕獲到任何ListenInit方法中的異常的,因為他在另外一個線程上下文中執行的。具體原理這裡就不解釋了。但是可以很容易的理解就是,你這個方法一旦執行就會立馬返回了。
(圖4:監聽Shutdown事件,記錄下LOG便於排查和監管服務的穩定性)
(圖5:組件內部的LOG接口)
此接口就是你內部用來將信息傳輸出去的渠道,而且這個渠道是活的,有各個應用系統決定怎麼記錄。
簡單處理你還需要一個LOG接口服務定位器對象,要不然你拿不到這個接口實例。
(圖6:LOG location對象)
如果我們是使用死循環的方式在接受消息,那麼一旦當你的接受消息的程序出現異常那麼你的while直接就會跳出,你的鏈接可能是還鏈接在服務器上但是你的channel已經斷開,說白了你的消息是不會接受到的,而且這樣的開發方法很不穩定也不優雅。我們可以使用面向事件的消費者來接受消息。
(圖7:使用Eventing類型的消費者接受消息)
默認情況下,一個隊列裡不管多少消息當你一個TCP連接打上去之後會LOCK住所有的消息,也就是說一個連接徹底占用了所有的消息,此時消息不會被其他集群的機器消費。
(圖8:一次只取一個消息進行消費)
但是如果你對消息的處理的前後順序有要求就不能這麼做,你需要獨立注冊一個隊列,然後將這樣的一此只消費一個消息配置話。
(圖9:創建出一個會自動重連的Connection對象)
(圖10:設置心跳超時時間)
如果你連接單台節點的時候不設置這個值是沒問題的,但是如果你連接的是類似HAProxy虛擬節點的時候就會出現TCP被斷開的可能性。如果你不設置這個心跳超時時間,它默認是不進行心跳保持的,就會出現網絡中的某個設置斷開空閒的TCP連接資源。就這個問題一直搞的我們的團隊到第二天兩點鐘。大家要記住這個點。
(圖11:重新放入隊列,推送給其他消費著)
最後,我是基於Rabbitmq.Client 版本3.5.3.0的基礎上開發的,這個大家要注意。版本不一樣會有一定的差異性。希望此文對大家在使用rabbitmq的同志有一點幫助,謝謝。
作者:王清培
出處:http://www.cnblogs.com/wangiqngpei557/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面