分析下SemaPhore吧,也是基于AQS實現的,對并發進行控制的工具類,看下其怎么實現的,
Semaphore semaphore = new Semaphore(3);semaphore.acquire();semaphore.release();
Semaphore 常用于控制并發量,比如這里設置為3,就可以只有三個線程可以acquire拿到資源,后續來的線程需要排隊,等原有線程release釋放之后,才可以接入新的請求,用于控制最大并發。
acquire 實現
// 默認非公平的
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果獲取不到,走的下面阻塞進行入等待隊列if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
// 執行的AQS的獲取資源
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 添加共享節點final Node node = addWaiter(Node.SHARED);try {for (;;) {// 死循環判斷,park之后喚醒,還是走這里final Node p = node.predecessor();// 如果前面是頭節點的話if (p == head) {// 執行的子類實現的嘗試方法int r = tryAcquireShared(arg);// 獲取成功的話if (r >= 0) {// 對其進行喚醒setHeadAndPropagate(node, r);p.next = null; // help GCreturn;}}// 如果不是頭節點,判斷需要park不,前節點是signal就進行park// park之前檢查是不是被打斷// 如果第一次不是,會給前節點設置signal,然后下一次再循環到,就park了if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} catch (Throwable t) {cancelAcquire(node);throw t;}
}// 實際獲取到鎖之后,改頭,然后傳播,這里是不是傳播根據子類返回的是0還是大于0private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);// 大于0,頭節點為空(執行完了),狀態小于0,// 新的頭節點(當前節點)為空,或者狀態小于0if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;//如果有后節點為空或者是共享的,釋放if (s == null || s.isShared())doReleaseShared();}}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 這里會先把狀態改為0,改成功了會是釋放,成功釋放之后if (ws == Node.SIGNAL) {if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}// 如果為0 改為傳播else if (ws == 0 &&!h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; // loop on failed CAS}// 判斷等于頭,就是沒改變頭就breakif (h == head) // loop if head changedbreak;}}
可以看到這是在獲取資源,獲取不到的時候進入隊列等待,默認的是非公平的,去看下怎么實現的
Sync
Semaphore 內部類Sync實現了AQS,看下怎么實現的
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;// 初始設置的資源數,也是通過stateSync(int permits) {setState(permits);}final int getPermits() {return getState();}// 非公平的獲取資源final int nonfairTryAcquireShared(int acquires) {for (;;) {// 獲取可用的資源int available = getState();// 如果可用的小于需要獲取的int remaining = available - acquires;// 小于0直接返回了,如果不小于0,就cas設置,設置成功就返回對應的值了大于等于0的if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}// 釋放資源protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();// 給對應的數量加上釋放的數量int next = current + releases;// 釋放的不能為負數,也不能超過限制if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// cas設置,成功返回釋放完成if (compareAndSetState(current, next))return true;}}// 減去對應的statefinal void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}final int drainPermits() {for (;;) {// 是不是為0,不為0的時候嘗試設置為0int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}
}
// 看下對應的公平鎖實現,非公平直接使用Sync的方法獲取
static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {// 是不是有在等待的,有就返回-1了,差的就是這個判斷if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
}
可以看到Sync繼承AQS之后實現的獲取資源方法就是對對應的state進行減,確保其大于等于0,有就可以獲取,公平非公平的實現就是判斷喜愛是不是有在等待的,有的話直接返回-1,不進行嘗試。
release
public void release() {sync.releaseShared(1);
}public final boolean releaseShared(int arg) {// 先改,成功就實際釋放if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
protected final boolean tryReleaseShared(int releases) {for (;;) {// 改了state的值int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 喚醒后面if (ws == Node.SIGNAL) {if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue; // loop to recheck cases// 實際喚醒線程unparkSuccessor(h);}else if (ws == 0 &&!h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}
}// 喚醒線程
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)node.compareAndSetWaitStatus(ws, 0);// 獲取下一個節點,不為空的時候直接喚醒Node s = node.next;// 如果是空或者取消狀態的話if (s == null || s.waitStatus > 0) {s = null;// 從后向前遍歷,然后喚醒,這里喚醒之后應該去繼續拿資源for (Node p = tail; p != node && p != null; p = p.prev)if (p.waitStatus <= 0)s = p;}if (s != null)LockSupport.unpark(s.thread);
}
總結
簡單總結下吧,Semaphore 通過AQS的state來控制并發數量,也分為公平和非公平,但是使用的是共享鎖,這樣就能根據數量進行喚醒,AQS提供的方法tryAcquire 讓子類實現的,返回正數代表可以繼續向后喚醒,返回0自己得到資源可以執行,就通過這樣的形式來控制并發