目錄
一 同步與協調模式
?1 保護性暫停模式
?2 順序控制模式
?3 生產者消費者模式
4 Balking模式(猶豫模式)
二 線程管理/生命周期模式
1 兩階段終止模式
一 同步與協調模式
?1 保護性暫停模式
?一個線程需要等待另一個線程提供特定條件(通常是某個結果)滿足后才能繼續執行。
Guarded Suspension 保護性暫停是一種節點的多線程設計模式,用于在條件不滿足時暫停線程執行,直到條件滿足后在繼續執行。(線程不滿足等待條件,手動實現主動等待)
代碼實現:
package day01.mysynchronized;import java.util.ArrayList;public class example6 {public static void main(String[] args) {GuardObject<ArrayList<Integer>> guardObject = new GuardObject<>();// 線程1等待線程2的結果new Thread(() -> {// 等待結果System.out.println("t1線程開始執行... 等待結果");ArrayList<Integer> result = guardObject.get();result.forEach(System.out::println);}, "t1").start();new Thread(() -> {System.out.println("t2線程開始執行...");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}ArrayList<Integer> objects = new ArrayList<>();objects.add(1);objects.add(2);guardObject.complete(objects);}, "t2").start();}
}class GuardObject<T> {private T response;public T get() {synchronized (this) {while (response == null) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}return response;}}public void complete(T response) {synchronized (this) {this.response = response;this.notifyAll();}}
}
更完善的:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class Example7 {public static void main(String[] args) {GuardObject<List<Integer>> guardObject = new GuardObject<>();// 線程1等待線程2的結果Thread t1 = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "開始執行... 等待結果");try {// 設置5秒超時List<Integer> result = guardObject.get(5000);System.out.println(Thread.currentThread().getName() + "收到結果:");result.forEach(System.out::println);} catch (TimeoutException e) {System.out.println(Thread.currentThread().getName() + "等待結果超時");} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "被中斷");}}, "t1");Thread t2 = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "開始執行...");try {// 模擬耗時操作Thread.sleep(3000);// 創建結果List<Integer> objects = new ArrayList<>();objects.add(1);objects.add(2);// 設置結果guardObject.complete(objects);System.out.println(Thread.currentThread().getName() + "已發送結果");} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "被中斷");}}, "t2");t1.start();t2.start();// 確保主線程等待子線程完成try {t1.join();t2.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("所有線程執行完成");}
}/*** 增強版保護對象類* @param <T> 結果類型*/
class GuardObject<T> {private T response;private boolean done = false; // 完成狀態標志/*** 獲取結果(無限等待)* @return 結果對象* @throws InterruptedException 如果等待時被中斷*/public T get() throws InterruptedException, TimeoutException {return get(0);}/*** 獲取結果(帶超時)* @param timeout 超時時間(毫秒),0表示無限等待* @return 結果對象* @throws InterruptedException 如果等待時被中斷* @throws TimeoutException 如果超過指定的超時時間*/public T get(long timeout) throws InterruptedException, TimeoutException {synchronized (this) {long start = System.currentTimeMillis();long remaining = timeout;while (!done) {if (timeout > 0 && remaining <= 0) {throw new TimeoutException("等待結果超時");}if (timeout == 0) {this.wait();} else {this.wait(remaining);remaining = timeout - (System.currentTimeMillis() - start);}}return response;}}/*** 設置結果* @param response 結果對象* @throws IllegalStateException 如果結果已被設置*/public void complete(T response) {synchronized (this) {if (done) {throw new IllegalStateException("結果已被設置");}this.response = response;this.done = true;this.notifyAll();}}
}
一個使用等待通知機制的例子
信箱類:借助HashMap用于管理收信人id與對應的異步結果對象GuardObject
收信人:借助信箱類的管理,設置對應的id與新建異步結果對象,接著調用對應的get進行等待。
郵遞員:傳遞收件人的id,并設置對應的content,借助信箱類根據收件人id得到對應的GuardObject異步結果對象,調用compete通知(將content傳遞)。
異步結果對象:兩個方法,get/compete一個等待,一個通知,實現多線程的保護性暫停模式。
1 收信人
在收信人當中需要內置變量信箱,在初始化創建時就需要對應調用創建信箱的方法,然后重寫run方法,因為我們需要實現的是每創建一個收信人就需要新建一個線程執行業務代碼,run方法當中使用get進行等待,若郵遞員發送通知之后再將結果接收,接收成功之后還需從信箱當中移除。
/*** 模擬收信人*/
class People extends Thread {private final GuardObject<String> guardObject;public People() {super("People-" + System.currentTimeMillis());this.guardObject = MailBox.createGuardObject();}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "開始收信...");try {// 等待信件,帶超時String response = guardObject.get(5000);System.out.println(Thread.currentThread().getName() + "收到信:" + response);} catch (TimeoutException e) {System.out.println(Thread.currentThread().getName() + "收信超時");} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "收信被中斷");Thread.currentThread().interrupt(); // 恢復中斷狀態} finally {// 確保從MailBox中移除GuardObjectMailBox.removeGuardObject(guardObject.getId());}}
}
2 郵遞員
郵遞員需要內置兩個成員變量,一個是收信人的id用來獲取尋找對應的收信人,同時也是開啟新的線程繼承Thread類重寫run方法,首先獲取收信人的對象,調用compete方法通知對應的收信人
/*** 郵遞員類*/
class PostMan extends Thread {private final int id;private final String content;public PostMan(int id, String content) {super("PostMan-" + id);this.id = id;this.content = content;}@Overridepublic void run() {GuardObject<String> guardObject = MailBox.getGuardObject(id);if (guardObject == null) {System.out.println(Thread.currentThread().getName() + "錯誤:收信人不存在");return;}System.out.println(Thread.currentThread().getName() + "開始發送信件...");try {// 模擬投遞延遲Thread.sleep(1000 + (int)(Math.random() * 2000));// 發送信件guardObject.complete(content);System.out.println(Thread.currentThread().getName() + "已發送信件");} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "發送被中斷");Thread.currentThread().interrupt(); // 恢復中斷狀態} catch (IllegalStateException e) {System.out.println(Thread.currentThread().getName() + "錯誤:" + e.getMessage());}}
}
3 信箱類
信箱當中維護的是一個HashMap集合,id存儲收件人信息,id,GuardObject<String>對象
/*** 用于管理多個 GuardObject 的信箱類*/
class MailBox {private static final AtomicInteger idGenerator = new AtomicInteger(1);private static final Map<Integer, GuardObject<?>> map = new ConcurrentHashMap<>();/*** 創建并返回一個泛型 GuardObject*/public static <T> GuardObject<T> createGuardObject() {int id = idGenerator.getAndIncrement();GuardObject<T> guardObject = new GuardObject<>(id);map.put(id, guardObject);return guardObject;}/*** 獲取所有 GuardObject 的 ID*/public static Set<Integer> getGuardObjectIds() {return map.keySet();}/*** 根據id獲取GuardObject*/@SuppressWarnings("unchecked")public static <T> GuardObject<T> getGuardObject(int id) {return (GuardObject<T>) map.get(id);}/*** 移除GuardObject*/public static void removeGuardObject(int id) {map.remove(id);}
}
4 保護對象
這里同時也會根據id維護獨立的GuardObject對象,里面實現了get與compete的邏輯代碼
/*** 增強版保護對象類*/
class GuardObject<T> {private T response;private boolean done = false;private final int id;public GuardObject(int id) {this.id = id;}public int getId() {return id;}/*** 獲取結果(帶超時)*/public T get(long timeout) throws InterruptedException, TimeoutException {synchronized (this) {long start = System.currentTimeMillis();long remaining = timeout;while (!done) {if (timeout > 0 && remaining <= 0) {throw new TimeoutException("等待結果超時");}if (timeout == 0) {this.wait();} else {this.wait(remaining);remaining = timeout - (System.currentTimeMillis() - start);}}return response;}}/*** 設置結果*/public void complete(T response) {synchronized (this) {if (done) {throw new IllegalStateException("結果已被設置");}this.response = response;this.done = true;this.notifyAll();}}
完整代碼
package day01.mysynchronized;import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;public class Example7 {public static void main(String[] args) throws InterruptedException {// 創建多個收信人for (int i = 0; i < 3; i++) {new People().start();}// 等待收信人創建GuardObjectThread.sleep(1000);// 獲取所有等待收信的IDSet<Integer> ids = MailBox.getGuardObjectIds();// 為每個收信人創建郵遞員int mailCount = 1;for (int id : ids) {new PostMan(id, "信件內容" + mailCount++).start();}}
}/*** 模擬收信人*/
class People extends Thread {private final GuardObject<String> guardObject;public People() {super("People-" + System.currentTimeMillis());this.guardObject = MailBox.createGuardObject();}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "開始收信...");try {// 等待信件,帶超時String response = guardObject.get(5000);System.out.println(Thread.currentThread().getName() + "收到信:" + response);} catch (TimeoutException e) {System.out.println(Thread.currentThread().getName() + "收信超時");} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "收信被中斷");Thread.currentThread().interrupt(); // 恢復中斷狀態} finally {// 確保從MailBox中移除GuardObjectMailBox.removeGuardObject(guardObject.getId());}}
}/*** 郵遞員類*/
class PostMan extends Thread {private final int id;private final String content;public PostMan(int id, String content) {super("PostMan-" + id);this.id = id;this.content = content;}@Overridepublic void run() {GuardObject<String> guardObject = MailBox.getGuardObject(id);if (guardObject == null) {System.out.println(Thread.currentThread().getName() + "錯誤:收信人不存在");return;}System.out.println(Thread.currentThread().getName() + "開始發送信件...");try {// 模擬投遞延遲Thread.sleep(1000 + (int)(Math.random() * 2000));// 發送信件guardObject.complete(content);System.out.println(Thread.currentThread().getName() + "已發送信件");} catch (InterruptedException e) {System.out.println(Thread.currentThread().getName() + "發送被中斷");Thread.currentThread().interrupt(); // 恢復中斷狀態} catch (IllegalStateException e) {System.out.println(Thread.currentThread().getName() + "錯誤:" + e.getMessage());}}
}/*** 用于管理多個 GuardObject 的信箱類*/
class MailBox {private static final AtomicInteger idGenerator = new AtomicInteger(1);private static final Map<Integer, GuardObject<?>> map = new ConcurrentHashMap<>();/*** 創建并返回一個泛型 GuardObject*/public static <T> GuardObject<T> createGuardObject() {int id = idGenerator.getAndIncrement();GuardObject<T> guardObject = new GuardObject<>(id);map.put(id, guardObject);return guardObject;}/*** 獲取所有 GuardObject 的 ID*/public static Set<Integer> getGuardObjectIds() {return map.keySet();}/*** 根據id獲取GuardObject*/@SuppressWarnings("unchecked")public static <T> GuardObject<T> getGuardObject(int id) {return (GuardObject<T>) map.get(id);}/*** 移除GuardObject*/public static void removeGuardObject(int id) {map.remove(id);}
}/*** 增強版保護對象類*/
class GuardObject<T> {private T response;private boolean done = false;private final int id;public GuardObject(int id) {this.id = id;}public int getId() {return id;}/*** 獲取結果(帶超時)*/public T get(long timeout) throws InterruptedException, TimeoutException {synchronized (this) {long start = System.currentTimeMillis();long remaining = timeout;while (!done) {if (timeout > 0 && remaining <= 0) {throw new TimeoutException("等待結果超時");}if (timeout == 0) {this.wait();} else {this.wait(remaining);remaining = timeout - (System.currentTimeMillis() - start);}}return response;}}/*** 設置結果*/public void complete(T response) {synchronized (this) {if (done) {throw new IllegalStateException("結果已被設置");}this.response = response;this.done = true;this.notifyAll();}}
}
結果展示
?2 順序控制模式
順序控制是同步模式中的一種重要控制方式,它確保多個操作或任務按照預定的順序執行。
(1 利用wait/notify
package day01.tongbu;public class example1 {static final Object lock = new Object();//用于標記線程2是否已經執行完畢static boolean flag = false;public static void main(String[] args) {Thread t1 = new Thread(() -> {synchronized (lock) {while (!flag) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("t1 線程被喚醒,執行剩下的業務");}});Thread t2 = new Thread(() -> {synchronized (lock) {System.out.println("t2 線程執行完畢");flag = true;lock.notify();}});t1.start();t2.start();try {t1.join();t2.join();} catch (InterruptedException e) {e.printStackTrace();}}
}
(2 使用ReentrantLock
package day01.tongbu;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class example3 {static boolean flag = false;static final ReentrantLock lock = new ReentrantLock();static final Condition condition = lock.newCondition();public static void main(String[] args) {// 實現先執行線程2,再執行線程1Thread t1 = new Thread(() -> {// 獲取鎖,未獲取將阻塞lock.lock();try {while (!flag) {System.out.println("線程1等待線程2");condition.await();}Thread.sleep(500);System.out.println("線程1開始執行");Thread.sleep(500);System.out.println("線程1完成工作");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}});Thread t2 = new Thread(() -> {lock.lock();try {System.out.println("線程2開始執行");Thread.sleep(500);System.out.println("線程2執行完畢");flag = true;condition.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}});t1.start();t2.start();try {t1.join();t2.join();} catch (InterruptedException e) {e.printStackTrace();}}
}
(3 借助park/unpark
package day01.tongbu;import java.util.concurrent.locks.LockSupport;public class example4 {public static void main(String[] args) {Thread t1 = new Thread(() -> {System.out.println("t1等待....");LockSupport.park();System.out.println("t1運行");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("t1結束");});Thread t2 = new Thread(() -> {System.out.println("t2開始運行");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("t2結束并標記t1");LockSupport.unpark(t1);});t1.start();t2.start();try {t1.join();t2.join();} catch (InterruptedException e) {e.printStackTrace();}}
}
(對比
特性 | synchronized + wait/notify | ReentrantLock + Condition | LockSupport |
---|---|---|---|
實現復雜度 | 低 | 中 | 低 |
性能 | 中等 | 高 | 最高 |
靈活性 | 低 | 高 | 中 |
多條件支持 | ? | ? | ? |
公平鎖選項 | ? | ? | ? |
超時控制 | ? | ? | ? |
喚醒先于阻塞 | ? | ? | ? |
鎖獲取嘗試 | ? | ? | ? |
內存占用 | 低 | 中 | 低 |
適用場景 | 簡單同步 | 復雜同步 | 簡單阻塞/喚醒 |
(4 實現多個線程交替執行
使用ReentractLock
package day01.tongbu;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class example5 {static ReentrantLock lock = new ReentrantLock();static Condition conditionA = lock.newCondition();static Condition conditionB = lock.newCondition();static Condition conditionC = lock.newCondition();static int flag = 1;static final int MAX_LOOP = 5; // 控制打印5次ABCpublic static void main(String[] args) {Thread t1 = new Thread(() -> {lock.lock();try {for (int i = 0; i < MAX_LOOP; i++) {while (flag != 1) {conditionA.await();}System.out.print("A");flag = 2;conditionB.signal();}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}});Thread t2 = new Thread(() -> {lock.lock();try {for (int i = 0; i < MAX_LOOP; i++) {while (flag != 2) {conditionB.await();}System.out.print("B");flag = 3;conditionC.signal();}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}});Thread t3 = new Thread(() -> {lock.lock();try {for (int i = 0; i < MAX_LOOP; i++) {while (flag != 3) {conditionC.await();}System.out.print("C");flag = 1;conditionA.signal();}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}});t1.start();t2.start();t3.start();// 等待所有線程執行完畢try {t1.join();t2.join();t3.join();} catch (InterruptedException e) {e.printStackTrace();}}
}
模塊化
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class example5 {static ReentrantLock lock = new ReentrantLock();static Condition[] conditions = new Condition[3];static int currentFlag = 0; // 0 -> A, 1 -> B, 2 -> Cstatic final int MAX_LOOP = 5;public static void main(String[] args) {// 初始化每個 conditionfor (int i = 0; i < conditions.length; i++) {conditions[i] = lock.newCondition();}// 啟動三個線程Thread t1 = new PrintThread(0, 'A', 1).getThread();Thread t2 = new PrintThread(1, 'B', 2).getThread();Thread t3 = new PrintThread(2, 'C', 0).getThread();t1.start();t2.start();t3.start();// 等待全部完成try {t1.join();t2.join();t3.join();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("全部打印完成!");}static class PrintThread {private final int id;private final char toPrint;private final int nextId;public PrintThread(int id, char toPrint, int nextId) {this.id = id;this.toPrint = toPrint;this.nextId = nextId;}public Thread getThread() {return new Thread(() -> {lock.lock();try {for (int i = 0; i < MAX_LOOP; i++) {while (currentFlag != id) {conditions[id].await();}System.out.println(toPrint);currentFlag = nextId;conditions[nextId].signal();}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}});}}
}
3 Balking模式(猶豫模式)
當某個線程發現另一個線程已經執行了相同的操作時,放棄當前操作并直接返回,避免重復操作導致的資源浪費或狀態不一致。
Balking 模式的核心是 避免重復操作和無效操作,通過狀態檢查確保操作只在合適的狀態下執行。
實現多次訪問不再重復創建
package day01;public class ax5 {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();tpt.start(); // 被balking拒絕Thread.sleep(3500);tpt.stop();// 測試重啟功能Thread.sleep(500);tpt.start(); // 應能重新啟動Thread.sleep(1500);tpt.stop();}
}class TwoPhaseTermination {private Thread monitor;// stop 這個狀態是判斷是否已經停止,防止調用stop時出錯,而state狀態用來判斷當前是否正在運行private volatile boolean stop = false;private static final int NEW = 0;private static final int RUNNING = 1;private volatile int state = NEW;public synchronized void start() {if (state != NEW) {System.out.println(Thread.currentThread().getName() + ": 拒絕重復啟動");return;}state = RUNNING;stop = false;monitor = new Thread(() -> {try {while (!stop) {try {Thread.sleep(1000);System.out.println("執行監控記錄");} catch (InterruptedException e) {System.out.println("睡眠中被中斷,準備退出");}}} finally {synchronized (TwoPhaseTermination.this) {state = NEW; // 關鍵修改:重置為NEW而非TERMINATED}System.out.println("料理后事,釋放資源");}}, "monitor");monitor.start();}public synchronized void stop() {if (state != RUNNING) return;stop = true;if (monitor != null) {monitor.interrupt();}}
}
二 線程管理/生命周期模式
1 兩階段終止模式
兩階段終止模式(Two-Phase Termination Pattern)是一種多線程編程中安全終止線程的設計模式。它的核心思想是:在終止線程前,先發出終止請求(第一階段),然后線程在完成必要清理工作后再真正終止(第二階段)。這種模式避免了直接強制終止線程導致的資源泄露、數據不一致等問題
簡單來說就是在一個線程T1中如何優雅的終止線程T2,給T2一個料理后事的機會
在某些線程正在運行時如何正確終止線程,如果強制stop會帶來相對應的問題:
- 1 資源未釋放:線程可能持有鎖或者打開文件或者網絡連接未關閉
- 2 數據不一致:線程可能正在修改共享數據,突然終止會導致數據損壞。
- 3 不可控性:無法保證線程在安全點終止。
(1 使用interrupt
需要借助interrupt進行打斷,存在一個中斷標志位,可以在這個中斷循環之外將其他需要善后的地方再進行操作,可實現優雅終止。在下面代碼實現的過程當中需要注意是在睡眠當中還是在運行當中被打斷。
代碼當中先開啟工作線程,線程兩個判斷,如果isInterrupted==true就是執行了interrupt()將線程打斷(模擬將后事料理),而另一種情況就是沒有被打斷就休眠1s(模擬工作過程),主線程先休眠再打斷工作線程。
package day01;public class ax5 {public static void main(String[] args) {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();try {Thread.sleep(3500);} catch (InterruptedException e) {System.out.println("main thread stop");}tpt.stop();}
}class TwoPhaseTermination {private Thread monitor;public void start() {monitor = new Thread(() -> {while (true) {Thread current = Thread.currentThread();if (current.isInterrupted()) {System.out.println("打斷線程,料理后事");break;} else {try {Thread.sleep(1000);System.out.println("執行監控記錄");} catch (InterruptedException e) {current.interrupt();// 重置標志位位}}}});monitor.start();}public void stop() {monitor.interrupt();}
}
補充:
interrupted() | 靜態方法 | 當前線程(調用?Thread.interrupted() ) |
isInterrupted() | 實例方法 | 指定線程對象(如?thread.isInterrupted() ) |
interrupted() | - 當前線程需要檢查中斷狀態并主動清除標志(如處理完中斷后)。 |
- 需要確保中斷狀態不會被后續邏輯誤判(如循環中檢查中斷狀態)。 | |
isInterrupted() | - 檢查其他線程的中斷狀態(如監控子線程是否被中斷)。 |
- 需要多次檢查中斷狀態且保留標志位(如在循環中持續監控)。 |
(2 使用volatile改進兩階段終止模式
設置一個volatile共享變量,實現線程之間共享
package day01;public class ax5 {public static void main(String[] args) {TwoPhaseTermination tpt = new TwoPhaseTermination();tpt.start();try {Thread.sleep(3500);} catch (InterruptedException e) {System.out.println("main thread stop");}tpt.stop();}
}class TwoPhaseTermination {private Thread monitor;// 保證共享變量在多線程之間的可見性private volatile boolean flag = false;public void start() {monitor = new Thread(() -> {while (true) {Thread current = Thread.currentThread();if (flag) {System.out.println("打斷線程,料理后事");break;} else {try {Thread.sleep(1000);System.out.println("執行監控記錄");} catch (InterruptedException e) {// 重置標志位位current.interrupt();}}}});monitor.start();}public void stop() {flag = true;// 打斷監控線程(中斷在睡眠期間的線程,不加的話需要等待睡眠結束)monitor.interrupt();}
}
三 異步模式
1 生產者消費者模式
消費者與生產者之間的協調關系,為了解決生產過剩的情況或者說消費過多,這里的緩沖區相當于就是一個物品的儲物箱,達到最大說明生產者生產太多了,達到最小說明消費者消費太多了。當達到最大就讓生產者進入等待隊列,喚醒消費者,反之就讓消費者進入等待隊列,喚醒生產者。
借助ReentrantLock與條件變量
package day01.lockax;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class Conditional {private final ReentrantLock lock = new ReentrantLock();private final Condition notEmpty = lock.newCondition(); // 消費者等待條件private final Condition notFull = lock.newCondition(); // 生產者等待條件private final int[] buffer = new int[5]; // 緩沖區private int count = 0; // 當前緩沖區元素數量// 生產者方法:向緩沖區添加元素public void produce(int item) throws InterruptedException {lock.lock();try {// 如果緩沖區已滿,等待while (count == buffer.length) {System.out.println("緩沖區已滿,生產者等待...");notFull.await(); // 釋放鎖并等待}// 添加元素到緩沖區buffer[count++] = item;System.out.println("生產者生產了: " + item + ",當前緩沖區大小: " + count);// 喚醒消費者notEmpty.signal();} finally {lock.unlock();}}// 消費者方法:從緩沖區取出元素public void consume() throws InterruptedException {lock.lock();try {// 如果緩沖區為空,等待while (count == 0) {System.out.println("緩沖區為空,消費者等待...");notEmpty.await(); // 釋放鎖并等待}// 從緩沖區取出元素int item = buffer[--count];System.out.println("消費者消費了: " + item + ",當前緩沖區大小: " + count);// 喚醒生產者notFull.signal();} finally {lock.unlock();}}// 測試生產者-消費者模型public static void main(String[] args) {Conditional example = new Conditional();// 啟動生產者線程Thread producer = new Thread(() -> {for (int i = 1; i <= 10; i++) {try {example.produce(i);Thread.sleep(500); // 模擬生產耗時} catch (InterruptedException e) {e.printStackTrace();}}});// 啟動消費者線程Thread consumer = new Thread(() -> {for (int i = 1; i <= 10; i++) {try {example.consume();Thread.sleep(800); // 模擬消費耗時} catch (InterruptedException e) {e.printStackTrace();}}});producer.start();consumer.start();}
}
2 工作線程(線程池)
核心:不同的任務類型使用不同的線程池。
代碼展示:
package threadPool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;public class RestaurantThreadPoolExample {// 封裝服務員任務邏輯private static void waiterTask(ExecutorService cookerPool, int taskId) {System.out.println("服務員[" + taskId + "] 處理點餐");Future<String> dishFuture = cookerPool.submit(() -> {// 模擬烹飪時間Thread.sleep(500);return "菜品[" + taskId + "] 完成 - 由廚師" + Thread.currentThread().getName();});try {// 模擬處理其他事務的時間Thread.sleep(300);System.out.println("服務員[" + taskId + "] 取菜: " + dishFuture.get());} catch (Exception e) {System.err.println("任務[" + taskId + "] 出錯: " + e.getMessage());}}public static void main(String[] args) {// 創建有意義的線程池名稱final int NUM_WAITERS = 5;final int NUM_COOKERS = 2;ExecutorService waitersPool = Executors.newFixedThreadPool(NUM_WAITERS);ExecutorService cookerPool = Executors.newFixedThreadPool(NUM_COOKERS);// 提交所有服務員任務for (int i = 1; i <= NUM_WAITERS; i++) {final int taskId = i;waitersPool.execute(() -> waiterTask(cookerPool, taskId));}// 優雅關閉線程池waitersPool.shutdown();try {// 等待服務員任務完成(包含它們提交的廚師任務)if (!waitersPool.awaitTermination(5, TimeUnit.SECONDS)) {System.err.println("警告: 服務員線程池未及時關閉");}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {cookerPool.shutdown();try {// 確保廚師任務完成if (!cookerPool.awaitTermination(2, TimeUnit.SECONDS)) {System.err.println("警告: 廚師線程池未及時關閉,強制終止剩余任務");cookerPool.shutdownNow();}} catch (InterruptedException e) {cookerPool.shutdownNow();Thread.currentThread().interrupt();}}System.out.println("====== 所有任務執行完成 ======");}
}