Java為我們提供了許多啟動線程和管理線程的方法。在本文中,我們將介紹一些在Java中進行并發編程的選項。我們將介紹結構化并發的概念,然后討論Java 21中一組預覽類——它使將任務拆分為子任務、收集結果并對其進行操作變得非常容易,而且不會不小心留下任何掛起的任務。
1 基礎方法
通過Lambda表達式啟動平臺線程的這種創建線程的方法最簡單,適用于簡單情況。
// Lambda表達式啟動平臺線程的一種方法。
Thread.ofPlatform().start(() -> {// 在這里執行在獨立線程上運行的操作});
問題
- 創建平臺線程是昂貴的
- 若應用程序用戶量很大,平臺線程數量可能增長到超出JVM支持的限制
顯然,大多數應用程序服務器不鼓勵這種行為。因此,繼續下一種方法——Java Futures。
2 Java Future類
JDK 5引入,開發者需要改變思考方式。不再考慮啟動新線程,而考慮將“任務”提交到線程池以供執行。JDK 5還引入ExecutorService,任務將提交到該服務。ExecutorService是一個定義了提交任務并返回Java Future的機制的接口。提交的任務需實現Runnable或Callable接口。
任務提交給表示單線程線程池
// 將Callable任務提交給表示單線程線程池的ExecutorServiceExecutorService service = Executors.newSingleThreadExecutor();
Future<String> future = service.submit(() -> {// 進行一些工作并返回數據return "Done";
});
// 在這里執行其他任務// 阻塞直到提交的任務完成
String output = future.get();// 打印 "Done"
System.out.println(output);// 繼續執行后續任務
多個任務提交到ExecutorService
try (ExecutorService service = Executors.newFixedThreadPool(3)) {Future<TaskResult> future1 = service.submit(() -> { // 執行任務1并返回TaskResult });Future<TaskResult> future2 = service.submit(() -> { // 執行任務2并返回TaskResult }); Future<TaskResult> future3 = service.submit(() -> { // 執行任務3并返回TaskResult });/* 所有異常上拋 */// get()將阻塞直到任務1完成TaskResult result1 = future1.get();// get()將阻塞直到任務2完成TaskResult result2 = future2.get();// get()將阻塞直到任務3完成TaskResult result3 = future3.get();// 處理result1、result2、result3handleResults(result1, result2, result3);
}
所有這些任務將并行運行,然后父線程可用future.get()方法檢索每個任務的結果。
3 上述實現的問題
如在上面代碼中用Platform線程,則存在一個問題。獲取TaskResult的get()方法將阻塞線程,由于與阻塞Platform線程相關的可擴展性問題,這代價可能很昂貴。然而,使用Java 21——如用Virtual Threads,則在get()期間,底層的平臺線程不會被阻塞。
若task2、task3在task1前完成,須等到task1完成,然后處理task2和task3結果。
若task2或task3執行過程失敗,則問題更糟。假設整個用例應在任何任務失敗時就失敗,代碼將等到task1完成,然后拋異常。這不是我們的期望,它將為最終用戶創建一個非常遲鈍的體驗。
3.1 基本問題
ExecutorService類對提交給它的各種任務之間關系一無所知。因此,它不知道若一個任務失敗,該發生點啥。即示例中提交的三個任務被視為獨立任務,而非用例的一部分。這并不是ExecutorService類的失敗,因為它沒有設計為處理提交的任務之間的任何關系。
3.2 另一個問題
ExecutorService周圍使用try-with-resources塊,確保在try塊退出時調用ExecutorService的close方法。close方法確保所有提交給執行器服務的任務在繼續執行之前終止。
若用例要求在任何任務失敗時立即失敗,那我們運氣不好。close方法將等待所有提交的任務完成。
但若不用try-with-resources塊,則不能保證在塊退出前三個任務都結束。將保留未清理終止的“未明確終止的線程”。任何其他自定義實現都須確保在失敗時立即取消其他任務。
因此,盡管用Java Future是處理可拆分為子任務的任務的一種不錯方法,但還不夠完美。開發須將用例的“感知”編碼到邏輯中,但這很難!
注意,對Platform線程存在于Java Futures的問題之一即阻塞問題——Java 21使用Virtual線程時,這問題不再存在。因為使用Virtual Threads時,使用future.get()方法阻塞線程將釋放底層的Platform線程。
使用CompletableFuture Pipelines也可解決阻塞問題,但這里不深入探討。有更簡單的方法來解決Java 21阻塞問題,沒錯就是Virtual Threads!但我們需要找到一種更好解決方案,以處理可拆分為多個子任務且“知道”用例的任務。這就引出結構化并發的基本思想。
4 結構化并發
想象,從方法內部向ExecutorService提交的任務,然后方法退出。現在更難推斷代碼,因為不知道此提交的任務可能的副作用,且這可能導致難以調試的問題。該問題的圖解:
結構化并發基本思想是從一個塊(方法或塊)內啟動的所有任務應在該塊結束前終止。即:
代碼的結構邊界(塊)
和該塊內提交的任務的運行時邊界
重合。這使應用程序代碼更容易理解,因為一個塊內提交的所有任務的執行效果都被限制在該塊內。塊外查看代碼時,不必擔心任務是否仍在運行。
ExecutorService的try-with-resources塊是對結構化并發的一次良好嘗試,其中從塊內提交的所有任務在塊退出時完成。但它還不夠,因為它可能導致父線程等待時間超過必要時間。其改進版——StructuredTaskScope。
5 StructuredTaskScope
Java 21 Virtual Thread作為一項功能被引入,它在大多情況下實際上消除了阻塞問題。但即使使用Virtual線程和Futures,仍存在“不干凈終止任務”和“等待時間比必要時間長”的問題。
StructuredTaskScope類在Java 21中作為預覽功能提供,旨在解決這問題。它試圖提供比Executor Service的try-with-resources塊更干凈的結構化并發模型。StructuredTaskScope類知道提交的任務之間的關系,因此它可對它們進行更智能假設。
使用StructuredTaskScope的示例
在任一任務失敗時,立即返回用例。
StructuredTaskScope.ShutdownOnFailure()返回一個StructuredTaskScope的引用,該引用知道若一個任務失敗,那其他任務也須終止,因為它“知道”提交的任務之間的關系。
try(var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 想象一下LongRunningTask實現Suppliervar dataTask = new LongRunningTask("dataTask", ...); var restTask = new LongRunningTask("restTask", ...); // 并行運行任務Subtask<TaskResponse> dataSubTask = scope.fork(dataTask); Subtask<TaskResponse> restSubTask = scope.fork(restTask); // 等待所有任務成功完成或第一個子任務失敗。 // 如果一個失敗,向所有其他子任務發送取消請求// 在范圍上調用join方法,等待兩個任務都完成或如果一個任務失敗scope.join(); scope.throwIfFailed(); // 處理成功的子任務結果 System.out.println(dataSubTask.get()); System.out.println(restSubTask.get()); }
企業用例
其中兩個任務可并行運行:
- 一個DB任務
- 一個Rest API任務
目標是并行運行這些任務,然后將結果合并到單個對象中并返回。
調用ShutdownOnFailure()靜態方法創建一個StructuredTaskScope類。然后使用StructuredTaskScope對象fork方法(將fork方法考慮為submit方法)并行運行兩個任務。幕后,StructuredTaskScope類默認使用Virtual線程來運行任務。每次fork一個任務,都創建一個新Virtual線程(Virtual線程永不會被池化)并運行任務。
然后在范圍上調用join方法,等待兩個任務都完成或如果一個任務失敗。更重要的——若一個任務失敗,join()方法將自動向其他任務(剩余運行任務)發送取消請求并等待其終止。這很重要,因為取消請求將確保在塊退出時沒有不必要的懸掛任務。
若其他線程向父線程發取消請求,也是如此。在最后,若塊內部任何位置拋異常——StructuredTaskScope的close方法將確保向子任務發送取消請求并終止任務。StructuredTaskScope美妙在于——若子線程創建自己的StructuredTaskScope(子任務本身有自己的子任務),取消時它們都會得到干凈處理。
開發在這里的一個職責是確保它們編寫的任務須正確處理在取消期間設置在線程上的中斷標志。任務有責任讀取此中斷標志并干凈終止自己。若任務未正確處理中斷標志,那用例的響應性將受影響。
6 使用StructuredTaskScope
當一個用例需要將任務分解為子任務,可能還需將子任務進一步分解為更多子任務時,使用StructuredTaskScope是合適的。本文看到的示例是用例需在任一子任務失敗時立即返回。但StructuredTaskScope遠不止如此。
- 在第一個任務成功時返回
- 在所有任務完成時返回(成功或失敗)
- 制作自己的StructuredTaskScope版本
6.1 StructuredTaskScope優點
- 代碼易閱讀,因為無論哪種用例,代碼看著都一樣
- 子線程失敗時會在適當時被干凈終止。沒有不必要的懸掛線程
- 使用StructuredTaskScope與Virtual Threads一起,意味與阻塞相關可擴展性問題不存在。這也難怪,默認情況下,StructuredTaskScope在底層使用Virtual Threads
7 總結
總的來說,StructuredTaskScope類是Java中處理將任務拆分為多個子任務的用例的良好補充。子線程在失敗時自動取消,不同用例的代碼一致性以及更好地理解代碼的能力,使其成為在Java中實現Structured Concurrency的理想選擇。
Virtual Threads和StructuredTaskScope類共同組成了一個完美的組合。Virtual Threads使我們能夠在JVM中創建數十萬個線程,而StructuredTaskScope類使我們能夠有效地管理這些線程。
讓我們等待它退出預覽并成為一個正式特性!
本文由博客一文多發平臺 OpenWrite 發布!