分而治之
作為最終示例(你肯定對生動的代碼開始生厭並希望回頭去鑽研關於比較性、抽象性准則的語言學探討),讓我們來做一個小型超級計算。然後喝個咖啡。我們的超級計算程序是個非常典型的並行處理模型。我們有:
一個通風機(ventilator)來產生可以並行處理的任務
一組工人(worker)來處理任務
一個水槽(sink)來回收工人處理的結果
事實上,工人運行於超快的機子,沒准是GPU(圖形處理單元)來做困難運算。這是通風機代碼,生成100個任務,每個任務都是一條消息告訴工人休眠(sleep)幾毫秒。
taskvent: Parallel task ventilator in C
[cpp]
//
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers…\n");
// The first message is "0" and signals start of batch
s_send (sink, "0");
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
} www.2cto.com
printf ("Total expected cost: %d msec\n", total_msec);
sleep (1); // Give 0MQ time to deliver
zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
圖 5 - 並行管道
這是工人程序。接收消息,休眠指定的時間,然後表明自己完成任務:
taskwork: Parallel task worker in C
[cpp]
//
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();
// Socket to receive messages on
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Process tasks forever
while (1) {
char *string = s_recv (receiver);
// Simple progress indicator for the viewer
fflush (stdout);
printf ("%s.", string);
// Do the work
s_sleep (atoi (string));
free (string);
// Send results to sink
s_send (sender, "");
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
這是水槽程序。它收集這100個任務,然後計算整個處理消耗的時間,讓我們能夠證實如果有多個工人時他們真的是並行運轉的:
tasksink: Parallel task sink in C
[cpp]
//
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
//
#include "zhelpers.h"
int main (void)
{
// Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Wait for start of batch
char *string = s_recv (receiver);
free (string);
// Start our clock now
int64_t start_time = s_clock ();
// Process 100 confirmations
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
// Calculate and report duration of batch
printf ("Total elapsed time: %d msec\n",
(int) (s_clock () - start_time));
zmq_close (receiver);
zmq_ctx_destroy (context);
return 0;
}
批處理的平均消耗為5秒。當我們啟動1個、2個、4個工人時,我們從水槽取得的結果是這樣的:
[plain]
# 1 worker
Total elapsed time: 5034 msec
# 2 workers
Total elapsed time: 2421 msec
# 4 workers
Total elapsed time: 1018 msec
讓我們更細致的查看這段代碼的某些方面:
工人們上游連接通風機,下游連接水槽。這意味著你可以任意添加工人。如果工人綁定到他們的端點,你會需要(a)更多的端點(b)每添加一個工人都得修改通風機或水槽。我們說通風機和水槽是結構中的“穩定”部分,而工人們是“動態”部分。
我們不得不在批次的開始與所有工人們都起來運行兩者間做出同步。這是一個ØMQ中特別常見的陷阱,也沒有簡單方案。“連接”方法需要一定時間。所以當一組工人連接到通風機,第一個成功連接的工人會在瞬間得到消息的全部負載,而其他人仍在連接。如果你總是不去同步批次的開始,系統完全不會並行運轉。試著移除等待看看。
通風機的推送(PUSH)套接字均勻的分發任務到工人們(假定批次開始送出之前他們都已連接)。這叫做負載均衡,我們會再詳細看看。
水槽的拉取(PULL)套接字均勻的收集工人的成果。這叫做公平隊列。
圖6 - 公平隊列
管道模式也表現出“遲鈍加入者”綜合症,導致了對推送套接字不能正確負載均衡的控訴。如果你使用推送和拉取,而且其中一個工人比其他人得到更多的消息,那是因為他的推送套接字比別人連接的更快,然後在其他人連接達成之前捕獲了一大堆消息。如果你想要正確的負載均衡,你可能想要看看第3章 - 高級請求應答模式中的小節:負載均衡模式。