放出消息 第二個經典模式是單向數據分發,服務器推送更新到一組客戶端。讓我們看一個推送天氣情況變化的例子,包含地區編碼、溫度、和相對濕度。我們會生成隨機值來模擬真實氣象站。 這是服務器代碼,這個程序我們使用5556端口。 wuserver: Weather update server in C [cpp] // // Weather update server // Binds PUB socket to tcp://*:5556 // Publishes random weather updates // #include "zhelpers.h" int main (void) { // Prepare our context and publisher void *context = zmq_ctx_new (); void *publisher = zmq_socket (context, ZMQ_PUB); int rc = zmq_bind (publisher, "tcp://*:5556"); assert (rc == 0); rc = zmq_bind (publisher, "ipc://weather.ipc"); assert (rc == 0); // Initialize random number generator srandom ((unsigned) time (NULL)); while (1) { // Get values that will fool the boss int zipcode, temperature, relhumidity; zipcode = randof (100000); temperature = randof (215) - 80; relhumidity = randof (50) + 10; // Send message to all subscribers char update [20]; sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity); s_send (publisher, update); } zmq_close (publisher); zmq_ctx_destroy (context); return 0; } 更新流既無開始也無結束,像一個永不結束的天氣預報。 圖 4 – 發布-訂閱 這是客戶端程序,監聽更新流並捕獲符合特定地區編碼的所有消息,默認為紐約市因為那是個冒險的好地方: wuclient: Weather update client in C [cpp] // // Weather update client // Connects SUB socket to tcp://localhost:5556 // Collects weather updates and finds avg temp in zipcode // #include "zhelpers.h" int main (int argc, char *argv []) { void *context = zmq_ctx_new (); // Socket to talk to server printf ("Collecting updates from weather server…\n"); void *subscriber = zmq_socket (context, ZMQ_SUB); int rc = zmq_connect (subscriber, "tcp://localhost:5556"); assert (rc == 0); // Subscribe to zipcode, default is NYC, 10001 char *filter = (argc > 1) ? argv [1] : "10001 "; rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter)); assert (rc == 0); // Process 100 updates int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { char *string = s_recv (subscriber); int zipcode, temperature, relhumidity; sscanf (string, "%d %d %d", &zipcode, &temperature, &relhumidity); total_temp += temperature; free (string); } printf ("Average temperature for zipcode '%s' was %dF\n", filter, (int) (total_temp / update_nbr)); zmq_close (subscriber); zmq_ctx_destroy (context); return 0; } 注意當你使用一個訂閱套接字時你必須使用zmq_setsockopt()和SUBSCRIBE設置一個訂閱,就像這段代碼中那樣。如果你不設置任何訂閱,就得不到任何消息。這是初學者的常見錯誤。訂閱者可以設置很多訂閱,會合並到一起。就是說,如果一個更新匹配任意一個訂閱,訂閱者都會接收。訂閱者也可以取消特定的訂閱。一個訂閱通常是一個可打印字符串,但也不是必須的。參考zmq_setsockopt()看這是怎麼工作的。 發布訂閱套接字對是異步的。客戶端在循環中(或單次如果有必要)做zmq_msg_recv()。嘗試發送消息到訂閱套接字將導致錯誤。同樣的,服務按所需頻率做zmq_msg_send(),但絕不能對發布套接字做zmq_msg_recv()。 理論上,ØMQ套接字不在乎哪一端來連接哪一端來綁定。但是在實踐中會有未公開的差異,待會我會提及。現在,綁定發布並連接訂閱,除非你的網絡設計導致這無法實現。 還有一個關於發布訂閱套接字的重要事項:你無法精確知道訂閱者什麼時候開始獲取消息。即使你先啟動一個訂閱者,過一會再啟動發布者,訂閱者總是會錯過發布者發送的第一條消息。這是因為訂閱者連接到發布者時(占用了短暫但非零的時間),發布者可能已經將消息發送出去了。 這種“遲鈍加入者”症狀擊中了很多人、很多次,我們需要詳細解釋一下。記住ØMQ是異步I/O的,也就是在後台。比如說你有兩個節點按這個順序這麼做: 訂閱者連接到一個端點並接收和計數消息 發布者綁定到一個端點並立刻發送1000條消息 那麼訂閱者很可能不會接收到任何東西。你可以眨眨眼,檢查一下是否設置了正確的過濾器,再試試,然而訂閱者還是不會接收任何東西。 建立TCP連接牽涉到大概幾毫秒的來回握手,這取決於網絡狀況和節點之間跳躍的次數。這個時間裡,ØMQ已經可以發送好多消息了。為了證明,假設花費5毫秒來建立連接,而相同的鏈路可以搞定1M每秒的消息。在訂閱者連接到發布者的5毫秒裡,發布者僅耗費1毫秒就將這1K的消息發送出去了。 在第2章 - 套接字與模式中我們會解釋如何讓發布者和訂閱者同步,以便無需等待訂閱者(們)都已連接並就緒時才開始發布數據。有一個簡單的笨辦法來延遲發布者,通過睡眠sleep。但是在真實程序中可不要這麼做,因為這極為脆弱、不雅而緩慢。先用sleep向自己證明到底發生了什麼,然後等到第2章再看正確做法。 同步的另一種替代方案是簡單的假設發布的數據流是無限的,沒有開始也沒有結束。還假設訂閱者不在乎它啟動之前發生過什麼。這就是我們建造的例子天氣客戶端的方式。 客戶端訂閱到選定的地區編碼並收集1000個更新。也就是大概1000萬服務器更新,如果地區編碼是隨機分發的。你可以先啟動客戶端,再啟動服務器,而客戶端會持續工作。你可以隨時停止並重啟服務器,而客戶端會持續工作。當客戶端收集到了1000個更新,它計算出平均值,打印輸出,然後退出。 關於發布訂閱模式的一些要點: 訂閱者可以每次調用“connect”來連接到一個以上的發布者。數據會交錯到達(“公平隊列”)以避免發布者相互淹沒。 如果一個發布者沒有已連接的訂閱者,那麼它直接丟棄所有消息。 如果你使用TCP時一個訂閱者很慢,消息會在發布者那裡排上隊。我們待會看看這種情況下如何用“高水位線”來保護發布者。 從ØMQ 3.x開始,使用已連接協議(tcp: 或 ipc:)將在發布者方面進行過濾,使用epgm://協議時,會在訂閱者一方進行過濾。在ØMQ2.x中所有過濾都在訂閱者方面進行。 如下是在我的筆記本電腦中接收過濾10M消息花費的時間,電腦配置是2011-era Intel i5,體面但也沒啥特殊的。 [plain] $ time wuclient Collecting updates from weather server... Average temperature for zipcode '10001 ' was 28F real 0m4.470s user 0m0.000s sys 0m0.008s