Stream流——串行版
? Stream流是java8引入的特性,極大的方便了我們對于程序內數據的操作,提高了性能。通過函數式編程解決復雜問題。
1.BaseStream<T,S extense BaseStream<T,S>>
? 他是流處理的基石概念,重點不在于這個接口定義了什么方法,而是它獨特的參數類型。
首先約定好:T——入參類型 S——出參類型
S是繼承于自己的同樣的類型,從而形成一種遞歸,每一次返回的結果類型都是自己或它的子類。這樣做是因為我們在流處理時,不會在原有的流上進行操作,而是形成新的流返回會去。這樣設計免去了類型轉換出錯和增強了靈活性
2.Stream extends BaseStream<T, Stream>
BaseStream有4大子類,我們講一個使用范圍最廣的——Stream
它定義了我們常用的一些方法如
Stream filter (Predicate<? super T> predicate)
這里的Predicate就是一個函數式例如判斷對象是否為空 s->s!=null
中間操作 (Intermediate operations)
無狀態 (Stateless) | 有狀態 (Stateful) |
---|---|
unordered() | distinct() 去重 |
filter() 過濾元素 | sorted() 排序 |
map() 轉換元素類型 | limit() |
mapToInt() | skip() 跳過前n個元素 |
mapToLong() | |
mapToDouble() | |
flatMap() | |
flatMapToInt() | |
flatMapToLong() | |
flatMapToDouble() | |
peek() |
結束操作 (Terminal operations)
非短路操作 | 短路操作 (short-circuiting) |
---|---|
forEach() | anyMatch() |
forEachOrdered() | allMatch() |
toArray() | noneMatch() |
reduce() 歸約 | findFirst() |
collect() | findAny() |
max() | |
min() | |
count() |
咱們這里通過它的一個實現類ReferencePipeline來舉個例子來體驗一下
List<Integer> numbers = Arrays.asList(2, 1, 3, 8, 5, 6, 7, 4, 9, 10);
List<String> evenNumbers = numbers.stream().map(o->o.toString())//將元素轉為字符串.filter(n -> n.length() == 1)//剔除大于兩位數的元素.sorted()//排序.collect(Collectors.toList());//整合出一個新的流返回System.out.println(evenNumbers);
先定義了一個List,通過.stream()新建一個流管道,函數式編程的好處就是他可以把操作整合到一起,這里的 o->o.toString()和n->n.length()==1會被Java整合為
(o -> o.toString()) -> (n -> n.length() == 1)一個操作鏈
接下來,您是否好奇這個鏈條是如何組裝的,反正我很好奇,let's dive into water
3.ReferencePipeline
它繼承了AbstractPipeline,而在其中保存了三個引用,類型都是自己,分別是sourceStage指向第一個Sink(后續展開),接下來就是previousStage和nextStage分別鏈接上下Sink。
下圖給出了具體的流程
中間操作是一種爛加載處理,只有當觸發了collect()方法才會真正的調用每個Steam流中的wrapSink方法去處理數據。之后調用sort()進行排序。我們來具體看一個方法是如何處理的
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {@Overridepublic void accept(P_OUT u) {downstream.accept(mapper.apply(u));
在ReferencePipeline中它定義三個靜態內部類——StatelessOp,StatafulOp,Head。先是對函數式判空,然后返回一個無狀態流。關鍵在于它內部定義的opWrapSinlk
通過返回一個Sink類并在其中定義了具體的操作accept()。然后調用函數式,并通過accept()觸發下游Stream進行進一步處理。
4.Sink
抽象的來講,上面所說的ReferencePipeline就像是流水線上不停流動的傳輸帶,而真正在加工物品的就是我們的Sink類
這里的Consumer接口就是我們將不同的流處理函數式拼接起來的關鍵
public interface Consumer<T> {void accept(T t);default Consumer<T> andThen(Consumer<? super T> after) {Objects.requireNonNull(after);return (T t) -> { accept(t); after.accept(t); };}
}
andThen()方法,首先是自己調用自己的accept(),再調用下游的accept()
像我們前面寫到的例子,map(o->o.toString)和filter(n->n.length)中的參數就是一個Consumer
5.歸約
我們將最后一步collect()定義為歸約,將流中的所有元素歸約為一個最終的結果。
它通過操作一個Collector進行操作,包含三個步驟
累加(accumulation):將流中的每個元素依次累加到一個容器中。
合并(combining):如果存在并行流,多個部分的結果需要合并。
完成(finishing):在所有元素處理完后,生成最終的結果。
public interface Collector<T, A, R> {Supplier<A> supplier(); // 提供一個容器,容器類型是 ABiConsumer<A, T> accumulator(); // 累加器,負責將元素添加到容器BinaryOperator<A> combiner(); // 合并器,用于并行處理時合并多個容器Function<A, R> finisher(); // 結果轉換器,返回最終的結果Set<Collector.Characteristics> characteristics(); // 一些特征,指示這個 Collector 是否具有某些優化特性
}
例如我們最常用的toList()將流中的所有元素收集到一個Lsit中。
public static <T>Collector<T, ?, List<T>> toList() {return new CollectorImpl<>(ArrayList::new, List::add,(left, right) -> { left.addAll(right); return left; },CH_ID);}
ArrayList::new,返回一個List容器,通過List::add方法添加進入容器,后續處理并行流。
以下是關于 Java Stream API 中 collect
歸約操作的表格,其中總結了常見的歸約操作、作用說明及示例:
歸約操作 | 作用說明 | 示例代碼 |
---|---|---|
toList() | 將流中的元素收集到一個 List 中。 | List<String> result = names.stream().collect(Collectors.toList()); |
toSet() | 將流中的元素收集到一個 Set 中,自動去重。 | Set<String> result = names.stream().collect(Collectors.toSet()); |
joining() | 將流中的元素連接成一個字符串,支持指定分隔符、前綴和后綴。 | String result = names.stream().collect(Collectors.joining(", ", "[", "]")); |
groupingBy() | 根據某個條件將流中的元素分組,返回一個 Map 。 | Map<Integer, List<String>> groupedByLength = names.stream().collect(Collectors.groupingBy(String::length)); |
partitioningBy() | 將流中的元素分成兩組,通常用于二元分類。 | Map<Boolean, List<String>> partitioned = names.stream().collect(Collectors.partitioningBy(name -> name.length() > 3)); |
summarizingInt() | 對流中的元素進行統計,返回 IntSummaryStatistics ,包括計數、求和、最小值、最大值、平均值等。 | IntSummaryStatistics stats = names.stream().collect(Collectors.summarizingInt(String::length)); |
reducing() | 對流中的元素進行歸約操作(例如累加、求最大值等),返回一個單一結果。 | Optional<String> result = names.stream().collect(Collectors.reducing((s1, s2) -> s1.length() > s2.length() ? s1 : s2)); |
toMap() | 將流中的元素根據某個鍵值映射規則收集到一個 Map 中。 | Map<Integer, String> map = names.stream().collect(Collectors.toMap(String::length, name -> name)); |
| 將流中的元素根據某個鍵值映射規則收集到一個 `Map` 中。 | `Map<Integer, String> map = names.stream().collect(Collectors.toMap(String::length, name -> name));` |