前言:前面我們講解的都是本地服務器,現在如果需要遠程計算機上運行一個函數,等待結果。這就是一個不同的故事了,這種模式通常被稱為遠程過程調用或者RPC。
本章教程我們使用RabbitMQ搭建一個RPC系統,一個客戶端和一個可擴展的RPC服務器,現在我們開始吧。
一般做rpc在RabbitMQ是比較容易的,一個客戶端發送一個請求信息和一個響應信息的服務器回復,為了得到一個響應,我們需要發送一個回調隊列地址請求。如下
Message屬性:
AMQP協議一共預定義了14個屬性,但是大多數屬性很少使用,下面幾個可能用的比較多
deliveryMode:有2個值,一個是持久,另一個表示短暫(第二篇說過)
contentType:內容類型:用來描述編碼的MIME類型。例如,經常使用JSON編碼是將此屬性設置為一個很好的做法:application/json。
replyTo:經常使用的是回調隊列的名字
correlationid:RPC響應請求的相關應用
在隊列上接收到一個響應,但它並不清楚響應屬於哪一個,當我們使用CorrelationId屬性的時候,我們就可以將它設置為每個請求的唯一值,稍後當我們在回調隊列中接收消息的時候,我們會看到這個屬性,如果我們看到一個未知的CorrelationId,我們就可以安全地忽略信息-它不屬於我們的請求。為什麼我們應該忽略未知的消息在回調隊列中,而不是失敗的錯誤?這是由於服務器端的一個競爭條件的可能性。比如還未發送了一個確認信息給請求,但是此時RPC服務器掛了。如果這種情況發生,將再次重啟RPC服務器處理請求。這就是為什麼在客戶端必須處理重復的反應。
我們的rpc工作方式如下:
1:當客戶端啟動時,它創建一個匿名的獨占回調隊列。
2:對於rpc請求,客戶端發送2個屬性,一個是replyTo設置回調隊列,另一是correlationId為每個隊列設置唯一值
3:請求被發送到一個rpc_queue隊列中
4:rpc服務器是等待隊列的請求,當收到一個請求的時候,他就把消息返回的結果返回給客戶端,使請求結束。
5:客戶端等待回調隊列上的數據,當消息出現的時候,他檢查correlationId,如果它和從請求返回的值匹配,就進行響應。
RPCServer.Java
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) { return 0; } if (n == 1) { return 1; } return fib(n - 1) + fib(n - 1); } public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("RPCServer Awating RPC request"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new AMQP.BasicProperties.Builder(). correlationId(props.getCorrelationId()).build(); String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("RPCServer fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
服務器代碼比較簡單
1:建立連接,通道,隊列
2:我們可能運行多個服務器進程,為了分散負載服務器壓力,我們設置channel.basicQos(1);
3:我們用basicconsume訪問隊列。然後進入循環,在其中我們等待請求消息並處理消息然後發送響應。
RPCClient.java
public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws IOException, InterruptedException { String response; String corrID = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .correlationId(corrID).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrID)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] args) throws Exception { RPCClient rpcClient = null; String response; try { rpcClient = new RPCClient(); System.out.println("RPCClient Requesting fib(20)"); response = rpcClient.call("20"); System.out.println("RPCClient Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (rpcClient != null) { rpcClient.close(); } } } }
客戶端代碼解讀
1:建立一個連接和通道,並聲明了一個唯一的“回調”隊列的答復
2:我們訂閱回調隊列,這樣就可以得到RPC的響應
3:定義一個call方法用於發送當前的回調請求
4:生成一個唯一的correlationid,然後通過while循環來捕獲合適的回應
5:我們請求信息,發送2個屬性,replyTo 和correlationId
6:然後就是等待直到有合適的回應到達
7:while循環是做一個非常簡單的工作,對於每一個響應消息,它檢查是否有correlationid然後進行匹配。然後是就進行響應。
8:最後把響應返回到客戶端。