放出消息
第二個經典模式是單向數據分發,服務器推送更新到一組客戶端。讓我們看一個推送天氣情況變化的例子,包含地區編碼、溫度、和相對濕度。我們會生成隨機值來模擬真實氣象站。
這是服務器代碼,這個程序我們使用5556端口。
wuserver: Weather update server in C
[cpp]
//
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
#include "zhelpers.h"
int main (void)
{
// Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
assert (rc == 0);
rc = zmq_bind (publisher, "ipc://weather.ipc");
assert (rc == 0);
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// Send message to all subscribers
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
}
更新流既無開始也無結束,像一個永不結束的天氣預報。
圖 4 – 發布-訂閱
這是客戶端程序,監聽更新流並捕獲符合特定地區編碼的所有消息,默認為紐約市因為那是個冒險的好地方:
wuclient: Weather update client in C
[cpp]
//
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
#include "zhelpers.h"
int main (int argc, char *argv [])
{
void *context = zmq_ctx_new ();
// Socket to talk to server
printf ("Collecting updates from weather server…\n");
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");
assert (rc == 0);
// Subscribe to zipcode, default is NYC, 10001
char *filter = (argc > 1) ? argv [1] : "10001 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
assert (rc == 0);
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
}
printf ("Average temperature for zipcode '%s' was %dF\n",
filter, (int) (total_temp / update_nbr));
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}
注意當你使用一個訂閱套接字時你必須使用zmq_setsockopt()和SUBSCRIBE設置一個訂閱,就像這段代碼中那樣。如果你不設置任何訂閱,就得不到任何消息。這是初學者的常見錯誤。訂閱者可以設置很多訂閱,會合並到一起。就是說,如果一個更新匹配任意一個訂閱,訂閱者都會接收。訂閱者也可以取消特定的訂閱。一個訂閱通常是一個可打印字符串,但也不是必須的。參考zmq_setsockopt()看這是怎麼工作的。
發布訂閱套接字對是異步的。客戶端在循環中(或單次如果有必要)做zmq_msg_recv()。嘗試發送消息到訂閱套接字將導致錯誤。同樣的,服務按所需頻率做zmq_msg_send(),但絕不能對發布套接字做zmq_msg_recv()。
理論上,ØMQ套接字不在乎哪一端來連接哪一端來綁定。但是在實踐中會有未公開的差異,待會我會提及。現在,綁定發布並連接訂閱,除非你的網絡設計導致這無法實現。
還有一個關於發布訂閱套接字的重要事項:你無法精確知道訂閱者什麼時候開始獲取消息。即使你先啟動一個訂閱者,過一會再啟動發布者,訂閱者總是會錯過發布者發送的第一條消息。這是因為訂閱者連接到發布者時(占用了短暫但非零的時間),發布者可能已經將消息發送出去了。
這種“遲鈍加入者”症狀擊中了很多人、很多次,我們需要詳細解釋一下。記住ØMQ是異步I/O的,也就是在後台。比如說你有兩個節點按這個順序這麼做:
訂閱者連接到一個端點並接收和計數消息
發布者綁定到一個端點並立刻發送1000條消息
那麼訂閱者很可能不會接收到任何東西。你可以眨眨眼,檢查一下是否設置了正確的過濾器,再試試,然而訂閱者還是不會接收任何東西。
建立TCP連接牽涉到大概幾毫秒的來回握手,這取決於網絡狀況和節點之間跳躍的次數。這個時間裡,ØMQ已經可以發送好多消息了。為了證明,假設花費5毫秒來建立連接,而相同的鏈路可以搞定1M每秒的消息。在訂閱者連接到發布者的5毫秒裡,發布者僅耗費1毫秒就將這1K的消息發送出去了。
在第2章 - 套接字與模式中我們會解釋如何讓發布者和訂閱者同步,以便無需等待訂閱者(們)都已連接並就緒時才開始發布數據。有一個簡單的笨辦法來延遲發布者,通過睡眠sleep。但是在真實程序中可不要這麼做,因為這極為脆弱、不雅而緩慢。先用sleep向自己證明到底發生了什麼,然後等到第2章再看正確做法。
同步的另一種替代方案是簡單的假設發布的數據流是無限的,沒有開始也沒有結束。還假設訂閱者不在乎它啟動之前發生過什麼。這就是我們建造的例子天氣客戶端的方式。
客戶端訂閱到選定的地區編碼並收集1000個更新。也就是大概1000萬服務器更新,如果地區編碼是隨機分發的。你可以先啟動客戶端,再啟動服務器,而客戶端會持續工作。你可以隨時停止並重啟服務器,而客戶端會持續工作。當客戶端收集到了1000個更新,它計算出平均值,打印輸出,然後退出。
關於發布訂閱模式的一些要點:
訂閱者可以每次調用“connect”來連接到一個以上的發布者。數據會交錯到達(“公平隊列”)以避免發布者相互淹沒。
如果一個發布者沒有已連接的訂閱者,那麼它直接丟棄所有消息。
如果你使用TCP時一個訂閱者很慢,消息會在發布者那裡排上隊。我們待會看看這種情況下如何用“高水位線”來保護發布者。
從ØMQ 3.x開始,使用已連接協議(tcp: 或 ipc:)將在發布者方面進行過濾,使用epgm://協議時,會在訂閱者一方進行過濾。在ØMQ2.x中所有過濾都在訂閱者方面進行。
如下是在我的筆記本電腦中接收過濾10M消息花費的時間,電腦配置是2011-era Intel i5,體面但也沒啥特殊的。
[plain]
$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F
real 0m4.470s
user 0m0.000s
sys 0m0.008s