必備知識:
三種創建線程的方式
java線程池
CompletionService
是Java并發庫中的一個接口,用于簡化處理一組異步任務的執行和結果收集。它結合了Executor和BlockingQueue的功能,幫助管理任務的提交和完成。CompletionService的主要實現類是ExecutorCompletionService
。
ExecutorCompletionService
例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;public class CompletionServiceExample {public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(3);CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);// 提交一組任務List<Callable<Integer>> tasks = new ArrayList<>();for (int i = 0; i < 5; i++) {final int index = i;tasks.add(() -> {TimeUnit.SECONDS.sleep(index);return index;});}for (Callable<Integer> task : tasks) {completionService.submit(task);}// 獲取任務結果for (int i = 0; i < tasks.size(); i++) {Future<Integer> future = completionService.take();Integer result = future.get();System.out.println("Task completed with result: " + result);}// 關閉線程池executorService.shutdown();}
}
public class ExecutorCompletionService<V> implements CompletionService<V> {private final Executor executor;private final BlockingQueue<Future<V>> completionQueue;public ExecutorCompletionService(Executor executor) {this.executor = executor;this.completionQueue = new LinkedBlockingQueue<>();}public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {this.executor = executor;this.completionQueue = completionQueue;}@Overridepublic Future<V> submit(Callable<V> task) {RunnableFuture<V> f = new FutureTask<>(task);executor.execute(new QueueingFuture(f));return f;}@Overridepublic Future<V> submit(Runnable task, V result) {RunnableFuture<V> f = new FutureTask<>(task, result);executor.execute(new QueueingFuture(f));return f;}@Overridepublic Future<V> take() throws InterruptedException {return completionQueue.take();}@Overridepublic Future<V> poll() {return completionQueue.poll();}@Overridepublic Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {return completionQueue.poll(timeout, unit);}private class QueueingFuture extends FutureTask<V> {QueueingFuture(RunnableFuture<V> task) {super(task);}@Overrideprotected void done() {completionQueue.add(this);}}
}