計數信號量用來控制同時訪問某個特定資源的操作數或同時執行某個指定操作的數量
A counting semaphore.Conceptually, a semaphore maintains a set of permits. Each acquire blocks if necessary until a permit is?available, and then takes it. Each release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
從概念上來說,Semaphore中維護了一組許可,許可的數量在構造函數中指定。acquire方法將獲取一個可用的許可,如果沒有可用的許可,該方法會被阻塞,直到Semaphore中有可用的許可。release方法釋放一個許可,如果此時存在阻塞中的acqure方法,將釋放一個阻塞中的acquire
事實上,Semaphore中只維護可用請求數量,并不包含實際的請求對象
示例一:數據庫連接池
在初始化Semaphore時可以設置其公平性,如果為公平Semaphore,則按照請求時間獲得許可,即先發送的請求先獲得許可,如果為非公平Semaphore,則先發送的請求未必先獲得許可,這有助于提高程序的吞吐量,但是有可能導致某些請求始終獲取不到許可(tryAcquire方法不使用公平性設置)
- import?java.sql.Connection;??
- import?java.sql.DriverManager;??
- import?java.util.HashMap;??
- import?java.util.LinkedList;??
- import?java.util.Map;??
- import?java.util.concurrent.Semaphore;??
- ??
- public?class?MyConnPool?{??
- ????private?LinkedList<Connection>?unusedConns?=???
- ????????new?LinkedList<Connection>();??
- ????//釋放連接時對查找性能要求較高,故使用哈希表??
- ????private?Map<Connection,String>?usedConns?=???
- ????????????new?HashMap<Connection,String>();??
- ????private?final?Semaphore?available;??
- ??????
- ????public?MyConnPool(int?size)?throws?Exception{??
- ????????StringBuilder?builder?=?new?StringBuilder();??
- ????????builder.append("-----pool-----\n");??
- ????????available?=?new?Semaphore(size,?true);//公平性Semaphore??
- ????????String?url?=?"jdbc:mysql://ip:port/name?user=user&password=pwd";??
- ????????for(int?i?=?0?;?i?<?size?;?i++){??
- ????????????Connection?conn?=?DriverManager.getConnection(url);??
- ????????????unusedConns.add(conn);??
- ????????????builder.append("conn-"?+?i?+?":"?+?conn.hashCode()?+?"\n");??
- ????????}??
- ????????builder.append("--------------\n");??
- ????????System.out.print(builder.toString());??
- ????}??
- ??????
- ????public?Connection?getConn()?throws?InterruptedException{??
- ????????//獲取Semaphore中的許可??
- ????????available.acquire();??
- ????????Connection?conn?=?null;??
- ????????synchronized(this){??
- ????????????conn?=?unusedConns.removeFirst();??
- ????????????usedConns.put(conn,?"");??
- ??????????????
- ????????????System.out.println(Thread.currentThread().getName()??
- ????????????????????+?":"?+?conn.hashCode()?+?"[got]");??
- ????????????System.out.println(display());??
- ????????}??
- ????????return?conn;??
- ????}??
- ??????
- ????public?void?close(Connection?conn){??
- ????????synchronized(this){??
- ????????????if(usedConns.containsKey(conn)){??
- ????????????????usedConns.remove(conn);??
- ????????????????unusedConns.addLast(conn);??
- ??????????????????
- ????????????????System.out.println(Thread.currentThread().getName()??
- ????????????????????????+?":"?+?conn.hashCode()?+?"[closed]");??
- ????????????????System.out.println(display());??
- ????????????}??
- ????????}??
- ????????//釋放線程獲取的許可??
- ????????available.release();??
- ????}??
- ??????
- ????private?final?synchronized?String?display(){??
- ????????String?str?=?"";??
- ????????if(unusedConns.size()?>?0){??
- ????????????str?=?"";??
- ????????????for(Connection?conn?:?unusedConns){??
- ????????????????str?+=?conn.hashCode()?+?"|";??
- ????????????}??
- ????????}??
- ????????if(!str.equals(""))??
- ????????????return?str;??
- ????????else??
- ????????????return?"empty";??
- ????}??
- }??
- import?java.sql.Connection;??
- import?java.util.concurrent.CountDownLatch;??
- ??
- public?class?Test?implements?Runnable{??
- ????private?static?CountDownLatch?latch???
- ????????????=?new?CountDownLatch(1);??
- ????private?MyConnPool?pool;??
- ??????
- ????public?Test(MyConnPool?pool){??
- ????????this.pool?=?pool;??
- ????}??
- ??????
- ????@Override??
- ????public?void?run(){???
- ????????try?{??
- ????????????latch.await();??
- ????????????Connection?conn?=?pool.getConn();??
- ????????????Thread.sleep(1*1000);??
- ????????????pool.close(conn);??
- ????????}?catch?(InterruptedException?e)?{??
- ????????????e.printStackTrace();??
- ????????}??
- ????}??
- ??????
- ????public?static?void?main(String[]?args)?throws?Exception{??
- ????????MyConnPool?pool?=?new?MyConnPool(2);??
- ????????for(int?i?=?0?;?i?<?4?;?i++){??
- ????????????Thread?t?=?new?Thread(new?Test(pool));??
- ????????????t.start();??
- ????????}??
- ????????//保證4個線程同時運行??
- ????????latch.countDown();??
- ????}??
- }??
運行結果如下:
- -----pool-----??
- conn-0:11631043??
- conn-1:14872264??
- --------------??
- Thread-4:11631043[got]??
- 14872264|??
- Thread-1:14872264[got]??
- empty??
- Thread-4:11631043[closed]??
- 11631043|??
- Thread-2:11631043[got]??
- empty??
- Thread-1:14872264[closed]??
- 14872264|??
- Thread-3:14872264[got]??
- empty??
- Thread-2:11631043[closed]??
- 11631043|??
- Thread-3:14872264[closed]??
- 11631043|14872264|??
特別注意如果getConn方法和close方法都為同步方法,將產生死鎖:
- public?synchronized?Connection?getConn()?throws?InterruptedException{??
- ????......??
- }??
- ??????
- public?synchronized?void?close(Connection?conn){??
- ????......??
- }??
同一時刻只能有一個線程調用連接池的getConn方法或close方法,當Semaphore中沒有可用的許可,并且此時恰好有一個線程成功調用連接池的getConn方法,則該線程將一直阻塞在acquire方法上,其它線程將沒有辦法獲取連接池上的鎖并調用close方法釋放許可,程序將會卡死
阻塞方法上不要加鎖,否則將導致鎖長時間不釋放,如果該鎖為互斥鎖,將導致程序卡住
acquire方法本身使用樂觀鎖實現,也不需要再加互斥鎖
示例二:不可重入互斥鎖
- import?java.util.concurrent.CountDownLatch;??
- import?java.util.concurrent.Semaphore;??
- ??
- public?class?Test?implements?Runnable{??
- ????private?static?CountDownLatch?latch?=??
- ????????????new?CountDownLatch(1);??
- ????private?static?Semaphore?lock?=??
- ????????????new?Semaphore(1,?true);??
- ??????
- ????@Override??
- ????public?void?run(){???
- ????????try?{??
- ????????????latch.await();??
- ????????????this.work();??
- ????????}?catch?(InterruptedException?e)?{??
- ????????????e.printStackTrace();??
- ????????}??
- ????}??
- ??????
- ????private?void?work()?throws?InterruptedException{??
- ????????????lock.acquire();??
- ????????????System.out.println("Locking?by?"???
- ????????????????????+?Thread.currentThread().getName());??
- ????????????Thread.sleep(1*1000);??
- ????????????lock.release();??
- ????}??
- ??????
- ????public?static?void?main(String[]?args)?throws?Exception{??
- ????????for(int?i?=?0?;?i?<?4?;?i++){??
- ????????????Thread?t?=?new?Thread(new?Test());??
- ????????????t.start();??
- ????????}??
- ????????//保證4個線程同時運行??
- ????????latch.countDown();??
- ????}??
- }??
運行結果如下:
- Locking?by?Thread-3??
- Locking?by?Thread-0??
- Locking?by?Thread-1??
- Locking?by?Thread-2??
?