程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> RabbitMQ入門教程——路由(Routing),rabbitmqrouting

RabbitMQ入門教程——路由(Routing),rabbitmqrouting

編輯:C#入門知識

RabbitMQ入門教程——路由(Routing),rabbitmqrouting


綁定( Bindings)

 

之前的文章中我們已經創建過bindings,代碼如下:

 

      channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);

 

綁定(bindings)是指交換機(exchange)與隊列(queue)之間的關系。可以簡單的理解為:隊列(queue)對所綁定的交換機(exchange)上的消息感興趣,交換機(exchange)要把它接收到的消息推送到隊列(queue)中。

綁定的時候需要帶上一個額外的參數routingKey,為避免與BasicPublish中的路由鍵(routing key)參數混淆,我們稱之為綁定鍵(binding key),以下是如何創建一個綁定。

 

    channel.QueueBind(queue: queue, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);

 

注意:

  • 參數routingKey為空時,也是一個綁定鍵
  • 綁定鍵的意義依賴於exchange type。如:如果exchange type 為 fanout 時,綁定鍵沒有任何意義。

 

直連交換機(direct exchange)

 

在之前的發布訂閱中我們已經講到直連交換機,我們了解到直連交換機的工作方式為——交換機(exchange)會對綁定鍵(binding key)與 路由鍵(routing key)進行精確匹配,然後將消息發送到能夠匹配成功的隊列中。

下圖能夠很好的描述整個場景:

在這個場景中,可以看出直連交換機X和隊列(Q1與Q2)進行了綁定。Q1隊列使用orange為綁定鍵(binding key),Q2有兩個綁定,分別以black和green作為綁定鍵(binding key)。

這樣以來,當路由鍵為orange的消息發送到交換機,就會被路由到隊列Q1,路由鍵為black和green的下拍戲就會被路由到Q2,其它的消息將會被丟棄。

 

多重綁定(multiple bindings)

 

多重綁定即使用一個綁定鍵(binding key)綁定到多個隊列,這是完全合法的,而且每個隊列都能得到完全相同的信息。

 

示例

接下來我們就使用direct exchange完善之前的日志功能

1.日志級別為error的日志保存的到txt文件中

2.日志級別為log的日志輸出到控制台面板

3.輸出所有的日志到控制台面板

 

生產者 RoutingProducer.cs

 

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using RabbitMQ.Client;

using System.Threading;

 

namespace RabbitMQProducer

{

    public class RoutingProducer

    {

        const string EXCHANGE_NAME = "ROUTING_EXCHANGE";

        static readonly List<string> LEVELS = new List<string>() { "error", "log" };

        public static void Send()

        {

 

            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };

            using (IConnection connection = factory.CreateConnection())

            {

                using (IModel channel = connection.CreateModel())

                {

 

                    //創建交換機類型為 direct 的交換機

                    channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);

                    for (int i = 0; i < 20; i++)

                    {

                        Thread.Sleep(100);

                        string level = GetLevels();

                        string message = $"日志信息:{i}——日志等級:{level}";

 

                        //發送消息至之前創建的交換機,並設置路由鍵為 日志級別

                        channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: level, basicProperties: null, body: Encoding.UTF8.GetBytes(message));

                        Console.WriteLine(message);

                    }

 

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

 

        private static string GetLevels()

        {

            return LEVELS[new Random().Next(0, 2)];

        }

    }

}

 

消費者 RoutingConsumer.cs

 

using RabbitMQ.Client;

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using RabbitMQ.Client.Events;

using System.IO;

namespace RabbitMQConsumer

{

    public class RoutingConsumer

    {

        const string EXCHANGE_NAME = "ROUTING_EXCHANGE";

        /// <summary>

        /// 是否使用多重綁定將所有日志級別消息輸出到控制台

        /// 默認只是輸出日志級別為log的內容到控制台

        /// </summary>

        /// <param name="all"></param>

        public static void Log(bool all = false)

        {

            var factory = new ConnectionFactory()

            {

                HostName = "127.0.0.1"

            };

            using (var connection = factory.CreateConnection())

            {

                using (var channel = connection.CreateModel())

                {

                    channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);

                    //每次運行consumer客戶端都創建一個新的queue,並且綁定到對應的exchange,這樣使每次發送消息到exchange時就能把消息由exchange傳遞到所綁定的queue

                    QueueDeclareOk queue = channel.QueueDeclare();

                    string queueName = queue.QueueName;

                    channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: "log", arguments: null);

                    if (all)

                    {

                        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);

                    }

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (sender, e) =>

                    {

                        string message = Encoding.UTF8.GetString(e.Body);

                        Console.WriteLine($"LOG——日志信息:{message}");

                    };

                    channel.BasicConsume(queueName, noAck: true, consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");

                    Console.ReadLine();

                }

            }

        }

        public static void Error()

        {

            var factory = new ConnectionFactory() { HostName = "127.0.0.1" };

            using (IConnection connection = factory.CreateConnection())

            {

                using (IModel channel = connection.CreateModel())

                {

                    //創建交換機類型為 direct 的交換機

                    channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);

                    //創建一個未命名的新的消息隊列,該隊列名稱有系統自動分配,並且為非持久化,在該隊列沒有訂閱時自動刪除的排它隊列

                    QueueDeclareOk queue = channel.QueueDeclare();

                    string queueName = queue.QueueName;

                    //綁定exchange 與 queue 並設置路由鍵為日志級別error

                    channel.QueueBind(queue: queue, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (sender, arg) =>

                    {

                        string message = Encoding.UTF8.GetString(arg.Body);

                        //寫入日志到txt文件

                        using (StreamWriter writer = new StreamWriter(@"c:\log\log.txt", true, Encoding.UTF8))

                        {

                            writer.WriteLine(message);

                            writer.Close();

                        }

                    };

                    channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);

                }

            }

        }

    }

}

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved