開源QQ群: .net 開源基礎服務 238543768
開源地址: http://git.oschina.net/chejiangyi/Dyd.BusinessMQ
## 業務消息隊列 ##
業務消息隊列是應用於業務的解耦和分離,應具備分布式,高可靠性,高性能,高實時性,高穩定性,高擴展性等特性。
## 優點: ##
- 大量的業務消息堆積能力
- 無單點故障及故障監控,異常提醒
- 生產者端負載均衡,故障轉移,故障自動恢復,並行消息插入。
- 消費者端負載均衡,故障保持,故障自動恢復,並行消息消費。
- 消息高可靠性持久化,較高性能,較高實時性,高穩定性,高擴展性。
- 支持99*99個消息分區,單個消息分區單天支持近1億的消息存儲。
- 消費者拉方式獲取消息,在高並發,大量消息湧入的情況下,只要消費能力足夠,不會有消息延遲,消息越多性能越好。
## 缺點: ##
- 能保證消息順序插入,保證相同分區的消息是順序的(排除網絡延遲),但是多個分區之間的可能是亂序的。
- 消息並行消費或者多個分區並行消費或者負載均衡情況下的,消息消費順序是亂序。
## 缺點原因: ##
- 消息的負載均衡是基於消息的分區存儲,故多個分區之間的消息是亂序的,但是相同分區的消息是順序的。
- 消息的消費者負載均衡也是基於消息的分區進行均衡的,同時單個消費者訂閱多個分區的情況下,也可並行進行消費。意味著不同分區的消息的消費是亂序的,但是相同分區的消息消費是順序的。
## 缺點解決方案: ##
- 生產者自定義負載均衡算法,按照業務維度(用戶,商戶)等進行分區(多個用戶之間可以消息亂序,單個用戶的消息必須是順序的),不同維度可以指向不同的分區,但是單個維度的消息是可以保證順序的。
- 本解決方案在故障的情況下,故障會移除某些故障節點,意味著故障節點會立即報錯(當然也可自己指定故障節點進行轉移,但是轉移的節點消息會被提前消費,故障的消息會在恢復故障後重新消費,這樣也會出現故障程度上的消息亂序消費)。
- 本解決方案在線上無縫擴容和擴展性能方面也會有限制,看要具體的負載均衡算法,但是一般情況下,如果要擴容還是會進行部分消息遷移的情況。
## 問答: ##
### *1.大量的業務消息堆積能力,如何實現?* ###
每個分區表支持約1億的消息存儲,可以通過增加分區表進行擴容。消費者進行消息消費,內部僅保留某個分區上一次消費的指針,所以不會影響消費者。
消息持久化到磁盤,不會在內存駐留,理論上不影響內存。
### *2.無單點故障及故障監控,異常提醒?* ###
故障一般會發生在redis,數據節點,管理中心,日志中心。
redis節點故障會影響消費者的消息消費響應及時度,一般延遲5s以內。不會影響消息消費速度和消息消費QPS
數據節點故障會影響生產者和消費者的消息,並造成消息暫時丟失(但是都是可恢復的,具體的看數據庫的高可用做到什麼程度)。
生產者端會無縫的進行節點移除,但是會默認1分鐘重新嘗試重連。消費者會持續報錯至日志中,但是不會影響其他分區消費。
管理中心故障會影響生產者和消費者的心跳檢測和新注冊的生產者,消費者,但不會影響生產者和消費者具體的消息存儲和發送接收。
日志中心故障不會影響生產者和消費者,但是影響日志的打印,日志中心故障會通知公司內部監控平台。
雖然故障不會影響線上已有的消息運行,但是還是會在高並發情況下出現性能問題,和系統穩定性,所以一旦發現要重視和及時處理。
### *3.生產者端負載均衡,故障轉移,故障自動恢復,並行消息插入?* ###
默認負載均衡采用多個分區順序輪詢插入,在並發情況下輪詢插入是並行插入到不同分區的;某個數據節點出現故障,會移除相關數據節點的所有分區;
默認1分鐘會重新載入故障分區進行重試。
### *4.消費者端負載均衡,故障保持,故障自動恢復,並行消息消費。* ###
默認消費者端負載均衡是根據消費者訂閱的分區進行的(一個消費者可以訂閱多個分區,多個相同業務的消費者可以訂閱多個不同分區進行負載)。
一個消費者訂閱多個分區,這個消費者可以開啟並行進行多分區消費。並行度=分區數,效果理論上最佳。
分區節點出現故障等,單個分區或者數據節點就會暫停消費,並通知日志中心打印錯誤日志。當故障恢復後,消費繼續進行。
### *5.消息高可靠性持久化,較高性能,較高實時性,高穩定性,高穩定性。* ###
消息傳遞到消息中心後,立即持久化到磁盤,故不會丟失消息。生產者可以采用多個分區進行並行插入,消費者可以采用並行進行消息消費,故理論上性能是可擴展無限量的。
消息是通過拉取的方式獲取的,發送消息會由redis進行即時通知消費者拉取(即時消息默認會合並在500ms內redis通知消息),一般在20ms內消息會被消費掉。
批量拉消息的方式相對push的消息推送方式在高並發和大量消息處理的情況下,消息發送性能應該是更優的。
穩定性是基於數據庫的穩定性和故障轉移層面來確保的,擴展性體現在線上無縫的遷移和擴容。
### *6.支持9999個消息分區,單個消息分區單天支持近1億的消息存儲。* ###
數據節點是01~99個,節點裡面的表分區是01~99個,所以可以支持近1萬個分區節點。單表的mqid最大應該是(1億-1)條,應該滿足一般的業務需求,
若不能滿足,可以通過多個分區的方式擴容。
### *7.消費者拉方式獲取消息,在高並發,大量消息湧入的情況下,只要消費能力足夠,不會有消息延遲,消息越多性能越好。* ###
push推消息的模式能保證更高的實時性,但是在大量消息的情況下,消息堆積的情況更嚴重,性能會有所影響。
pull拉消息的模式在保證消息實時性方面會略差,但是在大量消息湧入的情況下,批量拉消息效率更加。而且會將消息分發的負載轉移到多個消費者端上。
## 未來改進: ##
1. 未來采用leveldb重寫存儲。
1. 剝離broker服務用於支持相對可靠的消息服務。
1. 消息完成標記本地緩存/持久化(或者存儲redis),每秒提交更新至數據庫,消除頻繁消費導致的瓶頸。
## 架構示意圖 ##
## 使用demo示例 ##
/// <summary> /// 發送消息 /// </summary> /// <param name="msg"></param> public void SendMessageDemo(string msg) { //發送字符串示例 var p = ProducterPoolHelper.GetPool(new BusinessMQConfig() { ManageConnectString = "server=192.168.17.201;Initial Catalog=dyd_bs_MQ_manage;User ID=sa;Password=Xx~!@#;" },//管理中心數據庫 "dyd.mytest3");//隊列路徑 .分隔,類似類的namespace,是隊列的唯一標識,要提前告知運維在消息中心注冊,方可使用。 p.SendMessage(@"1"); //發送對象示例 /* var obj = new message2 { text = "文字", num = 1 }; var p = ProducterPoolHelper.GetPool(new BusinessMQConfig() { ManageConnectString = "server=192.168.17.237;Initial Catalog=dyd_bs_MQ_manage;User ID=sa;Password=Xx~!@#;" },//管理中心數據庫 "test.diayadian.obj");//隊列路徑 .分隔,類似類的namespace,是隊列的唯一標識,要提前告知運維在消息中心注冊,方可使用。 p.SendMessage<message>(obj); */ } private ConsumerProvider Consumer; /// <summary> /// 接收消息 /// </summary> /// <param name="action"></param> public void ReceiveMessageDemo(Action<string> action) { if (Consumer == null) { Consumer = new ConsumerProvider(); Consumer.Client = "dyd.mytest3.customer1";//clientid,接收消息的(消費者)唯一標示,一旦注冊以後,不能更改,業務下線廢棄後必須要告知運維,刪除消費者注冊。 Consumer.ClientName = "客戶端名稱";//這個相對隨意些,主要是用來自己識別的,要簡短 Consumer.Config = new BusinessMQConfig() { ManageConnectString = "server=192.168.17.201;Initial Catalog=dyd_bs_MQ_manage;User ID=sa;Password=Xx~!@#;" }; Consumer.MaxReceiveMQThread = 1;//並行處理的線程數,一般為1足夠,若消息處理慢,又想並行消費,則考慮 正在使用的分區=並行處理線程數 為並行效率極端最優,但cpu消耗應該不小。 Consumer.MQPath = "dyd.mytest3";//接收的隊列要正確 Consumer.PartitionIndexs = new List<int>() { 1, 2, 3,4, 5, 6, 7, 8 };//消費者訂閱的分區順序號,從1開始 Consumer.RegisterReceiveMQListener<string>((r) => { /* * 這些編寫業務代碼 * 編寫的時候要注意考慮,業務處理失敗的情況。 * 1.重試失敗n次。 * 2.重試還不行,則標記消息已被處理。然後跳過該消息處理,自己另外文檔記錄這種情況。 * 消息被消費完畢,一定要調用MarkFinished,標記消息被消費完畢。 */ action.Invoke(r.ObjMsg); r.MarkFinished(); }); } } /// <summary> /// 關閉消息訂閱連接 /// </summary> public void CloseReceiveMessage() { //注冊消費者消息,消費者務必要在程序關閉後關掉(dispose)。否則導致異常終止,要人工等待連接超時後,方可重新注冊。 if (Consumer != null) { Consumer.Dispose(); Consumer = null; } } }
部分截圖
備注:.net開源的消息隊列很少,特別是針對業務的高可靠性的消息隊列;希望這個開源的消息隊列,能夠為.net領域帶來更多解決方案,更多的思路和架構設計;同時也希望了解消息隊列的人能夠給於這個解決方案更多的建議和完善意見。
作者:車江毅