前言
🍊作者簡介: 不肯過江東丶,一個來自二線城市的程序員,致力于用“猥瑣”辦法解決繁瑣問題,讓復雜的問題變得通俗易懂。
🍊支持作者: 點贊👍、關注💖、留言💌~
相信各位小伙伴對 Stream 都不陌生,它是 Java 8 及以后版本中引入的一個新特性,用于處理集合數據。Stream 是對集合(Collection)對象功能的增強,與 Lambda 表達式結合,可以提高編程效率、間接性和程序可讀性。Stream API 中為我們提供了很多高效且易用的方法,大聰明的好朋友 —— 大明白就對這些方法情有獨鐘,但是就在前幾天,卻因為他在項目中使用了 Stream.parallel() 而引發了一個小小的意外情況… …
這里賣個關子😝~ 在說大明白引發的意外情況之前,我們先來一起看看什么是Stream.parallel()
Stream.parallel()
Stream.parallel() 方法用于將流操作轉換為并行操作,以便在多個線程上并行執行。并行流是一種可以同時在多個線程上執行操作的流,它將流的元素分割成多個子集,每個子集在不同的線程上獨立處理,最后將結果合并。使用 parallel() 方法可以輕松開啟并行流處理模式,無需顯式管理線程和同步。
List<Integer> numbers = ...; // 假設這里有一個包含大量正整數的List集合numbers.stream() // 創建順序流.parallel() // 轉換為并行流.filter(n -> n % 2 == 0) // 并行流操作 - 過濾List集合中的偶數.map(n -> n * 2) // 并行流操作 - 將過濾出來的偶數×2.forEach(System.out::println); // 并行流操作 - 打印結果
在上面的示例中,parallel() 方法將順序流轉換為并行流,后續的 filter()、map() 和 forEach() 操作將在多個線程上并行執行,從而加速數據處理。我們下面再看看它的底層原理👇
當調用 Stream.parallel() 方法時,它實際上會返回一個新的并行流對象,這個流對象可以在多個線程上并行執行流操作。下面是 Stream.parallel() 方法的大致工作原理:
① 并行流的劃分和分治:當我們對并行流進行操作時,Java 會使用 Fork/Join 框架將數據劃分成多個小任務,并將這些小任務分配給多個線程來并行執行。這個過程涉及到遞歸地將大任務分解為小任務,直到小任務足夠簡單可以直接求解。
② 工作竊取(Work Stealing):Fork/Join 框架采用工作竊取算法來實現任務的調度和執行。在工作竊取的過程中,空閑的線程會主動去其他線程的任務隊列中竊取任務執行。這種方式能夠充分利用線程資源,提高并行處理的效率。
③合并結果:在并行流的操作中,各個線程會并行地對數據進行處理,最后需要將各個線程的處理結果進行合并,得到最終的結果。這一過程涉及到結果的收集和合并,確保最終的結果是完整且正確的。
這里我們又引申出了一個新的概念 —— Fork/Join 框架。Fork/Join 框架是 Java 7 中引入的用于支持并行計算的框架,是一種并行計算模式,用于解決可以被分解成更小的可并行任務的問題。該模式包含兩個關鍵操作:Fork(分解)和Join(合并)。在 Fork/Join 模式中,原始問題被遞歸地分解為更小的子問題,直到達到可以并行解決的最小單位。這個過程被稱為 Fork。每個子問題可以獨立地在不同的處理器上執行,并行地求解部分問題。 一旦所有的子問題都被解決,就會進行 Join 操作。Join 操作將所有子問題的結果合并為最終的解決方案。這種分解和合并的過程可以視為樹形結構,其中每個節點代表一個子問題。
Fork/Join 模式最適用于可以自然地分解為多個獨立子問題的計算密集型任務。它適用于多核處理器或并行計算環境,其中可以充分利用并行性。Java 平臺提供了 Fork/Join 框架,用于實現該模式。它包括了一個線程池(ForkJoinPool) 和 任務(ForkJoinTask) 的概念。任務可以是可分解的子問題,也可以是執行最終計算的任務。通過 ForkJoinPool,可以將任務提交給線程池執行,自動實現任務的分解和合并過程。Fork/Join 模式的優點在于它能夠充分利用多核處理器的并行性,提高計算效率。
📌 在這里我們就先對 Fork/Join 框架做一個簡單的介紹,后續大聰明會單獨出一篇博客對 Fork/Join 框架進行詳細的介紹。
咱們言歸正傳,有些小伙伴看到“線程池(ForkJoinPool)”的時候可能就已經猜測到大明白遇倒的意外情況和線程有關系了。Stream.parallel() 并行流默認使用的是 ForkJoinPool.commonPool() 作為線程池,該線程池默認最大線程數就是 CPU 核數。正是因為大明白對并行流操作的原理不清楚,他在沒有配置線程池的情況下,通過并行流做了數據庫的大量批量更新操作,于是最大線程數只有 CPU 核數,最終導致在批量更新的時候出現了線程阻塞的情況,從而出現了這個小小的意外。
通過這件事應該也可以給各位小伙伴提個醒,在實際使用時需要慎重考慮并行化帶來的影響,并確保線程安全性和并發性。
① 線程安全:并行流并不能保證線程安全性,因此,如果流中的元素是共享資源或操作本身不是線程安全的,你需要確保正確同步或使用線程安全的數據結構。
② 資源消耗:并行流默認使用的線程池大小可能與機器的實際物理核心數相適應,但也可能與其他并發任務爭奪系統資源。
③ 結果一致性:并行流并不保證執行的順序性,也就是說,如果流操作的結果依賴于元素的處理順序,則不應該使用并行流。
④ 事務處理:在涉及到事務操作時,通常需要避免在并行流中直接處理,如上述例子所示,應當將事務邊界放在單獨的服務方法內,確保每個線程內的事務獨立完成。
小結
本人經驗有限,有些地方可能講的沒有特別到位,如果您在閱讀的時候想到了什么問題,歡迎在評論區留言,我們后續再一一探討🙇?
希望各位小伙伴動動自己可愛的小手,來一波點贊+關注 (????) 讓更多小伙伴看到這篇文章~ 蟹蟹呦(●’?’●)
如果文章中有錯誤,歡迎大家留言指正;若您有更好、更獨到的理解,歡迎您在留言區留下您的寶貴想法。
你在被打擊時,記起你的珍貴,抵抗惡意;
你在迷茫時,堅信你的珍貴,拋開蜚語;
愛你所愛 行你所行 聽從你心 無問東西