綁定( Bindings)
之前的文章中我們已經創建過bindings,代碼如下:
channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
綁定(bindings)是指交換機(exchange)與隊列(queue)之間的關系。可以簡單的理解為:隊列(queue)對所綁定的交換機(exchange)上的消息感興趣,交換機(exchange)要把它接收到的消息推送到隊列(queue)中。
綁定的時候需要帶上一個額外的參數routingKey,為避免與BasicPublish中的路由鍵(routing key)參數混淆,我們稱之為綁定鍵(binding key),以下是如何創建一個綁定。
channel.QueueBind(queue: queue, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);
注意:
直連交換機(direct exchange)
在之前的發布訂閱中我們已經講到直連交換機,我們了解到直連交換機的工作方式為——交換機(exchange)會對綁定鍵(binding key)與 路由鍵(routing key)進行精確匹配,然後將消息發送到能夠匹配成功的隊列中。
下圖能夠很好的描述整個場景:
在這個場景中,可以看出直連交換機X和隊列(Q1與Q2)進行了綁定。Q1隊列使用orange為綁定鍵(binding key),Q2有兩個綁定,分別以black和green作為綁定鍵(binding key)。
這樣以來,當路由鍵為orange的消息發送到交換機,就會被路由到隊列Q1,路由鍵為black和green的下拍戲就會被路由到Q2,其它的消息將會被丟棄。
多重綁定(multiple bindings)
多重綁定即使用一個綁定鍵(binding key)綁定到多個隊列,這是完全合法的,而且每個隊列都能得到完全相同的信息。
示例
接下來我們就使用direct exchange完善之前的日志功能
1.日志級別為error的日志保存的到txt文件中
2.日志級別為log的日志輸出到控制台面板
3.輸出所有的日志到控制台面板
生產者 RoutingProducer.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using System.Threading;
namespace RabbitMQProducer
{
public class RoutingProducer
{
const string EXCHANGE_NAME = "ROUTING_EXCHANGE";
static readonly List<string> LEVELS = new List<string>() { "error", "log" };
public static void Send()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
//創建交換機類型為 direct 的交換機
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);
for (int i = 0; i < 20; i++)
{
Thread.Sleep(100);
string level = GetLevels();
string message = $"日志信息:{i}——日志等級:{level}";
//發送消息至之前創建的交換機,並設置路由鍵為 日志級別
channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: level, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
Console.WriteLine(message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
private static string GetLevels()
{
return LEVELS[new Random().Next(0, 2)];
}
}
}
消費者 RoutingConsumer.cs
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using System.IO;
namespace RabbitMQConsumer
{
public class RoutingConsumer
{
const string EXCHANGE_NAME = "ROUTING_EXCHANGE";
/// <summary>
/// 是否使用多重綁定將所有日志級別消息輸出到控制台
/// 默認只是輸出日志級別為log的內容到控制台
/// </summary>
/// <param name="all"></param>
public static void Log(bool all = false)
{
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);
//每次運行consumer客戶端都創建一個新的queue,並且綁定到對應的exchange,這樣使每次發送消息到exchange時就能把消息由exchange傳遞到所綁定的queue
QueueDeclareOk queue = channel.QueueDeclare();
string queueName = queue.QueueName;
channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: "log", arguments: null);
if (all)
{
channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);
}
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
string message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine($"LOG——日志信息:{message}");
};
channel.BasicConsume(queueName, noAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
public static void Error()
{
var factory = new ConnectionFactory() { HostName = "127.0.0.1" };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
//創建交換機類型為 direct 的交換機
channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);
//創建一個未命名的新的消息隊列,該隊列名稱有系統自動分配,並且為非持久化,在該隊列沒有訂閱時自動刪除的排它隊列
QueueDeclareOk queue = channel.QueueDeclare();
string queueName = queue.QueueName;
//綁定exchange 與 queue 並設置路由鍵為日志級別error
channel.QueueBind(queue: queue, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, arg) =>
{
string message = Encoding.UTF8.GetString(arg.Body);
//寫入日志到txt文件
using (StreamWriter writer = new StreamWriter(@"c:\log\log.txt", true, Encoding.UTF8))
{
writer.WriteLine(message);
writer.Close();
}
};
channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
}
}
}
}
}