Master-Worker
- Master-Worker模式是常用的并行計算模式。它的核心思想是系統由兩類進程協作工作:Master進程和Worker進程
- Master負責接收和分配任務,Worker負責處理子任務
- 當各個Worker子進程處理完成后,會將結果返回給Master,由Master做歸納和總結。 其好處是能將一個大任務分解成若干個小任務,并行執行,從而提高系統的吞吐量
- master接收來自client的任務請求,將任務分發給不同的worker任務節點去執行任務,再將最終的任務結果返回給客戶端
- 模擬如下:客戶端、Master和Worker
- master里面用ConcurrentLinkedQueue盛放待處理的任務和HashMap<string,Thread>盛放每個線程,以及將每一個worker的執行結果存放在ConcurrentHashMap?中
- worker需要對任務隊列和線程處理進行映射,并且實現Runnable接口,設立一個集合,存放任務處理完的結果,等處理完之后,將結果集合返還到master的ConcurrentHashMap中,再由Master將結果返回到客戶端
具體代碼如下
- Task.java
package com.example.core.masterworker;public class Task {private int id;private int count;public Task(){}public Task(int id,int count){this.id = id;this.count = count;}public int getId() {return id;}public void setId(int id) {this.id = id;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}
}
- Master.java
package com.example.core.masterworker;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;public class Master {//1 承裝任務的一個容器private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>();//2 承裝worker執行器private HashMap<String,Thread>workers = new HashMap<>();//3 接受worker處理成功的結果集合private ConcurrentHashMap<String,Object>resultMap = new ConcurrentHashMap<>();//4 構造方法里面,要對worker進行一個初始化操作public Master(Worker worker,int workerCount) {//4.1 每一個worker 應該有master任務隊列容器對引用worker.setTaskQueue(this.taskQueue);//4.2 每一個worker 應該有master結果集容器對的引用worker.setResultMap(this.resultMap);//4.3 將所有的worker進行初始化,放入workers容器中for(int i=0;i<workerCount;i++){this.workers.put(Integer.toString(i),new Thread(worker));}}//5 需要一個提交任務的方法public void submit(Task task){this.taskQueue.add(task);}//6 需要一個真正Master所有worker進行工作的方法public void execute(){for(Map.Entry<String,Thread>me:this.workers.entrySet()){me.getValue().start();}}//7 需要一個統計的方法,用于合并結果結合public int getResult(){int sum=0;for(Map.Entry<String,Object>me : resultMap.entrySet()){sum += (Integer)me.getValue();}return sum;}//8,判斷是否所有的worker都完成了工作,如果全部完成就返truepublic boolean isComplete(){for(Map.Entry<String,Thread> me : this.workers.entrySet()){if(me.getValue().getState() != Thread.State.TERMINATED){return false;}}return true;}
}
- worker.java
package com.example.core.masterworker;import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;public class Worker implements Runnable{private ConcurrentLinkedQueue<Task> taskQueue;private ConcurrentHashMap<String,Object> resultMap;//設置任務集合public void setTaskQueue(ConcurrentLinkedQueue<Task>taskQueue){this.taskQueue = taskQueue;}//設置結果集合public void setResultMap(ConcurrentHashMap<String,Object>resultMap){this.resultMap = resultMap;}@Overridepublic void run(){while(true){Task task = this.taskQueue.poll();if(task == null){break;}try{Object result = handle(task);this.resultMap.put(Integer.toString(task.getId()),result);}catch(Exception e){e.printStackTrace();}}}private Random r = new Random();//實際做每一個工作private Object handle(Task task)throws Exception{//每一個任務的處理時間Thread.sleep(200);int ret = task.getCount();return ret;}
}
- Main.java
package com.example.core.masterworker;import java.util.Random;public class Main {public static void main(String[] args) {System.out.println("線程數:"+Runtime.getRuntime().availableProcessors());Master master = new Master(new Worker(),Runtime.getRuntime().availableProcessors());Random r = new Random();for(int i=0;i<100;i++){Task t = new Task(i,r.nextInt(1000));master.submit(t);}master.execute();long start = System.currentTimeMillis();while(true){if(master.isComplete()){long end = System.currentTimeMillis();int result = master.getResult();System.out.println("最終結果為:"+result+",總耗時:"+(end-start));break;}}}
}
/*
output:
線程數:12
最終結果為:48834,總耗時:1819*/
?
?
?
?