RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現的產品,RabbitMQ是一個消息代理,從“生產者”接收消息並傳遞消息至“消費者”,期間可根據規則路由、緩存、持久化消息。“生產者”也即message發送者以下簡稱P,相對應的“消費者”乃message接收者以下簡稱C,message通過queue由P到C,queue存在於RabbitMQ,可存儲盡可能多的message,多個P可向同一queue發送message,多個C可從同一個queue接收message
Broker:消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接裡,可建立多個channel,每個channel代表一個會話任務。
1、客戶端連接到消息隊列服務器,打開一個channel
2、客戶端聲明一個exchange,並設置相關屬性
3、客戶端聲明一個queue,並設置相關屬性
4、客戶端使用routing key,在exchange和queue之間建立好綁定關系
5、客戶端投遞消息到exchange
6、exchange接收到消息後,就根據消息的key和已經設置的binding,將消息投遞到一個或多個隊列裡
# 注:在聲明一個隊列後,如果將其持久化,則下次不需要進行聲明,因為該隊列已經在rabbitMQ中了!!!
1、direct交換機
特點:依據key進行投遞
例如綁定時設置了routing key為”hello”,那麼客戶端提交的消息,只有設置了key為”hello”的才會投遞到隊列。
2、topic交換機
特點:對key模式匹配後進行投遞,符號”#”匹配一個或多個詞,符號” * ”匹配一個詞
例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
3、fanout交換機
特點:不需要key,采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列
""" 當客戶端從隊列中取出消息之後,可能需要一段時間才能處理完成,如果在這個過程中,客戶端出錯了,異常退出了,而數據還沒有處理完成,那麼非常不幸,這段數據就丟失了,因為rabbitmq默認會把此消息標記為已完成,然後從隊列中移除, """
消息確認是客戶端從rabbitmq中取出消息,並處理完成之後,會發送一個ack告訴rabbitmq,消息處理完成,當rabbitmq收到客戶端的獲取消息請求之後,或標記為處理中,當再次收到ack之後,才會標記為已完成,然後從隊列中刪除。當rabbitmq檢測到客戶端和自己斷開鏈接之後,還沒收到ack,則會重新將消息放回消息隊列,交給下一個客戶端處理,保證消息不丟失,也就是說,RabbitMQ給了客戶端足夠長的時間來做數據處理。
在客戶端使用no_ack來標記是否需要發送ack,默認是False,開啟狀態
https://juejin.cn/post/6992383077435572260
python使用rabbitmq服務,可以使用現成的類庫pika、txAMQP或者py-amqplib,這裡選擇了pika
在命令行中直接使用pip命令:
pip install pika
示例測試內容就是從producer.py發送消息到RabbitMQ,consumer從RabbitMQ接受消息
producer.py
# 消息生產者
import json
import time
import pika
# 隊列名稱
queue = 'queue_test'
# 路由關鍵字
routing_key = 'hello'
# 消息交換機名稱
exchange = 'exchange_test'
# 建立連接
hostname = '127.0.0.1'
port = 5672
Credentials = pika.PlainCredentials('guest', 'guest')
Connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=hostname,
port=port,
virtual_host='/', # 虛擬主機
credentials=Credentials
)
)
# 連接建立成功後,建立通道
channel = Connection.channel()
# 創建exchange
channel.exchange_declare(exchange=exchange, exchange_type='direct', durable=True)
# 聲明隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
channel.queue_declare(queue=queue, durable=True)
# 把隊列和消息交換機綁定
channel.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)
# 交換機; 隊列名,寫明將消息發往哪個隊列; 消息內容
# routing_key在使用匿名交換機的時候才需要指定,表示發送到哪個隊列,注意當未定義exchange時,routing_key需和queue的值保持一致
for i in range(200):
data = f'hello word{
i}'
time.sleep(2)
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=data)
print(f'發送。。。{
i}')
Connection.close()
consumer.py
# 消息消費者
import pika
queue = 'queue_test'
hostname = 'localhost'
port = 5672
Credentials = pika.PlainCredentials('guest', 'guest')
ConnectionParam= pika.ConnectionParameters(host=hostname,port=port,credentials=Credentials)
Connection = pika.BlockingConnection(parameters=ConnectionParam)
# 連接建立成功後,建立通道
channel = Connection.channel()
# 創建exchange
channel.queue_declare(queue=queue,durable=True)
# 接受數據
def call_back(ch, method, properties, body):
print(" [x] Received %r" % (body,))
ch.basic_ack(delivery_tag=method.delivery_tag) # 發送ack消息
# 告訴RabbitMQ使用call_back來接受數據
channel.basic_consume(queue=queue,on_message_callback=call_back,auto_ack=False)#no_ack來標記是否需要發送ack,默認是False,開啟狀態
# 開始接收信息,並進入阻塞狀態,隊列裡有信息才會調用callback進行處理,按ctrl+c退出
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
先運行producer.py發送消息
在運行consumer.py接收消息
再看web端的展示:http://localhost:15672/#/
可以看到隊列中消息的數量以及雙方放消息和取消息的時間段
顯示一個隊列
消息持久化 消息確認機制使得客戶端在崩潰的時候,服務端消息不丟失,但是如果rabbitmq奔潰了呢?該如何保證隊列中的消息不丟失? 此就需要product在往隊列中push消息的時候,告訴rabbitmq,此隊列中的消息需要持久化,用到的參數:durable=True,再次強調,Producer和client都應該去創建這個queue,盡管只有一個地方的創建是真正起作用的:
channel.basic_publish(exchange='',
routing_key="test",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
具體代碼:
# 建立連接
hostname = '127.0.0.1'
port = 5672
Credentials = pika.PlainCredentials('guest', 'guest')
Connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=hostname,
port=port,
virtual_host='/', # 虛擬主機
credentials=Credentials
)
)
# 連接建立成功後,建立通道
channel = Connection.channel()
# 創建exchange
channel.exchange_declare(exchange=exchange, exchange_type='direct', durable=True)
# 聲明隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
channel.queue_declare(queue=queue, durable=True)
# 把隊列和消息交換機綁定
channel.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)
# 交換機; 隊列名,寫明將消息發往哪個隊列; 消息內容
# routing_key在使用匿名交換機的時候才需要指定,表示發送到哪個隊列,注意當未定義exchange時,routing_key需和queue的值保持一致
for i in range(200):
data = f'hello word{
i}'
time.sleep(2)
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=data,properties=pika.BasicProperties(delivery_mode=2))
print(f'發送。。。{
i}')
Connection.close()
# 消息消費者
import pika
queue = 'queue_test'
hostname = 'localhost'
port = 5672
Credentials = pika.PlainCredentials('guest', 'guest')
ConnectionParam= pika.ConnectionParameters(host=hostname,port=port,credentials=Credentials)
Connection = pika.BlockingConnection(parameters=ConnectionParam)
# 連接建立成功後,建立通道
channel = Connection.channel()
# 創建exchange
channel.queue_declare(queue=queue,durable=True)
# 接受數據
def call_back(ch, method, properties, body):
print(" [x] Received %r" % (body,))
ch.basic_ack(delivery_tag=method.delivery_tag) # 發送ack消息
# 告訴RabbitMQ使用call_back來接受數據
channel.basic_consume(queue=queue,on_message_callback=call_back,auto_ack=False)#no_ack來標記是否需要發送ack,默認是False,開啟狀態
# 開始接收信息,並進入阻塞狀態,隊列裡有信息才會調用callback進行處理,按ctrl+c退出
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
配置完之後,發現product往rabbitmq端push消息之後,重啟rabbitmq,消息依然存在