目錄
wait , notify
wait vs sleep
正確使用方法
同步保護性暫停
join的源碼
Future
異步生產者/消費者模型
定義
Park & Unpark
原理
wait , notify
小故事小南需要煙才能工作,但它又要占這鎖讓別人無法進來。那么這個時候開一個waitSet相當于就是休息室讓小南進去。并且釋放鎖。如果煙到了,那么notify小南就能夠繼續工作了。
Blocked和Waiting區別其實就是waiting是釋放了鎖,blocked是沒有鎖waiting被notify之后仍然需要進入到entrylist進行等待。
@Slf4j(topic = "c.TestWaitNotify")public class Test {?// 鎖對象final static Object obj = new Object();?public static void main(String[] args) throws InterruptedException {?new Thread(() -> {synchronized (obj) {log.debug("執行....");try {obj.wait(); // 讓線程在obj上一直等待下去} catch (InterruptedException e) {e.printStackTrace();}log.debug("其它代碼....");}},"t1").start();?new Thread(() -> {synchronized (obj) {log.debug("執行....");try {obj.wait(); // 讓線程在obj上一直等待下去} catch (InterruptedException e) {e.printStackTrace();}log.debug("其它代碼....");}},"t2").start();?// 主線程兩秒后執行Thread.sleep(5000);log.debug("喚醒 obj 上其它線程");synchronized (obj) {// ? ? ? ? ? obj.notify(); // 喚醒obj上一個線程obj.notifyAll(); // 喚醒obj上所有等待線程}}}
20:17:53.579 [t1] DEBUG c.TestWaitNotify - 執行....20:17:53.581 [t2] DEBUG c.TestWaitNotify - 執行....20:17:58.584 [main] DEBUG c.TestWaitNotify - 喚醒 obj 上其它線程20:17:58.584 [t2] DEBUG c.TestWaitNotify - 其它代碼....20:17:58.584 [t1] DEBUG c.TestWaitNotify - 其它代碼....?進程已結束,退出代碼0
wait vs sleep
sleep:Thread調用,靜態方法,而且不會釋放鎖
wait:所有obj,但是要配合synchronize使用,可以釋放鎖
sleep在睡眠時,不會釋放鎖,wait會釋放對象鎖
通常鎖會加上final防止被修改
正確使用方法
小南需要煙才能工作,如果是使用sleep不釋放鎖,那么其他需要等待干活的人就會干等著,等煙來。但是wait可以讓小南釋放鎖,讓其他線程工作,并且喚醒小南
@Slf4j(topic = "c.TestCorrectPosture")public class Test {static final Object room = new Object();?// 有無煙static boolean hasCigarette = false;static boolean hasTakeout = false;?public static void main(String[] args) throws InterruptedException {new Thread(() -> {synchronized (room) {log.debug("有煙沒?[{}]", hasCigarette);if (!hasCigarette) {log.debug("沒煙,先歇會!");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("有煙沒?[{}]", hasCigarette);if (hasCigarette) {log.debug("可以開始干活了");}}}, "小南").start();?for (int i = 0; i < 5; i++) {new Thread(() -> {synchronized (room) {log.debug("可以開始干活了");}}, "其它人").start();}?Thread.sleep(1000);// 送煙線程new Thread(() -> {synchronized (room) {hasCigarette = true;log.debug("煙到了噢!");}}, "送煙的").start();}?}
20:32:22.014 [小南] DEBUG c.TestCorrectPosture - 有煙沒?[false]20:32:22.019 [小南] DEBUG c.TestCorrectPosture - 沒煙,先歇會!20:32:24.024 [小南] DEBUG c.TestCorrectPosture - 有煙沒?[false]20:32:24.024 [送煙的] DEBUG c.TestCorrectPosture - 煙到了噢!20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以開始干活了20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以開始干活了20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以開始干活了20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以開始干活了20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以開始干活了?進程已結束,退出代碼0
存在的問題 :
-
其它干活的線程,都要一致阻塞,效率低
-
就算煙提前送到,也無法立刻醒來
-
送煙加上鎖之后,相當于門一直鎖著,煙送不進去
改進 :
@Slf4j(topic = "c.TestCorrectPosture")public class Test {static final Object room = new Object();?// 有無煙static boolean hasCigarette = false;static boolean hasTakeout = false;?public static void main(String[] args) throws InterruptedException {new Thread(() -> {synchronized (room) {log.debug("有煙沒?[{}]", hasCigarette);if (!hasCigarette) {log.debug("沒煙,先歇會!");try {room.wait();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("有煙沒?[{}]", hasCigarette);if (hasCigarette) {log.debug("可以開始干活了");}}}, "小南").start();?for (int i = 0; i < 5; i++) {new Thread(() -> {synchronized (room) {log.debug("可以開始干活了");}}, "其它人").start();}?Thread.sleep(1000);// 送煙線程new Thread(() -> {synchronized (room) {hasCigarette = true;log.debug("煙到了噢!");room.notify(); ? ?// 叫醒小南線程}}, "送煙的").start();}?}
存在問題
會不會有其他線程在等待著鎖?如果是那么會不會喚醒錯了線程?(虛假喚醒)
解決 :
可以通過while多次判斷條件是否成立,直接使用notifyAll來喚醒所有的線程。然后線程被喚醒之后先再次判斷條件是否成立,成立那么往下面執行,如果不成立那么繼續執行wait。
while (!hasCigarette) {log.debug("沒煙,先歇會!");try {room.wait();} catch (InterruptedException e) {e.printStackTrace();}}
正確使用 :
synchronized(lock){while(條件不成立){lock.wait();} // 干活}?// 另一個線程synchronized(lock){lock.notifyAll();}
同步保護性暫停
定義
-
t1需要t2的結果,那么就可以通過一個中間對象guardedObject來充當這個中間商,t2執行完就發送消息到obj,然后obj交給t1
-
如果是不斷發送結果那么可以使用消息隊列
-
要等待所以是同步
-
join和future就是用的這個原理
public class Test {public static void main(String[] args) {GuaObj guaObj = new GuaObj();Thread thread = new Thread(() -> {System.out.println("鎖住,等待結果");guaObj.get(2000);System.out.println("解鎖");}, "t1");thread.start();??Thread thread1 = new Thread(() -> {System.out.println("先睡兩秒");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("解鎖,設置對象");guaObj.set(new Object());}, "t2");thread1.start();}}?class GuaObj{// 結果public Object response;?// 獲取結果// timeout表示最多等多久public Object get(long timeout){synchronized (this){// 開始時間long cur = System.currentTimeMillis();// 經歷的時間long paseTime=0;while(response==null){try {// 這一輪應該等的時間long waitTime=timeout-paseTime;//超時就不等了if(waitTime<=0) break;this.wait(waitTime);paseTime=System.currentTimeMillis()-cur;} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("等待結束");return response;}}?// 產生結果public void set(Object response){synchronized (this){this.response=response;this.notifyAll();}}}
鎖住,等待結果先睡兩秒解鎖,設置對象等待結束解鎖?進程已結束,退出代碼0
-
需要記錄超時的時間,并且重新設置waittime,原因是可能會有虛假喚醒,那么這個時候超時時間不是timeout而是timeout-passedTime,也就是線程執行的時間。
-
如果超時的話,那么就會自動結束
join的源碼
public final synchronized void join(long millis)throws InterruptedException {//一開始的時間long base = System.currentTimeMillis();//線程執行的時間long now = 0;?//如果是<0那么就拋出異常if (millis < 0) {throw new IllegalArgumentException("timeout value is negative");}?//如果是0那么就一直等待線程執行完,isAlive是否生存if (millis == 0) {while (isAlive()) {wait(0);}} else {//timeout超時那么就結束while (isAlive()) {long delay = millis - now;if (delay <= 0) {break;}wait(delay);now = System.currentTimeMillis() - base;}}}
Future
相當于就是一個信箱,里面裝了很多GuardObject對象,線程可以通過對應的地址訪問對象獲取結果
異步生產者/消費者模型
定義
相當于就是生產者給隊列生產結果,消費者負責處理結果
-
不需要一一對應
-
平衡資源
-
消息隊列有容量控制
-
阻塞隊列控制結果出隊列
public class Test {public static void main(String[] args) {MesageQueue queue = new MesageQueue(2);?for (int i = 0; i < 3; i++) {int id = i;new Thread(() -> {queue.set(new Message(id, "值" + id));},"生產者" + i).start();}?new Thread(() -> {while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}Message message = queue.take();}}, "消費者").start();}}??@Slf4jclass MesageQueue{//存消息的集合private LinkedList<Message> list = new LinkedList();// 消息容量private int capacity;?public MesageQueue(int capacity){this.capacity = capacity;}?// 獲取消息public ?Message take() {// 檢查隊列是否為空synchronized (list){while(list.isEmpty()){try {log.debug("隊列為空,消費者線程等待");list.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}Message message = list.removeFirst();log.debug("已經消費了消息 {}",message);list.notifyAll();return message;}}?// 存入消息public void set(Message message) {// 檢查是不是滿了synchronized (list){while(list.size() == capacity){try {log.debug("隊列已滿,生產者線程等待");list.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}list.addLast(message);log.debug("已經生產了消息 {}",message);list.notifyAll();}}}?// 消息類final class ?Message{private int id;private Object value;?public Message(int id, Object value){this.id = id;this.value = value;}?public int getId() {return id;}?public Object getValue() {return value;}?@Overridepublic String toString() {return "Message{" +"id=" + id +", value=" + value +'}';}}
12:58:24.373 [生產者1] DEBUG MesageQueue - 已經生產了消息 Message{id=1, value=值1}12:58:24.375 [生產者2] DEBUG MesageQueue - 已經生產了消息 Message{id=2, value=值2}12:58:24.377 [生產者0] DEBUG MesageQueue - 隊列已滿,生產者線程等待12:58:25.371 [消費者] DEBUG MesageQueue - 已經消費了消息 Message{id=1, value=值1}12:58:25.371 [生產者0] DEBUG MesageQueue - 已經生產了消息 Message{id=0, value=值0}12:58:26.386 [消費者] DEBUG MesageQueue - 已經消費了消息 Message{id=2, value=值2}12:58:27.397 [消費者] DEBUG MesageQueue - 已經消費了消息 Message{id=0, value=值0}12:58:28.405 [消費者] DEBUG MesageQueue - 隊列為空,消費者線程等待
Park & Unpark
與wait和notify的區別
-
不需要與monitor一起使用
-
可以精準喚醒和阻塞線程
-
可以先unpark,但是不能先notify。但是unpark之后park不起作用。
原理
①park,先去到counter里面判斷是不是0,如果是那么就讓線程進入隊列。接著就是把counter設置為0
②unpark,那么喚醒線程,恢復運行,并且把counter設置為1
③先unpark后park,那么就unpark補充counter為1,那么park判斷counter是1,認為還有體力可以繼續執行。