簡介
juc,java.util.concurrent包的簡稱,java1.5時引入。juc中提供了一系列的工具,可以更好地支持高并發任務
juc中提供的工具
可重入鎖 ReentrantLock
可重入鎖:ReentrantLock,可重入是指當一個線程獲取到鎖之后,可以再次獲取到當前鎖。可重入鎖一定程度上防止了死鎖。
ReentrantLock提供的功能:
- 可重入:在獲取到鎖之后還可以再次獲取這把鎖
- 可打斷:獲取鎖時的阻塞狀態可以被interrupt方法打斷
- 可超時:可以指定阻塞時長
- 多條件變量:synchronized只支持一個條件變量,這里條件變量是指調用wait方法、notify方法的鎖對象,ReentrantLock可以實現在多個條件變量上等待和喚醒
- 可以指定內部使用公平鎖還是非公平鎖,默認使用非公平鎖
ReentrantLock和synchronized都支持可重入,但是synchronized沒有ReentrantLock提供的其它功能
使用案例
案例1:基本使用
10個線程,同時對同一個int變量執行1000次加加,確認結果是否正確。
private static final ReentrantLock LOCK = new ReentrantLock();
private static int count = 0;public static void main(String[] args) {List<Thread> list = new ArrayList<>();for (int i = 0; i < 10; i++) {Thread thread = new Thread(() -> {// 加鎖LOCK.lock();try {for(int j = 0; j < 1000; j++) {// 訪問共享資源count++;}} finally {// 釋放鎖LOCK.unlock();}});list.add(thread);thread.start();}for (Thread thread : list) {try {thread.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}// 驗證結果,結果正確System.out.println(count);
}
案例2:屬性方法
獲取ReentrantLock的狀態
private static final ReentrantLock LOCK = new ReentrantLock();public static void main(String[] args) throws InterruptedException {// 等待鎖的線程Thread thread2 = new Thread(() -> {LOCK.lock();try {for (int i = 0; i < 100000; i++) {}} finally {LOCK.unlock();}}, "t2");// 擁有鎖的線程new Thread(() -> {try {LOCK.lock();// 擁有鎖的情況下鎖的狀態System.out.println("---擁有鎖---");System.out.println("鎖是否被某個線程持有 = " + LOCK.isLocked()); // trueSystem.out.println("重入次數 = " + LOCK.getHoldCount()); // 1System.out.println("鎖是否被當前線程持有 = " + LOCK.isHeldByCurrentThread()); // trueSystem.out.println("阻塞隊列中是否有等待鎖的線程 = " + LOCK.hasQueuedThreads()); // trueSystem.out.println("線程2是否在阻塞隊列中 = " + LOCK.hasQueuedThread(thread2)); // trueSystem.out.println("阻塞隊列的長度 = " + LOCK.getQueueLength()); // 1System.out.println("鎖是不是公平鎖 = " + LOCK.isFair()); // false// java.util.concurrent.locks.ReentrantLock@6a53a7e9[Locked by thread t1]System.out.println("鎖.toString方法 = " + LOCK.toString()); Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {LOCK.unlock();}}, "t1").start();thread2.start();Thread.sleep(1000L);// 沒有上鎖的情況下ReentrantLock的狀態System.out.println("---沒有鎖---");System.out.println("鎖是否被某個線程持有 = " + LOCK.isLocked()); // falseSystem.out.println("重入次數 = " + LOCK.getHoldCount()); // 0System.out.println("鎖是否被當前線程持有 = " + LOCK.isHeldByCurrentThread()); // falseSystem.out.println("阻塞隊列中是否有等待鎖的線程 = " + LOCK.hasQueuedThreads()); // falseSystem.out.println("線程2是否在阻塞隊列中 = " + LOCK.hasQueuedThread(thread2)); // falseSystem.out.println("阻塞隊列的長度 = " + LOCK.getQueueLength()); // 0System.out.println("鎖是不是公平鎖 = " + LOCK.isFair()); // false// java.util.concurrent.locks.ReentrantLock@6a53a7e9[Unlocked]System.out.println("鎖.toString方法 = " + LOCK.toString());
}
案例3:可超時
可以指定超時時間,超過指定時長沒有獲取鎖,算失敗
private static final ReentrantLock LOCK = new ReentrantLock();// 測試tryLock方法
public static void main(String[] args) {// 先啟動一個線程占用鎖new Thread(() -> {LOCK.lock();try {Utils.println("當前線程獲取鎖");Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {Utils.println("當前線程釋放鎖");LOCK.unlock();}}).start();boolean tryLock = false;try{tryLock = LOCK.tryLock(4, TimeUnit.SECONDS);if (tryLock) {Utils.println("當前線程獲取鎖成功");} else {Utils.println("當前線程獲取鎖失敗");}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (tryLock) {Utils.println("當前線程釋放鎖");LOCK.unlock();}}
}
案例4:可打斷
使用lockInterruptibly方法獲取鎖,線程的等待狀態可以被interrupt方法打斷,普通的lock方法則不會。
線程在等待狀態下,如果外部調用了線程對象的interrupt方法,線程會結束等待狀態,如果是可打斷地獲取鎖,此時會拋出InterruptedException,結束獲取鎖的操作,然后需要用戶處理這個異常。
private static final ReentrantLock LOCK = new ReentrantLock();// 測試lockInterruptibly方法
public static void main(String[] args) throws InterruptedException {new Thread(() -> {// 主線程先占住鎖資源Utils.println("當前線程獲取鎖");LOCK.lock();try {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}} finally {if (LOCK.isHeldByCurrentThread()) {Utils.println("當前線程釋放鎖");LOCK.unlock();}}}, "線程0").start();Thread.sleep(500);// 可打斷地搶占鎖資源Thread thread = new Thread(() -> {try {LOCK.lockInterruptibly();Utils.println("當前線程獲取鎖");} catch (InterruptedException e) {e.printStackTrace();Utils.println("當前線程被打斷");} finally {if (LOCK.isHeldByCurrentThread()) {Utils.println("當前線程釋放鎖");LOCK.unlock();}}}, "線程1");thread.start();Thread.sleep(500);// 打斷線程1,在lockInterruptibly方法獲取鎖時調用線程的interrupt方法,// lockInterruptibly方法會拋出InterruptedExceptionthread.interrupt();
}
條件對象 Condition
Condition:聯合鎖對象一起使用,表示條件對象,提供了類似于Object類中的wait、notify等方法的功能。
- 當調用Condition實例中的await、signal方法時,如果當前線程沒有持有鎖資源,則拋出非法監視器狀態異常;
- 當線程調用Condition中的await方法時,線程放棄鎖資源,進入等待列表,如果在等待過程中被打斷,拋出中斷異常
通過Condition,可以支持在一個鎖對象上操作多個條件變量
常用api:
- await:
void await() throws InterruptedException
:相當于Object類中的wait方法 - signal:
void signal()
:相當于Object類中的notify方法 - signalAll:
void signalAll()
:相當于Object類中的notifyAll方法
案例1:多條件變量
類似于生產者/消費者模式,只不過這個案例中有兩個生產者、兩個消費者,它們一一對應。
private static final List<Integer> list = new ArrayList<>();
private static final List<Integer> list2 = new ArrayList<>();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Condition condition = LOCK.newCondition();
private static final Condition condition2 = LOCK.newCondition();public static void main(String[] args) throws InterruptedException {// 消費者1new Thread(() -> {while (true) {LOCK.lock();try {if (list.isEmpty()) {condition.await();} else {System.out.println(Thread.currentThread().getName() + " 消費數據 " + list);list.clear();}} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);} finally {LOCK.unlock();}try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "消費者1").start();// 生產者1new Thread(() -> {while (true) {LOCK.lock();try {if (list.isEmpty()) {list.addAll(createIntegerList(10));System.out.println(Thread.currentThread().getName() + " 生產數據 " + list);condition.signal();}} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);} finally {LOCK.unlock();}try {Thread.sleep(3000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "生產者1").start();// 消費者2new Thread(() -> {while (true) {LOCK.lock();try {if (list2.isEmpty()) {condition2.await();} else {System.out.println(Thread.currentThread().getName() + " 消費數據 " + list2);list2.clear();}} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);} finally {LOCK.unlock();}try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "消費者2").start();// 生產者2new Thread(() -> {while (true) {LOCK.lock();try {if (list2.isEmpty()) {list2.addAll(createIntegerList(100));System.out.println(Thread.currentThread().getName() + " 生產數據 " + list2);condition2.signal();}} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);} finally {LOCK.unlock();}try {Thread.sleep(3000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, "生產者2").start();
}public static List<Integer> createIntegerList(int bound) {List<Integer> list = new ArrayList<>();Random random = new Random();list.add(random.nextInt(bound));list.add(random.nextInt(bound));list.add(random.nextInt(bound));return list;
}
總結:這個案例比較粗糙,只是演示了一個鎖對象在支持多個條件變量的情況,要注意,如果是一個生產者對應多個消費者,signal方法會喚醒等待隊列中的第一個線程。
讀寫鎖 ReentrantReadWriteLock
ReentrantReadWriteLock:可重入的讀寫鎖,讀鎖和寫鎖使用同一個同步器,讀讀不沖突,讀寫沖突。寫鎖是互斥鎖,讀鎖是共享鎖。如果同步隊列中有寫鎖,讀鎖會排在寫鎖之后
讀寫鎖的使用規則:
- 讀讀不沖突:多個線程是可以同時獲取讀鎖而不需要阻塞等待
- 讀寫沖突:一個線程獲取了讀鎖,那么其他的線程要獲取寫鎖 需要等待;同樣的,一個線程獲取了寫鎖,另外的想要獲取讀鎖或者寫鎖都需要阻塞等待
- 鎖降級:一個獲取寫鎖的線程是可以在釋放寫鎖之前再次獲取讀鎖的,這就是鎖降級
案例:一個使用讀寫鎖來保護數據的容器
第一步:數據容器:
public class DataContainer {private Object data;private final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();private final ReentrantReadWriteLock.ReadLock READ_LOCK = LOCK.readLock(); // 獲取讀鎖private final ReentrantReadWriteLock.WriteLock WRITE_LOCK = LOCK.writeLock(); // 獲取寫鎖/*** 讀取數據*/public Object read(){Utils.println("獲取讀鎖...");READ_LOCK.lock();try{Utils.println("讀取數據...");Thread.sleep(1000);return data;} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {Utils.println("釋放讀鎖...");READ_LOCK.unlock();}return null;}/*** 寫入數據*/public void write(Object data){Utils.println("獲取寫鎖...");WRITE_LOCK.lock();try{this.data = data;Utils.println("寫入數據...");} finally {Utils.println("釋放寫鎖...");WRITE_LOCK.unlock();}}
}
第二步:測試,讀讀不沖突
public static void main(String[] args) {DataContainer container = new DataContainer();// 兩個線程同時獲取讀鎖,讀讀不沖突new Thread(() -> {Object read = container.read();System.out.println("read = " + read);}).start();new Thread(() -> {Object read = container.read();System.out.println("read = " + read);}).start();
}
加戳的讀寫鎖 StampedLock
StampedLock:自jdk8加入,對于讀寫鎖的進一步優化。它提供了一種樂觀讀技術,讀取完畢后需要做一次戳校驗,如果校驗通過,表示這期間確實沒有寫操作,數據可以安全使用,如果校驗沒有通過,需要重新獲取鎖,保證數據安全。適合于讀多寫少的場景
案例:
第一步:一個使用加戳讀寫鎖保護的容器
public class DataContainerStamped {private Object data;private final StampedLock LOCK = new StampedLock();public Object read() {// 第一步:樂觀地讀取數據Object result = null;// 獲取樂觀讀鎖long stamp = LOCK.tryOptimisticRead();Utils.println("樂觀地讀取數據,stamp = " + stamp);try {Thread.sleep(3000L);result = data;} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}// 第二步:讀完數據之后對戳進行校驗if (LOCK.validate(stamp)) {Utils.println("樂觀地讀完數據,stamp = " + stamp);return result;}// 第三步:如果時間戳變了,證明數據有更新,需要重新讀取數據Utils.println("更新讀鎖,stamp = " + stamp);try {stamp = LOCK.readLock();Utils.println("獲取讀鎖, stamp = " + stamp);try {Thread.sleep(3000L);result = data;} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}} finally {Utils.println("釋放讀鎖, stamp = " + stamp);LOCK.unlockRead(stamp);}return result;}public void write(Object newData) {long stamp = LOCK.writeLock();Utils.println("獲取寫鎖,stamp = " + stamp);try {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}this.data = newData;} finally {Utils.println("釋放寫鎖,stamp = " + stamp);LOCK.unlockWrite(stamp);}}
}
第二步:驗證樂觀讀,在讀取數據的時候寫入數據。結論是,寫入數據會改變數據戳,樂觀讀完數據需要會校驗戳,如果數據戳被改變,需要再次重新讀取。
public static void main(String[] args) {DataContainerStamped container = new DataContainerStamped();new Thread(() -> {Object read = container.read();System.out.println("read = " + read);}).start();try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}new Thread(() -> container.write(10)).start();
}
原子類
原子類內部維護了一個數據,并且通過cas算法來操作這個數據,確保對這個數據的操作是線程安全的,同時又避免了鎖競爭。
原子類適合處理單個變量需要在線程間共享的情況。
CAS算法
cas:compare and swap,比較和交換,實現無鎖同步的一種算法
工作機制:CAS算法維護了3個變量:內存位置、預期原值、新值,它會使用預期原值和內存位置存儲的值相比較,如果相同:進行交換操作,如果不同則不進行,整個比較并交換的操作是原子性的。
底層原理:在語法上,CAS算法操作的變量必須被volatile修飾,CAS算法的底層是基于CPU的原語支持,能夠保證 “比較-交換”操作是原子性的。
CAS算法的優缺點:
- 優點:可以避免阻塞,CAS算法適合多核、并發量不高的情況
- 缺點:
- 只能保證一個共享變量的原子操作,如果是多個的話,就需要使用鎖了。
- ABA問題:如果先將預期值A給成B,再改回A,那CAS操作就會誤認為A的值從來沒有被改變過,這時其他線程的CAS操作仍然能夠成功。通過加入一個版本號來解決這個問題
- 如果并發量很大,CAS算法的性能可能會降低。因為如果并發量很大,重試必然頻繁發生,這會導致效率降低
案例:
import sun.misc.Unsafe;
import java.lang.reflect.Field;public class CasTest {private volatile int a = 10;public static void main(String[] args) throws Exception {new CasTest().test();}public void test() throws Exception {Field field = Unsafe.class.getDeclaredField("theUnsafe");field.setAccessible(true);Unsafe unsafe = (Unsafe) field.get(null);// 獲取字段a的內存偏移量Field fieldI = CasTest.class.getDeclaredField("a");long fieldIOffset = unsafe.objectFieldOffset(fieldI);// cas操作// 參數1:操作哪個對象// 參數2:操作對象上的哪個字段// 參數3:預期值// 參數4:新值boolean b = unsafe.compareAndSwapInt(this, fieldIOffset, 10, 11);assert b;assert a == 11;}
}
常用的原子類
AtomicInteger
內部維護了一個int類型的數據,對這個int類型的數據的所有操作都是原子性的。
案例1:兩個線程同時修改原子類變量,只有一個可以修改成功
public static void main(String[] args) {AtomicInteger i = new AtomicInteger(10);int intValue = i.get();new Thread(() -> {boolean b = i.compareAndSet(intValue, 11);// 底層調用cas算法System.out.println(Thread.currentThread().getName() + " b = " + b);}, "t1").start();new Thread(() -> {boolean b = i.compareAndSet(intValue, 12);System.out.println(Thread.currentThread().getName() + " b = " + b);}, "t1").start();System.out.println("i = " + i.get()); // 11
}
案例2:10個線程,對一個原子類的變量各加加1000次,判斷最終結果是否正確
public static void main(String[] args) throws InterruptedException {AtomicInteger i = new AtomicInteger(0);List<Thread> threads = new ArrayList<>();for (int j = 0; j < 10; j++) {Thread thread = new Thread(() -> {for (int k = 0; k < 1000; k++) {int i1 = i.getAndIncrement();}});threads.add(thread);thread.start();}// 等待線程執行結束for (Thread thread : threads) {thread.join();}System.out.println("i = " + i.get()); // 10000,結果正確
}
AtomicReference
內部維護了一個普通JavaBean的原子類,對這個bean的操作是原子性的,但是,不支持單獨操作bean中的某個字段,必須整體替換bean。
案例:兩個線程同時修改原子類變量,只有一個可以修改成功
public static void main(String[] args) throws InterruptedException {AtomicReference<User> userAtomicReference = new AtomicReference<>(new User("張三", 18));User user = userAtomicReference.get();new Thread(() -> {boolean b = userAtomicReference.compareAndSet(user, new User("李四", 19));System.out.println(Thread.currentThread().getName() + " b = " + b);}, "t1").start();new Thread(() -> {boolean b = userAtomicReference.compareAndSet(user, new User("王五", 20));System.out.println(Thread.currentThread().getName() + " b = " + b);}, "t2").start();Thread.sleep(1000);System.out.println("user = " + userAtomicReference.get()); // 李四 19
}
AtomicIntegerArray
操作數組的原子類,支持原子性地修改數組中的某個元素
案例:兩個線程同時更新數組中某個下標處的值
public static void main(String[] args) throws InterruptedException {int[] intArr = new int[10];AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(intArr);new Thread(() -> {boolean b = atomicIntegerArray.compareAndSet(1, 0, 11);System.out.println(Thread.currentThread().getName() + " b = " + b);}).start();new Thread(() -> {boolean b = atomicIntegerArray.compareAndSet(1, 0, 12);System.out.println(Thread.currentThread().getName() + " b = " + b);}).start();Thread.sleep(100L);// 結論:只有一個線程可以更新成功,而且AtomicIntegerArray維護的是數組的拷貝而不是元數據,// 所以在原數組中看不出更新內容System.out.println("intArr[1] = " + atomicIntegerArray.get(1)); // 11System.out.println("intArr[1] = " + intArr[1]); // 0
}
AtomicStampedReference
使用一個版本號來解決ABA問題,每次操作都需要手動更新版本號。
案例:在主線程對變量進行修改的時候,發生了ABA問題
public static void main(String[] args) throws InterruptedException {AtomicReference<String> ref = new AtomicReference<>("A");// 主線程修改數據String prev = ref.get();// ABA操作:將數據從A改為B,再改回來new Thread(() -> Utils.println("change A -> B: "+ ref.compareAndSet("A", "B")), "t1").start();Thread.sleep(100L);new Thread(() -> Utils.println("change B -> A: "+ ref.compareAndSet("B", "A")), "t2").start();Thread.sleep(1000);Utils.println("change A -> C: " + ref.compareAndSet(prev, "C"));
}
案例:使用AtomicStampedReference解決ABA問題
public static void main(String[] args) throws InterruptedException {AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 1);// 主線程執行更新操作String prev = ref.getReference();int stamp = ref.getStamp(); // 1// 在這個過程中,其它線程執行了ABA操作new Thread(() -> {int stamp1 = ref.getStamp();Utils.println("change A -> B: "+ ref.compareAndSet("A", "B", stamp1, stamp1 + 1));}, "t1").start();Thread.sleep(100);new Thread(() -> {int stamp1 = ref.getStamp();Utils.println("change B -> A: "+ ref.compareAndSet("B", "A", stamp1, stamp1 + 1));}, "t2").start();Thread.sleep(1000);// 結果:主線程更新失敗,解決了ABA問題Utils.println("change A -> C: "+ ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
累加器 LongAdder LongAccumulator
它們都是Java 8引入的高性能累加器,原理幾乎一樣,內部維護了一個base變量和Cell數組,將累加操作分散到多個槽中,減少競爭,需要累加值的時候,調用sum方法,把Cell數組中每個槽中的數據相加。累加器用于替換AtomicLong,它們更加適合高并發場景,因為CAS在高并發場景下性能可能會降低。它們的不同之處在于,LongAdder適合于簡單的計算,LongAccumulator適合于需要復雜計算的累加場景,它可以定制計算規則
案例:LongAdder
public static void main(String[] args) {LongAdder longAdder = new LongAdder();longAdder.increment(); // 增加 1longAdder.add(10); // 增加 10System.out.println("總計數值: " + longAdder.sum()); // 輸出:11longAdder.reset(); // 重置計數器System.out.println("重置后總值: " + longAdder.sum()); // 輸出:0
}
案例:LongAccumulator
public static void main(String[] args) {// 定義累加規則為加法LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);for (int i = 0; i < 10; i++) {new Thread(() -> {longAccumulator.accumulate(10);}).start();}System.out.println("longAccumulator.get() = " + longAccumulator.get()); // 100
}
總結
原子類通常以Atomic開頭
juc中提供的原子類:
- 基本類型原子類:AtomicInteger、AtomicLong、AtomicBoolean
- 引用類型原子類:AtomicReference、AtomicStampedReference(解決ABA問題)
- 數組類型原子類:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
- 累加器:LongAdder、DoubleAdder、LongAccumulator、DoubleAccumulator
源碼解析
原子類的源碼比較簡單,它的內部調用了Unsafe類的cas算法,通過它來保證線程安全,這里以AtomicInteger為例,了解原子類的工作機制,
public class AtomicInteger extends Number implements java.io.Serializable {// 封裝了int類型的變量,并且變量被volatile修飾private volatile int value;// 獲取int類型的變量在對象中的內存偏移量private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}// 通過cas算法,更新value字段的值public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}
}
總結:原子類中封裝的變量使用volatile修飾,使用cas算法來更新,保證線程的安全性
信號量
信號量:Semaphore,用來限制并發度的工具,避免并發了過大,從而達到保護程序的目的
使用案例:使用semaphore來線程線程的并發數量,同一時刻只能有三個線程同時運行
public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 10; i++) {new Thread(() -> {try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}try {Utils.println("running...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println("end...");} finally {semaphore.release();}}).start();}
}
常用api:
- acquire:
void acquire()
:從信號量獲取一個許可,如果無可用許可前將一直阻塞等待 - acquire:
void acquire(int permits)
:獲取指定數目的許可,如果無可用許可前也將會一直阻塞等待 - tryAcquire:
boolean tryAcquire()
:從信號量嘗試獲取一個許可,如果無可用許可,直接返回false,不會阻塞。它的重載方法可以獲取指定數目的許可,也可以指定阻塞的時間 - release:
public void release()
:釋放一個許可證,計數器加1
使用方式總結:
- 在構造信號量對象時,指定許可證數量,一個許可證對應一個線程,表示最多有多少個線程可以執行任務,
- 在每個線程中,執行任務前,調用acquire方法,獲取許可證,semaphore對象中許可證數量減1
- 執行完任務后,調用release方法,釋放許可證,semaphore對象中許可證數量加1
- 如果semaphore對象中許可證數量為0,線程調用acquire方法時會進入阻塞狀態,直到其它線程釋放許可證
- 通過這種方式,實現控制并發度的功能
LockSupport
用于創建鎖和同步類的基本線程阻塞原語。當前線程調用LockSupport的park方法,可以進入阻塞狀態,在線程外可以調用unpark方法,同時傳入線程實例,可以讓指定線程退出阻塞狀態。
park和unpark與wait和notify的區別:
- wait、notify必須在同步塊內調用,park、unpark不必
- notify和notifyAll無法精確控制喚醒哪一個線程,park和unpark可以
案例1:基本使用
// 先調用park方法
public static void main(String[] args) {Thread thread = new Thread(() -> {Utils.println("start");LockSupport.park();Utils.println("end");});thread.start();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println("解除指定線程的阻塞狀態");// 再調用unpark方法LockSupport.unpark(thread);
}
案例2:在park方法之前調用unpark方法會怎么樣?
public static void main(String[] args) {Thread thread = new Thread(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println("start");LockSupport.park();Utils.println("end");});thread.start(); // 注意,線程必須先啟動,否則針對它調用unpark方法沒有效果。Utils.println("解除指定線程的阻塞狀態");LockSupport.unpark(thread); // LockSupport類似于只有一張許可證的Semaphore
}
結論:先調用unpark方法會,park方法不會阻塞線程
LockSupport的源碼非常簡單,它的底層是基于Unsafe類的park、unpark方法,它只是直接調用了這兩個方法,然后再傳入其它必要的參數,比如超時時間。
倒計時鎖
CountdownLatch:倒計時鎖,做線程間的同步協作,在某個位置等待所有線程完成倒計時,然后再向下執行。
案例:使用CountDownLatch,實現主線程等待所有子線程執行完成后在執行的效果
public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(3);new Thread(() -> {Utils.println("開始執行任務");Utils.sleep(1000);Utils.println("執行完成");latch.countDown();}).start();new Thread(() -> {Utils.println("開始執行任務");Utils.sleep(2000);Utils.println("執行完成");latch.countDown();}).start();new Thread(() -> {Utils.println("開始執行任務");Utils.sleep(1500);Utils.println("執行完成");latch.countDown();}).start();Utils.println("等待中");try {latch.await();} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println("等待結束");
}
使用方式總結:
- 第一步:構建倒計時鎖,構造函數中指定需要等待多少個任務完成
- 第二步:主線程調用await方法,進入阻塞,用來等待計數器歸零
- 第三步:執行任務的線程,執行完任務后,調用countDown方法,計數器減1,表示執行完一個任務了
- 結果:當所有任務都執行完后,await方法結束阻塞
循環柵欄
循環柵欄:CyclicBarrier,允許一組線程互相等待,直到到達某個公共屏障點,并且在釋放等待線程后可以重用。CyclicBarrier的字面意思是可循環使用的屏障。它要做的事情是,讓一組線程到達一個屏障時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續干活。
作用:適用于一組線程中的每一個線程需要都等待所有線程完成任務后再繼續執行下一次任務的場景
CountDownLatch和CyclicBarrier的異同:
- 相同點:都有讓多個線程等待同步然后再開始下一步動作的意思
- 不同點:
- CountDownLatch的下一步的動作實施者是主線程,具有不可重復性;
- CyclicBarrier的下一步動作實施者還是“其他線程”本身,具有往復多次實施動作的特點。
案例:
public static void main(String[] args) {// 第一步:指定需要同步的線程數和所有線程都到達同步點之后需要執行的方法CyclicBarrier barrier = new CyclicBarrier(2, // 只有所有線程到達同步點之后,才會執行這個任務() -> Utils.println("任務結束"));ExecutorService pool = Executors.newFixedThreadPool(2);pool.submit(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}Utils.println("任務1執行");try {// 第二步:執行任務的線程到達同步點barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println("任務1執行完成");});pool.submit(() -> {try {Thread.sleep(2000L);} catch (InterruptedException e) {throw new RuntimeException(e);}Utils.println("任務2執行");try {// 第二步:執行任務的線程到達同步點。barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();Thread.currentThread().interrupt();}Utils.println("任務2執行完成");});pool.shutdown();
}
源碼解析
循環柵欄不像其他工具類那樣,內部依賴AQS,它的內部只使用了可重入鎖。線程會阻塞在wait方法,直到所有的線程都執行到wait方法,再一起向下執行,接下來看一下它是如何做到的?
核心代碼:
// wait方法的內部會調用dowait方法
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock(); // 加鎖try {final Generation g = generation; // 代表循環柵欄的一次運行if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 所有線程都會執行wait方法,--count,表示當前線程執行到了wait方法,數值減1int index = --count;// 如果count等于0,表示所有線程都執行到了wait方法,執行預先定義好的異步任務if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;nextGeneration(); // 循環柵欄的下一次運行return 0;} finally {if (!ranAction)breakBarrier(); // 打破屏障,喚醒進入等待狀態的線程}}// 如果count不等于0,證明需要等待,在上面count=0的分支中,會喚醒等待中的線程// loop until tripped, broken, interrupted, or timed outfor (;;) { // 自旋try {// 進入等待狀態if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;// 如果等待超時,同樣打破屏障if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}// 打破屏障
private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();
}
總結:wait方法表示屏障點,線程會被阻塞在wait方法處,直到所有線程都執行到wait方法,會執行指定的異步任務,然后喚醒阻塞的線程。
線程安全的集合類
jdk1.8之前提供的安全集合:HashTable、Vector,它們的實現比較粗糙,直接使用synchronized關鍵字修飾整個方法。
使用Collections工具類中提供的方法,修飾一個集合。案例:List<String> list = Collections.synchronizedList(new ArrayList<String>())
,使用一個線程安全的集合來包裝用戶提供的集合,但是線程安全的集合中,所有的方法都是被synchronized修飾的,效率比較低下。
juc的安全集合:
- BlockingXXX:LinkedBlockingQueue、ArrayBlockingQueue,基于JUC中提供的鎖,線程池使用這兩個個隊列作為阻塞隊列
- CopyOnWriteXXX:CopyOnWriteArrayList、CopyOnWriteArraySet,基于鎖,寫時復制,適合讀多寫少的場景。
- ConcurrentXXX:ConcurrentHashMap,通過cas算法和局部加鎖的方式優化了性能
ArrayBlockingQueue
基于數組的同步隊列,內部使用ReentrantLock,所有的讀寫操作全部加鎖
// 元素入隊的方法
public boolean offer(E e) {checkNotNull(e);// 加鎖final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e); // 入隊return true;}} finally {lock.unlock(); // 釋放鎖}
}// 查看隊列頭部的元素,內部也會加鎖
public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}
}
CopyOnWriteArrayList
copy on write,寫時復制,當對集合進行修改操作時,不會直接修改原數組,而是創建一個新的數組副本,在副本上進行修改,然后將原數組替換為新數組。這種機制確保了在修改過程中,讀操作不會受到影響,因為讀操作始終基于原數組進行。
適用于讀多寫少的場景:由于寫操作需要創建數組副本,寫操作的性能開銷較大,但讀操作的性能非常高效,因此,非常適合讀操作頻繁且寫操作較少的場景
// 向集合中添加數據的源碼
public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock(); // 加鎖try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1); // 創建原先數組的復制品newElements[len] = e;setArray(newElements); // 使用復制好的數組替換原數組return true;} finally {lock.unlock();}
}
ConcurrentHashMap
使用方法和HashMap相同,但它是線程安全的。通過cas算法和局部加鎖(只鎖住某個節點)的方式,盡可能的避免鎖和減小鎖的粒度,以此來優化性能。這里簡單了解一下它的工作機制
// 首先,存儲數據的數組,使用volatile修飾,確保修改可以立刻被其他線程看到
transient volatile Node<K,V>[] table;
添加元素的核心方法:
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 計算key的哈希值int hash = spread(key.hashCode());int binCount = 0;// 死循環,自旋for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 如果存儲元素的數組為null,初始化數組,這里就是在第一次put元素的時候初始化數組if (tab == null || (n = tab.length) == 0)tab = initTable();// 根據哈希值計算元素的下標,如果下標處沒有值,進入當前分支else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 使用cas算法,更新下標處的值,如果更新成功,退出,// 這個過程是不加鎖的,使用cas算法可以保證線程安全if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}// 判斷是否正在擴容,如果是,幫助擴容else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {// 下標處有值,發生哈希沖突V oldVal = null;// 只在這一個節點上加鎖synchronized (f) {// 再次判斷,判斷當前節點沒有發生變化,否則它有可能已經被其他線程更新了。if (tabAt(tab, i) == f) {// 處理鏈表if (fh >= 0) { // 哈希值大于0,證明它沒有在擴容,并且不是樹節點binCount = 1;// 遍歷鏈表for (Node<K,V> e = f;; ++binCount) { K ek;// 如果找到相同的key,更新valueif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}// 將新節點掛載到鏈表的尾部Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 處理紅黑樹else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 判斷是否需要擴容if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i); // 擴容,或者將鏈表轉換為紅黑樹if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;
}
總結:通過cas算法來向數組中寫入元素,寫元素時如果發生哈希沖突,只在發生沖突的節點上加鎖,盡可能減小鎖的粒度。
讀取元素是不需要加鎖的,因為元素使用volatile修飾,其它線程可以立刻看到元素的變化