Master-Worker模式是常用的並行設計模式。它的核心思想是,系統有兩個進程協議工作:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完後,將結果返回給Master進程,由Master進行歸納和匯總,從而得到系統結果。處理過程如下圖:
Master-Worker模式的好處是,它能將大任務分解成若干個小任務,並發執行,從而提高系統性能。而對於系統請求者Client來說,任務一旦提交,Master進程就會立刻分配任務並立即返回,並不會等系統處理完全部任務再返回,其處理過程是異步的。
Master-Worker模式的主要結構如下圖:
如上圖所示,Master進程是主要進程,它維護著一個Worker進程隊列、子任務隊列和子結果集,Worker進程中的Worker進程不斷的從任務隊列中提取要處理的子任務,並將子任務的處理結果放入到子結果集中。
在上圖中,Master:用於任務的分配和最終結果的合並;Worker:用於實際處理一個任務;客戶端進程:用於啟動系統,調度開啟Master。
Master代碼實現:
1 public class Master { 2 //任務隊列 3 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>(); 4 //worker進程隊列 5 protected Map<String, Thread> threadMap = new HashMap<String, Thread>(); 6 //結果集 7 protected Map<String, Object> resultMap = new HashMap<String,Object>(); 8 9 //是否所有的子任務都結束 10 11 public boolean isComplete(){ 12 for(Map.Entry<String, Thread> entry:threadMap.entrySet()){ 13 if(entry.getValue().getState()!=Thread.State.TERMINATED){ 14 return false; 15 } 16 } 17 return true; 18 } 19 20 public Master(Worker worker,int countWorker) { 21 worker.setResultMap(resultMap); 22 worker.setWorkQueue(workQueue); 23 for (int i = 0; i < countWorker; i++) { 24 threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i))); 25 } 26 } 27 28 //提交任務 29 30 public void submit(Object obj){ 31 workQueue.add(obj); 32 //System.out.println(obj.toString()); 33 } 34 35 36 37 //返回子任務結果集 38 public Map<String, Object> getResultMap() { 39 return resultMap; 40 } 41 42 //開始運行所有worker進程,並進行處理 43 44 public void execute(){ 45 for(Map.Entry<String, Thread> entry:threadMap.entrySet()){ 46 entry.getValue().start(); 47 } 48 } 49 50 }
Worker代碼實現:
1 public class Worker implements Runnable { 2 //任務隊列 3 protected Queue<Object> workQueue; 4 //子任務結果集 5 protected Map<String,Object> resultMap = new HashMap<String, Object>(); 6 7 8 public void setWorkQueue(Queue<Object> workQueue) { 9 this.workQueue = workQueue; 10 } 11 public void setResultMap(Map<String, Object> resultMap) { 12 this.resultMap = resultMap; 13 } 14 15 public Object handle(Object input){ 16 return input; 17 } 18 @Override 19 public void run() { 20 while(true){ 21 Object input = workQueue.poll(); 22 23 if(null==input) break; 24 //處理子任務 25 Object re = handle(input); 26 resultMap.put(Integer.toString(input.hashCode()),re); 27 //System.out.println(re.toString()); 28 } 29 } 30 31 }
Master-Worker模式是一種串行任務並行化的方法,被分解的子任務在系統中可以並行處理。同時,如果有需要,Master進程不需要所有子任務都執行完成,就可以根據已有的部分結果集計算最終的結果。
現在以上面的Master-Worker實現為基礎,來實現計算1-100的立方和。計算將被分解為100個子任務,每個子任務僅用於計算單獨的立方和。Master產生固定數目Worker,來處理這些子任務。Worker不斷的從任務集合中取出這些計算立方和的子任務,並將計算結果放入到Master的結果集中。Master負責將所有Worker的任務結果進行累加,從而產生最終的立方和。整個計算過程,Worker和Master的運算也是完全異步的,Master進程不必等所有的Worker進程都執行完成,就可以進行求和操作了。也就是所,Master在獲取部分子任務的結果集時,就可以對最終結果進行計算了,從而提高了系統的並發性和吞吐量。
計算子任務的實現如下:
1 public class PlusWorker extends Worker { 2 3 @Override 4 public Object handle(Object input) { 5 Integer i = (Integer) input; 6 return i*i*i; 7 } 8 9 }
客戶端代碼如下:
1 public class Client { 2 public static void main(String[] args) { 3 Master m = new Master(new PlusWorker(), 5);//啟動五個線程處理 4 for (int i = 0; i < 100; i++) { 5 m.submit(i); 6 } 7 m.execute(); 8 int re = 0; 9 Map<String, Object> resultMap = m.getResultMap(); 10 while(resultMap.size()>0||!m.isComplete()){ 11 Set<String> keys = resultMap.keySet(); 12 String key = null; 13 for(String k:keys){ 14 key=k; 15 break; 16 } 17 Integer i = null; 18 if(key != null){ 19 i = (Integer) resultMap.get(key); 20 } 21 if(i!=null){ 22 re+=i;//並行計算結果集 23 } 24 25 if(key!=null){ 26 resultMap.remove(key);//將計算完成的結果移除 27 } 28 } 29 30 System.out.println(re); 31 } 32 }
通過Master創建5個Worker工作線程和PlusWorker工作實例。提交完100個任務後,就開始計算子任務。這些子任務,由生成的5個Worker線程共同完成。Master並不等所有的子任務都計算完成,就開始訪問子結果集進行最終結果的計算,直到子結果集中所有的數據都被處理,並且5個活躍的Worker線程全部終止,才能求出最終結果。