根據摩爾定律(Moore’s law),集成電路晶體管的數量差不多每兩年就會翻一倍。但是晶體管數量指數級的增長不一定會導致 CPU 性能的指數級增長。處理器制造商花了很多年來提高時鐘頻率和指令并行。在新一代的處理器上,單線程程序的執行速率確實有所提高。但是,時鐘頻率不可能無限制地提高,如處理器 AMD FX-9590 的時鐘頻率達到5 GHz,這已經非常困難了。如今處理器制造商更喜歡采用多核處理器(multi-core processors)。擁有4核的智能手機已經非常普遍,更不用提手提電腦和臺式機。結果,軟件不得不采用多線程的方式,以便能夠更好的使用硬件。線程池可以幫助程序員更好地利用多核 CPU。
?
線程池
?
好的軟件設計不建議手動創建和銷毀線程。線程的創建和銷毀是非常耗 CPU 和內存的,因為這需要 JVM 和操作系統的參與。64位 JVM 默認線程棧是大小1 MB。這就是為什么說在請求頻繁時為每個小的請求創建線程是一種資源的浪費。線程池可以根據創建時選擇的策略自動處理線程的生命周期。重點在于:在資源(如內存、CPU)充足的情況下,線程池沒有明顯的優勢,否則沒有線程池將導致服務器奔潰。有很多的理由可以解釋為什么沒有更多的資源。例如,在拒絕服務(denial-of-service)攻擊時會引起的許多線程并行執行,從而導致線程饑餓(thread starvation)。除此之外,手動執行線程時,可能會因為異常導致線程死亡,程序員必須記得處理這種異常情況。
?
即使在你的應用中沒有顯式地使用線程池,但是像 Tomcat、Undertow這樣的web服務器,都大量使用了線程池。所以了解線程池是如何工作的,怎樣調整,對系統性能優化非常有幫助。
?
線程池可以很容易地通過 Executors 工廠方法來創建。JDK 中實現 ExecutorService 的類有:
?
-
ForkJoinPool
-
ThreadPoolExecutor
-
ScheduledThreadPoolExecutor
?
這些類都實現了線程池的抽象。下面的一小段代碼展示了 ExecutorService 的生命周期:
?
1 public List<Future<T>> executeTasks(Collection<Callable<T>> tasks) { 2 3 // create an ExecutorService 4 // 創建 ExecutorService 5 final ExecutorService executorService = Executors.newSingleThreadExecutor(); 6 7 // execute all tasks 8 // 執行所有任務 9 final List<Future<T>> executedTasks = executorService.invokeAll(tasks); 10 11 // shutdown the ExecutorService after all tasks have completed 12 // 所有任務執行完后關閉 ExecutorService 13 executorService.shutdown(); 14 15 return executedTasks; 16 17 }?
?
首先,創建一個最簡單的 ExecutorService —— 一個單線程的執行器(executor)。它用一個線程來處理所有的任務。當然,你也可以通過各種方式自定義 ExecutorService,或者使用 Executors 類的工程方法來創建 ExecutorService:
?
newCachedThreadPool() :創建一個 ExecutorService,該 ExecutorService 根據需要來創建線程,可以重復利用已存在的線程來執行任務。
?
newFixedThreadPool(int numberOfThreads) :創建一個可重復使用的、固定線程數量的 ExecutorService。
?
newScheduledThreadPool(int corePoolSize):根據時間計劃,延遲給定時間后創建 ExecutorService(或者周期性地創建 ExecutorService)。
?
newSingleThreadExecutor():創建單個工作線程 ExecutorService。
?
newSingleThreadScheduledExecutor():根據時間計劃延遲創建單個工作線程 ExecutorService(或者周期性的創建)。
?
newWorkStealingPool():創建一個擁有多個任務隊列(以便減少連接數)的 ExecutorService。
?
在上面這個例子里,所有的任務都只執行一次,你也可以使用其他方法來執行任務:
?
-
void execute(Runnable)
-
Future submit(Callable)
-
Future submit(Runnable)
?
最后,關閉 executorService。Shutdown() 是一個非阻塞式方法。調用該方法后,ExecutorService 進入“關閉模式(shutdown mode)”,在該模式下,之前提交的任務都會執行完成,但是不會接收新的任務。如果想要等待任務執行完成,需要調用 awaitTermination() 方法。
?
ExecutorService 是一個非常有用的工具,可以幫助我們很方便執行所有的任務。它的好處在什么地方呢?我們不需要手動創建工作線程。一個工作線程就是 ExecutorService 內部使用的線程。值得注意的是,ExecutorService 管理線程的生命周期。它可以在負載增加的時候增加工作線程。另一方面,在一定周期內,它也可以減少空閑的線程。當我們使用線程池的時候,我們就不再需要考慮線程本身。我們只需要考慮異步處理的任務。此外,當出現不可預期的異常時,我們不再需要重復創建線程,我們也不需要擔心當一個線程執行完任務后的重復使用問題。最后,一個任務提交以后,我們可以獲取一個未來結果的抽象——Future。當然,在 Java 8中,我們可以使用更優秀的 CompletableFuture,如何將一個 Future 轉換為 CompletableFuture 已超出了本文所討論的范圍。但是請記住,只有提交的任務是一個 Callable 時,Future 才有意義,因為 Callable 有輸出結果,而 Runnable 沒有。
?
內部組成
?
每個線程池由幾個模塊組成:
?
-
一個任務隊列,
-
一個工作線程的集合,
-
一個線程工廠,
-
管理線程狀態的元數據。
?
ExecutorService 接口有很多實現,我們重點關注一下最常用的 ThreadPoolExecutor。實際上,newCachedThreadPool()、newFixedThreadPool() 和 newSingleThreadExecutor() 三個方法返回的都是 ThreadPoolExecutor 類的實例。如果要手動創建一個ThreadPoolExecutor 類的實例,至少需要5個參數:
?
-
int corePoolSize:線程池保存的線程數量。
-
int maximumPoolSize:線程的最大數量。
-
long keepAlive and TimeUnit unit:超出 corePoolSize 大小后,線程空閑的時間到達給定時間后將會關閉。
-
BlockingQueue workQueue:提交的任務將被放置在該隊列中等待執行。
-
thread-pool
?
?
阻塞隊列
?
LinkedBlockingQueue 是調用 Executors 類中的方法生成 ThreadPoolExecutor 實例時使用的默認隊列,PriorityBlockingQueue 實際上也是一個BlockingQueue,不過,根據設定的優先級來處理任務也是一個棘手的問題。首先,提交一個 Runnable 或 Callable 任務,該任務被包裝成一個 RunnableFuture,然后添加到隊列中,ProrityBlockingQueue 比較每個對象來決定執行的優先權(比較對象是包裝后的RunnableFuture而不是任務的內容)。不僅如此,當 corePoolSize 大于1并且工作線程空閑時,ThreadPoolExecutor 可能會根據插入順序來執行,而不是 PriorityBlockingQueue 所期望的優先級順序。
?
默認情況下,ThreadPoolExecutor 的工作隊列(workQueue)是沒有邊界的。通常這是沒問題的,但是請記住,沒有邊界的工作隊列可能導致應用出現內存溢出(out of memory)錯誤。如果要限制任務隊列的大小,可以設置 RejectionExecutionHandler。你可以自定義處理器或者從4個已有處理器(默認AbortPolicy)中選擇一個:
?
-
CallerRunsPolicy
-
AbortPolicy
-
DiscardPolicy
-
DiscardOldestPolicy
?
線程工廠
?
線程工廠通常用于創建自定義的線程。例如,你可以增加自定義的 Thread.UncaughtExceptionHandler 或者設置線程名稱。在下面的例子中,使用線程名稱和線程的序號來記錄未捕獲的異常。
?
1 public class LoggingThreadFactory implements ThreadFactory { 2 3 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); 4 private static final String THREAD_NAME_PREFIX = "worker-thread-"; 5 private final AtomicInteger threadCreationCounter = new AtomicInteger(); 6 7 @Override 8 public Thread newThread(Runnable task) { 9 10 int threadNumber = threadCreationCounter.incrementAndGet(); 11 Thread workerThread = new Thread(task, THREAD_NAME_PREFIX + threadNumber); 12 13 workerThread.setUncaughtExceptionHandler(thread, throwable -> logger.error("Thread {} {}", thread.getName(), throwable)); 14 15 return workerThread; 16 17 } 18 }?
?
生產者消費者實例
?
生產者消費者是一種常見的同步多線程處理問題。在這個例子中,我們使用 ExecutorService 解決此問題。但是,這不是解決該問題的教科書例子。我們的目標是演示線程池來處理所有的同步問題,從而程序員可以集中精力去實現業務邏輯。
?
Producer 定期的從數據庫獲取新的數據來創建任務,并將任務提交給 ExecutorService。ExecutorService 管理的線程池中的一個工作線程代表一個 Consumer,用于處理業務任務(如計算價格并返回給客戶)。
?
首先,我們使用 Spring 來配置:
?
1 @Configuration 2 public class ProducerConsumerConfiguration { 3 4 @Bean 5 public ExecutorService executorService() { 6 7 // single consumer 8 return Executors.newSingleThreadExecutor(); 9 } 10 11 // other beans such as a data source, a scheduler, etc. 12 13 }?
?
然后,建立一個 Consumer 及一個 ConsumerFactory。該工程方法通過生產者調用來創建一個任務,在未來的某一個時間點,會有一個工作線程執行該任務。
?
1 public class Consumer implements Runnable { 2 3 private final BusinessTask businessTask; 4 private final BusinessLogic businessLogic; 5 6 public Consumer(BusinessTask businessTask, BusinessLogic businessLogic) { 7 8 this.businessTask = businessTask; 9 this.businessLogic = businessLogic; 10 11 } 12 13 @Override 14 public void run() { 15 16 businessLogic.processTask(businessTask); 17 } 18 19 } 20 21 @Component 22 public class ConsumerFactory { 23 24 private final BusinessLogic businessLogic; 25 26 public ConsumerFactory(BusinessLogic businessLogic) { 27 this.businessLogic = businessLogic; 28 } 29 30 public Consumer newConsumer(BusinessTask businessTask) { 31 return new Consumer(businessTask, businessLogic); 32 } 33 34 }?
?
最后,有一個 Producer 類,用于從數據庫中獲取數據并創建業務任務。在這個例子中,我們假定 fetchData() 是通過 scheduler 周期性調用的。
?
1 @Component 2 public class Producer { 3 4 private final DataRepository dataRepository; 5 private final ExecutorService executorService; 6 private final ConsumerFactory consumerFactory; 7 8 @Autowired 9 public Producer(DataRepository dataRepository, ExecutorService executorService, 10 11 ConsumerFactory consumerFactory) { 12 13 this.dataRepository = dataRepository; 14 this.executorService = executorService; 15 this.consumerFactory = consumerFactory; 16 17 } 18 19 public void fetchAndSubmitForProcessing() { 20 21 List<Data> data = dataRepository.fetchNew(); 22 23 data.stream() 24 // create a business task from data fetched from the database 25 .map(BusinessTask::fromData) 26 // create a consumer for each business task 27 .map(consumerFactory::newConsumer) 28 // submit the task for further processing in the future (submit is a non-blocking method) 29 .forEach(executorService::submit); 30 31 } 32 }?
?
非常感謝 ExecutorService,這樣我們就可以集中精力實現業務邏輯,我們不需要擔心同步問題。上面的演示代碼只用了一個生產者和一個消費者。但是,很容易擴展為多個生產者和多個消費者的情況。
?
總結
?
JDK 5 誕生于2004年,提供很多有用的并發工具,ExecutorService 類就是其中的一個。線程池通常應用于服務器的底層(如 Tomcat 和 Undertow)。當然,線程池也不僅僅局限于服務器環境。在任何密集并行(embarrassingly parallel)難題中它們都非常有用。由于現在越來越多的軟件運行于多核系統上,線程池就更值得關注了。