原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap
AtomicInteger
java.concurrent.atomic
包下有很多原子操作的類。 在有些情況下,原子操作可以在不使用 synchronized
關鍵字和鎖的情況下解決多線程安全問題。
在內部,原子類大量使用 CAS
, 這是大多數現在 CPU 支持的原子操作指令, 這些指令通常情況下比鎖同步要快得多。如果需要同時改變一個變量, 使用原子類是極其優雅的。
現在選擇一個原子類 AtomicInteger
作為例子
AtomicInteger atomicInt = new AtomicInteger(0);ExecutorService executor = Executors.newFixedThreadPool(2);IntStream.range(0, 1000).forEach(i -> executor.submit(atomicInt::incrementAndGet));stop(executor);System.out.println(atomicInt.get()); // => 1000復制代碼
使用 AtomicInteger
代替 Integer
可以在線程安全的環境中增加變量, 而不要同步訪問變量。incrementAndGet()
方法是一個原子操作, 我們可以在多線程中安全的調用。
AtomicInteger
支持多種的原子操作, updateAndGet()
方法接受一個 lambda
表達式,以便對整數做任何的算術運算。
AtomicInteger atomicInt = new AtomicInteger(0);ExecutorService executor = Executors.newFixedThreadPool(2);IntStream.range(0, 1000).forEach(i -> {Runnable task = () ->atomicInt.updateAndGet(n -> n + 2);executor.submit(task);});stop(executor);System.out.println(atomicInt.get()); // => 2000
復制代碼
accumulateAndGet()
方法接受一個 IntBinaryOperator
類型的另一種 lambda
表達式, 我們是用這種方法來計算 1 -- 999 的和:
AtomicInteger atomicInt = new AtomicInteger(0);ExecutorService executor = Executors.newFixedThreadPool(2);IntStream.range(0, 1000).forEach(i -> {Runnable task = () ->atomicInt.accumulateAndGet(i, (n, m) -> n + m);executor.submit(task);});stop(executor);System.out.println(atomicInt.get()); // => 499500
復制代碼
還有一些其他的原子操作類: AtomicBoolean AtomicLong AtomicReference
LongAdder
作為 AtomicLong
的替代, LongAdder
類可以用來連續地向數字添加值。
ExecutorService executor = Executors.newFixedThreadPool(2);IntStream.range(0, 1000).forEach(i -> executor.submit(adder::increment));stop(executor);System.out.println(adder.sumThenReset()); // => 1000
復制代碼
LongAdder
類和其他的整數原子操作類一樣提供了 add()
和 increment()
方法, 同時也是線程安全的。但其內部的結果不是一個單一的值, 這個類的內部維護了一組變量來減少多線程的爭用。實際結果可以通過調用 sum()
和 sumThenReset()
來獲取。
當來自多線程的更新比讀取更頻繁時, 這個類往往優于其他的原子類。通常作為統計數據, 比如要統計 web 服務器的請求數量。 LongAdder
的缺點是會消耗更多的內存, 因為有一組變量保存在內存中。
LongAccumulator
LongAccumulator
是 LongAdder
的一個更通用的版本。它不是執行簡單的添加操作, 類 LongAccumulator
圍繞 LongBinaryOperator
類型的lambda表達式構建,如代碼示例中所示:
LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);ExecutorService executor = Executors.newFixedThreadPool(2);IntStream.range(0, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));stop(executor);System.out.println(accumulator.getThenReset()); // => 2539
復制代碼
我們使用函數 2 * x + y
和初始值1
創建一個 LongAccumulator
。 每次調用 accumulate(i)
, 當前結果和值i
都作為參數傳遞給``lambda` 表達式。
像 LongAdder
一樣, LongAccumulator
在內部維護一組變量以減少對線程的爭用。
ConcurrentMap
ConcurrentMap
接口擴展了 Map
接口,并定義了最有用的并發集合類型之一。 Java 8
通過向此接口添加新方法引入了函數式編程。
在下面的代碼片段中, 來演示這些新的方法:
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
復制代碼
forEach()
接受一個類型為 BiConsumer
的 lambda
表達式, 并將 map
的 key
和 value
作為參數傳遞。
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
復制代碼
putIfAbsent()
方法只有當給定的 key
不存在時才將數據存入 map
中, 這個方法和 put
一樣是線程安全的, 當多個線程訪問 map
時不要做同步操作。
String value = map.putIfAbsent("c3", "p1");
System.out.println(value); // p0
復制代碼
getOrDefault()
方法返回給定 key
的 value
, 當 key
不存在時返回給定的值。
String value = map.getOrDefault("hi", "there");
System.out.println(value); // there
復制代碼
replaceAll()
方法接受一個 BiFunction
類型的 lambda
表達式, 并將 key
和 value
作為參數傳遞,用來更新 value
。
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2")); // d3
復制代碼
compute()
方法和 replaceAll()
方法有些相同, 不同的是它多一個參數, 用來更新指定 key
的 value
map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo")); // barbar
復制代碼
ConcurrentHashMap
以上所有方法都是 ConcurrentMap
接口的一部分,因此可用于該接口的所有實現。 此外,最重要的實現 ConcurrentHashMap
已經進一步增強了一些新的方法來在 Map
上執行并發操作。
就像并行流一樣,這些方法在 Java 8
中通過 ForkJoinPool.commonPool()
提供特殊的 ForkJoinPool
。該池使用預設的并行性, 這取決于可用內核的數量。 我的機器上有四個CPU內核可以實現三種并行性:
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
復制代碼
通過設置以下 JVM
參數可以減少或增加此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
復制代碼
我們使用相同的示例來演示, 不過下面使用 ConcurrentHashMap
類型, 這樣可以調用更多的方法。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
復制代碼
Java 8
引入了三種并行操作:forEach
, search
和 reduce
。 每個操作都有四種形式, 分別用 key
, value
, entries
和 key-value
來作為參數。
所有這些方法的第一個參數都是 parallelismThreshold
閥值。 該閾值表示操作并行執行時的最小收集大小。 例如, 如果傳遞的閾值為500
,并且 map
的實際大小為499
, 則操作將在單個線程上按順序執行。 在下面的例子中,我們使用一個閾值來強制并行操作。
ForEach
方法 forEach()
能夠并行地迭代 map
的鍵值對。 BiConsumer
類型的 lambda
表達式接受當前迭代的 key
和 value
。 為了可視化并行執行,我們將當前線程名稱打印到控制臺。 請記住,在我的情況下,底層的 ForkJoinPool
最多使用三個線程。
map.forEach(1, (key, value) ->System.out.printf("key: %s; value: %s; thread: %s\n",key, value, Thread.currentThread().getName()));// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main
復制代碼
Search
search()
方法接受一個 BiFunction
類型的 lambda
表達式, 它能對 map
做搜索操作, 如果當前迭代不符合所需的搜索條件,則返回 null
。 請記住,ConcurrentHashMap
是無序的。 搜索功能不應該取決于地圖的實際處理順序。 如果有多個匹配結果, 則結果可能是不確定的。
String result = map.search(1, (key, value) -> {System.out.println(Thread.currentThread().getName());if ("foo".equals(key)) {return value;}return null;
});
System.out.println("Result: " + result);// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar
復制代碼
下面是對 value 的搜索
String result = map.searchValues(1, value -> {System.out.println(Thread.currentThread().getName());if (value.length() > 3) {return value;}return null;
});System.out.println("Result: " + result);// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo
復制代碼
Reduce
reduce()
方法接受兩個類型為 BiFunction
的 lambda
表達式。 第一個函數將每個鍵值對轉換為任何類型的單個值。 第二個函數將所有這些轉換后的值組合成一個結果, 其中火忽略 null
值。
String result = map.reduce(1,(key, value) -> {System.out.println("Transform: " + Thread.currentThread().getName());return key + "=" + value;},(s1, s2) -> {System.out.println("Reduce: " + Thread.currentThread().getName());return s1 + ", " + s2;});System.out.println("Result: " + result);// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar
復制代碼