程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> 分享一個c#寫的開源分布式消息隊列equeue

分享一個c#寫的開源分布式消息隊列equeue

編輯:C#入門知識

前言   本文想介紹一下前段時間在寫enode時,順便實現的一個分布式消息隊列equeue。這個消息隊列的思想不是我想出來的,而是通過學習阿裡的rocketmq後,自己用c#實現了一個輕量級的簡單版本。一方面可以通過寫這個隊列讓自己更深入的掌握消息隊列的一些常見問題;另一方面也可以用來和enode集成,為enode中的command和domain event的消息傳遞提供支持。目前在.net平台,比較好用的消息隊列,最常見的是微軟的MSMQ了吧,還有像rabbitmq也有.net的client端。這些消息隊列都很強大和成熟。但當我學習了kafka以及阿裡的rocketmq(早期版本叫metaq,自metaq 3.0後改名為rocketmq)後,覺得rocketmq的設計思想深深吸引了我,因為我不僅能理解其思想,還有其完整的源代碼可以學習。但是rocketmq是java寫的,且目前還沒有.net的client端,所以不能直接使用(有興趣的朋友可以為其寫一個.net的client端),所以在學習了rocketmq的設計文檔以及大部分代碼後,決定自己用c#寫一個出來。   項目開源地址:https://github.com/tangxuehua/equeue,項目中包含了隊列的全部源代碼以及如何使用的示例。也可以在enode項目中看到如何使用。   EQUEUE消息隊列中的專業術語   Topic   一個topic就是一個主題。一個系統中,我們可以對消息劃分為一些topic,這樣我們就能通過topic,將消息發送到不同的queue。   Queue   一個topic下,我們可以設置多個queue,每個queue就是我們平時所說的消息隊列;因為queue是完全從屬於某個特定的topic的,所以當我們要發送消息時,總是要指定該消息所屬的topic是什麼。然後equeue就能知道該topic下有幾個queue了。但是到底發送到哪個queue呢?比如一個topic下有4個queue,那對於這個topic下的消息,發送時,到底該發送到哪個queue呢?那必定有個消息被路由的過程。目前equeue的做法是在發送一個消息時,需要用戶指定這個消息對應的topic以及一個用來路由的一個object類型的參數。equeue會根據topic得到所有的queue,然後根據該object參數通過hash code然後取模queue的個數最後得到要發送的queue的編號,從而知道該發送到哪個queue。這個路由消息的過程是在發送消息的這一方做的,也就是下面要說的producer。之所以不在消息服務器上做是因為這樣可以讓用戶自己決定該如何路由消息,具有更大的靈活性。   Producer   就是消息隊列的生產者。我們知道,消息隊列的本質就是實現了publish-subscribe的模式,即生產者-消費者模式。生產者生產消息,消費者消費消息。所以這裡的Producer就是用來生產和發送消息的。   Consumer   就是消息隊列的消費者,一個消息可以有多個消費者。   Consumer Group   消費者分組,這可能對大家來說是一個新概念。之所以要搞出一個消費者分組,是為了實現下面要說的集群消費。一個消費者分組中包含了一些消費者,如果這些消費者是要集群消費,那這些消費者會平均消費該分組中的消息。   Broker   equeue中的broker負責消息的中轉,即接收producer發送過來的消息,然後持久化消息到磁盤,然後接收consumer發送過來的拉取消息的請求,然後根據請求拉取相應的消息給consumer。所以,broker可以理解為消息隊列服務器,提供消息的接收、存儲、拉取服務。可見,broker對於equeue來說是核心,它絕對不能掛,一旦掛了,那producer,consumer就無法實現publish-subscribe了。   集群消費   集群消費是指,一個consumer group下的consumer,平均消費topic下的queue。具體如何平均可以看一下下面的架構圖,這裡先用文字簡單描述一下。假如一個topic下有4個queue,然後當前有一個consumer group,該分組下有4個consumer,那每個consumer就被分配到該topic下的一個queue,這樣就達到了平均消費topic下的queue的目的。如果consumer group下只有兩個consumer,那每個consumer就消費2個queue。如果有3個consumer,則第一個消費2個queue,後面兩個每個消費一個queue,從而達到盡量平均消費。所以,可以看出,我們應該盡量讓consumer group下的consumer的數目和topic的queue的數目一致或成倍數關系。這樣每個consumer消費的queue的數量總是一樣的,這樣每個consumer服務器的壓力才會差不多。當前前提是這個topic下的每個queue裡的消息的數量總是差不多多的。這點我們可以對消息根據某個用戶自己定義的key來進行hash路由來保證。   廣播消費   廣播消費是指一個consumer只要訂閱了某個topic的消息,那它就會收到該topic下的所有queue裡的消息,而不管這個consumer的group是什麼。所以對於廣播消費來說,consumer group沒什麼實際意義。consumer可以在實例化時,我們可以指定是集群消費還是廣播消費。   消費進度(offset)   消費進度是指,當一個consumer group裡的consumer在消費某個queue裡的消息時,equeue是通過記錄消費位置(offset)來知道當前消費到哪裡了。以便該consumer重啟後繼續從該位置開始消費。比如一個topic有4個queue,一個consumer group有4個consumer,則每個consumer分配到一個queue,然後每個consumer分別消費自己的queue裡的消息。equeue會分別記錄每個consumer對其queue的消費進度,從而保證每個consumer重啟後知道下次從哪裡開始繼續消費。實際上,也許下次重啟後不是由該consumer消費該queue了,而是由group裡的其他consumer消費了,這樣也沒關系,因為我們已經記錄了這個queue的消費位置了。所以可以看出,消費位置和consumer其實無關,消費位置完全是queue的一個屬性,用來記錄當前被消費到哪裡了。另外一點很重要的是,一個topic可以被多個consumer group裡的consumer訂閱。不同consumer group裡的consumer即便是消費同一個topic下的同一個queue,那消費進度也是分開存儲的。也就是說,不同的consumer group內的consumer的消費完全隔離,彼此不受影響。還有一點就是,對於集群消費和廣播消費,消費進度持久化的地方是不同的,集群消費的消費進度是放在broker,也就是消息隊列服務器上的,而廣播消費的消費進度是存儲在consumer本地磁盤上的。之所以這樣設計是因為,對於集群消費,由於一個queue的消費者可能會更換,因為consumer group下的consumer數量可能會增加或減少,然後就會重新計算每個consumer該消費的queue是哪些,這個能理解的把?所以,當出現一個queue的consumer變動的時候,新的consumer如何知道該從哪裡開始消費這個queue呢?如果這個queue的消費進度是存儲在前一個consumer服務器上的,那就很難拿到這個消費進度了,因為有可能那個服務器已經掛了,或者下架了,都有可能。而因為broker對於所有的consumer總是在服務的,所以,在集群消費的情況下,被訂閱的topic的queue的消費位置是存儲在broker上的,存儲的時候按照不同的consumer group做隔離,以確保不同的consumer group下的consumer的消費進度互補影響。然後,對於廣播消費,由於不會出現一個queue的consumer會變動的情況,所以我們沒必要讓broker來保存消費位置,所以是保存在consumer自己的服務器上。   EQUEUE是什麼?       通過上圖,我們能直觀的理解equeue。這個圖是從rocketmq的設計文檔中拿來的,呵呵。由於equeue的設計思想完全和rocketmq一致,所以我就拿過來用了。每個producer可以向某個topic發消息,發送的時候根據某種路由策略(producer可自定義)將消息發送到某個特定的queue。然後consumer可以消費特定topic下的queue裡的消息。上圖中,TOPIC_A有兩個消費者,這兩個消費者是在一個group裡,所以應該平均消費TOPIC_A下的queue但由於有三個queue,所以第一個consumer分到了2個queue,第二個consumer分到了1個。對於TOPIC_B,由於只有一個消費者,那TOPIC_B下的所有queue都由它消費。所有的topic信息、queue信息、還有消息本身,都存儲在broker服務器上。這點上圖中沒有體現出來。上圖主要關注producer,consumer,topic,queue這四個東西之間的關系,並不關注物理服務器的部署架構。   關鍵問題的思考   1.producer,broker,consumer三者之間如何通信   由於是用c#實現,且因為一般是在局域網內部署,為了實現高性能通信,我們可以利用異步socket來通信。.net本身提供了很好的異步socket通信的支持;我們也可以用zeromq來實現高性能的socket通信。本來想直接使用zeromq來實現通信模塊就好了,但後來自己學習了一下.net自帶的socket通信相關知識,發現也不難,所以就自己實現了一個,呵呵。自己實現的好處是我可以自己定義消息的協議,目前這部分實現代碼在ecommon基礎類庫中,是一個獨立的可服用的與業務場景無關的基礎類庫。有興趣的可以去下載下來看看代碼。經過了自己的一些性能測試,發現通信模塊的性能還是不錯的。一台broker,四台producer同時向這個broker發送消息,每秒能發送的消息4W沒有問題,更多的producer還沒測試。   2.消息如何持久化   消息持久化方面主要考慮的是性能問題,還有就是消息如何快速的讀取。   1. 首先,一台broker上的消息不需要一直保存在該broker服務器上,因為這些消息總會被消費掉。根據阿裡rocketmq的設計,默認會1天刪除一次已經被消費過的消息。所以,我們可以理解,broker上的消息應該不會無限制增長,因為會被定期刪除。所以不必考慮一台broker上消息放不下的問題。   2. 如何快速的持久化消息?一般來說,我覺得有兩種方式:1)順序寫磁盤文件;2)用現成的key,value的nosql產品來存儲;rocketmq目前用的是自己寫文件的方式,這種方式的難點是寫文件比較復雜,因為所有消息都是順序append到文件末尾,雖然性能非常高,但復雜度也很高;比如所有消息不能全寫在一個文件裡,一個文件到達一定大小後需要拆分,一旦拆分就會產生很多問題,呵呵。拆分後如何讀取也是比較復雜的問題。還有由於是順序寫入文件的,那我們還需要把每一個消息在文件中的起始位置和長度需要記錄下來,這樣consumer在消費消息時,才能根據offset從文件中拿到該消息。總之需要考慮的問題很多。如果是用nosql來持久化消息,那可以省去我們寫文件時遇到的各種問題,我們只需要關心如何把消息的key和該消息在queue中的offset對應起來即可。另外一點疑問是,queue裡的信息要持久化嗎?先要想清楚queue裡放的是什麼東西。當broker接收到一個消息後,首先肯定是要先持久化,完成後需要把消息放入queue裡。但由於內存很有限,我們不可能把這個消息直接放入queue裡,我們其實要放的只需要時該消息在nosql裡的key即可,或者如果是用文件來持久化,那放的是該消息在文件中的偏移量offset,即存儲在文件的那個位置(比如是哪個行號)。所以,實際上,queue只是一個消息的索引。那有必要持久化queue嗎?可以持久化,這樣畢竟在broker重啟的時候,恢復queue的時間可以縮短。那需要和持久化消息同步持久化嗎?顯然不需要,我們可以異步定時持久化每個queue,然後恢復queue的時候,可以先從持久化的部分恢復,然後再把剩下的部分通過持久化的消息來補充以達到queue因為異步持久化而慢的部分可以追平。所以,經過上面的分析,消息本身都是放在nosql中,queue全部在內存中。   那消息如何持久化呢?我覺得最好的辦法是讓每個消息有一個全局的順序號,一旦消息被寫入nosql後,該消息的全局順序號就確定了,然後我們在更新對應的queue的信息時,把該消息的全局順序號傳給queue,這樣queue就能把queue自己對該消息的本地順序號和該消息的全局順序號建立映射關系。相關代碼如下:   復制代碼 public MessageStoreResult StoreMessage(Message message, int queueId) {     var queues = GetQueues(message.Topic);     var queueCount = queues.Count;     if (queueId >= queueCount || queueId < 0)     {         throw new InvalidQueueIdException(message.Topic, queueCount, queueId);     }     var queue = queues[queueId];     var queueOffset = queue.IncrementCurrentOffset();     var storeResult = _messageStore.StoreMessage(message, queue.QueueId, queueOffset);     queue.SetMessageOffset(queueOffset, storeResult.MessageOffset);     return storeResult; } 復制代碼 沒什麼比代碼更能說明問題了,呵呵。上的代碼的思路是,接收一個消息對象和一個queueId,queueId表示當前消息要放到第幾個queue裡。然後內部邏輯是,先獲取該消息的topic的所有queue,由於queue和topic都在內存,所以這裡沒性能問題。然後檢查一下當前傳遞進來的queueId是否合法。如果合法,那就定位到該queue,然後通過IncrementCurrentOffset方法,將queue的內部序號加1並返回,然後持久化消息,持久化的時候把queueId以及queueOffset一起持久化,完成後返回一個消息的全局序列號。由於messageStore內部會把消息內容、queueId、queueOffset,以及消息的全局順序號一起作為一個整體保存到nosql中,key就是消息的全局序列號,value就是前面說的整體(被序列化為二進制)。然後,在調用queue的SetMessageOffset方法,把queueOffset和message的全局offset建立映射關系即可。最後返回一個結果。messageStore.StoreMessage的內存實現大致如下:   復制代碼 public MessageStoreResult StoreMessage(Message message, int queueId, long queueOffset) {     var offset = GetNextOffset();     _queueCurrentOffsetDict[offset] = new QueueMessage(message.Topic, message.Body, offset, queueId, queueOffset, DateTime.Now);     return new MessageStoreResult(offset, queueId, queueOffset); } 復制代碼 GetNextOffset就是獲取下一個全局的消息序列號,QueueMessage就是上面所說的“整體”,因為是內存實現,所以就用了一個ConcurrentDictionary來保存一下queueMessage對象。如果是用nosql來實現messageStore,則這裡需要寫入nosql,key就是消息的全局序列號,value就是queueMessage的二進制序列化數據。通過上面的分析我們可以知道我們會將消息的全局序列號+queueId+queueOffset一起整體作為一條記錄持久化起來。這樣做有兩個非常好的特性:1)實現了消息持久化和消息在queue中的位置的持久化的原子事務;2)我們總是可以根據這些持久化的queueMessage還原出所有的queue的信息,因為queueMessage裡包含了消息和消息在queue的中的位置信息;   基於這樣的消息存儲,當某個consumer要消費某個位置的消息時,我們可以通過先通過queueId找到queue,然後通過消息在queueOffset(由consumer傳遞過來的)獲取消息的全局offset,然後根據該全局的offset作為key從nosql拿到消息。實際上現在的equeue是批量拉取消息的,也就是一次socket請求不是拉一個消息,而是拉一批,默認是32個消息。這樣consumer可以用更少的網絡請求拿到更多的消息,可以加快消息消費的速度。   3.producer發送消息時的消息路由的細節   producer在發送消息時,如何知道當前topic下有多少個queue呢?每次發送消息時都要去broker上查一下嗎?顯然不行,這樣發送消息的性能就上不去了。那怎麼辦呢?就是異步,呵呵。producer可以定時向broker發送請求,獲取topic下的queue數量,然後保存起來。這樣每次producer在發送消息時,就只要從本地緩存裡拿即可。因為broker上topic的queue的數量一般不會變化,所以這樣的緩存很有意義。那還有一個問題,當前producer第一次對某個topic發送消息時,queue哪裡來呢?因為定時線程不知道要向broker拿哪個topic下的queue數量,因為此時producer端還沒有一個topic呢,因為一個消息都還沒發送過。那就是需要判斷一下,如果當前topic沒有queue的count信息,則直接從broker上獲取queue的count信息。然後再緩存起來,在發送當前消息。然後第二次發送時,因為緩存裡已經有了該消息,所以就不必再從broker拿了,且後續定時線程也會自動去更新該topic下的queue的count了。好,producer有了topic的queue的count,那用戶在發送消息時,框架就能把這個topic的queueCount傳遞給用戶,然後用戶就能根據自己的需要將消息路由到第幾個queue了。   4.consumer負載均衡如何實現   consumer負載均衡的意思是指,在消費者集群消費的情況下,如何讓同一個consumer group裡的消費者平均消費同一個topic下的queue。所以這個負載均衡本質上是一個將queue平均分配給consumer的過程。那麼怎麼實現呢?通過上面負載均衡的定義,我們只要,要做負載均衡,必須要確定consumer group和topic;然後拿到consumer group下的所有consumer,以及topic下的所有queue;然後對於當前的consumer,就能計算出來當前consumer應該被分配到哪些queue了。我們可以通過如下的函數來得到當前的consumer應該被分配到哪幾個queue。   復制代碼 public class AverageAllocateMessageQueueStrategy : IAllocateMessageQueueStrategy {     public IEnumerable<MessageQueue> Allocate(string currentConsumerId, IList<MessageQueue> totalMessageQueues, IList<string> totalConsumerIds)     {         var result = new List<MessageQueue>();           if (!totalConsumerIds.Contains(currentConsumerId))         {             return result;         }           var index = totalConsumerIds.IndexOf(currentConsumerId);         var totalMessageQueueCount = totalMessageQueues.Count;         var totalConsumerCount = totalConsumerIds.Count;         var mod = totalMessageQueues.Count() % totalConsumerCount;         var size = mod > 0 && index < mod ? totalMessageQueueCount / totalConsumerCount + 1 : totalMessageQueueCount / totalConsumerCount;         var averageSize = totalMessageQueueCount <= totalConsumerCount ? 1 : size;         var startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;         var range = Math.Min(averageSize, totalMessageQueueCount - startIndex);           for (var i = 0; i < range; i++)         {             result.Add(totalMessageQueues[(startIndex + i) % totalMessageQueueCount]);         }           return result;     } } 復制代碼 函數裡的實現就不多分析了。這個函數的目的就是根據給定的輸入,返回當前consumer該分配到的queue。分配的原則就是平均分配。好了,有了這個函數,我們就能很方便的實現負載均衡了。我們可以對每一個正在運行的consumer內部開一個定時job,該job每隔一段時間進行一次負載均衡,也就是執行一次上面的函數,得到當前consumer該綁定的最新queue。因為每個consumer都有一個groupName屬性,用於表示當前consumer屬於哪個group。所以,我們就可以在負載均衡時到broker獲取當前group下的所有consumer;另一方面,因為每個consumer都知道它自己訂閱了哪些topic,所以有了topic信息,就能獲取topic下的所有queue的信息了,有了這兩樣信息,每個consumer就能自己做負載均衡了。先看一下下面的代碼:   _scheduleService.ScheduleTask(Rebalance, Setting.RebalanceInterval, Setting.RebalanceInterval); _scheduleService.ScheduleTask(UpdateAllTopicQueues, Setting.UpdateTopicQueueCountInterval, Setting.UpdateTopicQueueCountInterval); _scheduleService.ScheduleTask(SendHeartbeat, Setting.HeartbeatBrokerInterval, Setting.HeartbeatBrokerInterval); 每個consumer內部都會啟動三個定時的task,第一個task表示要定時做一次負載均衡;第二個task表示要定時更新當前consumer訂閱的所有topic的queueCount信息,並把最新的queueCount信息都保存在本地;第三個task表示當前consumer會向broker定時發送心跳,這樣broker就能通過心跳知道某個consumer是否還活著,broker上維護了所有的consumer信息。一旦有新增或者發現沒有及時發送心跳過來的consumer,就會認為有新增或者死掉的consumer。因為broker上維護了所有的consumer信息,所以他就能提供查詢服務,比如根據某個consumer group查詢該group下的consumer。   通過這三個定時任務,就能完成消費者的負載均衡了。先看一下Rebalance方法:   復制代碼 private void Rebalance() {     foreach (var subscriptionTopic in _subscriptionTopics)     {         try         {             RebalanceClustering(subscriptionTopic);         }         catch (Exception ex)         {             _logger.Error(string.Format("[{0}]: rebalanceClustering for topic [{1}] has exception", Id, subscriptionTopic), ex);         }     } }

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved