阻塞隊列是什么
阻塞隊列是一種特殊的隊列.也遵循"先進先出"的原則
阻塞隊列能是一種線程安全的數據結構,并且具有以下特性:
當隊列滿的時候,繼續入隊列就會阻塞,直到有其他線程從隊列中取走元素.
當隊列空的時候,繼續出隊列也會阻塞,直到有其他線程往隊列中插入元素.
阻塞隊列的一個典型應用場景就是"生產者消費者模型".這時一種非常典型的開發模型.
生產者消費者模型
實際開發中,經常會涉及到分布式系統.服務器整個功能不是由一個服務器全部完成的.而是每個服務器負責一部分功能.通過服務器間的網絡通信,最終完成整個功能.
生產者消費者模型就是通過一個容器來解決生產者和消費者的強耦合問題.(更好地做到解耦合的能力).
生產者和消費者彼此之間不再進行通訊,而是通過阻塞隊列來進行通訊,所以生產者生產完數據不用再等消費者處理,而是直接丟給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列中取.
示意圖如下:
1.阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力.(削峰填谷)
比如在"秒殺"的場景下,服務器同一時刻可能會受到大量的支付請求.如果直接處理這些支付要求,服務器可能扛不住(每個支付請求的處理都需要比較復雜的流程,即使一個請求消耗的資源少,但加到一起,總的消耗的資源就多了,任何一種硬件資源達到瓶頸,服務器都會掛).這個時候就可以把這些請求都放到一個阻塞隊列中,然后再由消費者線程慢慢來處理每個支付請求.?
這樣做可以有效做到"削峰",防止服務器被突然來到的一波請求直接沖垮(掛的直觀現象:給它發請求,無回應).
2.阻塞隊列也能使生產者和消費者之間"解耦"
比如過年一家人一起包餃子.一般都是有明確分工,比如一個人負責搟餃子皮,其他人負責包.搟餃子皮的人就是"生產者",包餃子的人就是"消費者".
搟餃子皮的人并不關心包餃子的人是誰(能包就行,無論是手工,借助工具還是機器),包餃子的人也不需要關心搟餃子皮的人是誰(有餃子皮就行,無論是用搟面杖搟的,還是用ipadAir5搟的)
補充說明:
(1)上述描述的阻塞隊列,并非是簡單的數據結構,而是基于這個這個數據結構實現的服務器程序,又被部署到單獨的主機上了(消息隊列)
(2)整個系統的結構更復雜了.你要維護的服務器更多了
(3)效率.引入中間商,還是有差價的.比如在上面的圖當中,請求從A出來到B收到.過程中的就經歷隊列的轉發,這個過程有一定開銷.
標準庫中的阻塞隊列
在Java標準庫中內置了阻塞隊列.如果我們需要在一些程序中使用阻塞隊列,直接使用標準庫中的即可.
譬如有:ArrayBlockingQueue, LinkedBlockingQueue,PriorityBlockingQueue.但最常用的是
LinkedBlockingQueue.
BlockingQueue是一個接口.真正實現的類是LinkedBlockingQueue.
put方法用于阻塞式的入隊列,take用于阻塞式的出隊列.
BlockingQueue也有offer,poll,peek等方法,但是這些方法不具有阻塞特性.
簡單的代碼示例:
public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<>();//入隊列queue.put("abc");//出隊列.如果沒有put直接take,會阻塞.String elem = queue.take();System.out.println(elem);}
}
生產者消費者模型
實際開發中,生產者消費者模型,往往是多個生產者多個消費者.
這里的生產者和消費者往往不僅是一個線程,也可能是獨立的服務器程序.甚至是一組服務器程序.?
代碼示例如下:
public class TestCustomerAndProducer {public static void main(String[] args) {BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();Thread customer = new Thread(() -> {while(true) {try {int value = blockingQueue.take();System.out.println("消費元素: " + value);Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}, "消費者");Thread producer = new Thread(() -> {Random r = new Random();while(true) {try {int num = r.nextInt(1000);System.out.println("生產元素: " + num);blockingQueue.put(num);Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}}}, "生產者");customer.start();producer.start();}
}
?阻塞隊列的實現
通過"循環隊列"的方式實現.
使用synchronized進行加鎖控制.
put插入元素的時候,判定如果隊列滿了,就進行wait.(注意,要在循環中進行wait.被喚醒時不一定隊列就不滿了,因為同時可能是喚醒了多個線程).
take取出元素的時候如果判定隊列為空,就進行wait(也是循環wait).
下面展示代碼(注意注釋中的重點):
public class MyBlockingQueue {//主題內容指定為一個含有1000個元素的數組public int[] elems = new int[1000];private volatile int size = 0;private volatile int head = 0;private volatile int tail = 0;//鎖對象private Object locker = new Object();public synchronized int getSize() {return size;}public void put(int value) throws InterruptedException {//鎖加到這里和加到方法上的本質是一樣的,加到方法上是給this加鎖,此處是給locker對象加鎖.synchronized (locker) {while(size >= elems.length) {//1//隊列滿了//后續需要讓這個代碼能夠阻塞locker.wait();}//新的元素要放到tail指向的位置上elems[tail] = value;tail = (tail + 1) % elems.length;size++;//入隊之后喚醒(可能有阻塞的take方法)locker.notify();}}public int take() throws InterruptedException {int ret = 0;synchronized (locker) {while(size <= 0) {//1//隊列空了//后續也需要讓這個代碼阻塞locker.wait();}//取出head位置的元素并返回ret = elems[head];head = (head + 1) % elems.length;size--;//元素出隊列成功后,加上喚醒locker.notify();}return ret;}
}
我相信大家應該能了解鎖是怎么加的,這里不過多贅述.
那可能就會有人問,1處的判斷處為什么用的是while,而不是if?
?這主要是因為put和take中使用的是同一把鎖.我們可能會想到,如put中元素滿了阻塞,然后take出元素了,這里就解鎖.但我要說的是,如果是put成功了(這里put之后隊列剛好滿了),又喚醒了另一個阻塞的put,又進行put,顯然會出錯,那么就可以加上循環條件,如果隊列一直是滿的,就不再被喚醒,所以用while.
?