Queue是什么
隊列,是一種數據結構。除了優先級隊列和LIFO隊列外,隊列都是以FIFO(先進先出)的方式對各個元素進行排序的。無論使用哪種排序方式,隊列的頭都是調用remove()或poll()移除元素的。在FIFO隊列中,所有新元素都插入隊列的末尾。隊列都是線程安全的,內部已經實現安全措施,不用我們擔心
?
Queue中的方法
Queue中的方法不難理解,6個,每2對是一個也就是總共3對。看一下JDK API就知道了:
注意一點就好,Queue通常不允許插入Null,盡管某些實現(比如LinkedList)是允許的,但是也不建議。
ArrayBlockingQueue:基于數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。并且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。
LinkedBlockingQueue:基于鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。
PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。
DelayQueue:基于PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
注意:
1、必須要使用take()方法在獲取的時候達成阻塞結果
2、使用poll()方法將產生非阻塞效果
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}
public ArrayBlockingQueue(int capacity) {
}
public ArrayBlockingQueue(int capacity, boolean fair) {
?
}
public ArrayBlockingQueue(int capacity, boolean fair,
????????????????????????? Collection<? extends E> c) {
}
public void put(E e) throws InterruptedException {
??? if (e == null) throw new NullPointerException();
??? final E[] items = this.items;
??? final ReentrantLock lock = this.lock;
??? lock.lockInterruptibly();
??? try {
??????? try {
??????????? while (count == items.length)
??????????????? notFull.await();
??????? } catch (InterruptedException ie) {
??????????? notFull.signal(); // propagate to non-interrupted thread
??????????? throw ie;
??????? }
??????? insert(e);
??? } finally {
??????? lock.unlock();
??? }
}
private void insert(E x) {
??? items[putIndex] = x;
??? putIndex = inc(putIndex);
??? ++count;
??? notEmpty.signal();
}
public E take() throws InterruptedException {
??? final ReentrantLock lock = this.lock;
??? lock.lockInterruptibly();
??? try {
??????? try {
??????????? while (count == 0)
??????????????? notEmpty.await();
??????? } catch (InterruptedException ie) {
??????????? notEmpty.signal(); // propagate to non-interrupted thread
??????????? throw ie;
??????? }
??????? E x = extract();
??????? return x;
??? } finally {
??????? lock.unlock();
??? }
}
private E extract() {
??? final E[] items = this.items;
??? E x = items[takeIndex];
??? items[takeIndex] = null;
??? takeIndex = inc(takeIndex);
??? --count;
??? notFull.signal();
??? return x;
}
public class Test {
??? private int queueSize = 10;
??? private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
?
??? public static void main(String[] args)? {
??????? Test test = new Test();
??????? Producer producer = test.new Producer();
??????? Consumer consumer = test.new Consumer();
?
??????? producer.start();
??????? consumer.start();
??? }
?
??? class Consumer extends Thread{
?
??????? @Override
??????? public void run() {
??????????? consume();
??????? }
?
??????? private void consume() {
??????????? while(true){
??????????????? synchronized (queue) {
??????????????????? while(queue.size() == 0){
??????????????????????? try {
??????????????????????????? System.out.println("隊列空,等待數據");
??????????????????????????? queue.wait();
??????????????????????? } catch (InterruptedException e) {
??????????????????????????? e.printStackTrace();
??????????????????????????? queue.notify();
??????????????????????? }
??????????????????? }
??????????????????? queue.poll();????????? //每次移走隊首元素
??????????????????? queue.notify();
??????????????????? System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素");
??????????????? }
??????????? }
??????? }
??? }
?
??? class Producer extends Thread{
?
??????? @Override
??????? public void run() {
??????????? produce();
??????? }
?
??????? private void produce() {
??????????? while(true){
??????????????? synchronized (queue) {
??????????????????? while(queue.size() == queueSize){
??????????????????????? try {
??????????????????????????? System.out.println("隊列滿,等待有空余空間");
??????????????????????????? queue.wait();
??????????????????????? } catch (InterruptedException e) {
??????????????????????????? e.printStackTrace();
??????????????????????????? queue.notify();
??????????????????????? }
??????????????????? }
??????????????????? queue.offer(1);??????? //每次插入一個元素
??????????????????? queue.notify();
??????????????????? System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size()));
??????????????? }
??????????? }
??????? }
??? }
}
public class Test {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
while(true){
try {
queue.take();
System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce() {
while(true){
try {
queue.put(1);
System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}