在內部,這些實現巧妙地利用了Java 1.5中引入的另一種并發功能-阻塞隊列。
隊列
首先,簡要回顧一下什么是標準隊列。 在計算機科學中,隊列只是一個集合,始終將元素添加到末尾,并始終從頭開始獲取元素。 表達式先進先出(FIFO)通常用于描述標準隊列。 在Java 1.6中引入的是Deque或雙端隊列,該接口現在在LinkedList上實現。 Java中的某些隊列允許其他排序,例如使用Comparator甚至編寫自己的排序實現。 雖然擴展功能很好,但是我們今天關注的是BlockingQueues如何真正在并發開發中大放異彩。
阻塞隊列
阻塞隊列是一些隊列,它們還公開了在沒有可用元素的情況下阻止檢索元素的請求的功能,該附加選項可以限制等待時間。 在受限制的大小隊列上,嘗試添加時可以使用相同的阻止功能。 讓我們深入探討一下BlockingQueue用法的示例。
讓我們假設一個簡單的場景。 您有一個處理線程,其功能只是執行命令。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;private BlockingQueue<Command> workQueue = new LinkedBlockingQueue<Command>();public void addCommand(Command command) {workQueue.offer(command);
}public Object call() throws Exception {try {Command command = workQueue.take();command.execute();} catch (InterruptedException e) {throw new WorkException(e);}
}
當然,這是一個非常簡單的示例,但它向您展示了對多個線程使用BlockingQueue的基本知識。 讓我們嘗試一些更多的事情。 在此示例中,我們需要創建一個具有限制的連接池。 它僅應根據需要創建連接。 沒有客戶端將等待超過5秒的可用連接。
private BlockingQueue<Connection> pool = new ArrayBlockingQueue<Connection>(10);
private AtomicInteger connCount = new AtomicInteger();public Connection getConnection() {Connection conn = pool.poll(5, TimeUnit.SECONDS);if (conn == null) {synchronized (connCount) {if (connCount.get() < 10) {conn = getNewConnection();pool.offer(conn);connCount.incrementAndGet();}}if (conn == null) {throw new ConnUnavailException();} else {return conn;}}
}
最后,讓我們考慮一個有趣的實現示例示例SynchronousQueue 。
在此示例中,類似于我們的第一個示例,我們想要執行一個Command,但是需要知道它何時完成,最多等待2分鐘。
private BlockingQueue workQueue = new LinkedBlockingQueue();
private Map commandQueueMap = new ConcurrentHashMap(); public SynchronousQueue addCommand(Command command) {SynchronousQueue queue = new SynchronousQueue();commandQueueMap.put(command, queue);workQueue.offer(command);return queue;
}public Object call() throws Exception {try {Command command = workQueue.take();Result result = command.execute();SynchronousQueue queue = commandQueueMap.get(command);queue.offer(result);return null;} catch (InterruptedException e) {throw new WorkException(e);}
}
現在,使用者可以安全地輪詢其請求以執行其命令的超時。
Command command;
SynchronousQueue queue = commandRunner.addCommand(command);
Result result = queue.poll(2, TimeUnit.MINUTES);
if (result == null) {throw new CommandTooLongException(command);
} else {return result;
}
正如您開始看到的那樣,java中的BlockingQueues提供了很大的靈活性,并為您提供了相對簡單的結構來滿足多線程應用程序中的許多(如果不是全部)需求。 我們甚至沒有審查過一些非常整潔的BlockingQueues ,例如PriorityBlockingQueue和DelayQueue 。 看看他們并取得聯系。 我們喜歡與開發人員交談。
參考: Carfey Software博客上的JCG合作伙伴的Java并發第5部分-阻塞隊列 。
- Java并發教程–信號量
- Java并發教程–重入鎖
- Java并發教程–線程池
- Java并發教程–可調用,將來
- Java并發教程– CountDownLatch
- Exchanger和無GC的Java
- Java Fork / Join進行并行編程
- 使用迭代器時如何避免ConcurrentModificationException
- 改善Java應用程序性能的快速技巧
翻譯自: https://www.javacodegeeks.com/2011/09/java-concurrency-tutorial-blocking.html