分而治之 作為最終示例(你肯定對生動的代碼開始生厭並希望回頭去鑽研關於比較性、抽象性准則的語言學探討),讓我們來做一個小型超級計算。然後喝個咖啡。我們的超級計算程序是個非常典型的並行處理模型。我們有: 一個通風機(ventilator)來產生可以並行處理的任務 一組工人(worker)來處理任務 一個水槽(sink)來回收工人處理的結果 事實上,工人運行於超快的機子,沒准是GPU(圖形處理單元)來做困難運算。這是通風機代碼,生成100個任務,每個任務都是一條消息告訴工人休眠(sleep)幾毫秒。 taskvent: Parallel task ventilator in C [cpp] // // Task ventilator // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket // #include "zhelpers.h" int main (void) { void *context = zmq_ctx_new (); // Socket to send messages on void *sender = zmq_socket (context, ZMQ_PUSH); zmq_bind (sender, "tcp://*:5557"); // Socket to send start of batch message on void *sink = zmq_socket (context, ZMQ_PUSH); zmq_connect (sink, "tcp://localhost:5558"); printf ("Press Enter when the workers are ready: "); getchar (); printf ("Sending tasks to workers…\n"); // The first message is "0" and signals start of batch s_send (sink, "0"); // Initialize random number generator srandom ((unsigned) time (NULL)); // Send 100 tasks int task_nbr; int total_msec = 0; // Total expected cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // Random workload from 1 to 100msecs workload = randof (100) + 1; total_msec += workload; char string [10]; sprintf (string, "%d", workload); s_send (sender, string); } www.2cto.com printf ("Total expected cost: %d msec\n", total_msec); sleep (1); // Give 0MQ time to deliver zmq_close (sink); zmq_close (sender); zmq_ctx_destroy (context); return 0; } 圖 5 - 並行管道 這是工人程序。接收消息,休眠指定的時間,然後表明自己完成任務: taskwork: Parallel task worker in C [cpp] // // Task worker // Connects PULL socket to tcp://localhost:5557 // Collects workloads from ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to sink via that socket // #include "zhelpers.h" int main (void) { void *context = zmq_ctx_new (); // Socket to receive messages on void *receiver = zmq_socket (context, ZMQ_PULL); zmq_connect (receiver, "tcp://localhost:5557"); // Socket to send messages to void *sender = zmq_socket (context, ZMQ_PUSH); zmq_connect (sender, "tcp://localhost:5558"); // Process tasks forever while (1) { char *string = s_recv (receiver); // Simple progress indicator for the viewer fflush (stdout); printf ("%s.", string); // Do the work s_sleep (atoi (string)); free (string); // Send results to sink s_send (sender, ""); } zmq_close (receiver); zmq_close (sender); zmq_ctx_destroy (context); return 0; } 這是水槽程序。它收集這100個任務,然後計算整個處理消耗的時間,讓我們能夠證實如果有多個工人時他們真的是並行運轉的: tasksink: Parallel task sink in C [cpp] // // Task sink // Binds PULL socket to tcp://localhost:5558 // Collects results from workers via that socket // #include "zhelpers.h" int main (void) { // Prepare our context and socket void *context = zmq_ctx_new (); void *receiver = zmq_socket (context, ZMQ_PULL); zmq_bind (receiver, "tcp://*:5558"); // Wait for start of batch char *string = s_recv (receiver); free (string); // Start our clock now int64_t start_time = s_clock (); // Process 100 confirmations int task_nbr; for (task_nbr = 0; task_nbr < 100; task_nbr++) { char *string = s_recv (receiver); free (string); if ((task_nbr / 10) * 10 == task_nbr) printf (":"); else printf ("."); fflush (stdout); } // Calculate and report duration of batch printf ("Total elapsed time: %d msec\n", (int) (s_clock () - start_time)); zmq_close (receiver); zmq_ctx_destroy (context); return 0; } 批處理的平均消耗為5秒。當我們啟動1個、2個、4個工人時,我們從水槽取得的結果是這樣的: [plain] # 1 worker Total elapsed time: 5034 msec # 2 workers Total elapsed time: 2421 msec # 4 workers Total elapsed time: 1018 msec 讓我們更細致的查看這段代碼的某些方面: 工人們上游連接通風機,下游連接水槽。這意味著你可以任意添加工人。如果工人綁定到他們的端點,你會需要(a)更多的端點(b)每添加一個工人都得修改通風機或水槽。我們說通風機和水槽是結構中的“穩定”部分,而工人們是“動態”部分。 我們不得不在批次的開始與所有工人們都起來運行兩者間做出同步。這是一個ØMQ中特別常見的陷阱,也沒有簡單方案。“連接”方法需要一定時間。所以當一組工人連接到通風機,第一個成功連接的工人會在瞬間得到消息的全部負載,而其他人仍在連接。如果你總是不去同步批次的開始,系統完全不會並行運轉。試著移除等待看看。 通風機的推送(PUSH)套接字均勻的分發任務到工人們(假定批次開始送出之前他們都已連接)。這叫做負載均衡,我們會再詳細看看。 水槽的拉取(PULL)套接字均勻的收集工人的成果。這叫做公平隊列。 圖6 - 公平隊列 管道模式也表現出“遲鈍加入者”綜合症,導致了對推送套接字不能正確負載均衡的控訴。如果你使用推送和拉取,而且其中一個工人比其他人得到更多的消息,那是因為他的推送套接字比別人連接的更快,然後在其他人連接達成之前捕獲了一大堆消息。如果你想要正確的負載均衡,你可能想要看看第3章 - 高級請求應答模式中的小節:負載均衡模式。