協作基礎(wait/notify)
Java的根父類是Object,Java在Object類而非Thread類中,定義了一些線程協作的基本方法,使得每個對象都可以調用這些方法,這些方法有兩類,一類是wait,另一類是notify。
wait方法主要有兩個:
public final void wait() throws InterruptedException public final native void wait(long timeout) throws InterruptedException;
一個帶時間參數,單位是毫秒,表示最多等待這么長時間,參數為0表示無限期等待。一個不帶時間參數,表示無限期等待,實際就是調用wait(0)。在等待期間都可以被中斷,如果被中斷,會拋出InterruptedException。
wait實際上做了什么呢?每個對象都有一把鎖和一個鎖等待隊列,一個線程在進入synchronized代碼塊時,會嘗試獲取鎖,獲取不到的話會把當前線程加入等待隊列中。其實,除了用于鎖的等待隊列,每個對象還有另一個等待隊列,表示條件隊列,該隊列用于線程間的協作。調用wait就會把當前線程放到條件隊列上并阻塞,表示當前線程執行不下去了,它需要等待一個條件,這個條件它自己改變不了,需要其他線程改變。當其他線程改變了條件后,應該調用Object的notify方法:
public final native void notify(); public final native void notifyAll();
notify做的事情就是從條件隊列中選一個線程,將其從隊列中移除并喚醒,notifyAll和notify的區別是,它會移除條件隊列中所有的線程并全部喚醒。
wait/notify方法只能在synchronized代碼塊內被調用,如果調用wait/notify方法時,當前線程沒有持有對象鎖,會拋出異常java.lang.IllegalMonitorStateException。
wait的具體過程是:
- 把當前線程放入條件等待隊列,釋放對象鎖,阻塞等待,線程狀態變為WAITING或TIMED_WAITING
- 等待時間到或被其他線程調用notify/notifyAll從條件隊列中移除,這時,要重新競爭對象鎖
-
- 如果能夠獲得鎖,線程狀態變為RUNNABLE,并從wait調用中返回
- 否則,該線程加入對象鎖等待隊列,線程狀態變為BLOCKED,只有在獲得鎖后才會從wait調用中返回
線程從wait調用中返回后,不代表其等待的條件就一定成立了,它需要重新檢查其等待的條件,一般的調用模式是:
synchronized (obj) {while (條件不成立)obj.wait();... // 條件滿足后的操作 }
?
生產者/消費者模式
下面來看一個生產者和消費者的例子:
/*** @author 沉默哥* */ public class MyProducerConsumerDemo {static class GoodsQueue {private int size;private Queue<String> que = new ArrayDeque<String>();public GoodsQueue(int size) {// 維護一個有界隊列,傳入隊列的最大容量super();this.size = size;}public synchronized void put(String e) throws InterruptedException {while (que.size() == size) {System.out.println("隊列已滿,生產者等待");wait();}que.add(e);System.out.println("生產者生產:" + e);notify();}public synchronized String take() throws InterruptedException {while (que.size() == 0) {System.out.println("隊列為空,消費者等待");wait();}String e = que.poll();System.out.println("消費者消費" + e);notify();return e;}}static class Producer extends Thread {GoodsQueue que;Random rad = new Random();public Producer(GoodsQueue que) {super();this.que = que;}@Overridepublic void run() {int i = 0;try {while (true) {String e = String.valueOf(i);que.put(e);i++;Thread.sleep(rad.nextInt(1000));// 生產者休息準備下一次生產 }} catch (InterruptedException e1) {}}}static class Consumer extends Thread {GoodsQueue que;Random rad = new Random();public Consumer(GoodsQueue que) {super();this.que = que;}@Overridepublic void run() {try {while (true) {que.take();Thread.sleep(rad.nextInt(1000));// 消費者休息準備下一次消費 }} catch (InterruptedException e) {}}}public static void main(String[] args) throws InterruptedException {GoodsQueue que = new GoodsQueue(1);Producer pro = new Producer(que);Consumer con = new Consumer(que);con.start();Thread.sleep(500);pro.start();} }
?