使用
Semaphore是計數信號量。Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然后拿走一個許可證;每個release方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實并沒有實際的許可證這個對象,Semaphore只是維持了一個可獲得許可證的數量。?
Semaphore經常用于限制獲取某種資源的線程數量。下面舉個例子,比如說操場上有5個跑道,一個跑道一次只能有一個學生在上面跑步,一旦所有跑道在使用,那么后面的學生就需要等待,直到有一個學生不跑了,下面是這個例子:
/*** 操場,有5個跑道* Created by Xingfeng on 2016-12-09.*/
public class Playground {/*** 跑道類*/static class Track {private int num;public Track(int num) {this.num = num;}@Overridepublic String toString() {return "Track{" +"num=" + num +'}';}}private Track[] tracks = {new Track(1), new Track(2), new Track(3), new Track(4), new Track(5)};private volatile boolean[] used = new boolean[5];private Semaphore semaphore = new Semaphore(5, true);/*** 獲取一個跑道*/public Track getTrack() throws InterruptedException {semaphore.acquire(1);return getNextAvailableTrack();}/*** 返回一個跑道** @param track*/public void releaseTrack(Track track) {if (makeAsUsed(track))semaphore.release(1);}/*** 遍歷,找到一個沒人用的跑道** @return*/private Track getNextAvailableTrack() {for (int i = 0; i < used.length; i++) {if (!used[i]) {used[i] = true;return tracks[i];}}return null;}/*** 返回一個跑道** @param track*/private boolean makeAsUsed(Track track) {for (int i = 0; i < used.length; i++) {if (tracks[i] == track) {if (used[i]) {used[i] = false;return true;} else {return false;}}}return false;}}
- 從上面可以看到,創建了5個跑道對象,并使用一個boolean類型的數組記錄每個跑道是否被使用了,初始化了5個許可證的Semaphore,在獲取跑道時首先調用acquire(1)獲取一個許可證,在歸還一個跑道是調用release(1)釋放一個許可證。接下來再看啟動程序,如下:
public class SemaphoreDemo {static class Student implements Runnable {private int num;private Playground playground;public Student(int num, Playground playground) {this.num = num;this.playground = playground;}@Overridepublic void run() {try {//獲取跑道Playground.Track track = playground.getTrack();if (track != null) {System.out.println("學生" + num + "在" + track.toString() + "上跑步");TimeUnit.SECONDS.sleep(2);System.out.println("學生" + num + "釋放" + track.toString());//釋放跑道playground.releaseTrack(track);}} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {Executor executor = Executors.newCachedThreadPool();Playground playground = new Playground();for (int i = 0; i < 100; i++) {executor.execute(new Student(i+1,playground));}}}
- 上面的代碼中,Student類代表學生,首先獲取跑道,一旦獲取到就打印一句話,然后睡眠2s,然后再打印釋放,最后歸還跑道。
源碼解析
Semaphore有兩種模式,公平模式和非公平模式。公平模式就是調用acquire的順序就是獲取許可證的順序,遵循FIFO;而非公平模式是搶占式的,也就是有可能一個新的獲取線程恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的線程。
構造方法
Semaphore有兩個構造方法,如下:
public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
- 從上面可以看到兩個構造方法,都必須提供許可的數量,第二個構造方法可以指定是公平模式還是非公平模式,默認非公平模式。?
Semaphore內部基于AQS的共享模式,所以實現都委托給了Sync類。?
這里就看一下NonfairSync的構造方法:
NonfairSync(int permits) {super(permits);}
- 可以看到直接調用了父類的構造方法,Sync的構造方法如下:
Sync(int permits) {setState(permits);}
- 可以看到調用了setState方法,也就是說AQS中的資源就是許可證的數量。
獲取許可
先從獲取一個許可看起,并且先看非公平模式下的實現。首先看acquire方法,acquire方法有幾個重載,但主要是下面這個方法
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}
- 從上面可以看到,調用了Sync的acquireSharedInterruptibly方法,該方法在父類AQS中,如下:
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//如果線程被中斷了,拋出異常if (Thread.interrupted())throw new InterruptedException();//獲取許可失敗,將線程加入到等待隊列中if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
- AQS子類如果要使用共享模式的話,需要實現tryAcquireShared方法,下面看NonfairSync的該方法實現:
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}
- 該方法調用了父類中的nonfairTyAcquireShared方法,如下:
final int nonfairTryAcquireShared(int acquires) {for (;;) {//獲取剩余許可數量int available = getState();//計算給完這次許可數量后的個數int remaining = available - acquires;//如果許可不夠或者可以將許可數量重置的話,返回if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
- 從上面可以看到,只有在許可不夠時返回值才會小于0,其余返回的都是剩余許可數量,這也就解釋了,一旦許可不夠,后面的線程將會阻塞。看完了非公平的獲取,再看下公平的獲取,代碼如下:
protected int tryAcquireShared(int acquires) {for (;;) {//如果前面有線程再等待,直接返回-1if (hasQueuedPredecessors())return -1;//后面與非公平一樣int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
- 從上面可以看到,FairSync與NonFairSync的區別就在于會首先判斷當前隊列中有沒有線程在等待,如果有,就老老實實進入到等待隊列;而不像NonfairSync一樣首先試一把,說不定就恰好獲得了一個許可,這樣就可以插隊了。?
看完了獲取許可后,再看一下釋放許可。
釋放許可
釋放許可也有幾個重載方法,但都會調用下面這個帶參數的方法,
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}
- releaseShared方法在AQS中,如下:
public final boolean releaseShared(int arg) {//如果改變許可數量成功if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
- AQS子類實現共享模式的類需要實現tryReleaseShared類來判斷是否釋放成功,實現如下:
protected final boolean tryReleaseShared(int releases) {for (;;) {//獲取當前許可數量int current = getState();//計算回收后的數量int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//CAS改變許可數量成功,返回trueif (compareAndSetState(current, next))return true;}}
- 從上面可以看到,一旦CAS改變許可數量成功,那么就會調用doReleaseShared()方法釋放阻塞的線程。
減小許可數量
Semaphore還有減小許可數量的方法,該方法可以用于用于當資源用完不能再用時,這時就可以減小許可證。代碼如下:
protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction);}
- 可以看到,委托給了Sync,Sync的reducePermits方法如下:
final void reducePermits(int reductions) {for (;;) {//得到當前剩余許可數量int current = getState();//得到減完之后的許可數量int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");//如果CAS改變成功if (compareAndSetState(current, next))return;}}
- 從上面可以看到,就是CAS改變AQS中的state變量,因為該變量代表許可證的數量。
獲取剩余許可數量
Semaphore還可以一次將剩余的許可數量全部取走,該方法是drain方法,如下:
public int drainPermits() {return sync.drainPermits();}
- Sync的實現如下:
final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}
- 可以看到,就是CAS將許可數量置為0。
總結
Semaphore是信號量,用于管理一組資源。其內部是基于AQS的共享模式,AQS的狀態表示許可證的數量,在許可證數量不夠時,線程將會被掛起;而一旦有一個線程釋放一個資源,那么就有可能重新喚醒等待隊列中的線程繼續執行。
轉自:https://blog.csdn.net/qq_19431333/article/details/70212663