Semaphore,如今通常被翻譯為"信號量",過去也曾被翻譯為"信號燈",因為類似于現實生活中的紅綠燈,車輛是否能通行取決于是否是綠燈。同樣,在編程世界中,線程是否能執行取決于信號量是否允許。
信號量是由著名的計算機科學家迪杰斯特拉(Dijkstra)于1965年提出的,直到1980年管程被提出,它一直是并發編程領域的主導方法。如今幾乎所有支持并發編程的語言都支持信號量機制,因此掌握信號量仍然非常必要。
下面我們首先介紹信號量模型,然后介紹如何運用信號量,最后使用信號量來實現一個流量控制器。
信號量模型
信號量模型還是很簡單的,可以簡單概括為: 一個計數器,一個等待隊列,三個方法。在信號量模型里,計數器和等待隊列對外是透明的,所以只能通過信號量模型提供的三個方法來訪問它們,這三個方法分別是:init()、down()和up()。你可以結合下圖來形象化地理解。

這三個操作的具體意義如下所示。
-
初始化(init()):設定計數器的起始值。 -
減少(down()):將計數器的值減1;如果此時計數器的值小于0,則當前線程會被阻塞,否則當前線程可以繼續執行。 -
增加(up()):將計數器的值加1;如果此時計數器的值小于或等于0,則喚醒等待隊列中的一個線程,并將其從等待隊列中移除。
上述提到的init()、down()和up()三個操作都是原子的,并且這種原子性由信號量模型的實現方保證。在Java SDK中,信號量模型由java.util.concurrent.Semaphore類實現,Semaphore類能夠確保這三個操作都是原子操作。
如果你覺得上面的描述有些復雜,可以參考下面的代碼來理解信號量模型的實現。
class?Semaphore{
??//?計數器
??int?count;
??//?等待隊列
??Queue?queue;
??//?初始化操作
??Semaphore(int?c){
????this.count=c;
??}
??//
??void?down(){
????this.count--;
????if(this.count<0){
??????//將當前線程插入等待隊列
??????//阻塞當前線程
????}
??}
??void?up(){
????this.count++;
????if(this.count<=0)?{
??????//移除等待隊列中的某個線程T
??????//喚醒線程T
????}
??}
}
這里再插一句,信號量模型里面,down()、up()這兩個操作歷史上最早稱為P操作和V操作,所以信號量模型也被稱為PV原語。另外,還有些人喜歡用semWait()和semSignal()來稱呼它們,雖然叫法不同,但是語義都是相同的。在Java SDK并發包里,down()和up()對應的則是acquire()和release()。
如何使用信號量
通過前文,你應該會發現信號量的模型還是非常簡單的,那具體應該如何使用呢?其實你可以想象一下紅綠燈。在十字路口,紅綠燈可以控制交通流量,這得益于一個重要的規則:車輛在通過路口前必須檢查是否為綠燈,只有綠燈才能通行。這個規則與我們之前提到的鎖規則很相似,不是嗎?
實際上,信號量的使用也是類似的。讓我們繼續用累加器的例子來說明信號量的使用吧。在累加器的例子中,count += 1操作是一個臨界區,只允許一個線程執行,也就是說需要保證互斥性。那么,在這種情況下,如何使用信號量來控制呢?
實際上非常簡單,就像我們使用互斥鎖一樣,只需要在進入臨界區之前執行一次down()操作,在退出臨界區之前執行一次up()操作就可以了。下面是Java代碼的示例,acquire()就是信號量中的down()操作,release()就是信號量中的up()操作。
static?int?count;
//初始化信號量
static?final?Semaphore?s
????=?new?Semaphore(1);
//用信號量保證互斥
static?void?addOne()?{
??s.acquire();
??try?{
????count+=1;
??}?finally?{
????s.release();
??}
}
接下來我們再來分析一下,信號量是如何確保互斥性的。假設有兩個線程T1和T2同時訪問addOne()方法,當它們同時調用acquire()時,由于acquire()是一個原子操作,因此只能有一個線程(假設是T1)將信號量中的計數器減為0,而另一個線程(T2)將計數器減為-1。對于線程T1來說,信號量中計數器的值是0,大于等于0,所以線程T1會繼續執行;對于線程T2來說,信號量中計數器的值是-1,小于0,根據信號量模型中對down()操作的描述,線程T2將被阻塞。因此,此時只有線程T1可以進入臨界區并執行 count += 1;
。
當線程T1執行release()操作,也就是up()操作時,信號量中計數器的值是-1,經過加1后的值是0,小于等于0,根據信號量模型中對up()操作的描述,此時等待隊列中的T2將被喚醒。于是,在T1執行完臨界區代碼之后,T2才有機會進入臨界區執行,從而確保了互斥性。
快速實現一個限流器
上述的示例,我們通過使用信號量實現了一個簡單的互斥鎖功能。也許你會覺得奇怪,既然Java的SDK中已經提供了Lock,為什么還需要提供Semaphore呢?實際上,互斥鎖只是Semaphore的一部分功能,而Semaphore還有一個Lock無法實現的功能,那就是:允許多個線程訪問臨界區。
實際情況中確實存在這樣的需求。比較常見的例子是我們在工作中遇到的各種資源池,比如連接池、對象池、線程池等等。其中,你可能對數據庫連接池最為熟悉,在同一時刻,允許多個線程同時使用連接池,但每個連接在釋放之前不允許其他線程使用。
實際上,不久前我在工作中也遇到了一個對象池的需求。對象池指的是一次性創建N個對象,然后所有線程重復利用這些對象,當然在對象釋放之前不允許其他線程使用。對象池可以使用List來保存實例對象,這很簡單。但關鍵在于限流器的設計,這里的限流指的是不允許超過N個線程同時進入臨界區。那么如何快速實現這樣的限流器呢?我立刻想到了使用信號量的解決方案。
在上面的例子中,信號量的計數器被設置為1,這個1表示只允許一個線程進入臨界區。但是,如果我們將計數器的值設為對象池中對象的數量N,就可以完美解決對象池的限流問題了。下面是一個對象池的示例代碼。
class?ObjPool<T,?R>?{
??final?List<T>?pool;
??//?用信號量實現限流器
??final?Semaphore?sem;
??//?構造函數
??ObjPool(int?size,?T?t){
????pool?=?new?Vector<T>(){};
????for(int?i=0;?i<size;?i++){
??????pool.add(t);
????}
????sem?=?new?Semaphore(size);
??}
??//?利用對象池的對象,調用func
??R?exec(Function<T,R>?func)?{
????T?t?=?null;
????sem.acquire();
????try?{
??????t?=?pool.remove(0);
??????return?func.apply(t);
????}?finally?{
??????pool.add(t);
??????sem.release();
????}
??}
}
//?創建對象池
ObjPool<Long,?String>?pool?=
??new?ObjPool<Long,?String>(10,?2);
//?通過對象池獲取t,之后執行
pool.exec(t?->?{
????System.out.println(t);
????return?t.toString();
});
我們使用一個List來存儲對象實例,并使用Semaphore實現限流器。關鍵代碼位于ObjPool的exec()方法中,這個方法實現了限流的功能。在這個方法中,我們首先調用acquire()方法(在finally塊中會調用release()方法),假設對象池大小為10,信號量的計數器初始化為10,那么前10個線程調用acquire()方法后可以繼續執行,相當于通過了信號燈,而其他線程將阻塞在acquire()方法上。通過了信號燈的線程,我們為每個線程分配一個對象t(通過pool.remove(0)實現),并執行一個回調函數func,該回調函數的參數正是之前分配的對象t;在執行完回調函數后,它們會釋放對象(通過pool.add(t)實現),同時調用release()方法來更新信號量的計數器。如果此時信號量的計數器值小于等于0,說明有線程在等待,此時會自動喚醒等待的線程。
簡而言之,使用信號量,我們可以輕松實現一個限流器,且使用起來非常簡單。
總結
信號量在Java語言中的知名度相對較低,但在其他編程語言中卻非常有名。Java在并發編程領域取得了很快的發展,重點支持的是管程模型。管程模型在理論上解決了信號量模型的一些不足之處,主要體現在易用性和工程化方面。例如,使用信號量來解決我們前面提到的阻塞隊列問題比使用管程模型要復雜許多。如果你感興趣的話,可以進行了解和嘗試。
本文由 mdnice 多平臺發布