Java Stream 宏觀介紹見:深入解析 Java Stream 設計:從四幕劇看流水線設計與執行機制-CSDN博客
PipelineHelper
PipelineHelper
?是 Java Stream API 內部一個至關重要的輔助類。正如其名,它是一個“管道助手”。可以把它想象成一個執行上下文對象,當一個流管道需要被執行時(即調用終端操作時),這個對象會被創建并傳遞,它封裝了執行該管道所需的所有信息和核心工具方法。
- 在?
PipelineHelper
?這個抽象類的定義中,沒有聲明任何字段。它是一個純粹的行為和契約定義類,其狀態將由它的具體實現類(主要是?AbstractPipeline
)來持有。 PipelineHelper
?是一個純抽象類,它只定義了方法簽名,沒有任何一個方法的具體實現。所有方法的實現都委托給了它的子類。這種設計強制子類必須提供一套完整的管道執行機制。
抽象方法構成了?PipelineHelper
?的核心能力,可以分為幾類:
信息獲取類方法
這類方法用于查詢當前流管道的靜態屬性,是執行優化的關鍵依據。
abstract StreamShape getSourceShape();
- 語義:獲取管道源頭的“形狀”,即流中元素的類型是引用類型 (
REFERENCE
)、int
、long
?還是?double
。
- 語義:獲取管道源頭的“形狀”,即流中元素的類型是引用類型 (
abstract int getStreamAndOpFlags();
- 語義:獲取整個管道(從源頭到當前操作)合并后的特征標記 (
StreamOpFlag
)。這些標記包括?SIZED
(大小已知)、ORDERED
(有序)、DISTINCT
(元素唯一)、SHORT_CIRCUIT
(可短路)等。終端操作會根據這些標記來選擇最高效的執行策略。
- 語義:獲取整個管道(從源頭到當前操作)合并后的特征標記 (
abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
- 語義:在已知的情況下,計算處理完給定的?
spliterator
?后,會產生的確切輸出元素數量。如果源是?SIZED
?的,并且中間沒有?filter
?等改變大小的操作,這個方法就能返回一個精確值。這對于?toArray
?等操作預先分配內存空間至關重要。
- 語義:在已知的情況下,計算處理完給定的?
核心執行類方法
這類方法定義了驅動管道數據流動的核心邏輯。
abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
- 語義:這是最核心的方法之一。它接受一個用于接收最終輸出結果的?
Sink
(來自終端操作),然后用當前管道中所有中間操作的邏輯,從后往前地對這個?Sink
?進行層層包裝,最終返回一個包裝好的、位于管道頭部的?Sink
。當向這個返回的?Sink
?推入一個元素時,這個元素會依次流過?filter
、map
?等所有中間操作,最終(如果沒被過濾掉)到達原始的?Sink
。
- 語義:這是最核心的方法之一。它接受一個用于接收最終輸出結果的?
abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
- 語義:將數據從源?
spliterator
?中不斷取出,并喂給(accept
)已經通過?wrapSink
?包裝好的?Sink
。它負責驅動數據的流動。
- 語義:將數據從源?
abstract <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
- 語義:
copyInto
?的一個支持短路(cancellation)的版本。在每次推送元素后,它會檢查?Sink.cancellationRequested()
,如果為?true
(例如?findFirst
?找到了元素),就立刻停止迭代。
- 語義:
abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
- 語義:一個便捷方法,它內部調用?
wrapSink
?和?copyInto
,將包裝和數據拷貝兩步操作合并為一步。
- 語義:一個便捷方法,它內部調用?
結果聚合類方法
這類方法用于有狀態操作(如?sorted
)或部分終端操作(如?toArray
)將結果收集到一個中間容器?Node
?中。
abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<P_OUT[]> generator);
- 語義:執行整個管道,并將所有輸出結果收集到一個?
Node
?對象中返回。Node
?是一個可以表示單個元素或元素樹的內部結構。
- 語義:執行整個管道,并將所有輸出結果收集到一個?
abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator);
- 語義:創建一個?
Node.Builder
,這是一個用于構建?Node
?的輔助工具。
- 語義:創建一個?
其他方法
abstract<P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> spliterator);
- 語義:提供一種不同于?
Sink
?模型的執行方式。它將管道操作包裝到一個新的?Spliterator
?中,當遍歷這個新的?Spliterator
?時,操作會惰性地應用到原始數據上。
- 語義:提供一種不同于?
總結:語義和能力
PipelineHelper
?的核心語義是封裝一條流管道的完整執行方案。它不是管道本身,而是執行管道時所需的 “藍圖”和“工具箱” 。
它的能力可以總結為:
- 信息中心:提供關于管道的元數據(形狀、標志、大小),供終端操作進行查詢和優化決策。
- 執行引擎:定義了將數據源 (
Spliterator
) 和數據處理邏輯 (Sink
?鏈) 連接起來并驅動數據流動的核心方法 (wrapSink
,?copyInto
)。 - 流程編排器:它將流的“聲明式”定義(一系列?
map
,?filter
?調用)轉化為一個“命令式”的執行過程。 - 結果聚合器:為需要緩沖所有元素的有狀態操作和終端操作提供了收集結果到?
Node
?的能力。
AbstractPipeline
?類概述
AbstractPipeline
?類的結構和源碼。這個抽象類是所有類型流(如?ReferencePipeline
?for?Stream<T>
,?IntPipeline
?for?IntStream
?等)實現的基礎,在 Java Stream API 中扮演著至關重要的角色。
StreamSupport.java
?文件中的?stream()
?方法,正是通過實例化?ReferencePipeline.Head
(ReferencePipeline
?的一個內部類,而?ReferencePipeline
?繼承自?AbstractPipeline
)來創建流的。
AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
?是一個抽象類,它實現了?BaseStream
?和?PipelineHelper
?接口。
E_IN
: 上游階段輸入元素的類型。E_OUT
: 當前階段輸出元素的類型。S
: 當前流的類型 (例如?Stream<E_OUT>
)。
主要結構和成員
AbstractPipeline
?的字段主要用于構建和管理流管道這個雙向鏈表,并維護整個管道的狀態。
sourceStage
: 指向管道鏈表的頭節點(源階段)。所有階段都共享同一個?sourceStage
,用于獲取全局信息,如數據源和并行標志。previousStage
: 指向鏈表中的前一個階段。nextStage
: 指向鏈表中的后一個階段。depth
: 當前階段在管道中的深度(從0開始的索引)。sourceOrOpFlags
:?當前階段操作的標志(如?NOT_SIZED
)。combinedFlags
: 從源階段到當前階段累計的所有標志。sourceSpliterator
?/?sourceSupplier
: 數據源,只在?sourceStage
?中有效。parallel
: 是否為并行流,只在?sourceStage
?中有效。linkedOrConsumed
: 一個布爾標志,用于確保流只能被消費一次。一旦一個中間操作被鏈接到下游,或者一個終端操作開始執行,這個標志就會被設為?true
,防止后續操作。sourceCloseAction
: 用于注冊流關閉時需要執行的清理邏輯。
核心職責:構建和驅動流水線
AbstractPipeline
?是整個 Stream API 實現的骨架和引擎。它的核心職責可以概括為以下幾點:
-
流水線構建 (Pipeline Construction):
- 雙向鏈表結構:通過?
previousStage
?和?nextStage
?字段,將各個操作階段(stage)鏈接成一個雙向鏈表。 - 構造器:提供了三種核心構造器:
AbstractPipeline(Spliterator, ...)
?和?AbstractPipeline(Supplier, ...)
?用于創建流水線的源頭 (Head)。AbstractPipeline(previousStage, ...)
?用于追加 (Append)?一個新的中間操作階段。
- 狀態管理:通過?
linkedOrConsumed
?標志,嚴格保證了“流只能被消費一次”的原則。在構造鏈條時,它會將被鏈接的?previousStage
?標記為已消費,防止分叉。
- 雙向鏈表結構:通過?
-
標志位管理 (Flag Management):
sourceOrOpFlags
: 每個階段都持有自己的操作標志(如?SORTED
,?DISTINCT
)。combinedFlags
: 在構建流水線時,通過?StreamOpFlag.combineOpFlags()
?方法,自頂向下地累積和計算從源頭到當前階段的所有標志。這使得任何一個階段都能快速知道整個上游流水線的綜合特性(例如,是否整體有序?ORDERED
,是否包含短路操作?SHORT_CIRCUIT
)。這是非常高效的設計。
-
終端操作的統一入口和驅動 (Terminal Operation Driver):
evaluate(TerminalOp)
: 這是所有終端操作的總入口。它不關心具體的終端操作是什么,只負責根據?isParallel()
?狀態,調用?terminalOp.evaluateParallel(...)
?或?terminalOp.evaluateSequential(...)
。sourceSpliterator(int)
: 這是?evaluate
?方法的核心輔助。它負責為終端操作準備好一個“就緒”的?Spliterator
,尤其是通過預計算來處理并行流中的有狀態操作(屏障),這是整個并行處理機制的精髓所在。
-
并行/順序切換 (Parallel/Sequential Switching):
- 通過?
parallel()
?和?sequential()
?方法,簡單地修改源頭階段的?parallel
?標志位,就能影響整個流水線的執行模式。這種控制集中在源頭的設計非常簡潔。
- 通過?
-
核心?
PipelineHelper
?接口的實現:- 它實現了?
PipelineHelper
?接口中的大部分通用邏輯,如?wrapAndCopyInto
,?copyInto
,?copyIntoWithCancel
?等,這些方法定義了如何將?Spliterator
?中的數據“灌入”到?Sink
?鏈中,并處理了短路操作的邏輯。
- 它實現了?
留給子類實現的職責:與“形狀”相關的具體行為
AbstractPipeline
?完美地抽象了所有 Stream(無論其元素是對象、int
、long
?還是?double
)的通用流水線邏輯。它刻意將所有與**“形狀” (Shape)** 相關的具體實現細節留給了子類。這里的“形狀”指的是流的類型(REFERENCE
,?INT_VALUE
,?LONG_VALUE
,?DOUBLE_VALUE
)。
子類(如?ReferencePipeline
,?IntPipeline
?等)必須實現以下抽象方法:
-
getOutputShape()
: 返回當前階段的輸出“形狀”。這是類型系統的基礎。 -
opWrapSink(int flags, Sink<E_OUT> sink)
:?(核心抽象)?這是實現一個中間操作的關鍵。子類需要提供一個方法,將下游的?Sink
?“包裝”成一個新的?Sink
,這個新的?Sink
?在接收到元素后,會執行當前階段的操作(如?map
?的轉換邏輯,filter
?的判斷邏輯),然后將結果傳遞給原始的下游?Sink
。 -
wrap(...)
?和?lazySpliterator(...)
: 這兩個方法負責創建與“形狀”匹配的?Spliterator
?包裝器。例如,ReferencePipeline
?會創建?StreamSpliterators.WrappingSpliterator
,而?IntPipeline
?會創建?StreamSpliterators.IntWrappingSpliterator
。 -
evaluateToNode(...)
,?makeNodeBuilder(...)
: 這些方法與將流的結果收集到?Node
(一個內部的、用于聚合結果的樹狀結構)中有關。不同“形狀”的流需要不同類型的?Node
?和?Node.Builder
(如?Node.OfReference
,?Node.OfInt
)。 -
opEvaluateParallel(...)
?和?opEvaluateParallelLazy(...)
:?(核心抽象)?這是實現一個有狀態操作的關鍵。子類必須為有狀態操作(如?sorted
,?distinct
)提供具體的并行求值邏輯。
設計原因分析與啟示
這種設計的精妙之處在于 “模板方法模式” (Template Method Pattern)? 和 “職責分離原則” (Separation of Concerns) 的完美應用。
-
模板方法模式:
AbstractPipeline
?定義了流水線執行的骨架(模板),例如?evaluate()
?方法規定了“獲取?Spliterator
?-> 調用終端操作求值”這個流程。- 它將流程中可變的部分(如如何包裝?
Sink
、如何創建?Spliterator
)定義為抽象方法(opWrapSink
,?wrap
),交由子類去實現。 - 啟示: 當我們設計一個具有固定流程但某些步驟細節可變的框架或組件時,模板方法模式是絕佳的選擇。它能鎖定核心邏輯的穩定性,同時提供高度的擴展性。
-
職責分離:
AbstractPipeline
?只關心“如何驅動流水線”,它不關心流水線中流動的數據具體是什么類型,也不關心每個操作的具體業務邏輯。- 具體的操作邏輯被封裝在?
Sink
?的實現中。 - 具體的類型處理被封裝在?
ReferencePipeline
,?IntPipeline
?等子類中。 - 啟示: 在設計復雜系統時,要清晰地劃分不同模塊的職責邊界。一個類或模塊應該只有一個引起它變化的原因。
AbstractPipeline
?的變化原因僅僅是“流水線驅動邏輯的變更”,而不是“增加了一種新的中間操作”或“支持一種新的數據類型”。
-
組合優于繼承:
- 雖然這里用了繼承,但流水線的構建更像是組合。每個?
AbstractPipeline
?對象都持有一個?previousStage
?的引用,形成一個操作鏈。Stream 的行為是由這一系列對象的組合來定義的,而不是通過一個巨大的、多層繼承的類來實現。 - 啟示: 優先考慮使用對象組合來構建復雜的行為,因為它比繼承更加靈活。
- 雖然這里用了繼承,但流水線的構建更像是組合。每個?
通過分析?AbstractPipeline
,我們可以學到以下幾點來提升自己的設計能力:
- 識別變與不變: 在設計之初,就要仔細思考你的系統中哪些是穩定不變的核心流程(不變),哪些是可能變化或需要擴展的細節(變)。將不變的部分固化在抽象基類中,將變化的部分抽象成接口或抽象方法。
- 善用模板方法: 對于有固定步驟的業務流程,使用模板方法模式可以極大地簡化子類的實現,并保證核心流程的正確性。
- 明確職責邊界: 一個好的抽象類應該有一個清晰、單一的職責。
AbstractPipeline
?的職責就是“流水線控制器”,它做得非常出色。避免設計“萬能”的基類。 - 面向接口/抽象編程:?
AbstractPipeline
?內部大量使用了?Spliterator
,?Sink
,?TerminalOp
?等接口,而不是具體的實現類。這使得它可以與任何符合接口定義的組件協同工作,大大增強了靈活性。 - 管理好狀態:?
AbstractPipeline
?通過?linkedOrConsumed
?和?parallel
?等幾個關鍵的狀態字段,清晰地管理了流的生命周期和執行模式,保證了正確性和線程安全。在自己的設計中,也要仔細考慮對象的狀態轉換和并發訪問問題。
為什么叫做pipeline而不是node?
單個?AbstractPipeline
?對象很像一個鏈表中的節點 (Node)。然而,它被命名為?AbstractPipeline
(抽象管道)而不是?AbstractStage
?或?AbstractNode
?是因為它所代表的概念和職責遠超一個簡單的節點。
簡單來說:一個?AbstractPipeline
?實例代表管道中的一個“階段”,但它本身的設計和功能是為了構建和管理整個“管道”。
讓我們從幾個方面來深入理解:
1. 結構上:既是節點,也知全局
AbstractPipeline
?內部通過?previousStage
?和?nextStage
?字段,將各個操作(如?map
,?filter
)鏈接起來,形成一個雙向鏈表結構。這確實是節點的特征。
但是,請看這個關鍵字段:
// ... existing code .../*** Backlink to the head of the pipeline chain (self if this is the source* stage).*/@SuppressWarnings("rawtypes")private final AbstractPipeline sourceStage;
// ... existing code ...
這個?sourceStage
?字段讓每一個“節點”都能直接訪問到整個鏈表的頭部,也就是數據源頭。一個普通的鏈表節點通常只知道它的前驅和后繼。而?AbstractPipeline
?的每個實例都擁有對整個管道起點的認知。
此外,還有?combinedFlags
?字段,它會累積從源頭到當前階段所有操作的特性(比如?SIZED
,?DISTINCT
?等)。
這種設計意味著,雖然它是一個獨立的階段(節點),但它始終攜帶著整個管道(到它為止)的上下文信息。它的行為和優化都依賴于對整個管道的理解,而不僅僅是它自己這個環節。
2. 概念上:代表“數據流動的管道”
“Pipeline”(管道)這個詞更強調的是過程和流動。數據從?sourceStage
?流入,依次穿過每一個由?AbstractPipeline
?實例代表的中間操作,最后從末端流出到終端操作。
- Node?(節點) 是一個靜態的概念,強調“存在于某處”。
- Pipeline?(管道) 是一個動態的概念,強調“從一處流向另一處的過程”。
Stream API 的核心就是建立一個數據處理的流水線。AbstractPipeline
?正是構建這條流水線的基礎組件。所以用“Pipeline”來命名,更能體現其在數據流處理中的角色和作用。
3. 功能上:懶加載的“配方”
整個由?AbstractPipeline
?鏈接起來的結構,在終端操作(如?collect
,?forEach
)被調用之前,什么也不做。它只是一個“配方”或者說一個“藍圖”,描述了數據應該如何被處理。
當終端操作開始時,它會從最后一個?AbstractPipeline
?階段開始,反向遍歷整個鏈表(管道),將每個階段的操作包裝(wrapSink
)成一個最終的、融合的?Sink
。然后,一次性地從源頭拉取數據,讓數據流過這個融合后的?Sink
,完成所有計算。
所以,AbstractPipeline
?的鏈表結構本身就是對整個處理 流程(Pipeline) 的定義。
總結
特性 | 像一個 Node | 為什么叫 Pipeline |
---|---|---|
結構 | 通過 previousStage 和 nextStage 鏈接,形成鏈表。 | 每個實例都通過 sourceStage 了解管道源頭,并用 combinedFlags 累積全局狀態。 |
概念 | 是構成鏈表的一個基本單元。 | 強調數據從源頭流經一系列處理階段的動態過程。 |
功能 | 封裝了單個操作(如 map , filter )。 | 整個鏈表共同定義了一個完整的、可執行的數據處理流水線(配方)。 |
因此,雖然?AbstractPipeline
?的實例在結構上扮演著“節點”的角色,但它的命名著眼于它所構建的、更宏大的“數據處理管道”這一核心概念。這個命名精準地反映了 Stream API 的設計哲學。
啟動Stream:evaluate
有兩個evaluate,它們的關系是:
-
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp)
?(將軍/Orchestrator):?這是高層次的、通用的執行入口。它負責啟動任何類型的終端操作。它不關心終端操作具體做什么,只負責接收一個代表“任務”的?TerminalOp
?對象,然后決定是按順序執行還是并行執行,并把具體的執行工作委托給這個?TerminalOp
?對象。 -
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, ...)
?(工兵/Worker):?這是一個低層次的、專用的執行方法。它只有一個非常具體的目標:將流中的所有元素收集到一個內部數據結構?Node
?中。它是一個“工具”方法,被那些需要先把所有元素聚合起來才能進行下一步操作的終端操作所使用。
evaluate(TerminalOp<E_OUT, R> terminalOp)
?- 通用執行引擎
// ... existing code ...final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}
// ... existing code ...
-
職責:
- 接收任務: 它的唯一參數?
TerminalOp<E_OUT, R>
?是一個接口,代表了一個完整的終端操作。像?count()
,?collect()
,?reduce()
,?forEach()
?等操作,在內部都會被封裝成一個實現了?TerminalOp
?接口的對象。這個對象知道如何進行求值、如何合并結果。 - 檢查狀態: 檢查流是否已經被使用過 (
linkedOrConsumed
)。 - 決策與分派: 核心邏輯是?
isParallel()
?判斷。- 如果流是并行的,它就調用?
terminalOp.evaluateParallel(...)
。 - 如果流是順序的,它就調用?
terminalOp.evaluateSequential(...)
。
- 如果流是并行的,它就調用?
- 返回最終結果: 它返回泛型?
<R>
,也就是終端操作的最終用戶可見結果。這個?R
?可以是任何類型,比如?Long
?(對于?count
)、List<T>
?(對于?collect
)、void
?(對于?forEach
)。
- 接收任務: 它的唯一參數?
-
總結: 這個方法是所有終端操作的總指揮。它連接了用戶調用的?
Stream.collect()
?和實際執行該操作的?TerminalOp
?對象,并根據并行狀態選擇正確的執行路徑。
evaluate(Spliterator<P_IN> spliterator, ...)
?- 節點收集器
// ... existing code ...@Overridefinal <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator) {if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);}else {Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);return wrapAndCopyInto(nb, spliterator).build();}}
// ... existing code ...
-
職責:
- 單一目標: 它的目標非常明確,就是執行流水線,并將所有輸出元素收集到一個?
Node
?對象里。Node
?是 Stream API 內部用來暫存元素集合的高效數據結構。 - 需要具體實現: 它需要一個?
generator
?(如?String[]::new
) 來知道如何創建最終存放元素的數組。 - 返回中間結構: 它返回的是?
Node<E_OUT>
,這是一個內部數據結構,而不是通常用戶直接消費的最終結果。
- 單一目標: 它的目標非常明確,就是執行流水線,并將所有輸出元素收集到一個?
-
總結: 這個方法是一個專用工具,用于將流“物化”到內存中。它本身不是一個完整的終端操作,而是某些終端操作(比如?
toArray()
)實現過程中的一個步驟。
一個調用鏈的例子
讓我們以?stream.toArray(String[]::new)
?為例,看看它們是如何協同工作的:
-
用戶調用:?
stream.toArray(String[]::new)
。 -
創建?
TerminalOp
:?toArray
?方法內部會創建一個?TerminalOp
?的實例。這個?TerminalOp
?的邏輯大致是:“請給我一個包含所有流元素的?Node
,然后我會用這個?Node
?和傳入的?String[]::new
?來創建一個?String
?數組作為最終結果”。 -
調用“將軍”:?
toArray
?方法接著會調用高層?evaluate(theToArrayTerminalOp)
。 -
“將軍”做決策:?
evaluate(TerminalOp)
?檢查?isParallel()
。我們假設是順序執行。 -
“將軍”下令: 它調用?
theToArrayTerminalOp.evaluateSequential(this, spliterator)
。 -
TerminalOp
?的實現:?evaluateSequential
?的實現需要一個?Node
。于是,它內部就會調用低層的、作為“工兵”的?evaluate(spliterator, true, generator)
。 -
“工兵”干活: 這個低層?
evaluate
?方法啟動流水線,將所有元素收集到一個?Node
?中,并返回這個?Node
。 -
完成任務:?
TerminalOp
?拿到?Node
?后,從中提取元素,創建用戶期望的?String[]
?數組。 -
返回結果: 最終的數組被層層返回,直到最初的用戶調用處。
這兩個方法的設計體現了優秀的關注點分離(Separation of Concerns):一個負責頂層流程控制,另一個負責具體的數據聚合任務,使得整個 Stream 執行框架既靈活又清晰。
toArray實現沒有使用 teminalOp,而是直接調用evaluateToArrayNode,這個會間接調用
evaluate(Spliterator<P_IN> spliterator, ...)
對于其它op,基本只需要直接調用 wrapAndCopyInto,比如ReduceOp的任務是將流中的所有元素聚合成一個單一的值,它不需要在內存中保留所有元素,可以逐個處理元素,并不斷更新一個累加器。
Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, ...)
evaluate
?的核心作用是啟動流水線并將其所有輸出元素收集到一個內部數據結構?Node
?中。Node
?是 Stream API 用于在內存中高效存儲一系列元素的內部表示,它可以是一個簡單的數組包裝,也可以是一個更復雜的樹形結構(在并行計算中使用)。
此方法是許多終端操作(如?toArray()
,?reduce
?的部分形式)的最終執行邏輯。它的設計思想是根據流的并行/順序狀態,分派到兩種截然不同的執行策略:
- 順序執行:邏輯相對簡單,創建一個?
Node.Builder
(它本身也是一個?Sink
),然后利用我們之前分析過的?wrapAndCopyInto
?機制,將所有元素推入?Builder
?中,最后構建出?Node
。 - 并行執行:邏輯復雜得多,它將任務委托給一個抽象的?
evaluateToNode
?方法。該方法內部會利用 Fork/Join 框架,將數據源?Spliterator
?分割成多個部分,并行處理,然后將各個部分的結果(通常是多個?Node
)合并成一個最終的?Node
。
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator)
final
: 此方法不可被子類重寫,是框架的核心穩定部分。<P_IN>
: 方法級泛型,代表輸入?Spliterator
?的元素類型。Node<E_OUT>
: 返回值。E_OUT
?是當前?Pipeline
?階段的輸出類型。返回一個包含所有流元素的?Node
。Spliterator<P_IN> spliterator
: 數據源。boolean flatten
: 一個標志。在并行計算中,結果可能是一個?Node
?樹。如果?flatten
?為?true
,則要求返回的?Node
?是一個扁平化的結構(即一個包含所有元素的單一數組),而不是樹。IntFunction<E_OUT[]> generator
: 一個函數,用于創建指定類型的數組,例如?String[]::new
。這是 Java 泛型數組創建的標準模式。
evaluate
?的實現是一個清晰的?if-else
?分支:
// ... existing code ...@Overridefinal <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator) {if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);}else {Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);return wrapAndCopyInto(nb, spliterator).build();}}
// ... existing code ...
順序執行路徑
這是相對簡單的路徑,我們先分析它。
Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
第 1 步:?exactOutputSizeIfKnown(spliterator)
?- 計算確切大小
此方法嘗試在流開始處理前,預先計算出最終會輸出多少個元素。這是一個重要的優化,如果大小已知,Node.Builder
?就可以一次性分配足夠大的內存,避免后續動態擴容。
exactOutputSizeIfKnown
:int flags = getStreamAndOpFlags();
?獲取當前流水線的所有標志。long size = StreamOpFlag.SIZED.isKnown(flags) ? spliterator.getExactSizeIfKnown() : -1;
- 檢查流水線是否具有?
SIZED
?特性。像從?ArrayList
?創建的流就有這個特性。 - 如果有,就調用?
spliterator.getExactSizeIfKnown()
?獲取源頭大小。 - 如果沒有,大小未知,返回?
-1
。
- 檢查流水線是否具有?
if (size != -1 && StreamOpFlag.SIZE_ADJUSTING.isKnown(flags) && !isParallel())
- 這是一個針對順序流的進一步計算。如果源大小已知,并且流水線中有會調整大小的操作(
SIZE_ADJUSTING
,例如?limit()
),則需要逐個階段計算最終大小。 for (AbstractPipeline<?, ?, ?> stage = sourceStage.nextStage; ...)
: 循環遍歷從源之后到當前的所有中間操作。size = stage.exactOutputSize(size);
: 每個階段都會根據自己的邏輯調整大小。例如,limit(10)
?的?exactOutputSize
?實現就會返回?Math.min(previousSize, 10)
。
- 這是一個針對順序流的進一步計算。如果源大小已知,并且流水線中有會調整大小的操作(
- 最終返回計算出的?
size
?或?-1
。
第 2 步:?makeNodeBuilder(...)
?- 創建節點構建器
這是一個?abstract
?方法,其具體實現由子類(如?ReferencePipeline
,?IntPipeline
)提供。
- 遞歸分析?
makeNodeBuilder
:- 它接收上一步計算出的?
exactSizeIfKnown
?和數組生成器?generator
。 - 在?
ReferencePipeline
?中,它會調用?Nodes.builder(exactSizeIfKnown, generator)
。 Nodes.builder
?會根據?exactSizeIfKnown
?的值,決定是創建一個固定大小的?Builder
?還是一個可動態擴容的?Builder
。- 這個?
Node.Builder
?同時實現了?Sink
?接口,所以它可以作為數據流的目的地。它的?accept
?方法就是將接收到的元素添加到內部的存儲中。
- 它接收上一步計算出的?
第 3 步:?wrapAndCopyInto(nb, spliterator).build()
?- 封裝、執行、構建
這是執行的核心。
-
遞歸分析?
wrapAndCopyInto
:wrapSink(nb)
: 調用?wrapSink
,將?Node.Builder
?(nb
) 從后向前用流水線中的每個中間操作邏輯進行包裝。例如,map(f).filter(p)
,會先用?filter
?的邏輯包裝?nb
,再用?map
?的邏輯包裝?filter
?后的?Sink
。返回一個包含了所有操作的?wrappedSink
。copyInto(wrappedSink, spliterator)
: 調用?copyInto
,它會啟動數據流,將?spliterator
?的數據推送到?wrappedSink
。- 返回?
nb
:?wrapAndCopyInto
?執行完畢后,返回最初的、未被包裝的?Node.Builder
?實例?nb
。此時,nb
?內部已經包含了所有處理過的元素。
-
.build()
: 在?nb
?上調用?build()
?方法,它會完成構建過程(例如,將內部的動態數組裁剪到合適的大小),并返回一個最終的、不可變的?Node
?對象。
并行執行路徑
if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);
}
當?isParallel()
?返回?true
?時,執行會進入此分支。
evaluateToNode(this, spliterator, flatten, generator)
?- 并行求值
此方法是并行執行的核心,但它本身也是一個?abstract
?方法。
- 遞歸分析?
evaluateToNode
:- 為什么是抽象的??因為針對不同數據類型(引用類型 vs. 原始類型?
int
,?long
,?double
)的并行處理和數據存儲方式有很大差異。原始類型流可以利用連續內存的數組進行高效計算,而引用類型流則不能。將此方法設為抽象,可以強制每個子類(ReferencePipeline
,?IntPipeline
?等)提供最高效的并行實現。 - 內部發生了什么??以?
ReferencePipeline
?為例,它的?evaluateToNode
?實現大致如下:- 調用 Nodes.collect
- 大小已知路徑 (Sized Path): 如果流的大小可以精確預知 (
size >= 0
) 并且源?Spliterator
?支持?SUBSIZED
?特性,系統會采取最高效的策略。它會預先分配一個最終大小的數組,然后啟動一個?SizedCollectorTask
(一個 Fork/Join 任務),讓所有并行線程直接將結果寫入到數組的指定位置。這避免了任何中間數據結構和后續的數據拷貝。 - 大小未知路徑 (Unsized Path): 如果大小未知,系統會啟動一個?
CollectorTask
。這個任務會遞歸地分解,每個子任務都會生成一個?Node
,然后通過?conc
?方法合并成一個?ConcNode
?樹。最終返回的就是這個樹的根節點。flattenTree
?參數決定了是否需要將這棵樹“壓平”后再返回。
- 為什么是抽象的??因為針對不同數據類型(引用類型 vs. 原始類型?
總結
evaluate
?方法是 Stream API 內部一個設計極為精巧的執行引擎入口。它通過一個簡單的?isParallel()
?判斷,將執行流導向兩個完全不同的世界:
- 順序世界:清晰、線性、易于理解。通過?
Sink
?鏈的包裝和?forEachRemaining
?的驅動,一步步完成數據處理。 - 并行世界:復雜、遞歸、高性能。通過抽象化和 Fork/Join 框架,將大規模數據處理任務分解、并行執行并合并結果。
它完美地體現了策略模式,根據上下文(并行或順序)選擇合適的算法來完成“將流元素收集到?Node
?中”這一任務。對它的分析,可以幫助我們深入理解 Stream API 在不同模式下的核心工作原理。
Stream構建過程 wrapSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink)
final
: 這個方法是最終的,不允許任何子類(如?ReferencePipeline
,?IntPipeline
?等)重寫。這表明它是 Stream 框架的核心、穩定且不可更改的機制。<P_IN>
: 這是一個泛型參數。它代表了經過所有包裝后,最終返回的那個?Sink
?所能接受的元素類型。這個類型實際上是當前流水線階段的上一個階段的輸出類型。Sink<P_IN>
: 這是返回類型。它返回一個?Sink
,這個?Sink
?已經將當前?AbstractPipeline
?節點以及它之前的所有節點的操作邏輯都“包裹”起來了。Sink<E_OUT> sink
: 這是輸入參數。它接收一個?Sink
。E_OUT
?是當前?AbstractPipeline
?節點的輸出類型。這個傳入的?sink
?通常是流水線中下一個階段的?Sink
,或者是終端操作(Terminal Operation)提供的最終?Sink
。
一句話概括:wrapSink
?方法接收一個用于處理本階段輸出的?Sink
,然后返回一個能夠處理本階段輸入的、經過層層包裝的全新?Sink
。
wrapSink
?的核心思想是操作融合(Operation Fusion)和責任鏈模式的反向構建。
我們知道 Stream 的中間操作是懶加載的。當寫下?stream.filter(...).map(...).sorted(...)
?時,數據并沒有開始流動。只是在構建一個處理步驟的“配方”(AbstractPipeline
?鏈表)。
當一個終端操作(如?forEach
,?collect
)被調用時,Stream 需要開始處理數據。但它不是低效地讓數據流過?filter
,把結果存起來,再流過?map
... 而是希望一次性完成所有操作。
wrapSink
?就是實現這個“一次性完成”的關鍵。它的工作流程如下:
- 從流水線的末端(最靠近終端操作的那個中間操作)開始。
- 接收終端操作提供的“最終 Sink”(比如,
Collectors.toList()
?會提供一個將元素添加到 List 的 Sink)。 - 將這個 Sink 用當前階段的操作邏輯(比如?
map
?的轉換邏輯)包裝起來,生成一個新的 Sink。 - 拿著這個新生成的 Sink,移動到前一個流水線階段。
- 用前一個階段的操作邏輯(比如?
filter
?的過濾邏輯)再次包裝。 - ...如此循環,直到回到數據源頭。
最終會得到一個“俄羅斯套娃”式的?Sink
,它內部包含了所有中間操作的邏輯。當數據從源頭(Spliterator
)取出后,只需調用這個最終?Sink
?的?accept
?方法,數據就會像穿過一根融合后的管道一樣,瞬間完成所有處理。
現在我們來看具體的實現,它非常精煉:
// ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {sink = p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink<P_IN>) sink;}
// ... existing code ...
-
Objects.requireNonNull(sink);
- 標準的非空檢查,確保傳入的下游?
Sink
?是有效的。
- 標準的非空檢查,確保傳入的下游?
-
for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage)
- 初始化:?
AbstractPipeline p = AbstractPipeline.this
。循環從當前?AbstractPipeline
?實例開始。在實際調用中,this
?通常是流水線的最后一個中間操作。 - 循環條件:?
p.depth > 0
。depth
?屬性記錄了當前階段距離源頭的“深度”。源頭(Head)的?depth
?是 0。所以這個循環會一直持續,直到回溯到源頭為止。 - 迭代:?
p = p.previousStage
。在每次循環后,p
?指向鏈表中的上一個節點,實現了從后往前的遍歷。
- 初始化:?
-
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
- 這是循環的核心。它調用了當前階段?
p
?的?opWrapSink
?方法。 opWrapSink
?是一個抽象方法,必須由具體的中間操作(如?map
,?filter
?的匿名內部類)來實現。- 它將當前的?
sink
(也就是下游操作的?Sink
)傳進去。 opWrapSink
?的實現會返回一個新的、包裝后的?Sink
。- 這個新的?
Sink
?被重新賦值給?sink
?變量,用于下一次循環(即交給上一個階段繼續包裝)。
- 這是循環的核心。它調用了當前階段?
-
return (Sink<P_IN>) sink;
- 當循環結束時,
sink
?變量所持有的已經是被流水線上所有中間操作層層包裹后的最終?Sink
。 - 它被強制轉換為?
Sink<P_IN>
?并返回。此時?P_IN
?對應的是第一個中間操作的輸入類型。
- 當循環結束時,
關鍵交互:opWrapSink
?方法
wrapSink
?只是一個驅動循環,真正的魔法發生在每個操作對?opWrapSink
?的具體實現中。我們來看幾個例子(以?LongPipeline
?為例,原理相通):
示例 1:?peek()
?操作?peek
?的作用是在元素流過時執行一個動作,但不對元素做任何修改。
// ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {@Overridepublic void accept(long t) {action.accept(t); // 執行 peek 的動作downstream.accept(t); // 將原始元素傳遞給下游}};}// Sink.javaabstract static class ChainedLong<E_OUT> implements Sink.OfLong {protected final Sink<? super E_OUT> downstream;public ChainedLong(Sink<? super E_OUT> downstream) {this.downstream = Objects.requireNonNull(downstream);}@Overridepublic void begin(long size) {downstream.begin(size);}@Overridepublic void end() {downstream.end();}@Overridepublic boolean cancellationRequested() {return downstream.cancellationRequested();}}
// ... existing code ...
opWrapSink
?返回了一個新的?Sink
。當這個新?Sink
?的?accept
?方法被調用時,它先執行?peek
?的?action
,然后原封不動地調用下游?sink
(下游sink通過構造函數 賦值給?downstream
)的?accept
?方法。
示例 2:?filter()
?操作
// ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {
// ... existing code ...@Overridepublic void accept(long t) {if (predicate.test(t)) // 執行過濾downstream.accept(t); // 滿足條件才傳遞給下游}};}
// ... existing code ...
filter
?的?opWrapSink
?實現中,只有當元素?t
?滿足?predicate
?條件時,才會調用下游?sink
?的?accept
?方法。
示例 3:?map()
?操作
// ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {@Overridepublic void accept(long t) {downstream.accept(mapper.applyAsLong(t)); // 將轉換后的元素傳遞給下游}};}
// ... existing code ...
map
?的?opWrapSink
?實現中,它先用?mapper
?對元素?t
?進行轉換,然后將轉換后的結果傳遞給下游?sink
。
調用時機與完整流程
wrapSink
?通常在終端操作的?evaluate
?方法中被間接調用,例如通過?wrapAndCopyInto
。
一個完整的流程是這樣的:
- 構建:?
Stream.of(1,2,3).filter(i -> i > 1).map(i -> i * 2)
。這會創建一個?AbstractPipeline
?的鏈表。 - 觸發: 調用終端操作?
.forEach(System.out::println)
。 - 準備執行:?
forEach
?操作的?evaluate
?方法被調用。它會創建一個最終的?Sink
,這個?Sink
?的?accept
?方法就是?System.out::println
。 - 包裝 Sink:?
evaluate
?方法內部會調用?wrapSink(finalSink)
。wrapSink
?從?map
?階段開始。它調用?map
?的?opWrapSink
,傳入?finalSink
。返回一個?mapSink
,其?accept
?方法會執行?i -> finalSink.accept(i * 2)
。wrapSink
?移動到?filter
?階段。它調用?filter
?的?opWrapSink
,傳入上一步得到的?mapSink
。返回一個?filterSink
,其?accept
?方法會執行?i -> { if (i > 1) mapSink.accept(i); }
。
- 執行: 循環結束,
wrapSink
?返回了最終的?filterSink
。 - 數據流動: 系統開始從源?
Spliterator
?中獲取數據(1, 2, 3),并逐個調用?filterSink.accept()
。filterSink.accept(1)
?->?1 > 1
?為 false,什么也不做。filterSink.accept(2)
?->?2 > 1
?為 true,調用?mapSink.accept(2)
?-> 調用?finalSink.accept(2 * 2)
?-> 打印 4。filterSink.accept(3)
?->?3 > 1
?為 true,調用?mapSink.accept(3)
?-> 調用?finalSink.accept(3 * 2)
?-> 打印 6。
總結
wrapSink
?是 Stream API 實現高性能的核心機制之一。它不是一個簡單的工具方法,而是將聲明式的操作鏈表轉換為高效的、融合的執行計劃的“編譯器”。
通過從后向前的遍歷和責任鏈模式的運用,它將多個獨立的中間操作邏輯“編織”到一個單一的?Sink
?對象中,使得數據可以在一次遍歷中完成所有處理,極大地減少了中間狀態的存儲和方法調用的開銷。理解了?wrapSink
,就理解了 Java Stream 運行時的精髓。
copyIntoWithCancel
這個方法是處理 短路操作(Short-Circuiting Operations) 的核心,比如?findFirst
,?anyMatch
,?limit
?等。
在 Stream 流水線中,大部分操作(如?map
,?filter
)需要處理完所有元素。但有些操作希望能提前終止,一旦滿足某個條件就不再處理后續元素,這就是“短路”。
findFirst()
: 找到第一個元素后就應該立即停止。limit(n)
: 處理完 n 個元素后就應該立即停止。anyMatch(p)
: 找到任何一個匹配的元素后就應該立即停止。
copyIntoWithCancel
?的核心作用就是為支持短路操作提供一個高效的數據遍歷和推送機制。
它的設計思想是:在數據從源頭(Spliterator
)流向?Sink
?的過程中,每處理完一個元素,就通過?Sink.cancellationRequested()
?方法檢查下游是否發出了“取消”信號。如果收到了取消信號,就立即停止遍歷,不再從?Spliterator
?中拉取更多的數據。
這與非短路操作的?copyInto
?方法形成對比,后者通常會使用?spliterator.forEachRemaining(wrappedSink)
?一次性將所有元素推送給?Sink
,效率更高,但無法中途停止。
final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator)
final
: 此方法不可被子類重寫,是框架的核心穩定部分。<P_IN>
: 泛型參數,代表?wrappedSink
?能接受的元素類型,也就是數據源?Spliterator
?提供的元素類型。boolean
: 返回值。返回?true
?表示遍歷是因為收到了取消請求而提前終止的;返回?false
?表示遍歷是正常完成的(所有元素都被處理了)。Sink<P_IN> wrappedSink
: 經過?wrapSink
?方法包裝后的最終?Sink
。它內部已經融合了流水線上所有中間操作的邏輯。Spliterator<P_IN> spliterator
: 數據源的?Spliterator
。copyIntoWithCancel
?將從它這里拉取數據。
代碼分析
// ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {@SuppressWarnings("rawtypes")AbstractPipeline p = AbstractPipeline.this;while (p.depth > 0) {p = p.previousStage;}wrappedSink.begin(spliterator.getExactSizeIfKnown());boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);wrappedSink.end();return cancelled;}
// ... existing code ...
-
AbstractPipeline p = AbstractPipeline.this;
?while (p.depth > 0) { p = p.previousStage; }
- 這段代碼的目的是找到流水線的源頭階段(Source Stage)。
AbstractPipeline
?實例構成一個鏈表,depth
?屬性表示當前階段距離源頭的距離。源頭的?depth
?為 0。- 通過?
p = p.previousStage
?不斷回溯,循環結束后,變量?p
?就指向了鏈表的頭部,即代表數據源的那個?AbstractPipeline
?實例。 - 為什么需要找到源頭??因為遍歷的邏輯(特別是針對不同數據類型,如?
int
,?long
,?Object
?的遍歷)是與源頭的?StreamShape
?相關的。forEachWithCancel
?是一個抽象方法,其具體實現位于?ReferencePipeline
,?IntPipeline
?等具體的?Pipeline
?子類中,而調用哪個實現版本取決于源頭的類型。
-
wrappedSink.begin(spliterator.getExactSizeIfKnown());
- 這是?
Sink
?協議的一部分。在開始向?Sink
?推送數據之前,必須調用?begin()
?方法。 - 它會通知?
Sink
?準備接收數據,并可選地告知預計的元素數量。對于短路操作,這個數量可能不準,但協議要求必須調用。
- 這是?
-
boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
- 這是整個方法的核心。
- 它調用了源頭階段?
p
?的?forEachWithCancel
?方法。 forEachWithCancel
?是一個抽象方法,由具體的?Pipeline
?實現(如?ReferencePipeline
)提供。它的職責是:- 從?
spliterator
?中逐個拉取元素。 - 將每個元素傳遞給?
wrappedSink.accept()
。 - 在處理完每個元素后,檢查?
wrappedSink.cancellationRequested()
。 - 如果返回?
true
,則立即停止遍歷,并向上返回?true
。
- 從?
cancelled
?變量記錄了遍歷是否被提前取消。
-
wrappedSink.end();
Sink
?協議的另一部分。在所有數據推送完畢(無論是正常結束還是被取消)后,必須調用?end()
?方法。- 這會通知?
Sink
?數據流結束,可以進行一些最終處理,比如?limit
?操作可以丟棄多余的元素,findFirst
?可以標記已找到結果。
-
return cancelled;
- 將?
forEachWithCancel
?的結果返回,告知上游調用者(通常是終端操作的?evaluate
?方法)數據處理是否被短路了。
- 將?
forEachWithCancel
?方法
copyIntoWithCancel
?將核心的遍歷邏輯委托給了?forEachWithCancel
。我們來看一下?ReferencePipeline
?中?forEachWithCancel
?的典型實現:
// In ReferencePipeline.java
@Override
final boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink) {boolean cancelled = false;// tryAdvance returns false when there are no more elements// sink.cancellationRequested() returns true when short-circuiting is requestedwhile (!cancelled && spliterator.tryAdvance(e -> sink.accept(e))) {cancelled = sink.cancellationRequested();}return cancelled;
}
這段代碼非常清晰地展示了短路機制:
while
?循環的條件是?!cancelled
?并且?spliterator.tryAdvance(...)
。spliterator.tryAdvance(e -> sink.accept(e))
:嘗試從?Spliterator
?獲取一個元素,如果成功,就立即通過 lambda 表達式將其傳遞給?sink.accept()
。如果?Spliterator
?中沒有更多元素了,tryAdvance
?返回?false
,循環正常結束。cancelled = sink.cancellationRequested();
:在?accept
?調用之后,立刻檢查?Sink
?是否請求取消。如果?limit(5)
?的?Sink
?已經收到了 5 個元素,它的?cancellationRequested()
?就會返回?true
。cancelled
?變量被設為?true
?后,下一次?while
?循環的條件?!cancelled
?就不滿足了,循環終止,即使?Spliterator
?中還有很多元素。
調用時機與完整流程
copyIntoWithCancel
?在?copyInto
?方法內部被調用,而?copyInto
?又是在終端操作的?evaluate
?方法中被觸發的。
完整流程示例:Stream.of(1,2,3,4,5).limit(2).forEach(...)
- 構建: 創建?
AbstractPipeline
?鏈表。 - 觸發: 調用終端操作?
forEach
。 - 包裝 Sink:?
forEach
?內部調用?wrapSink
,將?limit(2)
?的邏輯和?forEach
?的邏輯包裝成一個?wrappedSink
。limit(2)
?的?Sink
?會有一個計數器。它的?cancellationRequested()
?在計數器達到 2 之前返回?false
,達到 2 之后返回?true
。
- 調用 copyInto:?
forEach
?的?evaluate
?方法調用?wrapAndCopyInto
,后者再調用?copyInto
。 - 進入短路分支:?
copyInto
?內部檢查到?StreamOpFlag.SHORT_CIRCUIT
?標志位(由?limit
?操作設置),于是調用?copyIntoWithCancel(wrappedSink, spliterator)
。 - 執行?
copyIntoWithCancel
:- 找到源頭階段。
- 調用?
wrappedSink.begin()
。 - 調用源頭階段的?
forEachWithCancel
。 forEachWithCancel
?開始循環:tryAdvance
?獲取元素?1
,調用?wrappedSink.accept(1)
。limit
?的?Sink
?計數器變為 1。cancellationRequested()
?返回?false
。tryAdvance
?獲取元素?2
,調用?wrappedSink.accept(2)
。limit
?的?Sink
?計數器變為 2。cancellationRequested()
?現在返回?true
。cancelled
?變量被設為?true
。while
?循環終止。
forEachWithCancel
?返回?true
。- 調用?
wrappedSink.end()
。 copyIntoWithCancel
?返回?true
。
總結
copyIntoWithCancel
?是 Stream API 實現**短路(short-circuiting)**操作的關鍵樞紐。它通過一個清晰的協議——在推送每個元素后檢查?Sink
?的取消狀態——實現了對數據處理流程的提前終止。
它本身并不執行遍歷,而是負責:
- 定位到正確的?
Pipeline
?實現(源頭階段)。 - 遵循?
Sink
?的?begin/end
?協議。 - 委托帶有取消檢查的遍歷任務給特定于數據類型的?
forEachWithCancel
?方法。
這種設計將框架流程控制(copyIntoWithCancel
)與具體遍歷邏輯(forEachWithCancel
)解耦,是 Stream 內部優雅設計的一個典范。
為什么通過循環找源頭?sourceStage
不是保存了嗎?
原因在于代碼的演進、分段執行(Pipeline Slicing)的復雜性以及對?depth
?和?combinedFlags
?字段的重新計算。
讓我們來看?sourceSpliterator
?這個關鍵方法,它揭示了更深層次的原因:
// ... existing code ...@SuppressWarnings("unchecked")protected Spliterator<?> sourceSpliterator(int terminalFlags) {// ... (獲取初始 spliterator) ...if (isParallel() && hasAnyStateful()) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;if (p.opIsStateful()) {depth = 0;// ... (省略一些標志位操作) ...spliterator = p.opEvaluateParallelLazy(u, spliterator);// ... (省略一些標志位操作) ...}p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}// ... (應用 terminalFlags) ...return spliterator;}
// ... existing code ...
這個方法揭示了一個非常重要的機制:在并行流遇到有狀態操作(Stateful Operation, 如?sorted
,?distinct
)時,流水線會被“切片”執行。
-
流水線切片(Pipeline Slicing): 當一個并行流包含有狀態操作時,執行過程會被分為多個段(Segment)。每個有狀態操作都會成為一個段的終點。前一個段會先被完整地執行(
opEvaluateParallelLazy
),其結果(一個新的?Spliterator
)會成為下一個段的輸入。 -
depth
?和?combinedFlags
?的動態修改: 請注意?sourceSpliterator
?方法中的?for
?循環。在這個循環里,它會遍歷從?sourceStage
?到當前階段?this
?之間的所有 stage。當遇到一個有狀態操作時,它會:- 立即執行到這個有狀態操作為止的所有上游操作?(
spliterator = p.opEvaluateParallelLazy(u, spliterator);
)。 - 重置?
depth
?(depth = 0;
)。這意味著這個有狀態操作成為了一個新的“源頭”。 - 重新計算后續階段的?
depth
?和?combinedFlags
?(p.depth = depth++; p.combinedFlags = ...
)。
- 立即執行到這個有狀態操作為止的所有上游操作?(
在調用終端操作時,sourceSpliterator
?方法可能會動態地修改流水線中各個階段的內部狀態,特別是?depth
?字段。一個原本?depth > 0
?的階段,在經過?sourceSpliterator
?的處理后,其?depth
?可能變為?0
,因為它成了一個新分段的“源頭”。
copyIntoWithCancel
?中的那個?循環的真正目的,是為了找到當前執行上下文中的“有效源頭”。
- 對于簡單的順序流或無狀態并行流,
sourceStage
?確實就是最終的源頭,它的?depth
?始終是?0
。此時,循環和直接使用?sourceStage
?效果一致。 - 但對于被切片后的有狀態并行流,
this
?所屬的那個分段的“源頭”可能不再是最初的?sourceStage
,而是一個中間的有狀態操作階段。sourceSpliterator
?方法已經把那個階段的?depth
?修改為了?0
。
因此,while (p.depth > 0)
?這個循環是一個健壯的、不依賴于?sourceStage
?字段的、根據當前(可能已被修改過的)depth
?狀態來查找當前分段源頭的機制。它從?this
?開始,沿著?previousStage
?鏈回溯,直到找到第一個?depth
?為?0
?的階段——這才是它當前需要依賴的那個“源頭”,并調用它的?forEachWithCancel
?方法。
copyInto
?
它是 Stream 流水線執行的核心驅動方法之一,負責將數據從源頭(Spliterator
)推送至最終的?Sink
。
copyInto
?的核心作用是啟動并管理整個數據流的處理過程。在 Stream 的懶加載模型中,所有的中間操作都只是構建了一個操作鏈(AbstractPipeline
),并沒有實際處理數據。當終端操作被調用時,就需要一個機制來“啟動引擎”,讓數據真正地流動起來。copyInto
?就是這個引擎的啟動器。
它的設計思想是區分對待兩種不同類型的流水線:
- 非短路流水線(Non-Short-Circuit): 對于那些需要處理所有元素的操作(如?
map
,?filter
,?collect
),copyInto
?會采用最高效的方式,即調用?spliterator.forEachRemaining(wrappedSink)
,一次性地將所有數據從?Spliterator
?推送到?Sink
。 - 短路流水線(Short-Circuit): 對于那些可能提前結束的操作(如?
findFirst
,?anyMatch
,?limit
),copyInto
?會將任務委托給一個專門的方法?copyIntoWithCancel
。這個方法會逐個處理元素,并在每一步檢查是否需要提前終止。
通過這種方式,copyInto
?充當了一個分發器(Dispatcher),根據流水線的特性(是否包含短路操作)選擇最優的執行策略。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator)
final
: 此方法不可被子類重寫,是框架的核心穩定部分。<P_IN>
: 泛型參數,代表?wrappedSink
?能接受的元素類型,也就是數據源?Spliterator
?提供的元素類型。void
: 這個方法沒有返回值。它的職責是執行一個動作(將數據從 spliterator 拷貝到 sink),而不是計算一個結果。最終的結果是由?Sink
?自身來構建和持有的。Sink<P_IN> wrappedSink
: 經過?wrapSink
?方法包裝后的最終?Sink
。它內部已經融合了流水線上所有中間操作的邏輯。Spliterator<P_IN> spliterator
: 數據源的?Spliterator
。copyInto
?將從它這里拉取數據。
代碼解析
// ... existing code ...@Overridefinal <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else {copyIntoWithCancel(wrappedSink, spliterator);}}
// ... existing code ...
-
Objects.requireNonNull(wrappedSink);
- 標準的非空檢查,確保下游的?
Sink
?是有效的。
- 標準的非空檢查,確保下游的?
-
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags()))
- 這是核心的判斷邏輯。它檢查整個流水線的?
combinedFlags
?中是否包含?SHORT_CIRCUIT
?標志。 getStreamAndOpFlags()
?返回當前流水線所有操作標志的組合。StreamOpFlag.SHORT_CIRCUIT
?是一個標志位,像?limit()
,?findFirst()
,?anyMatch()
?等操作會把它加入到流水線的標志中。- 如果不包含這個標志,說明這是一個需要處理所有元素的普通流水線,進入?
if
?分支。
- 這是核心的判斷邏輯。它檢查整個流水線的?
-
非短路分支(
if
?塊)wrappedSink.begin(spliterator.getExactSizeIfKnown());
- 調用?
Sink
?的?begin
?方法,通知它數據即將開始流動。如果源?Spliterator
?的大小已知,就傳遞給?Sink
,這有助于?Sink
(比如?Collectors.toList()
)預分配內存,提高效率。
- 調用?
spliterator.forEachRemaining(wrappedSink);
- 這是最高效的數據推送方式。
forEachRemaining
?會遍歷?Spliterator
?中所有剩余的元素,并將每個元素傳遞給?wrappedSink
?的?accept
?方法。這個調用是阻塞的,直到所有元素處理完畢。
- 這是最高效的數據推送方式。
wrappedSink.end();
- 所有元素都處理完后,調用?
Sink
?的?end
?方法,通知它數據流結束,可以進行最后的處理(比如?Collector
?的?finisher
?操作)。
- 所有元素都處理完后,調用?
-
短路分支(
else
?塊)else { copyIntoWithCancel(wrappedSink, spliterator); }
- 如果流水線中包含?
SHORT_CIRCUIT
?標志,copyInto
?不會自己處理,而是將所有參數(wrappedSink
?和?spliterator
)直接委托給?copyIntoWithCancel
?方法。 copyIntoWithCancel
?內部實現了逐個元素處理并檢查取消信號的邏輯,我們之前已經詳細分析過它。
調用時機與完整流程
copyInto
?通常由?wrapAndCopyInto
?方法調用,而?wrapAndCopyInto
?是終端操作(如?forEach
,?reduce
,?collect
)執行其?evaluateSequential
?邏輯時的最終步驟。
完整流程示例:Stream.of(1, 2, 3).map(i -> i * 2).collect(Collectors.toList())
- 構建: 創建?
AbstractPipeline
?鏈表 (source
?->?map
)。 - 觸發: 調用終端操作?
collect
。 - 執行:?
collect
?的?evaluateSequential
?方法被調用。 - 準備 Sink:?
collect
?方法會創建一個?Sink
,這個?Sink
?的邏輯是把元素添加到一個?ArrayList
?中。 - 調用?
wrapAndCopyInto
:?evaluateSequential
?內部調用?wrapAndCopyInto(listSink, spliterator)
。 - 包裝 Sink:?
wrapAndCopyInto
?首先調用?wrapSink(listSink)
。wrapSink
?會將?listSink
?用?map
?操作的邏輯包裝起來,返回一個?mapSink
。mapSink.accept(i)
?的效果是?listSink.accept(i * 2)
。 - 調用?
copyInto
:?wrapAndCopyInto
?接著調用?copyInto(mapSink, spliterator)
。 - 進入非短路分支:?
copyInto
?檢查流水線標志,發現沒有?SHORT_CIRCUIT
。- 調用?
mapSink.begin(...)
。 - 調用?
spliterator.forEachRemaining(mapSink)
。forEachRemaining
?從?spliterator
?取出?1
,調用?mapSink.accept(1)
?->?listSink.accept(2)
。forEachRemaining
?從?spliterator
?取出?2
,調用?mapSink.accept(2)
?->?listSink.accept(4)
。forEachRemaining
?從?spliterator
?取出?3
,調用?mapSink.accept(3)
?->?listSink.accept(6)
。
- 調用?
mapSink.end()
。
- 調用?
- 返回結果:?
wrapAndCopyInto
?返回最初的?listSink
,collect
?操作從這個?Sink
?中獲取最終的?List
?結果?[2, 4, 6]
。
總結
copyInto
?是連接 Stream?聲明式構建和命令式執行的關鍵橋梁。它位于執行路徑的核心位置,扮演著一個智能調度員的角色。
- 它通過檢查?
SHORT_CIRCUIT
?標志,為流水線選擇了最優的執行路徑。 - 對于非短路操作,它使用?
forEachRemaining
?進行高效的批量處理。 - 對于短路操作,它委托給專門的?
copyIntoWithCancel
?方法進行精細的、可中斷的控制。
理解?copyInto
?的分發邏輯,是理解 Stream 如何根據不同操作的特性來優化其執行過程的重要一步。
wrapAndCopyInto
wrapAndCopyInto
?的核心作用是封裝(Wrap)并執行(CopyInto)。它是許多終端操作(Terminal Operation)在順序執行(Sequential Execution)時的入口點。
當一個終端操作(如?reduce
,?collect
,?forEach
)被調用時,它提供了一個最終的“目的地”——Sink
,這個?Sink
?知道如何處理最終的元素并產生結果。但是,數據從源頭流到這個最終?Sink
?之前,需要經過所有中間操作(如?map
,?filter
)的處理。
wrapAndCopyInto
?的設計思想就是:
- 封裝 (Wrap): 接收終端操作提供的原始?
Sink
,然后從流水線的末端開始,反向用每一個中間操作的邏輯去“包裝”這個?Sink
,最終形成一個包含了所有操作邏輯的、完整的?wrappedSink
。 - 執行 (CopyInto): 將這個封裝好的?
wrappedSink
?和數據源?Spliterator
?交給?copyInto
?方法,由?copyInto
?啟動數據流,將數據從源頭一個個推送到?wrappedSink
?中進行處理。
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator)
<P_IN>
: 方法級的泛型參數。代表整個流水線輸入端的元素類型,即源頭?Spliterator
?提供的元素類型。<S extends Sink<E_OUT>>
: 方法級的泛型參數。S
?代表傳入的?sink
?參數的具體類型。E_OUT
?是?AbstractPipeline
?類的泛型參數,代表當前流水線階段的輸出元素類型。- 這個約束保證了傳入的?
sink
?必須能消費當前階段的輸出。
S sink
: 參數,由終端操作提供,是數據流的最終目的地。例如,對于?collect(Collectors.toList())
,它就是一個能將元素添加到列表中的?Sink
。Spliterator<P_IN> spliterator
: 參數,數據源的?Spliterator
。- 返回?
S
: 方法返回傳入的那個原始?sink
?對象。這一點非常重要,因為它允許調用方在?wrapAndCopyInto
?執行完畢后,能從這個?sink
?對象中提取最終結果(例如,通過調用?sink.build()
?或?sink.get()
)。
代碼解析
// ... existing code ...@Overridefinal <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);return sink;}@Override
// ... existing code ...
這個方法的實現非常簡潔,但每一步都至關重要,它將兩個核心操作串聯了起來:
-
wrapSink(Objects.requireNonNull(sink))
: 這是第一步,即“封裝”階段。Objects.requireNonNull(sink)
:確保傳入的終端?Sink
?不為空。wrapSink(...)
:這是個關鍵的輔助方法。它會從當前?Pipeline
?階段開始,沿著?previousStage
?鏈反向回溯,直到源頭。在回溯的每一步,它都會調用當前階段的?opWrapSink
?方法,將?Sink
?用當前操作的邏輯包裝一層。- 比喻:想象一個俄羅斯套娃。
wrapSink
?從最里面的小娃娃(終端?Sink
)開始,一層層地往外套上更大的娃娃(中間操作的?Sink
),最終返回最外層的那個大娃娃。這個大娃娃就是?wrappedSink
。
-
copyInto(wrappedSink, spliterator)
: 這是第二步,即“執行”階段。- 它接收上一步構建好的、包含了所有流水線邏輯的?
wrappedSink
。 - 然后調用?
copyInto
?方法,將數據源?spliterator
?和?wrappedSink
?連接起來,啟動數據流。 copyInto
?內部會判斷流水線是否包含短路操作,并選擇合適的遍歷策略(forEachRemaining
?或?copyIntoWithCancel
)。
- 它接收上一步構建好的、包含了所有流水線邏輯的?
-
return sink;
:- 在?
copyInto
?執行完畢后(意味著所有數據都處理完了),方法返回最初傳入的、未被包裝的那個?sink
。 - 調用者(終端操作)持有這個原始?
sink
?的引用,現在可以安全地從中獲取計算結果了。
- 在?
調用時機與完整流程
wrapAndCopyInto
?主要在順序執行的終端操作中被調用。一個典型的例子是?collect
?操作。
完整流程示例:List.of("a", "b", "c").stream().map(String::toUpperCase).collect(Collectors.toList())
- 觸發: 調用終端操作?
collect
。 - 執行?
evaluate
:?collect
?內部會調用?evaluate(terminalOp)
。 - 選擇路徑:?
evaluate
?方法發現是順序執行 (isParallel()
?為 false),于是調用?terminalOp.evaluateSequential(this, sourceSpliterator(...))
。 evaluateSequential
:?CollectOp
(collect
?的實現)的?evaluateSequential
?方法會創建一個用于收集元素的?Sink
(我們稱之為?listSink
),然后調用?helper.wrapAndCopyInto(listSink, spliterator)
。這里的?helper
?就是?map
?操作所在的?Pipeline
?實例。- 進入?
wrapAndCopyInto
:wrapSink(listSink)
?被調用:wrapSink
?發現?map
?操作是它的上一個階段。- 它調用?
map
?操作的?opWrapSink
?方法,用?map
?的邏輯(s -> s.toUpperCase()
)包裝?listSink
,生成一個?mapSink
。 wrapSink
?返回這個?mapSink
。
copyInto(mapSink, spliterator)
?被調用:copyInto
?啟動數據流。- 它從?
spliterator
?中取出 "a",交給?mapSink.accept("a")
。mapSink
?內部執行 "A",然后調用?listSink.accept("A")
。 - 這個過程對 "b" 和 "c" 重復。
- 返回?
listSink
:?wrapAndCopyInto
?執行完畢,將原始的?listSink
?返回給?CollectOp
。
- 獲取結果:?
CollectOp
?從?listSink
?中獲取最終的列表?["A", "B", "C"]
?并返回。
總結
wrapAndCopyInto
?是 Stream API 內部一個設計精巧的執行協調器。它本身不處理任何業務邏輯,而是通過精確地調用?wrapSink
?和?copyInto
,完美地將操作鏈的構建和數據的實際流動解耦并銜接起來。
- 職責單一: 它的職責就是“封裝然后執行”。
- 流程清晰:?
wrapSink
?負責準備處理器,copyInto
?負責輸送數據。 - 結果導向: 通過返回原始?
Sink
,使得調用方能方便地獲取最終結果。
理解?wrapAndCopyInto
?的工作方式,是深入理解 Stream 順序執行模型的關鍵。
evaluateToArrayNode
evaluateToArrayNode
?的核心作用是作為?toArray
?操作的專用執行引擎。它封裝了將流轉換為?Node
?的所有邏輯,特別是針對并行流中包含有狀態操作(stateful operation)的場景,它實現了一個重要的優化。
其設計思想是:
- 提供統一入口:為?
toArray
?提供一個比通用?evaluate(TerminalOp)
?更直接、更專門的入口點。 - 并行優化:識別出一種可以優化的特定并行場景(流水線末端是單個有狀態操作),并為其提供一條“快速通道”,避免不必要的數據收集和復制步驟。
- 代碼復用:在通用場景下,它會委托給更底層的?
evaluate(spliterator, flatten, generator)
?方法,復用其核心的“物化”邏輯。
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator)
final
: 此方法不可被子類重寫,是框架的核心穩定部分。Node<E_OUT>
: 返回值。E_OUT
?是當前流水線階段(也就是最后一個階段)的輸出元素類型。返回一個包含了所有流元素的?Node
?對象。IntFunction<E_OUT[]> generator
: 一個函數,用于創建指定類型的數組,例如?String[]::new
。這是?toArray
?操作所必需的,因為它需要知道如何創建最終的數組容器。
代碼邏輯深度解析
// ... existing code ...@SuppressWarnings("unchecked")final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;// If the last intermediate operation is stateful then// evaluate directly to avoid an extra collection stepif (isParallel() && previousStage != null && opIsStateful()) {// Set the depth of this, last, pipeline stage to zero to slice the// pipeline such that this operation will not be included in the// upstream slice and upstream operations will not be included// in this slicedepth = 0;return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);}else {return evaluate(sourceSpliterator(0), true, generator);}}
// ... existing code ...
if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
這是所有終端操作的標準起始步驟。它確保一個流實例只能被消費一次,然后將?linkedOrConsumed
?標志位置為?true
,防止后續操作。
特殊并行路徑 (Optimized Parallel Path -?if
?塊)
這是該方法中最精妙的部分,一個針對特定場景的性能優化。
if (isParallel() && previousStage != null && opIsStateful()) {// ...
}
觸發條件:
isParallel()
: 流必須是并行的。previousStage != null
: 當前流水線至少有兩個階段(源+一個操作)。opIsStateful()
:?當前(也就是最后一個)中間操作必須是有狀態的。例如?sorted()
,?distinct()
。
場景示例:?stream.filter(...).sorted().toArray()
。當執行?toArray()
?時,sorted()
?就是那個“最后一個有狀態的中間操作”。
為什么需要優化?
常規的并行執行模型遇到有狀態操作時,會將其視為一個“屏障”。它會先并行執行屏障之前的所有操作,將結果收集到一個中間?Node
?中,然后再對這個中間?Node
?執行有狀態操作。如果?toArray()
?再走一遍這個流程,就會變成:
并行 filter -> 中間 Node1 -> 并行 sorted -> 中間 Node2 -> toArray 從 Node2 創建數組
這中間的?Node2
?是多余的。我們完全可以直接讓?sorted()
?操作的結果直接寫入最終的數組。
優化實現:
depth = 0;
- 這是一個非常巧妙的技巧。通過將當前階段(
toArray
?偽階段)的深度設置為0,它有效地將流水線“切片”。toArray
?之前的?sorted()
?操作現在被視為一個獨立的、完整的流水線。
- 這是一個非常巧妙的技巧。通過將當前階段(
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
opEvaluateParallel
?是一個抽象方法,由有狀態操作(如?SortedOps
)自己實現。- 這個調用相當于對?
sorted()
?操作說:“請你用并行的方式執行,但不要把結果收集到你自己的中間?Node
?里,而是直接使用我(toArray
)提供的?generator
?來創建并填充最終的?Node
。” - 這樣,執行流就變成了:?
并行 filter -> 中間 Node1 -> sorted() 直接并行排序并寫入最終 Node -> toArray 從最終 Node 創建數組
- 這就避免了創建?
Node2
?的開銷,減少了一次大規模的數據收集和可能的復制。
通用路徑 (General Path -?else
?塊)
如果不滿足上述特殊并行場景的條件(例如,是順序流,或者最后一個操作是無狀態的如?map
),則會進入這個更通用的路徑。
else {return evaluate(sourceSpliterator(0), true, generator);
}
實現:
sourceSpliterator(0)
: 獲取流的源?Spliterator
。參數?0
?表示終端操作本身沒有附加的標志。evaluate(..., true, ...)
: 直接調用我們之前分析過的那個底層的“工兵”方法?evaluate
。- 第一個參數: 數據源?
Spliterator
。 - 第二個參數?
flatten
: 設置為?true
。這至關重要,它告訴?evaluate
?方法,即使在并行計算中產生了?Node
?樹,也必須在返回前將其“拍平”為一個包含所有元素的、單一的、由數組支持的?Node
。這正是?toArray
?所需要的結果。 - 第三個參數?
generator
: 將?toArray
?提供的數組生成器傳遞下去,供?evaluate
?在創建最終?Node
?時使用。
- 第一個參數: 數據源?
總結
evaluateToArrayNode
?是一個精心設計的、專用于?toArray
?的執行方法。它不僅僅是一個簡單的包裝,其核心價值在于:
- 識別并優化了一個關鍵的并行場景:通過“切片”流水線和調用?
opEvaluateParallel
,它避免了在“有狀態操作 +?toArray
”組合下的重復數據收集,提升了性能。 - 無縫銜接通用邏輯:在不滿足優化條件時,它能平滑地回退到通用的?
evaluate
?方法,復用其健壯的流物化能力,并確保通過?flatten=true
?參數獲得?toArray
?所需的扁平化?Node
?結果。
這個方法的設計完美體現了在通用框架中嵌入專用優化的思想,是理解 Stream API 高性能實現的一個絕佳范例。
sourceStageSpliterator
sourceStageSpliterator()
?的核心作用是為流管道的源頭階段(Source Stage)提供一個一次性的、安全的獲取其?Spliterator
?的方式。
其設計思想是:
- 所有權和封裝:流的源?
Spliterator
?是整個管道的生命線。它被封裝在源頭階段(sourceStage
)中,不能被管道中的任意階段隨意訪問。 - 單一職責:此方法只做一件事——從源頭階段取出?
Spliterator
。它不關心并行、有狀態操作等復雜的流水線處理邏輯。 - 狀態強制轉換:調用此方法是一個明確的“消費”動作。一旦成功調用,流就被認為是已消費,其內部的?
sourceSpliterator
?或?sourceSupplier
?會被清空,防止重復消費。
final Spliterator<E_OUT> sourceStageSpliterator()
final
: 此方法不可被子類重寫,保證其行為在整個框架中的一致性和穩定性。Spliterator<E_OUT>
: 返回值。E_OUT
?在這里特指源頭階段的輸出類型。它返回流最開始的那個?Spliterator
。- 無參數: 該方法不需要任何外部輸入,它的所有操作都基于當前?
AbstractPipeline
?實例的狀態。
代碼邏輯深度解析
// ... existing code ...final Spliterator<E_OUT> sourceStageSpliterator() {// Ensures that this method is only ever called on the sourceStageif (this != sourceStage)throw new IllegalStateException();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;if (sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSpliterator;sourceSpliterator = null;return s;}else if (sourceSupplier != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSupplier.get();sourceSupplier = null;return s;}else {throw new IllegalStateException(MSG_CONSUMED);}}
// ... existing code ...
調用者身份驗證
// Ensures that this method is only ever called on the sourceStage
if (this != sourceStage)throw new IllegalStateException();
這是此方法最關鍵的檢查。sourceStage
?字段在流水線構建時,會從第一個階段一直傳遞到最后一個階段,它始終指向流水線的源頭。
this
: 代表調用?sourceStageSpliterator()
?方法的當前?AbstractPipeline
?對象。sourceStage
: 指向流管道的第一個階段(源頭)。
這個檢查強制規定:只有源頭階段對象自己才能調用這個方法來獲取?Spliterator
。管道中的任何中間階段(如?map
,?filter
)都無權調用此方法,這保證了數據源的封裝性和安全性。
流狀態檢查
if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
這是標準的流消費檢查。
if (linkedOrConsumed)
: 檢查流是否已經被操作過或關閉。由于這個方法只能在源頭階段調用,這里的?linkedOrConsumed
?實際上檢查的是源頭階段的狀態。linkedOrConsumed = true
: 如果檢查通過,立即將流標記為已消費。這是一個原子性的“檢查并設置”操作,確保一旦?Spliterator
?被取出,流就不能再被用于任何其他操作。
Spliterator
?提取邏輯
Stream 的源可以由一個?Spliterator
?直接提供,也可以由一個?Supplier<Spliterator>
?延遲提供。這部分代碼處理了這兩種情況。
-
Case 1: 直接提供?
Spliterator
if (sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSpliterator;sourceSpliterator = null; // 清空引用,防止重復獲取return s; }
如果?
sourceSpliterator
?字段不為空,說明流是由一個現成的?Spliterator
?創建的。代碼會:- 將其強制轉換為正確的泛型類型。
- 將?
sourceSpliterator
?字段置為?null
。這是關鍵的“消費”動作,確保這個?Spliterator
?實例不會被再次取出。 - 返回取出的?
Spliterator
。
-
Case 2: 延遲提供?
Spliterator
else if (sourceSupplier != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSupplier.get();sourceSupplier = null; // 清空引用,防止重復獲取return s; }
如果?
sourceSupplier
?字段不為空,說明流是由一個?Supplier
?創建的。代碼會:- 調用?
sourceSupplier.get()
?來生成一個新的?Spliterator
?實例。 - 將?
sourceSupplier
?字段置為?null
。同樣是關鍵的“消費”動作,確保不能通過這個?Supplier
?再次生成?Spliterator
。 - 返回新生成的?
Spliterator
。
- 調用?
-
Case 3: 異常情況
else {throw new IllegalStateException(MSG_CONSUMED); }
如果?
sourceSpliterator
?和?sourceSupplier
?都為?null
,這意味著流已經被消費過了(它們的引用在上次被獲取時已經被清空)。此時拋出異常,明確告知調用者流已被消費。
與?sourceSpliterator(int)
?的對比
AbstractPipeline
?中還有一個看起來很像的方法?sourceSpliterator(int terminalFlags)
。它們的主要區別在于:
-
sourceStageSpliterator()
?(本方法):- 職責: 簡單、純粹地從源頭階段獲取?
Spliterator
。 - 調用者: 只能是源頭階段自己。
- 復雜度: 非常簡單,不處理任何流水線邏輯。
- 用途: 主要被?
spliterator()
?這個終端操作的實現所使用,當用戶想把一個未消費的流轉換回?Spliterator
?時調用。
- 職責: 簡單、純粹地從源頭階段獲取?
-
sourceSpliterator(int terminalFlags)
:- 職責: 為任意階段準備用于終端操作求值的?
Spliterator
。 - 調用者: 任何一個階段在準備執行終端操作時都可以調用(通常是最后一個階段)。
- 復雜度:?非常復雜。它包含了處理并行流中有狀態操作的核心邏輯。如果流是并行的且包含有狀態操作(如?
sorted
),這個方法會先執行到最后一個有狀態操作為止的所有計算,然后返回一個代表中間結果的?Spliterator
。 - 用途: 被?
evaluate()
?和?evaluateToArrayNode()
?等所有執行終端操作的方法調用,是實際執行計算的起點。
- 職責: 為任意階段準備用于終端操作求值的?
簡單來說,sourceStageSpliterator()
?是一個“檔案管理員”,只負責從檔案室(源頭)取出原始文件(Spliterator
)。而?sourceSpliterator(int)
?是一個“項目經理”,它不僅能拿到原始文件,還能在需要時組織團隊(并行計算)完成一系列加工(有狀態操作),最后交付一份處理好的報告(中間結果的?Spliterator
)。
總結
sourceStageSpliterator()
?是一個高度專用的內部方法,其設計目標是正確性和安全性。通過嚴格的調用者檢查和狀態轉換,它確保了流的源數據只能被源頭階段自己、且僅有一次地取出。它與更復雜的?sourceSpliterator(int)
?方法形成了職責分工,共同構成了 Stream API 內部數據流轉的基石。
sourceSpliterator
這個方法是 Stream API 內部實現中最核心、最復雜的方法之一。它不僅僅是獲取源?Spliterator
,更重要的是,它扮演了啟動終端操作前“預處理”流水線的角色,尤其是處理復雜的并行流場景。可以說,理解了這個方法,就理解了 Stream 并行計算的精髓。
sourceSpliterator(int terminalFlags)
?的核心作用是:為終端操作準備一個“就緒”的?Spliterator
。
這個“就緒”的?Spliterator
?可能是:
- 對于簡單流(順序流或無狀態并行流):就是流的原始?
Spliterator
。 - 對于復雜流(有狀態并行流):是一個代表了部分計算結果的全新?
Spliterator
。
其設計思想是:
- 處理屏障(Barrier):有狀態操作(如?
sorted()
,?distinct()
)在并行流中像一個“屏障”,必須等待上游所有數據都到達并處理后,才能繼續向下游傳遞。此方法就是負責執行到最后一個屏障為止的所有計算。 - 流水線切片與重組:它通過一個循環,動態地“切片”流水線,執行一部分(直到一個有狀態操作),然后用其結果(一個新的?
Spliterator
)作為下一段流水線的輸入,并更新后續階段的元數據(如?depth
,?combinedFlags
)。 - 延遲求值(Lazy Evaluation):在處理有狀態操作時,它調用?
opEvaluateParallelLazy
,這個方法返回的?Spliterator
?封裝了并行計算的結果,但數據本身可能是延遲生成的。
protected Spliterator<?> sourceSpliterator(int terminalFlags)
protected
: 只能被?java.util.stream
?包內或其子類訪問,是內部實現細節。Spliterator<?>
: 返回一個?Spliterator
。注意這里的通配符??
,因為經過有狀態操作后,Spliterator
?的類型可能已經改變,但這個方法本身不關心具體類型,只把它當作一個數據源。int terminalFlags
: 來自終端操作的標志位(StreamOpFlag
)。例如,一個短路操作(like?findFirst
)會傳入?SHORT_CIRCUIT
?標志,這個標志會影響后續流水線的行為。
代碼邏輯深度解析
// ... existing code ...@SuppressWarnings("unchecked")protected Spliterator<?> sourceSpliterator(int terminalFlags) {// Get the source spliterator of the pipelineSpliterator<?> spliterator = null;if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;}else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;}else {throw new IllegalStateException(MSG_CONSUMED);}if (isParallel() && hasAnyStateful()) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;if (p.opIsStateful()) {depth = 0;if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {// ... (omitted for brevity)thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;}spliterator = p.opEvaluateParallelLazy(u, spliterator);// Inject or clear SIZED on the source pipeline stage// based on the stage's spliteratorthisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;}p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}if (terminalFlags != 0) {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);}return spliterator;}
// ... existing code ...
第一步:獲取原始?Spliterator
// Get the source spliterator of the pipeline
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;
}
else {throw new IllegalStateException(MSG_CONSUMED);
}
這部分邏輯與?sourceStageSpliterator()
?中的提取邏輯幾乎完全相同。它從源頭階段(sourceStage
)獲取最原始的?Spliterator
,并立即將源頭的引用(sourceSpliterator
?或?sourceSupplier
)清空,標志著流的消費正式開始。
第二步:處理并行流中的有狀態操作(核心?if
?和?for
?循環)
if (isParallel() && hasAnyStateful()) {// ... for loop ...
}
這是整個方法的核心和靈魂。只有當流是并行的,并且流水線中至少包含一個有狀態操作時,這個?if
?塊才會執行。
for
?循環詳解:?
for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage)
- 初始化:
u
?(upstream): 上游階段,初始為?sourceStage
。p
?(pipeline): 當前處理的階段,初始為源的下一個階段。e
?(end): 循環的終點,即調用?sourceSpliterator
?的那個階段(通常是最后一個階段)。
- 循環條件:?
u != e
,只要上游階段還沒到終點就繼續。 - 迭代:?
u = p, p = p.nextStage
,兩個指針一起向后移動,始終保持?u
?是?p
?的前一個階段。
循環體內部:
-
if (p.opIsStateful()) { ... }
- 循環的主要目的就是找到有狀態的操作階段?
p
。
- 循環的主要目的就是找到有狀態的操作階段?
-
depth = 0;
- 一旦找到一個有狀態操作,就將?
depth
?計數器重置為0。這是一個信號,表示我們即將創建一個新的“邏輯源頭”。
- 一旦找到一個有狀態操作,就將?
-
spliterator = p.opEvaluateParallelLazy(u, spliterator);
- 這是魔法發生的地方。調用當前有狀態操作?
p
?的?opEvaluateParallelLazy
?方法。 - 這個方法會把?
u
(代表了到目前為止的所有上游操作)和當前的?spliterator
(代表了上游的數據源)作為輸入。 - 它會立即觸發并執行從上一個屏障(或源頭)到當前屏障?
p
?之間的所有并行計算。 - 執行完成后,它返回一個新的?
Spliterator
。這個新的?Spliterator
?封裝了?p
?操作(如?sorted
)的計算結果。現在,這個新的?spliterator
?變量就成了下一段流水線的“源”。
- 這是魔法發生的地方。調用當前有狀態操作?
-
更新標志和元數據:
thisOpFlags = spliterator.hasCharacteristics(...)
:根據新生成的?Spliterator
?是否?SIZED
,來更新當前操作的標志。例如,distinct()
?可能會改變流的大小,所以需要重新評估?SIZED
?特性。p.depth = depth++;
:重新設置后續階段的深度。因為?p
?已經被“執行”并物化為一個新的?Spliterator
,所以它的下一個階段的深度就變成了1。p.combinedFlags = StreamOpFlag.combineOpFlags(...)
:基于更新后的?thisOpFlags
,重新計算后續階段的組合標志。
執行示例:?source.parallel().filter(...).sorted().map(...).toArray()
?當?toArray
?調用?sourceSpliterator
?時:
- 循環開始,
p
?指向?filter
。opIsStateful()
?為 false,跳過?if
。 - 循環繼續,
p
?指向?sorted
。opIsStateful()
?為 true,進入?if
。 - 調用?
sorted.opEvaluateParallelLazy(filterHelper, sourceSpliterator)
。 - 這會觸發?
filter
?和?sorted
?的并行計算,其結果被封裝成一個新的?spliterator_after_sorted
。 spliterator
?變量現在被更新為?spliterator_after_sorted
。sorted
?之后的?map
?階段的?depth
?和?combinedFlags
?會被更新,就好像它的上游直接就是?spliterator_after_sorted
?一樣。- 循環結束。方法返回?
spliterator_after_sorted
。
最終,toArray
?的終端操作拿到的?Spliterator
?已經是排好序的數據源了。
?第三步:合并終端操作的標志
if (terminalFlags != 0) {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
最后,將終端操作自己帶來的標志(如?SHORT_CIRCUIT
)合并到最后一個階段的?combinedFlags
?中。這會影響最終的?wrapAndCopyInto
?等方法的行為。
?與?sourceStageSpliterator()
?的對比
sourceStageSpliterator()
:?檔案管理員。職責單一,只負責從源頭取出原始?Spliterator
。簡單、安全、不涉及計算。sourceSpliterator(int)
:?項目經理/總工程師。職責復雜,負責在啟動最終任務前,解決掉所有中途的“硬骨頭”(有狀態并行操作),通過預計算和流水線重組,為最終任務提供一個干凈、就緒的數據源。
總結
sourceSpliterator(int terminalFlags)
?是 Stream API 實現高性能并行計算的引擎室。它通過一個巧妙的循環,將一個復雜的、帶有“屏障”的流水線,動態地、分段地執行,并將每一段的計算結果物化為一個新的?Spliterator
,作為下一段的輸入。這個“評估-重組”的過程,有效地解決了有狀態操作在并行環境下的數據依賴問題,是整個 Stream 框架中最為精妙和關鍵的設計之一。
wrapSpliterator
這個方法是?PipelineHelper
?接口的實現,它的核心作用是:將一個代表上游數據的?Spliterator
,通過當前流水線階段以及其所有下游階段的操作進行“包裝”,最終返回一個包含了所有后續操作邏輯的新的?Spliterator
。
簡單來說,它就是?stream.spliterator()
?這個終端操作的幕后功臣之一。當你想把一個構建好的、包含多個中間操作的 Stream 再轉換回一個?Spliterator
?時,就是這個方法在發揮作用。
wrapSpliterator
?的核心作用是將**數據源(sourceSpliterator
)和操作鏈(從當前階段到末尾)**結合起來,生成一個全新的、功能完備的?Spliterator
。
其設計思想是:
- 延遲執行 (Laziness):這個方法本身不執行任何計算。它只是創建一個新的?
Spliterator
?對象,這個對象內部“知道”需要對源數據執行哪些操作。真正的計算和轉換只會在你調用這個新?Spliterator
?的?tryAdvance
?或?forEachRemaining
?等方法時才會發生。 - 遞歸/委托:它不自己實現復雜的包裝邏輯,而是委托給一個抽象的?
wrap
?方法。這個?wrap
?方法由具體的 Stream 實現(如?ReferencePipeline
,?IntPipeline
)來提供,因為不同類型的 Stream(對象、int、long、double)其?Spliterator
?的包裝方式也不同。 - 流水線封裝:返回的?
Spliterator
?封裝了從當前階段開始的整個下游流水線。對外部調用者來說,它就像一個黑盒,你只管從里面取數據,它會自動幫你完成?map
,?filter
?等所有操作。
final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator)
final
: 此方法不可被子類重寫,是框架的核心穩定部分。<P_IN>
: 泛型參數,代表輸入?Spliterator
?的元素類型。P_IN
?代表 "Pipeline Input"。Spliterator<E_OUT>
: 返回值。返回一個新的?Spliterator
,其元素類型是?E_OUT
,即當前流水線階段的輸出類型。Spliterator<P_IN> sourceSpliterator
: 參數,代表上游提供的數據源。
代碼邏輯深度解析
// ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {if (depth == 0) {return (Spliterator<E_OUT>) sourceSpliterator;}else {return wrap(this, () -> sourceSpliterator, isParallel());}}
// ... existing code ...
基本情況 (Base Case)
if (depth == 0) {return (Spliterator<E_OUT>) sourceSpliterator;
}
depth
: 這個字段表示當前階段在流水線中的深度。depth == 0
?是一個特殊標記,它意味著當前?AbstractPipeline
?實例就是源頭階段 (Source Stage)。- 邏輯: 如果當前階段就是源頭,說明它后面沒有任何操作了。因此,不需要進行任何包裝,直接將輸入的?
sourceSpliterator
?原樣返回即可。這里的強制類型轉換是安全的,因為在源頭階段,輸入類型?P_IN
?和輸出類型?E_OUT
?必然是相同的。 - 作用: 這是遞歸包裝的終點。
遞歸包裝 (Recursive Wrapping)
else {return wrap(this, () -> sourceSpliterator, isParallel());
}
當?depth > 0
?時,意味著當前階段是一個中間操作階段(如?map
,?filter
?等)。這時就需要進行包裝。
-
wrap(...)
: 這是一個抽象方法,定義在?AbstractPipeline
?的更下方。abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,Supplier<Spliterator<P_IN>> supplier,boolean isParallel);
它由具體的子類(
ReferencePipeline
,?IntPipeline
?等)實現,因為它們才知道如何創建對應類型的包裝?Spliterator
(例如,StreamSpliterators.WrappingSpliterator
)。 -
wrap
?方法的參數:this
: 將當前?AbstractPipeline
?實例作為?PipelineHelper
?傳遞進去。這個?helper
?對象包含了從當前階段到最后一個階段的所有信息(操作鏈、標志位等)。() -> sourceSpliterator
: 將上游的?sourceSpliterator
?包裝成一個?Supplier
。這是為了支持延遲獲取。包裝后的?Spliterator
?只有在第一次被使用時,才會通過這個?Supplier
?真正拿到上游的?Spliterator
。isParallel()
: 傳遞當前流的并行狀態,以便?wrap
?方法創建出正確的(并行或順序的)Spliterator
?包裝器。
執行流程示例: 假設有?stream.map(...).filter(...)
,我們想從?map
?階段開始獲取?Spliterator
。
- 在?
map
?階段調用?wrapSpliterator(sourceSpliterator)
。 map
?階段的?depth > 0
,進入?else
?分支。- 調用?
wrap(mapPipeline, () -> sourceSpliterator, isParallel)
。 ReferencePipeline
?中實現的?wrap
?方法會創建一個?StreamSpliterators.WrappingSpliterator
。- 這個?
WrappingSpliterator
?內部持有了?mapPipeline
?這個?PipelineHelper
?和上游的?Spliterator
?的?Supplier
。 - 當用戶調用這個新?
Spliterator
?的?tryAdvance
?時,它會:
- a. 從?
Supplier
?獲取上游?Spliterator
?并調用其?tryAdvance
?得到一個元素。 - b. 通過?
mapPipeline
?找到?map
?操作的?Sink
?包裝邏輯。 - c. 將元素推入?
map
?的?Sink
,再推入?filter
?的?Sink
。 - d. 如果元素通過了所有操作,就返回?
true
。
總結
wrapSpliterator
?是連接 Stream 流水線和?Spliterator
?終端操作的關鍵橋梁。它通過一個簡潔的?if-else
?判斷,優雅地處理了兩種情況:
- 當位于流水線源頭時,它作為遞歸的終點,直接返回源?
Spliterator
。 - 當位于中間操作時,它將**“未來的操作”(由?
this
?PipelineHelper 代表)和“現在的數據源”**(由?sourceSpliterator
?代表)委托給具體的?wrap
?方法,創建一個新的、懶加載的、功能完備的?Spliterator
。
這個方法的設計充分體現了職責分離和延遲執行的思想,是 Stream API 能夠靈活地在不同表現形式(Stream
?vs?Spliterator
)之間轉換的核心機制。
spliterator()
?
這個方法是?BaseStream
?接口中定義的一個終端操作。它的作用非常直觀:將一個已經構建好(但未消費)的 Stream 流水線轉換回一個?Spliterator
。這提供了一種機制,允許用戶在構建了復雜的流操作后,不立即進行求值,而是獲取一個代表了整個流水線邏輯的?Spliterator
,以便進行更靈活或自定義的遍歷。
spliterator()
?的核心作用是將一個完整的、聲明式的 Stream 流水線物化為一個可迭代的數據源?Spliterator
。
其設計思想是:
- 延遲執行的橋梁:Stream 的中間操作是延遲執行的,終端操作觸發計算。
spliterator()
?是一個特殊的終端操作,它本身不“計算”出最終結果(如?collect
),而是將計算邏輯封裝到一個新的?Spliterator
?中。真正的計算將在遍歷這個返回的?Spliterator
?時發生。 - 互操作性:提供了從 Stream API 到更底層的?
Spliterator
?API 的一個“出口”,允許開發者將 Stream 的強大聲明性與?Spliterator
?的精細控制(如自定義分割策略)結合起來。 - 消費流:和所有終端操作一樣,調用?
spliterator()
?會消費掉這個流,使其不能再被其他操作使用。
public Spliterator<E_OUT> spliterator()
public
: 這是一個公開的 API,是?BaseStream
?接口的一部分,供所有 Stream 用戶使用。Spliterator<E_OUT>
: 返回一個?Spliterator
。E_OUT
?是當前流水線階段(也就是調用?spliterator()
?的那個 Stream 對象)的輸出元素類型。
代碼邏輯深度解析
// ... existing code ...// Primitive specialization use co-variant overrides, hence is not final@Override@SuppressWarnings("unchecked")public Spliterator<E_OUT> spliterator() {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;if (this == sourceStage) {if (sourceStage.sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;return s;}else if (sourceStage.sourceSupplier != null) {@SuppressWarnings("unchecked")Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;sourceStage.sourceSupplier = null;return lazySpliterator(s);}else {throw new IllegalStateException(MSG_CONSUMED);}}else {return wrap(this, () -> sourceSpliterator(0), isParallel());}}
// ... existing code ...
首先是標準的狀態檢查:
if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
這確保了流只能被消費一次,然后將流標記為已消費。
接下來,代碼邏輯分為兩大分支:
情況一:當前階段是源頭 (this == sourceStage
)
這個?if
?塊處理的是最簡單的情況:在一個沒有任何中間操作的原始 Stream 上調用?spliterator()
。例如?Stream.of(1, 2, 3).spliterator()
。
-
if (sourceStage.sourceSpliterator != null)
: 如果流是直接由一個?Spliterator
?創建的。- 直接獲取這個?
Spliterator
。 - 將?
sourceStage.sourceSpliterator
?置為?null
,完成消費。 - 返回該?
Spliterator
。
- 直接獲取這個?
-
else if (sourceStage.sourceSupplier != null)
: 如果流是由?Supplier<Spliterator>
?創建的。- 獲取這個?
Supplier
。 - 將?
sourceStage.sourceSupplier
?置為?null
,完成消費。 - 調用?
lazySpliterator(s)
。這是一個關鍵點,它不會立即調用?supplier.get()
,而是返回一個特殊的?Spliterator
(StreamSpliterators.LazySpliterator
),這個?LazySpliterator
?只有在第一次被使用(如調用?tryAdvance
)時,才會真正從?Supplier
?中獲取底層的?Spliterator
。這保持了懶加載的特性。
- 獲取這個?
-
else
: 如果兩者都為?null
,說明流已被消費,拋出異常。
情況二:當前階段是中間操作 (else
?塊)
這個?else
?塊處理的是更復雜、更常見的情況:在一個包含了至少一個中間操作的 Stream 上調用?spliterator()
。例如?Stream.of(1, 2, 3).map(i -> i * 2).spliterator()
。
else {return wrap(this, () -> sourceSpliterator(0), isParallel());
}
這行代碼是整個方法的核心,它做了三件事:
-
() -> sourceSpliterator(0)
: 創建一個?Supplier
。這個?Supplier
?的任務是調用我們之前深入分析過的?sourceSpliterator(0)
?方法。回憶一下,sourceSpliterator
?會負責處理并行流中的有狀態操作(屏障),并返回一個“就緒”的?Spliterator
?作為后續操作的數據源。對于順序流,它就簡單地返回原始?Spliterator
。 -
wrap(this, ..., isParallel())
: 調用?wrap
?方法。this
: 將當前?AbstractPipeline
?實例(例如,代表?map
?操作的那個對象)作為?PipelineHelper
?傳入。這個?helper
?知道從當前階段到最后一個階段的所有操作。() -> sourceSpliterator(0)
: 將準備好的數據源(通過?Supplier
?懶加載)傳入。isParallel()
: 告知?wrap
?方法要創建并行還是順序的包裝器。
-
返回結果:?
wrap
?方法會返回一個全新的、經過包裝的?Spliterator
(例如?StreamSpliterators.WrappingSpliterator
)。這個?WrappingSpliterator
?內部就同時持有了數據源(來自?sourceSpliterator
)和操作鏈(來自?this
?這個?PipelineHelper
)。當遍歷這個新的?Spliterator
?時,它會從數據源取出一個元素,然后依次應用流水線上的所有操作。
與內部?wrapSpliterator
?的關系
spliterator()
?的?else
?塊與我們之前分析的?wrapSpliterator
?的?else
?塊非常相似。
spliterator()
:?return wrap(this, () -> sourceSpliterator(0), isParallel());
wrapSpliterator(...)
:?return wrap(this, () -> sourceSpliterator, isParallel());
它們的主要區別在于?wrap
?方法的第二個參數,即數據源?Supplier
:
spliterator()
?使用?() -> sourceSpliterator(0)
。它調用的是復雜的?sourceSpliterator(int)
?方法。這是因為它作為終端操作,必須處理整個流水線的復雜性,特別是并行流中的有狀態操作。它需要一個能代表“預計算”結果的?Spliterator
。wrapSpliterator(...)
?使用?() -> sourceSpliterator
?(這里的?sourceSpliterator
?是?wrapSpliterator
?的參數)。它接收一個已經準備好的上游?Spliterator
。它的職責相對簡單,只是將這個現成的數據源和下游操作包裝起來,不負責處理上游的屏障問題。
簡單說,spliterator()
?是面向用戶的、發起整個封裝過程的入口;而?wrapSpliterator
?是內部遞歸包裝過程中的一個環節。
總結
public spliterator()
?方法是 Stream API 提供的一個功能強大的“逃生艙口”。它允許用戶在任意階段將一個聲明式的、懶加載的 Stream 流水線,轉換成一個具體的、可迭代的?Spliterator
?對象。
- 對于源 Stream,它直接或懶加載地返回原始的?
Spliterator
。 - 對于帶有中間操作的 Stream,它通過調用核心的?
wrap
?和?sourceSpliterator
?方法,將數據源的獲取邏輯(可能包含預計算)和操作鏈的執行邏輯完美地封裝到一個新的?Spliterator
?中,實現了從?Stream
?到?Spliterator
?的無縫轉換,同時保持了懶加載的特性。
close
Stream 需要?close()
?和?onClose()
?是因為它可能建立在需要顯式關閉的底層 I/O 資源之上(例如文件、網絡套接字等)。如果不關閉 Stream,這些底層資源可能無法被釋放,從而導致資源泄漏。
BaseStream
?接口(Stream
,?IntStream
等的父接口)繼承了?java.lang.AutoCloseable
?接口。這意味著所有 Stream 都可以,并且在某些情況下必須在?try-with-resources
?語句中使用,以確保資源的正確釋放。
想象一個典型的場景:逐行讀取文件。
在 Java 8 之前,我們可能會這么寫:
BufferedReader reader = null;
try {reader = new BufferedReader(new FileReader("file.txt"));String line;while ((line = reader.readLine()) != null) {System.out.println(line);}
} finally {if (reader != null) {reader.close(); // 必須手動關閉}
}
使用 Stream API,代碼可以變得非常簡潔:
// 正確的用法
try (Stream<String> lines = Files.lines(Paths.get("file.txt"))) {lines.forEach(System.out::println);
} catch (IOException e) {e.printStackTrace();
}
// 在 try 塊結束時,lines.close() 會被自動調用
Files.lines(...)
?方法返回一個?Stream<String>
。這個 Stream 的數據源是一個打開的文件句柄。當 Stream 的處理完成時,這個文件句柄必須被關閉。try-with-resources
?結構就保證了?stream.close()
?方法在最后一定會被調用。
如果忘記使用?try-with-resources
,就可能導致文件句柄一直被占用,直到垃圾回收器介入,這在生產環境中是不可接受的。
現在我們來看onClose(Runnable closeHandler)
?方法。
它的作用不是執行關閉操作,而是注冊一個關閉處理器(Runnable
)。這個處理器會在 Stream 的?close()
?方法被調用時執行。
這是一個非常精巧的回調機制。我們來分析一下它的源碼:
// ... existing code ...@Override@SuppressWarnings("unchecked")public S onClose(Runnable closeHandler) {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);Objects.requireNonNull(closeHandler);Runnable existingHandler = sourceStage.sourceCloseAction;sourceStage.sourceCloseAction =(existingHandler == null)? closeHandler: Streams.composeWithExceptions(existingHandler, closeHandler);return (S) this;}
// ... existing code ...
if (linkedOrConsumed)
: 檢查流是否已經被操作或消費,確保只能在流水線構建階段注冊關閉處理器。Runnable existingHandler = sourceStage.sourceCloseAction;
:?這是關鍵。它獲取的是源頭階段 (sourceStage
)?的關閉處理器。這意味著無論你在流水線的哪個階段(map
,?filter
?等)調用?onClose
,這個關閉邏輯最終都會被附加到整個流水線的源頭。這是完全合理的,因為需要關閉的資源(如文件句柄)正是在源頭被打開的。sourceStage.sourceCloseAction = ...
: 它將新的處理器設置回源頭階段。Streams.composeWithExceptions(existingHandler, closeHandler)
: 這個邏輯允許你注冊多個關閉處理器。如果已經存在一個處理器,它會把新舊兩個處理器組合起來,確保它們都會被執行。
那么,這個注冊的?sourceCloseAction
?在哪里被調用呢?答案是在?close()
?方法里:
// ... existing code ...@Overridepublic void close() {linkedOrConsumed = true;sourceSupplier = null;sourceSpliterator = null;Runnable closeAction = sourceStage.sourceCloseAction;if (closeAction != null) {sourceStage.sourceCloseAction = null;closeAction.run();}}
// ... existing code ...
可以看到,close()
?方法會從?sourceStage
?獲取之前注冊的?closeAction
?并執行它(closeAction.run()
)。
設計啟示
- 職責分離:
onClose
?的設計體現了完美的職責分離。- 資源創建者(例如?
Files.lines
?方法)負責提供資源,并使用?onClose
?注冊清理資源的邏輯。它知道如何關閉資源。 - 資源使用者(調用 Stream API 的我們)負責在處理完數據后調用?
close()
?方法(通常通過?try-with-resources
),但不需要關心具體的關閉細節。
- 資源創建者(例如?
- 封裝:
onClose
?機制將資源管理的復雜性封裝在了 Stream 的實現內部。使用者只需要遵循?AutoCloseable
?的標準用法即可,這大大降低了API的復雜度。 - 靈活性:通過組合?
Runnable
,該機制甚至支持為一個 Stream 注冊多個獨立的關閉動作,增加了靈活性。
因此,onClose()
?是一個供 Stream?提供方使用的內部機制,用于將資源清理邏輯與 Stream 本身綁定,而?close()
?則是供 Stream?消費方使用的公開接口,用于觸發這些清理邏輯。
核心抽象方法
這些方法必須由具體的管道實現(如?ReferencePipeline
,?IntPipeline
)或其內部的操作類(如?StatelessOp
,?StatefulOp
)提供:
abstract StreamShape getOutputShape()
: 返回當前管道階段輸出的流類型(REFERENCE
,?INT_VALUE
,?LONG_VALUE
,?DOUBLE_VALUE
)。abstract <P_IN> Sink<P_IN> opWrapSink(int flags, Sink<E_OUT> sink)
:- 這是構建?
Sink
?鏈的核心。 - 參數?
sink
?是下游操作的?Sink
。 - 此方法需要返回一個新的?
Sink
,該?Sink
?實現了當前操作的邏輯,并將結果傳遞給下游的?sink
。 - 例如,一個?
map
?操作會創建一個?Sink
,它對接收到的每個元素應用映射函數,然后將結果傳遞給下游?Sink
。
- 這是構建?
abstract <P_IN> Spliterator<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<E_OUT[]> generator)
:- (主要由有狀態操作實現)定義了有狀態操作如何在并行模式下執行,并返回一個新的?
Spliterator
?包含處理后的元素。
- (主要由有狀態操作實現)定義了有狀態操作如何在并行模式下執行,并返回一個新的?
abstract boolean opIsStateful()
: 返回當前操作是否有狀態(例如?sorted()
,?distinct()
?是有狀態的,而?filter()
,?map()
?是無狀態的)。