Stream
- Stream 的核心概念
- 核心特點
- Stream 的操作分類
- 中間操作(Intermediate Operations)
- 終止操作(Terminal Operations)
- Stream 的流分類
- 順序流(Sequential Stream)
- 并行流(Parallel Stream)
- 并行流的注意事項
- 并行流的底層機制
- 順序流 vs 并行流的對比
- 順序流和并行流的示例代碼
- 順序流并行流總結
Stream 的核心概念
Java 8 引入的 Stream API 是一種基于函數式編程的數據處理抽象,允許以聲明式方式操作集合(如過濾、映射、排序等)。Stream 不是數據結構,而是對數據源(集合、數組、I/O 等)的高效計算工具
核心特點
- 鏈式調用:通過多個操作(中間操作 + 終止操作)串聯處理數據
- 惰性求值:中間操作(如 filter, map)不會立即執行,直到遇到終止操作(如 collect, forEach)
- 不可復用:一個 Stream 只能被消費一次,再次使用會拋出 IllegalStateException
- 并行處理:可通過簡單方法(parallel())實現多線程并行計算
Stream 的操作分類
中間操作(Intermediate Operations)
返回新的 Stream,支持鏈式調用,常見方法示例:filter(), map(), sorted(), distinct()。
List<Integer> list = Arrays.asList(1, 2, 2,3);list.stream().distinct().forEach(System.out::println);//用于去除流中的重復元素。list.stream().map(x -> x * x).forEach(System.out::println);//用于對流中的每個元素應用一個函數,并返回一個新的流list.stream().filter(v->v==3).forEach(System.out::println);//用于對流中的元素進行篩選,只保留滿足條件的元素list.stream().flatMap(s -> s.toString().chars().boxed()).forEach(System.out::println);//用于將流中的每個元素轉換為另一個流,然后將這些流連接成一個流list.stream().limit(2).forEach(System.out::println);//用于限制流中元素的數量list.stream().skip(2).forEach(System.out::println);//用于跳過流中前n個元素list.stream().sorted().forEach(System.out::println);//用于對流中的元素進行排序
支持鏈式調用
list.stream().distinct().filter(v->v==3).sorted().map(x -> x * x).forEach(System.out::println);
但是注意,流中間操作的方法返回的結果依然是流,但是上面為了顯示這些方法使用的樣例里面
forEach(System.out::println)這個是終止操作方法,也就是在forEach方法之前的返回的是流,在使用forEach方法之后返回的又將流轉換非 Stream 結果
終止操作(Terminal Operations)
觸發實際計算,返回非 Stream 結果(如集合、數值、void),常見方法示示例:collect(), forEach(), reduce(), count()
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");names.stream().forEach(System.out::println);//對流中的每個元素執行指定的操作 這里是打印names.stream() .map(String::toUpperCase).collect(Collectors.toList()).forEach(System.out::println);//將流中的元素收集到一個容器中System.out.println(names.stream().allMatch(s -> s.contains("s")));//allMatch 檢查是否匹配所有元素System.out.println(names.stream().anyMatch(s -> s.contains("a")));//anyMatch 檢查是否至少匹配一個元素System.out.println(names.stream().noneMatch(s -> s.contains("x")));//noneMatch:檢查是否沒有匹配所有元素System.out.println(names.stream().findFirst());//findFirst:返回當前流中的第一個元素System.out.println(names.stream().findAny());//findAny:返回當前流中的任意元素System.out.println(names.stream().count());//count:返回流中元素總數System.out.println(names.stream().max(Comparator.comparingInt(s->s.length())));//max:返回流中最大值System.out.println(names.stream().min(Comparator.comparingInt(s->s.length())));//min:最小值
Stream 的流分類
順序流(Sequential Stream)
默認模式:所有操作在單線程中按順序執行
List<Integer> list = Arrays.asList(1, 2, 3);
Stream<Integer> stream = list.stream(); // 順序流
適用場景:
數據量較小,或操作本身簡單。
需要保證操作順序(如 sorted() 依賴前序操作結果)。
并行流(Parallel Stream)
Stream<Integer> parallelStream = list.parallelStream(); // 并行流
或
Stream<Integer> parallelStream = list.stream().parallel(); // 轉換為并行流
適用場景:
數據量較大,且任務可獨立拆分(無共享狀態或順序依賴)。
操作耗時(如復雜計算、I/O 等待)。
并行流的注意事項
1.線程安全問題
確保操作中使用的 Lambda 表達式或函數是線程安全的(避免共享可變狀態)。
示例錯誤:
List<Integer> result = new ArrayList<>();
list.parallelStream().forEach(result::add); // 并發修改 ArrayList 導致數據錯誤
應使用線程安全的收集器(如 Collectors.toList()):
List<Integer> result = list.parallelStream().collect(Collectors.toList());
2.性能未必更好
并行流需額外開銷(任務拆分、線程調度),對小數據量可能更慢。
測試性能:通過基準測試(如 JMH) 驗證是否適合并行
3.順序依賴操作
如 limit()、findFirst() 在并行流中可能性能更差,需改用無序流
list.parallelStream().unordered().limit(10); // 提升性能
ORDERED是Spliterator的特征值, 特征要求并行流保持元素順序(如 List),改成unordered就是改成序列流,除此之外
并行流的底層機制
1.Spliterator
并行流通過 Spliterator(可拆分迭代器) 將數據源拆分為多個子任務,是 Stream API 并行處理的底層實現
List<String> list = Arrays.asList("Java", "Python", "C++", "Go");// 使用并行流處理
list.parallelStream() // 將數據源拆分為多個子任務.map(String::toUpperCase).forEach(System.out::println);// 輸出(順序不確定,因為并行處理):
// PYTHON
// JAVA
// GO
// C++
2.ForkJoinPool
并行流默認使用公共的 ForkJoinPool(線程數 = CPU 核心數)。通過系統屬性修改默認線程池大小
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
3.自定義線程池(避免影響全局)
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> list.parallelStream().forEach(...)).get();
順序流 vs 并行流的對比
特性 | 順序流 | 并行流 |
---|---|---|
線程模型 | 單線程 | 多線程(ForkJoinPool 默認線程池) |
性能優勢 | 簡單任務、小數據量 | 大數據量、可并行化的復雜任務 |
開銷 | 低 | 高(線程切換、任務拆分/合并) |
順序保證 | 嚴格按順序處理 | 不保證順序(除非使用有序操作) |
數據源要求 | 無特殊要求 | 數據源需可拆分(如 ArrayList ) |
順序流和并行流的示例代碼
順序流處理
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
List<String> filtered = names.stream().filter(name -> name.length() > 3).map(String::toUpperCase).collect(Collectors.toList()); // [ALICE, CHARLIE]
并行流處理
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int sum = numbers.parallelStream()//通過底層Spliterator拆分為多個子任務.filter(n -> n % 2 == 0).mapToInt(n -> n * 2).sum(); // (2+4+6+8+10)*2 = 60
順序流并行流總結
順序流:簡單、低開銷,適合小數據量或順序敏感操作。
并行流:通過多線程加速處理,適合大數據量和可并行化任務,但需注意線程安全和性能開銷。
選擇策略:
優先使用順序流,僅在必要時(且驗證有效)切換為并行流。
避免在并行流中操作共享可變狀態。