線程通信的方式
線程中通信是指多個線程之間通過某種機制進行協調和交互
線程通信主要可以分為三種方式,分別為共享內存、消息傳遞和管道流。每種方式有不同的方法來實現
volatile共享內存
- 共享內存:線程之間共享程序的公共狀態,線程之間通過讀-寫內存中的公共狀態來隱式通信。
wait/notify等待通知方式
- 消息傳遞:線程之間沒有公共的狀態,線程之間必須通過明確的發送信息來顯示的進行通信。
join方式
管道輸入/輸出流的形式
- 管道流
共享內存
/*** @Author: Simon Lang* @Date: 2020/5/5 15:13*/
public class TestVolatile {private static volatile boolean flag=true;public static void main(String[] args){new Thread(new Runnable() {public void run() {while (true){if(flag){System.out.println("線程A");flag=false;}}}}).start();
?
?new Thread(new Runnable() {public void run() {while (true){if(!flag){System.out.println("線程B");flag=true;}}}}).start();}
}
?
測試結果:線程A和線程B交替執行
消息傳遞-線程等待和通知
線程等待和通知機制是線程通訊的主要手段之一。
在 Java 中有以下三種實現線程等待的手段 :
Object 類提供的 wait(),notify() 和 notifyAll() 方法;
Condition 類下的 await(),signal() ?和 signalAll() 方法;
LockSupport 類下的 park() 和 unpark() 方法。
Object 類提供的 wait(),notify() 和 notifyAll() 方法;
Object lock = new Object(); new Thread(() -> {synchronized (lock) {try {System.out.println("線程1 -> 進入等待");lock.wait();System.out.println("線程1 -> 繼續執行");} catch (InterruptedException e) {e.printStackTrace();}System.out.println("線程1 -> 執行完成");} }).start();Thread.sleep(1000); synchronized (lock) {// 喚醒線程System.out.println("執行 notifyAll()");lock.notifyAll(); }
Condition 類下的 await(),signal() ?和 signalAll() 方法;
// 創建 Condition 對象 (lock 可創建多個 condition 對象) Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); // 加鎖 lock.lock(); try {// 一個線程中執行 await()condition.await();// 另一個線程中執行 signal()condition.signal(); } catch (InterruptedException e) {e.printStackTrace(); } finally {lock.unlock(); }
Condition 類它可以創建出多個對象。那為什么有了 Object 類的 wait 和 notify 的方式,還需要 condition 來干嘛呢 ?
因為 Object 類的 wait 和 notify 只適用于一個任務隊列,而 Condition 類的 await 和 signal 適用于多個任務隊列,在多個任務隊列的情況下,使用 Object 類的 wait 和 notify 可能會存在線程餓死的問題。
比如以上這種生產者消費者模型,當生產者,消費者(阻塞式的)都有多個的時候,并且此時任務隊列里面沒有任務了,所以消費者就會進入休眠狀態,此時生產者需要做兩件事情 :?
將任務推送到任務隊列
喚醒線程
【問題所在】① ?此時如果使用 Object 類提供的 wait 和 notify,而喚醒線程是存在兩種可能的:
1)喚醒了消費者?
2)喚醒了生產者
? ? ? ? 如果是喚醒了生產者,那就出問題了,當生產者這邊代碼執行完了就結束了,消費者這邊永遠不會去消費隊列里的任務了,這就會導致線程饑餓問題。
而 Condition 類因為可以被創建多個,所以可以使用兩個 Condition 對象,一個指定喚醒生產者,一個指定喚醒消費者,這樣就不會出現線程饑餓了。所以 Condition 類的 await 和 signal 是對 Object 類的 wait 和 notify 的一個補充,它解決了 Object 類種分組不明確的問題。
LockSupport 類下的 park() 和 unpark() 方法。
public static void main(String[] args) {Thread t1 = new Thread(() -> {LockSupport.park();System.out.println("線程1被喚醒");},"線程1");t1.start();Thread t2 = new Thread(() -> {LockSupport.park();System.out.println("線程2被喚醒");},"線程2");t2.start();Thread t3 = new Thread(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("喚醒線程2");LockSupport.unpark(t2);},"線程3");t3.start(); }
LockSupport 類又是對 Condition 類的一個補充,它可以指定喚醒某一個線程,它解決了前兩種方式不能隨機指定喚醒線程的問題。
join方式
join()方法的作用是:在當前線程A調用線程B的join()方法后,會讓當前線程A阻塞,直到線程B的邏輯執行完成,A線程才會解除阻塞,然后繼續執行自己的業務邏輯,這樣做可以節省計算機中資源。
public class TestJoin {public static void main(String[] args){Thread thread=new Thread(new Runnable() {@Overridepublic void run() {System.out.println("線程0開始執行了");}});thread.start();for (int i=0;i<10;i++){JoinThread jt=new JoinThread(thread,i);jt.start();thread=jt;}
?}
?static class JoinThread extends Thread{private Thread thread;private int i;
?public JoinThread(Thread thread,int i){this.thread=thread;this.i=i;}
?@Overridepublic void run() {try {thread.join();System.out.println("線程"+(i+1)+"執行了");} catch (InterruptedException e) {e.printStackTrace();}}}
}
?
每個線程的終止的前提是前驅線程的終止,每個線程等待前驅線程終止后,才從join方法返回,實際上,這里涉及了等待/通知機制,即下一個線程的執行需要接受前驅線程結束的通知。
管道輸入/輸出流
管道流是是一種使用比較少的線程間通信方式,管道輸入/輸出流和普通文件輸入/輸出流或者網絡輸出/輸出流不同之處在于,它主要用于線程之間的數據傳輸,傳輸的媒介為管道。
管道輸入/輸出流主要包括4種具體的實現:PipedOutputStrean、PipedInputStrean、PipedReader和PipedWriter,前兩種面向字節,后兩種面向字符。
java的管道的輸入和輸出實際上使用的是一個循環緩沖數組來實現的,默認為1024,輸入流從這個數組中讀取數據,輸出流從這個數組中寫入數據,當這個緩沖數組已滿的時候,輸出流所在的線程就會被阻塞,當向這個緩沖數組為空時,輸入流所在的線程就會被阻塞。
public class TestPip {public static void main(String[] args) throws IOException {PipedWriter writer = new PipedWriter();PipedReader reader = new PipedReader();//使用connect方法將輸入流和輸出流連接起來writer.connect(reader);Thread printThread = new Thread(new Print(reader) , "PrintThread");//啟動線程printThreadprintThread.start();int receive = 0;try{//讀取輸入的內容while((receive = System.in.read()) != -1){writer.write(receive);}}finally {writer.close();}}
?private static class Print implements Runnable {private PipedReader reader;
?public Print(PipedReader reader) {this.reader = reader;}
?@Overridepublic void run() {int receive = 0;try{while ((receive = reader.read()) != -1){//字符轉換System.out.print((char) receive);}}catch (IOException e) {System.out.print(e);}}}
}
?
?對于Piped類型的流,必須先進性綁定,也就是調用connect()方法,如果沒有將輸入/輸出流綁定起來,對于該流的訪問將拋出異常。