DistinctOps
DistinctOps
?是一個專門用于實現?Stream.distinct()
?操作的工廠類。正如它的名字所示,它的核心職責就是創建能夠去除流中重復元素的操作。distinct()
?是一個有狀態的中間操作 (stateful intermediate operation),這意味著它通常需要看到所有元素才能決定哪些元素可以進入下一階段,這使得它的實現比無狀態操作(如?filter
,?map
)要復雜得多。
ReferencePipeline(這個本身就是頂層的stream)中調用
@Overridepublic final Stream<P_OUT> distinct() {return DistinctOps.makeRef(this);}
下面我們分幾個部分來詳細解析?DistinctOps
?類。
類的定位和結構
final class DistinctOps
: 這是一個?final
?類,并且構造函數是?private
?的,表明它是一個純粹的工具類(Utility Class),不能被繼承或實例化。所有的功能都通過靜態方法?makeRef
?提供。makeRef(AbstractPipeline<?, T, ?> upstream)
: 這是該類的唯一入口。當調用?stream.distinct()
?時,底層實際上就是調用了這個?DistinctOps.makeRef()
?方法。它接收上游的?Pipeline
?作為輸入,然后返回一個新的?Pipeline
?階段,這個新階段就包含了去重邏輯。// ... existing code ... static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {// ... 具體的實現 ...}; } // ... existing code ...
這里它創建了一個?
ReferencePipeline.StatefulOp
?的匿名子類實例。這立即告訴我們幾點重要信息:distinct()
?是一個有狀態操作 (StatefulOp
)。- 它向上游聲明了?
IS_DISTINCT
?標志,表示從這個階段輸出的流是已經去重的。 - 它還聲明了?
NOT_SIZED
?標志,因為去重后元素的數量是未知的,除非處理完所有元素。
核心實現:串行 vs 并行,有序 vs 無序
distinct()
?的復雜性在于它必須根據流的 并行性 (parallel)?和?有序性 (ordered) 采取截然不同的策略。DistinctOps
?內部通過重寫?StatefulOp
?的幾個關鍵方法來處理這些不同情況。
串行執行 (opWrapSink
)
這是最簡單的情況,對應的是流的串行 (sequential)?執行。邏輯在?opWrapSink
?方法中實現。
// ... existing code ...@OverrideSink<T> opWrapSink(int flags, Sink<T> sink) {Objects.requireNonNull(sink);if (StreamOpFlag.DISTINCT.isKnown(flags)) {// 如果上游已經去重,什么都不做,直接返回下游的 Sinkreturn sink;} else if (StreamOpFlag.SORTED.isKnown(flags)) {// 優化:如果流已排序,只需和前一個元素比較return new Sink.ChainedReference<>(sink) {boolean seenNull;T lastSeen;// ...@Overridepublic void accept(T t) {if (t == null) {if (!seenNull) { /* ... */ }} else if (lastSeen == null || !t.equals(lastSeen)) {downstream.accept(lastSeen = t);}}};} else {// 通用情況:使用 Set 來記錄見過的元素return new Sink.ChainedReference<>(sink) {Set<T> seen;@Overridepublic void begin(long size) {seen = new HashSet<>();downstream.begin(-1);}// ...@Overridepublic void accept(T t) {if (seen.add(t)) { // Set.add() 返回 true 表示添加成功(即之前沒有)downstream.accept(t);}}};}}
// ... existing code ...
邏輯分析:
- 已去重 (
DISTINCT.isKnown
): 如果流已經被標記為去重的,distinct()
?就成了一個空操作(no-op),直接把元素傳給下游。 - 已排序 (
SORTED.isKnown
): 這是一個非常重要的優化。如果流是排序的,那么重復的元素必然是相鄰的。因此,我們不需要一個?Set
?來存儲所有見過的元素,只需要記住上一個元素 (lastSeen
) 即可。這極大地降低了空間復雜度。 - 通用情況: 對于無序的流,唯一的辦法就是用一個?
HashSet
?來存儲所有已經見過的元素。每次來一個新元素,嘗試添加到?Set
?中。如果?add
?方法返回?true
,說明這是個新元素,就把它傳遞給下游。
并行執行 (opEvaluateParallel
)
這是最復雜的情況,對應流的并行 (parallel)?執行。并行執行需要一個屏障 (barrier)?操作,即必須處理完所有分片(Spliterator)的數據,合并結果后才能繼續。
// ... existing code ...@Override<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator,IntFunction<T[]> generator) {if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {// 已去重,空操作return helper.evaluate(spliterator, false, generator);}else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {// 有序并行:退化為串行處理模式來保證順序return reduce(helper, spliterator);}else {// 無序并行:最高效的并行模式AtomicBoolean seenNull = new AtomicBoolean(false);ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();// ... 并行地將所有元素放入 ConcurrentHashMap 來去重forEachOp.evaluateParallel(helper, spliterator);// ...return Nodes.node(keys);}}
// ... existing code ...
邏輯分析:
- 有序并行 (
ORDERED.isKnown
): 為了保證元素的順序,并行?distinct
?無法做到真正的“懶加載”。它必須收集所有元素,在一個地方完成去重,然后再把結果交給下游。這里的?reduce
?方法內部使用了?LinkedHashSet
?來保證順序,這實際上是一個代價高昂的屏障操作。 - 無序并行: 這是并行?
distinct
?性能最好的場景。因為它不關心順序,所以可以充分利用并行性。- 它使用?
ConcurrentHashMap
?作為共享的?Set
(因為?ConcurrentHashMap
?是線程安全的,而?HashSet
?不是)。 - 每個線程都把自己分片中的元素嘗試放入這個共享的?
map
?中。putIfAbsent
?是一個原子操作,可以保證只有一個線程能成功放入一個特定的元素。 - 所有線程完成后,
map
?的?keySet
?就是最終去重后的結果集。
- 它使用?
- ConcurrentHashMap 有一個限制:它的 key 和 value 都不能為 null。
為了處理流中可能存在的 null 元素,代碼使用了一個額外的 AtomicBoolean seenNull 標志位。如果流中出現了 null,就將這個標志設為 true。
并行懶加載 (opEvaluateParallelLazy
)
這個方法用于支持可以“懶加載”的并行操作。
// ... existing code ...@Override<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {// ...}else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {// 有序流不支持懶加載,必須退化為屏障操作return reduce(helper, spliterator).spliterator();}else {// 無序流可以懶加載return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));}}
// ... existing code ...
邏輯分析:
- 對于有序流,
distinct
?無法實現懶加載,因為它需要看到所有元素才能確定最終順序。所以它退化成了?opEvaluateParallel
?中的?reduce
?邏輯,收集所有元素再返回一個新的?Spliterator
。 - 對于無序流,可以實現懶加載(逐個處理)。它返回一個特制的?
DistinctSpliterator
,這個?Spliterator
?在內部包裝了原始的?Spliterator
,并在?tryAdvance
?時進行去重判斷。
MatchOps
MatchOps
?是 Stream API 內部一個至關重要的工廠類,它專門負責創建和管理 短路(short-circuiting) 的終端操作,也就是我們常用的?anyMatch()
,?allMatch()
,?noneMatch()
。首先看類的定義:
final class MatchOps {private MatchOps() { }
//...
和?SortedOps
、ReduceOps
?類似,它是一個包可見的、擁有私有構造函數的?final
?類。這表明它是一個內部使用的工具工廠,專門用于創建?TerminalOp
(終端操作)的實例。
它的核心職責是:根據用戶調用的匹配方法(anyMatch
?等)和傳入的謂詞(Predicate
),構建一個能夠高效執行匹配邏輯的終端操作。
短路(Short-circuiting)是理解?MatchOps
?的關鍵。anyMatch
,?allMatch
,?noneMatch
?都是短路操作。這意味著它們不一定需要處理流中的所有元素就能得出最終結果。
anyMatch(p)
: 只要找到一個滿足謂詞?p
?的元素,結果就確定為?true
,無需再檢查后續元素。allMatch(p)
: 只要找到一個不滿足謂詞?p
?的元素,結果就確定為?false
,無需再檢查后續元素。noneMatch(p)
: 只要找到一個滿足謂詞?p
?的元素,結果就確定為?false
,無需再檢查后續元素。
這種“提前退出”的能力就是短路,它能極大地提升在某些數據場景下的執行效率。
MatchKind
?枚舉:統一匹配邏輯
為了用一套統一的邏輯來處理這三種不同的匹配規則,MatchOps
?設計了一個非常精巧的枚舉?MatchKind
。
// ... existing code ...enum MatchKind {/** Do any elements match the predicate? */ANY(true, true),/** Do all elements match the predicate? */ALL(false, false),/** Do no elements match the predicate? */NONE(true, false);private final boolean stopOnPredicateMatches;private final boolean shortCircuitResult;private MatchKind(boolean stopOnPredicateMatches,boolean shortCircuitResult) {this.stopOnPredicateMatches = stopOnPredicateMatches;this.shortCircuitResult = shortCircuitResult;}}
// ... existing code ...
這個枚舉通過兩個布爾值,巧妙地描述了三種匹配行為的共性與差異:
stopOnPredicateMatches
: 當謂詞(predicate
)的計算結果為?true
?時,是否應該停止處理?ANY
: 是。找到一個匹配的就停。ALL
: 否。找到一個匹配的還不夠,必須繼續找,直到找到不匹配的或者遍歷完。NONE
: 是。找到一個匹配的就停(因為結果已經確定是?false
)。
shortCircuitResult
: 如果發生了短路(提前停止),那么最終的結果應該是什么?ANY
:?true
。ALL
:?false
。NONE
:?false
。
通過這個枚舉,后續的實現代碼(如下面的?MatchSink
)就可以寫出通用的邏輯,而無需為?any
,?all
,?none
?分別寫?if-else
?分支。
工廠方法與?MatchSink
MatchOps
?提供了一系列?make...
?工廠方法,用于為不同類型的流(Stream
,?IntStream
?等)創建匹配操作。我們以?makeRef
?為例:
// ... existing code ...public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,MatchKind matchKind) {Objects.requireNonNull(predicate);Objects.requireNonNull(matchKind);class MatchSink extends BooleanTerminalSink<T> {MatchSink() {super(matchKind);}@Overridepublic void accept(T t) {if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {stop = true;value = matchKind.shortCircuitResult;}}}return new MatchOp<>(StreamShape.REFERENCE, matchKind, MatchSink::new);}
// ... existing code ...
- 它接收一個?
Predicate
?和一個?MatchKind
。 - 內部定義了一個局部類?
MatchSink
,它繼承自?BooleanTerminalSink
。Sink
?是流中處理元素的末端。 MatchSink
?的?accept(T t)
?方法是核心邏輯所在:!stop
: 檢查是否已經有其他元素觸發了短路。如果已經?stop
,則直接忽略當前元素。predicate.test(t) == matchKind.stopOnPredicateMatches
: 這是一個非常優雅的判斷。它將當前元素的匹配結果 (true
?或?false
) 與?MatchKind
?中定義的“停止條件”進行比較。- 對于?
ANY
:?stopOnPredicateMatches
?是?true
。當?predicate.test(t)
?為?true
?時,條件成立。 - 對于?
ALL
:?stopOnPredicateMatches
?是?false
。當?predicate.test(t)
?為?false
?時,條件成立。 - 對于?
NONE
:?stopOnPredicateMatches
?是?true
。當?predicate.test(t)
?為?true
?時,條件成立。
- 對于?
- 如果條件成立,就設置?
stop = true
?來通知上游停止發送數據,并設置?value = matchKind.shortCircuitResult
?來記錄短路時的結果。
- 最后,它創建一個?
MatchOp
?實例并返回。MatchOp
?是?TerminalOp
?的一個具體實現,它封裝了?MatchKind
?和用于創建?MatchSink
?的?Supplier
。
并行執行:MatchTask
對于并行流,MatchOp
?的?evaluateParallel
?方法會創建一個?MatchTask
。
// ... existing code ...@SuppressWarnings("serial")private static final class MatchTask<P_IN, P_OUT>extends AbstractShortCircuitTask<P_IN, P_OUT, Boolean, MatchTask<P_IN, P_OUT>> {
// ... (constructors) ...@Overrideprotected Boolean doLeaf() {boolean b = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).getAndClearState();if (b == op.matchKind.shortCircuitResult)shortCircuit(b);return null;}@Overrideprotected Boolean getEmptyResult() {return !op.matchKind.shortCircuitResult;}}
// ... existing code ...
MatchTask
?繼承自?AbstractShortCircuitTask
,這是一個專為并行短路操作設計的 Fork/Join 任務。
doLeaf()
: 這是葉子任務執行的邏輯。它會對分配給自己的那一小部分數據執行匹配操作。helper.wrapAndCopyInto(...)
: 執行匹配,得到這部分數據的結果?b
。if (b == op.matchKind.shortCircuitResult)
: 判斷這個局部結果?b
?是否就是最終的短路結果。例如,對于?anyMatch
,如果這個葉子任務發現了一個匹配項,它的結果?b
?就會是?true
,這恰好等于?ANY
?的?shortCircuitResult
。shortCircuit(b)
: 如果發現了可以導致整個流短路的結果,它會調用?shortCircuit
。這個方法會通知 Fork/Join 框架,一個最終結果已經找到了,所有其他正在運行的、或者尚未開始的?MatchTask
?都可以被取消了。這實現了并行的短路。
getEmptyResult()
: 如果所有任務都正常執行完畢,沒有發生短路,那么這個方法返回最終的結果。例如,對于?allMatch
,如果所有元素都匹配,沒有發生短路,最終結果就是?true
,即?!op.matchKind.shortCircuitResult
?(!false
)。
總結
MatchOps
?是一個設計精良的內部工廠,它通過以下方式優雅地實現了?anyMatch
,?allMatch
,?noneMatch
:
MatchKind
?枚舉: 用兩個布爾標志統一了三種匹配模式的邏輯,避免了重復代碼。MatchSink
: 作為串行執行的核心,它利用?MatchKind
?的配置來實現通用的、可短路的元素處理邏輯。MatchTask
: 作為并行執行的核心,它利用?AbstractShortCircuitTask
?的能力,在 Fork/Join 框架下實現了高效的并行短路,一旦任何一個子任務找到了決定性的結果,就能迅速終止整個計算。
FindOps
它與我們之前討論的?MatchOps
?非常相似,但專注于實現?findFirst()
?和?findAny()
?這兩個終端操作。FindOps
?是一個內部使用的工廠類,其核心職責是創建用于“查找”操作的終端操作(TerminalOp
)實例。
final class FindOps {private FindOps() { }
//...
和?MatchOps
?一樣,它也是一個?final
?的、擁有私有構造函數的工具類,專門用于 Stream API 內部。它處理的?findFirst()
?和?findAny()
?都是短路操作,一旦找到符合條件的元素,就可以立即終止流的處理。
findFirst()
?vs?findAny()
這是理解?FindOps
?的關鍵區別點:
findFirst()
: 必須返回流中遭遇順序(encounter order)的第一個元素。這是一個有序操作。在并行流中,即使其他線程更快地找到了一個元素,也必須等待遭遇順序更靠前的任務完成,以確保返回的是“最左邊”的那個結果。findAny()
: 可以返回流中的任意一個元素。這是一個無序操作。在并行流中,它允許任何一個線程只要找到了一個元素,就可以立即短路整個計算,而無需關心這個元素在原始流中的位置。這使得?findAny()
?在并行流中的性能通常優于?findFirst()
。
FindOps
?內部通過一個布爾標志?mustFindFirst
?來區分這兩種行為。
FindOps
?提供了一系列?make...
?方法,用于為不同類型的流(對象、int、long、double)創建查找操作。
// ... existing code ...@SuppressWarnings("unchecked")public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {return (TerminalOp<T, Optional<T>>)(mustFindFirst ? FindSink.OfRef.OP_FIND_FIRST : FindSink.OfRef.OP_FIND_ANY);}
// ... existing code ...
這些方法非常簡潔。它們根據?mustFindFirst
?參數,直接返回一個預先創建好的、靜態的?TerminalOp
?實例。這些實例(如?OP_FIND_FIRST
)被定義在內部類?FindSink
?中。這種方式避免了重復創建對象,提高了效率。
FindSink
?- 串行執行的核心
FindSink
?是實現查找邏輯的?TerminalSink
。
// ... existing code ...private abstract static class FindSink<T, O> implements TerminalSink<T, O> {boolean hasValue;T value;// ...@Overridepublic void accept(T value) {if (!hasValue) {hasValue = true;this.value = value;}}@Overridepublic boolean cancellationRequested() {return hasValue;}
// ... existing code ...
它的邏輯非常直接:
accept(T value)
: 當接收到第一個元素時,將其保存在?this.value
?中,并將?hasValue
?標志設為?true
。由于?if (!hasValue)
?的判斷,后續所有到達的元素都會被忽略。cancellationRequested()
: 一旦?hasValue
?變為?true
,此方法就返回?true
。這會通知上游的流管道:“我已經找到結果了,請不要再發送更多元素了”,從而實現串行流的短路。
FindSink
?有多個靜態內部子類(OfRef
,?OfInt
?等),用于處理不同數據類型,并預先創建了?OP_FIND_FIRST
?和?OP_FIND_ANY
?這兩個靜態常量。
FindOp
?- 終端操作的封裝
FindOp
?是?TerminalOp
?接口的具體實現,它封裝了查找操作的所有元信息。
// ... existing code ...private static final class FindOp<T, O> implements TerminalOp<T, O> {private final StreamShape shape;final int opFlags;final O emptyValue;// ...final Supplier<TerminalSink<T, O>> sinkSupplier;FindOp(boolean mustFindFirst,StreamShape shape,O emptyValue,Predicate<O> presentPredicate,Supplier<TerminalSink<T, O>> sinkSupplier) {this.opFlags = StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);// ...}
// ... existing code ...
opFlags
: 操作標志。所有查找操作都是?IS_SHORT_CIRCUIT
。對于?findAny
?(mustFindFirst
?為?false
),還會額外加上?NOT_ORDERED
?標志,告知流管道可以進行無序優化。evaluateSequential(...)
: 定義了串行執行的邏輯,即創建一個?FindSink
?并讓流把元素送入其中。evaluateParallel(...)
: 定義了并行執行的邏輯,它會創建一個?FindTask
?并啟動它。
FindTask
?- 并行執行的核心
FindTask
?是?AbstractShortCircuitTask
?的子類,負責并行查找。
// ... existing code ...private static final class FindTask<P_IN, P_OUT, O>extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {private final FindOp<P_OUT, O> op;private final boolean mustFindFirst;
// ...@Overrideprotected O doLeaf() {O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();if (!mustFindFirst) { // findAny 邏輯if (result != null)shortCircuit(result); // 找到任何一個,立即短路return null;}else { // findFirst 邏輯if (result != null) {foundResult(result); // 找到一個,需要通過 cancelLaterNodes 確保有序性return result;}elsereturn null;}}
// ...}
// ... existing code ...
doLeaf()
?方法清晰地展示了?findFirst
?和?findAny
?在并行時的區別:
findAny
?(!mustFindFirst
): 葉子任務只要處理自己的數據分片并找到了一個元素 (result != null
),就立刻調用?shortCircuit(result)
。這會嘗試將結果寫入共享的?AtomicReference
,并觸發全局的短路,所有其他任務都會盡快停止。findFirst
?(mustFindFirst
): 葉子任務找到一個元素后,不能直接宣布勝利。它需要調用?foundResult(result)
,這個方法內部會調用?cancelLaterNodes()
。這會取消掉所有處理“更右邊”(遭遇順序更靠后)數據的任務,同時允許“更左邊”的任務繼續執行。最終,只有最左邊的那個找到了結果的任務,其結果才會被采納為最終結果。這個過程通過?onCompletion
?方法中的邏輯來保證。
private void foundResult(O answer) {if (isLeftmostNode())shortCircuit(answer);elsecancelLaterNodes();}AbstractTask::protected boolean isLeftmostNode() {@SuppressWarnings("unchecked")K node = (K) this;while (node != null) {K parent = node.getParent();if (parent != null && parent.leftChild != node)return false;node = parent;}return true;}
總結
FindOps
?通過一系列精心設計的內部類,為?findFirst
?和?findAny
?提供了統一且高效的實現:
- 工廠方法 (
make...
): 提供簡潔的入口,并復用預創建的?TerminalOp
?實例。 FindSink
: 實現了串行查找的短路邏輯。FindOp
: 封裝了操作的元數據,并區分了?findFirst
?和?findAny
?的標志。FindTask
: 作為?AbstractShortCircuitTask
?的子類,它為并行查找提供了核心實現,并巧妙地利用父類的短路和取消機制,分別實現了?findAny
?的“快速響應”和?findFirst
?的“有序保證”。