一 Semaphore入門
1.1 什么是Semaphore
Semaphore,俗稱信號量,它是操作系統中PV操作的原語在java的實現,它也是基于AbstractQueuedSynchronizer實現的。
Semaphore的功能非常強大,大小為1的信號量就類似于互斥鎖,通過同時只能有一個線程獲取信號量實現。大小為n(n>0)的信號量可以實現限流的功能,它可以實現只能有n個線程同時獲取信號量。
什么是pv操作?
PV操作是操作系統一種實現進程互斥與同步的有效方法。PV操作與信號量(S)的處理相關,P表示通過的意思,V表示釋放的意思。用PV操作來管理共享資源時,首先要確保PV操作自身執行的正確性。
P操作的主要動作是:
①S減1;
②若S減1后仍大于或等于0,則進程繼續執行;
③若S減1后小于0,則該進程被阻塞后放入等待該信號量的等待隊列中,然后轉進程調度。
V操作的主要動作是:
①S加1;
②若相加后結果大于0,則進程繼續執行;
③若相加后結果小于或等于0,則從該信號的等待隊列中釋放一個等待進程,然后再返回原進程繼續執行或轉進程調度。
1.2 Semaphore的常用方法
構造器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
● permits 表示許可證的數量(資源數)
● fair 表示公平性,如果這個設為 true 的話,下次執行的線程會是等待最久的線程
常用方法
public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
● acquire() 表示嘗試獲取許可(獲取不到則阻塞)。
● tryAcquire() 方法在沒有許可的情況下會立即返回 false,要獲取許可的線程不會阻塞。
● release() 表示釋放許可。
● int availablePermits():返回此信號量中當前可用的許可證數。
● int getQueueLength():返回正在等待獲取許可證的線程數。
● boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
● void reducePermit(int reduction):減少 reduction 個許可證
● Collection getQueuedThreads():返回所有等待獲取許可證的線程集合
1.3 應用場景
可以用于做流量控制,特別是公用資源有限的應用場景
代碼演示:
模擬一個每5S只能處理5個請求的限流Demo
/**
-
限流測試
-
@author wcy
*/
@Slf4j
public class SemaphoneDemo1 {/**
- 實現一個同時只能處理5個請求的限流器
*/
private static Semaphore semaphore = new Semaphore(5);
/**
- 定義一個線程池
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 50,
60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
/**
- 模擬執行方法
*/
public static void exec() {
try {
semaphore.acquire(1);
// 模擬真實方法執行
log.info(“執行exec方法”);
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release(1);
}
}
public static void main(String[] args) throws InterruptedException {
{for (; ; ) {Thread.sleep(100);// 模擬請求以10個/s的速度executor.execute(() -> exec());}}
}
}
運行結果: - 實現一個同時只能處理5個請求的限流器
可以看出,每個周期內只能5個線程執行了方法
二 Semaphore原理
學習Semaphore源碼的時候我們有兩個關注點:
-
Semaphore的加鎖解鎖(共享鎖)邏輯實現
-
線程競爭鎖失敗入隊阻塞邏輯和獲取鎖的線程釋放鎖喚醒阻塞線程競爭鎖的邏輯實現
加鎖邏輯(acquire)
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
這里調用了同步器的acquireSharedInterruptibly(int arg)方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
先看判斷邏輯tryAcquireShared(arg)方法,這是同步器子類實現的
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
再看nonfairTryAcquireShared(acquires)方法
final int nonfairTryAcquireShared(int acquires) {
for (;😉 {
//獲取許可證數量
int available = getState();
//減去當前線程使用的許可數
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
返回可用的許可數,如果<0,說明沒有可用的許可,就會進入 doAcquireSharedInterruptibly(arg)方法,這個方法也是同步器實現的
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;😉 {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
注意:Node node = addWaiter(Node.SHARED);這是構建隊列的方法,但是和ReentrantLock不同的是,這里參數傳的是Node.SHARED,第一個if邏輯是當同步隊列中第一個線程被喚醒后,會進入這里重新競爭鎖,競爭成功后,做出隊的操作,我們假設這里是第一次構建隊列,先看addWaiter(Node.SHARED)方法
static final Node SHARED = new Node();
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這里第一個線程入隊,會調用enq方法構建隊列,后來的線程會進入if分支,加入隊列尾部。當前線程Node的nextWaiter=Node.SHARED
private Node enq(final Node node) {
for (;😉 {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
tail仍然為空,通過cas操作,新建一個頭節點,這就是并發的精髓了,通過一個死循環,第二次循環的時候tail不為空,進入else邏輯,把當前線程所在的節點的前驅節點指向前邊的結點,并把當前線程節點設置為尾結點。(這里通過cas保證線程安全問題),至此,我們的隊列構建完成,回到doAcquireSharedInterruptibly方法中,可以看出,如果當前線程節點的前驅節點如果是頭節點是話還會進行一次cas操作去嘗試獲取許可,假設還沒有線程釋放許可,返回負數,進入第二個if邏輯中,有兩個判斷方法,shouldParkAfterFailedAcquire(p, node)和parkAndCheckInterrupt()方法
先看shouldParkAfterFailedAcquire(p, node)方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
/
return true;
if (ws > 0) {
/
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don’t park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
先判斷前驅節點的waitStatus是否為-1.如果不是-1,通過cas操作改為-1,返回false,外邊是一個死循環,會第二次進入這個方法,這次判斷為-1,返回true,進入parkAndCheckInterrupt()方法,
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
在這里,我們的線程就阻塞著了。
解鎖邏輯(release):
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
接著看releaseShared方法,這個是由同步器來實現的
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
接著看tryReleaseShared,這個是同步器子類實現的,主要目的就是釋放一個資源許可。
protected final boolean tryReleaseShared(int releases) {
for (;😉 {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error(“Maximum permit count exceeded”);
if (compareAndSetState(current, next))
return true;
}
}
這里釋放鎖后,許可加1,執行doReleaseShared()方法
doReleaseShared()方法,
private void doReleaseShared() {
/
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
/
for (;😉 {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
把頭節點的waitStatus置為0,調用unparkSuccessor方法
unparkSuccessor方法
private void unparkSuccessor(Node node) {
/
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);
}
拿到頭結點的下一個節點,喚醒同步隊列中阻塞的第一個線程,此時又會回到阻塞的地方doAcquireSharedInterruptibly方法中
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;😉 {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這時候,被阻塞的第一個線程被喚醒,重新進入循環,會進入第一個if中,此時調用tryAcquireShared方法可以拿到一個許可,也就是r>=0,
然后調用setHeadAndPropagate方法,這就是共享鎖和獨占鎖的區別之一
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//設置新的頭節點
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//喚醒下一個節點
doReleaseShared();
}
}
在這里先把當前線程的節點設置為新的頭節點,再次嘗試喚醒下一個節點,這樣有個好處,就是資源釋放得快的話,線程就持續被喚醒,這也就保證了Semaphone可以限流的原因,同時刻,只要有線程釋放資源,其他線程就可以拿到許可進而執行自己的業務。