目錄
ConcurrentHashMap 一定是線程安全的嗎
CAS 機制的注意事項
使用java 并行流 ,您要留意了
??????ConcurrentHashMap 在JDK1.8中ConcurrentHashMap 內部使用的是數組加鏈表加紅黑樹的結構,通過CAS+volatile或synchronized的方式來保證線程安全的,這些原理已毋庸置疑,一言不合上代碼.
? ? ?1.? 模擬2個線程累計,通過ConcurrentHashMap 儲存累計的結果。
/*** @description: ConcurrentHashMap 真的安全嗎* @author: ppx* @date: 2023/8/17 14:11* @version: 1.0*/
public class TestMap {private static ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();private static String key = "hello";/*** @description: 測試2個線程 執行計算* @param:* @return: void* @author: ppx* @date: 2023/8/17 16:43*/private static void testRun() {ExecutorService executor = new ThreadPoolExecutor(2, 5,2L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());for (int i = 0; i < 2; i++) {executor.submit(() -> {for (int j = 0; j < 5; j++) {// 第一步讀取int value = concurrentHashMap.getOrDefault(key, 0);// 第二步+1value++;// 第三補+ 回寫mapconcurrentHashMap.put(key, value);}});}executor.shutdown();// 直到線程執行完成while(!executor.isTerminated()){}System.out.println("執行結果:" + concurrentHashMap.get(key));}public static void main(String[] args) {testRun();}
}
2.出乎意料執行多次輸出不同的結果:
?
?
3. 分析原理:ConcurrentHashMap 本身是線程安全的,但for 里面的獲map取值、加加操作及回寫map 這三步是非原子性。要保證操作的安全性,這三步實現原子性即可。
?優化后代碼:
private static void testRun() {ExecutorService executor = new ThreadPoolExecutor(2, 5,2L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());for (int i = 0; i < 2; i++) {executor.submit(() -> {for (int j = 0; j < 5; j++) {synchronized (TestMap.class) {int value = concurrentHashMap.getOrDefault(key, 0);value ++;concurrentHashMap.put(key, value);}}});}executor.shutdown();while (!executor.isTerminated()) {}System.out.println("執行結果:" + concurrentHashMap.get(key));}
CAS 機制的注意事項
? ? ? ?某線程把數據A更新了B,隨后又從B更新成A,恰好此時另一線程讀取該數據,發現數據的值還是A沒有變化,誤認為還是原來的A,但此時A的一些屬性或狀態已經發生過變化。
? ? ? CAS操作中將判斷“V的值是否仍然為A?”,如果是的話將執行更新操作,在某些CAS操作中,如果V的值首先由A變為B,再由B變為A,那么CAS仍然將會操作成功。
ABA問題:
? ????????線程A 的操作,cas中的值由1變成99,再由99變成1,此次線程B 發現AtomicInteger 的值還是1,于是更新到50,產生ABA的問題。
private static AtomicInteger atomicInteger = new AtomicInteger(1);public static void main(String[] args) {Thread threadA = new Thread(() -> {atomicInteger.compareAndSet(1, 99);atomicInteger.compareAndSet(99, 1);System.out.println("線程A進行CAS后的值:"+atomicInteger.get());try {Thread.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}, "線程A");Thread threadB = new Thread(() -> {try{atomicInteger.compareAndSet(1, 50);System.out.println("線程B進行CAS后的值:"+atomicInteger.get());}catch (Exception e) {e.printStackTrace();}}, "線程B");threadA.start();try {threadA.join();} catch (InterruptedException e) {e.printStackTrace();}threadB.start();}
基于AtomicStampedReference類實現
AtomicStampedReference內部增加了版本號的概念,只有期待的值與版本號分別匹配后,才滿足條件,更新最新的值。
案例:
? ? ? 線程 A? 進行CAS 操作更新時,發布版本已發生變動,CAS更新?失敗。線程B? ?進行CAS 操作更新時,匹配對應的版本,期待值,更新成功。
public static void main(String[] args) {new Thread(() -> {// 讓線程B 獲取最新版本號,成功 執行更新try {Thread.sleep(11);} catch (InterruptedException e) {e.printStackTrace();}int stamp = atomicStampedReference.getStamp();System.out.println(Thread.currentThread().getName() + ", 當前版本號為:" + stamp);boolean firstCasFlag = atomicStampedReference.compareAndSet(100, 99, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);System.out.println("當前版本號:"+atomicStampedReference.getStamp()+", 線程A進行CAS后的值:" + atomicInteger.get() + ",第1次操作是否修改成功: " + firstCasFlag);}, "線程A").start();new Thread(() -> {try {int stamp = atomicStampedReference.getStamp();System.out.println(Thread.currentThread().getName() + ", 版本號為:" + atomicStampedReference.getStamp());boolean flag = atomicStampedReference.compareAndSet(100, 888, stamp, atomicStampedReference.getStamp() + 1);System.out.println("線程B進行CAS后的值:" + atomicStampedReference.getReference() + ", 此次操作是否修改成功: " + flag);} catch (Exception e) {e.printStackTrace();}}, "線程B").start();}
執行結果:
線程B, 版本號為:1
線程B進行CAS后的值:888, 此次操作是否修改成功: true
線程A, 當前版本號為:2
當前版本號:2, 線程A進行CAS后的值:1,第1次操作是否修改成功: false
?