C++多線程設計(任務的“多對一”、“一對多”、“多對多”情況 該如何設計線程?)
C++書籍中并未找到對多線程設計,有很完整詳細的總結!!C++并發編程書籍中也只是一些理論或則零散的多線程實例。無奈,只能參考Java的相關多線程設計知識總結,重在了解多線程設計模式的不同應用場景和不同需求,如下:
注意:多線程設計模式 實際就是 并發問題的解決方案!!對多線程設計模式的總結,就是對現實中所有并發問題的總結!!
參考原文:并發編程中常見的設計模式-CSDN博客
文章目錄
一、 終止線程的設計模式
1. 簡介
2. Tow-phase Termination(兩階段終止模式)—優雅的停止線程
二、避免共享的設計模式
1. 簡介
2. Immutability模式—想破壞也破壞不了
3. Copy-on-Write模式
4. Thread-Specific Storage模式—沒有共享就沒有傷害
三、多線程版本的if模式
1. 簡介
2. Guarded Suspension模式—等我準備好
3. Balking模式—不需要就算了
四、多線程分工模式
1. 簡介
2. Thread-Per-Message模式—最簡單實用的分工方法
3. Working Thread模式—如何避免重復創建線程
4. 生產者 - 消費者模式—用流水線的思想提高效率
一、 終止線程的設計模式
1. 簡介
思考:在一個線程T1中如何正確安全的終止線程T2
錯誤思路1
使用線程對象的stop()方法停止線程,因為Stop方法會真正殺死線程,如果這時線程鎖住了共享資源,那么當它被殺死后就再也沒有機會釋放鎖,其它線程將永遠無法獲取鎖。
錯誤思路2
使用System.exit(int)方法停止線程,目的僅是停止一個線程,但這種做法會讓整個程序都停止。
2. Tow-phase Termination(兩階段終止模式)—優雅的停止線程
“兩階段停止”(Two-Phase Termination)是一種多線程編程中的設計模式,用于優雅地終止一個線程。這個模式包含兩個階段:
第一階段 - 發出終止請求: 在這個階段,通過設置一個標志(通常是一個volatile變量)來通知線程應該停止。線程在執行任務時會周期性地檢查這個標志,如果發現終止請求,則會進入第二階段。
第二階段 - 執行清理工作: 在這個階段,線程執行必要的清理工作,釋放資源、關閉連接等。這個階段的目的是確保線程在終止前完成所有必要的清理,以確保系統的穩定性。
這個模式的優點在于它提供了一種可控的、安全的線程終止機制,避免了突然中斷線程可能導致的資源泄漏或數據不一致性問題。通過明確地區分終止請求和清理工作兩個階段,可以更好地管理線程的生命周期。
如下面代碼就實現了兩階段終止模式:
private static class MyThread extends Thread {
? ? ? ? private volatile boolean terminated = false;
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? //判斷線程是否被中斷
? ? ? ? ? ? ? ? while (!terminated) {
? ? ? ? ? ? ? ? ? ? // 線程執行的業務邏輯
? ? ? ? ? ? ? ? ? ? System.out.println("Working...");
? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? cleanup();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? //該線程的中斷方法
? ? ? ? public void terminate() {
? ? ? ? ? ? terminated = true;
? ? ? ? ? ? interrupt(); // 中斷線程,喚醒可能在等待狀態的線程
? ? ? ? }
? ? ? ? //中斷后的清理工作
? ? ? ? private void cleanup() {
? ? ? ? ? ? // 執行清理工作,例如關閉資源等
? ? ? ? ? ? System.out.println("Cleanup work...");
? ? ? ? }
? ? }
? ? public static void main(String[] args) {
? ? ? ? //創建一個新的線程
? ? ? ? MyThread myThread = new MyThread();
? ? ? ? myThread.start();
? ? ? ? // 模擬運行一段時間后終止線程
? ? ? ? try {
? ? ? ? ? ? Thread.sleep(5000);
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? // 發出終止請求
? ? ? ? myThread.terminate();
? ? ? ? // 等待線程結束
? ? ? ? try {
? ? ? ? ? ? myThread.join();
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? System.out.println("Main thread exiting...");
? ? }
上面我們我們使用系統提供的中斷標志位,而是自己實現的
二、避免共享的設計模式
1. 簡介
Immutability模式,Copy-on-Write模式,Thread-Specific Storage模式本質上是為了避免共享。
使用時需要注意Immutablility模式的屬性不可變性
Copy-on-Write模式需要注意拷貝的性能問題
Thread-Specific Storage模式需要注意異步執行問題
2. Immutability模式—想破壞也破壞不了
“多個線程同時讀寫同一共享變量存在的共享問題”,這里的必要條件之一就是讀寫,如果只有讀,而沒有寫,是沒有并法問題的。解決并法問題,其實最簡單的辦法就是讓共享變量只有讀操作,而沒有寫操作。這個辦法如此重要,以致于被上升到了一種解決并法問題的設計模式:不變性(Immutability)模式。所謂不變性,簡單來講,就是對象一旦創建之后,狀態就不再發生變化。換句話說,就是變量一旦被賦值。就不允許修改了(沒有寫操作);沒有修改操作,也就是保持了不變性。以下是不變性模式的特點:
線程安全性: 不可變對象天生是線程安全的,因為多個線程無法修改它們的狀態。這簡化了并發編程,無需使用鎖或其他同步機制。
簡化代碼: 不可變性避免了對象狀態的變化,減少了代碼的復雜性。由于對象不可變,不需要考慮狀態的一致性和變化。
易于緩存: 不可變對象是可緩存的,因為它們的值永遠不會改變。可以在對象創建時進行緩存,而無需擔心后續的修改。
安全性: 不可變對象不容易受到外部修改的影響,因此更容易設計和維護安全的系統。
如何實現:
將一個類所有的屬性都設置為final,并且只允許存在只讀方法,那么這個類基本上就具備了不可變性了。更嚴格的做法是這個類本身也是final的,也就是不允許繼承。jdk中很多類都具備本可變性,例如經常用到的String和Long、Integer、Double等基礎類型的包裝類都具備不可變性,這些對象的線程安全性是靠不可變性來保證的,它們都嚴格遵守來不可變類的三點要求:類和屬性都是final的,所有方法都是只讀的
使用Immutability模式的注意事項
所有對象的屬性都是final的,并不能保證不可變性:如final修飾的是對象的引用,此時對象的引用目標是不變的,但被引用的對象本身是可以改變的。
不可變對象也需要正確的發布
在使用Immutability模式的時候一定要確定保持不變性的邊界在哪里,是否要求屬性也具備不變性。
public final class ImmutablePerson {
? ? //屬性全部定義為final屬性
? ? private final String name;
? ? private final int age;
? ? public ImmutablePerson(String name, int age) {
? ? ? ? this.name = name;
? ? ? ? this.age = age;
? ? }
? ? //只提供了get方法
? ? public String getName() {
? ? ? ? return name;
? ? }
? ? public int getAge() {
? ? ? ? return age;
? ? }
? ? //當需要修改屬性時,會創建一個新的對象,這就是后面要介紹的Copy-on-Write模式
? ? public ImmutablePerson withAge(int newAge) {
? ? ? ? return new ImmutablePerson(this.name, newAge);
? ? }
? ? // 不提供setter方法,確保對象的不可變性
? ? @Override
? ? public String toString() {
? ? ? ? return "ImmutablePerson{" +
? ? ? ? ? ? ? ? "name='" + name + '\'' +
? ? ? ? ? ? ? ? ", age=" + age +
? ? ? ? ? ? ? ? '}';
? ? }
? ? public static void main(String[] args) {
? ? ? ? ImmutablePerson person = new ImmutablePerson("John", 30);
? ? ? ? System.out.println(person);
? ? ? ? // 通過withAge方法創建新對象,而不是修改現有對象
? ? ? ? ImmutablePerson newPerson = person.withAge(31);
? ? ? ? System.out.println(newPerson);
? ? ? ? // 原始對象仍然保持不可變性
? ? ? ? System.out.println(person);
? ? }
}
3. Copy-on-Write模式
簡介
Java里String在實現replace()方法的時候,并沒有更改value[]數組的內容,而是創建了一個新的字符串,這種方法在解決不可變對象的修改問題時經常用到。它本質上就是Copy-on-Write方法,顧命思議,寫時復制。
不可變對象的寫操作往往都是使用Copy-on-Write 方法解決的,當然Copy-on-Write的應用領域并不局限于 Immutability模式。Copy-on-Write才是最簡單的并發解決方案,很多人都在無意中把它忽視了。它是如此簡單,以至于 Java 中的基本數據類型 String、Integer、Long 等都是基于Copy-on-Write方案實現的。Copy-on-Write缺點就是消耗內存,每次修改都需要復制一個新的對象出來,好在隨著自動垃圾回收(GC)算法的成熟以及硬件的發展,這種內存消耗已經漸漸可以接受了。所以在實際工作中,如果寫操作非常少(讀多寫少的場景),可以嘗試使用 Copy-on-Write。
應用場景
在Java中,CopyOnWriteArrayList 和 CopyOnWriteArraySet這兩個Copy-on-Write
容器,它們背后的設計思想就是Copy-on-Write;通過 Copy-on-Write這兩個容器實現的讀操作是無鎖的,由于無鎖,所以將讀操作的性能發揮到了極致。Copy-on-Write 在操作系統領域也有廣泛的應用。類 Unix 的操作系統中創建進程的 API 是 fork(),傳統的 fork() 函數會創建父進程的一個完整副本,例如父進程的地址空間現在用到 了1G 的內存,那么 fork() 子進程的時候要復制父進程整個進程的地址空間(占有 1G 內存) 給子進程,這個過程是很耗時的。而 Linux 中fork() 子進程的時候,并不復制整個進程的地址空間,而是讓父子進程共享同一個地址空間;只用在父進程或者子進程需要寫入的時候才會復制地址空間,從而使父子進程擁有各自的地址空間。Copy-on-Write 最大的應用領域還是在函數式編程領域。函數式編程的基礎是不可變性 (Immutability),所以函數式編程里面所有的修改操作都需要 Copy-on-Write 來解決。像一些RPC框架還有服務注冊中心,也會利用Copy-on-Write設計思想維護服務路由表。 路由表是典型的讀多寫少,而且路由表對數據的一致性要求并不高,一個服務提供方從上線到 反饋到客戶端的路由表里,即便有 5 秒鐘延遲,很多時候也都是能接受的。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class CopyOnWriteExample {
? ? private List<String> data = new ArrayList<>();
? ? public void addData(String value) {
? ? ? ? // 創建數據副本
? ? ? ? List<String> newData = new ArrayList<>(data);
? ? ? ? newData.add(value);
? ? ? ? // 將新副本賦值給原始數據
? ? ? ? data = newData;
? ? }
? ? public void printData() {
? ? ? ? // 讀取數據,無需加鎖
? ? ? ? for (String value : data) {
? ? ? ? ? ? System.out.println(value);
? ? ? ? }
? ? }
? ? public static void main(String[] args) {
? ? ? ? CopyOnWriteExample example = new CopyOnWriteExample();
? ? ? ? // 啟動一個線程添加數據
? ? ? ? Thread writerThread = new Thread(() -> {
? ? ? ? ? ? for (int i = 0; i < 5; i++) {
? ? ? ? ? ? ? ? example.addData("Data " + i);
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? // 啟動一個線程讀取數據
? ? ? ? Thread readerThread = new Thread(() -> {
? ? ? ? ? ? for (int i = 0; i < 3; i++) {
? ? ? ? ? ? ? ? example.printData();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(2000);
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? writerThread.start();
? ? ? ? readerThread.start();
? ? }
}
4. Thread-Specific Storage模式—沒有共享就沒有傷害
Thread-Specific Storage(線程特定存儲),簡稱TSS,是一種設計模式,用于在多線程環境中實現線程私有的數據存儲。該模式允許每個線程都有自己的數據副本,而不會影響其他線程的數據。這對于需要在線程之間隔離數據的場景非常有用。在Java中,ThreadLocal類是實現Thread-Specific Storage模式的一種方式。ThreadLocal提供了一種在每個線程中存儲數據的機制,確保每個線程都能訪問到自己的數據副本。
如果你需要在并發場景中使用一個線程不安全的工具類,最簡單的方案就是避免共享。 避免共享有兩種方案,一種方案是將這個工具類作為局部變量使用,另外一種方案就是線程本地存儲模式。這兩種方案,局部變量方案的缺點是在高并發場景下會頻繁創建對象,而線程本地存儲方案,每個線程只需要創建一個工具類的實例,所以不存在頻繁創建對象的問題。
public class ThreadSpecificStorageExample {
? ? private static ThreadLocal<Integer> threadLocalData = new ThreadLocal<>();
? ? public static void main(String[] args) {
? ? ? ? // 啟動兩個線程,分別操作ThreadLocal中的數據
? ? ? ? Thread thread1 = new Thread(() -> {
? ? ? ? ? ? threadLocalData.set(1);
? ? ? ? ? ? System.out.println("Thread 1: Data = " + threadLocalData.get());
? ? ? ? });
? ? ? ? Thread thread2 = new Thread(() -> {
? ? ? ? ? ? threadLocalData.set(2);
? ? ? ? ? ? System.out.println("Thread 2: Data = " + threadLocalData.get());
? ? ? ? });
? ? ? ? // 啟動線程
? ? ? ? thread1.start();
? ? ? ? thread2.start();
? ? ? ? // 主線程操作ThreadLocal中的數據
? ? ? ? threadLocalData.set(0);
? ? ? ? System.out.println("Main Thread: Data = " + threadLocalData.get());
? ? }
}
三、多線程版本的if模式
1. 簡介
Guarded Suspension模式和Balking模式屬于多線程版本的if模式
Guarded Suspension模式需要注意性能
Balking模式需要注意競態條件
2. Guarded Suspension模式—等我準備好
Guarded Suspension(保護性暫停) 模式是一種多線程設計模式,用于解決在特定條件下才能執行的操作。這個模式的核心思想是,當條件不滿足時,線程暫時掛起等待,直到條件滿足時再繼續執行。這種模式通常用于協調多個線程之間的操作,其中一個線程(稱為 “等待方”)等待另一個線程(稱為 “通知方”)滿足某個條件。Guarded Suspension 模式有助于防止在條件不滿足時執行不必要的操作,提高系統效率。
以下是 Guarded Suspension 模式的一般步驟:
等待條件: “等待方” 在某個條件不滿足時掛起等待,例如使用 wait 方法。
通知: “通知方” 在滿足條件時通知等待方,例如使用 notify 或 notifyAll 方法。
喚醒: “等待方” 在收到通知后被喚醒,檢查條件是否滿足,如果滿足則執行相應操作,否則繼續等待。
場景的使用這種設計模式的場景有如下三種:
synchronized+wait/notify/notifyAll
reentrantLock+Condition(await/singal/dingalAll)
cas+park/unpark
這種設計模式常用于多線程環境下多個線程訪問相同實例資源,從實例資源中獲得資源并處理。實例資源需要管理自身擁有的資源,并對請求線程的請求作出允許與否的判斷。
public class GuardedSuspensionExample {
? ? private boolean condition = false;
? ? public synchronized void waitForCondition() {
? ? ? ? while (!condition) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? // 等待條件滿足
? ? ? ? ? ? ? ? wait();
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? // 條件滿足后執行操作
? ? ? ? System.out.println("Condition is satisfied!");
? ? }
? ? public synchronized void notifyCondition() {
? ? ? ? // 修改條件為滿足
? ? ? ? condition = true;
? ? ? ? // 通知等待的線程
? ? ? ? notifyAll();
? ? }
? ? public static void main(String[] args) {
? ? ? ? GuardedSuspensionExample example = new GuardedSuspensionExample();
? ? ? ? // 啟動一個線程等待條件滿足
? ? ? ? Thread waitingThread = new Thread(() -> example.waitForCondition());
? ? ? ? // 啟動另一個線程通知條件滿足
? ? ? ? Thread notifyingThread = new Thread(() -> {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? // 模擬一些操作
? ? ? ? ? ? ? ? Thread.sleep(2000);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? example.notifyCondition();
? ? ? ? });
? ? ? ? waitingThread.start();
? ? ? ? notifyingThread.start();
? ? }
}
3. Balking模式—不需要就算了
Balking是“退縮不前”的意思。如果現在不適合執行這個操作,或者沒必要執行這個操作,就停止處理,直接返回。當流程的執行順序依賴于某個共享變量的場景,可以歸納為多線程if模式。Balking 模式常用于一個線程發現另一個線程已經做了某一件相同的事,那么本線程就無需再做了,直接結束返回。Balking模式是一種多個線程執行同一操作A時可以考慮的模式;在某一個線程B被阻塞或者執行其他操作時,其他線程同樣可以完成操作A,而當線程B恢復執行或者要執行操作A時,因A 已被執行,而無需線程B再執行,從而提高了B的執行效率。Balking模式和Guarded Suspension模式一樣,存在守護條件,如果守護條件不滿足,則中斷處理;這與Guarded Suspension模式不同,Guarded Suspension模式在守護條件不滿足的時候會一直等待至可以運行。
這個模式的核心思想是在某個條件未滿足時立即返回,而不是等待條件滿足后再執行操作。
以下是 Balking 模式的一般步驟:
檢查條件: 在執行操作之前,首先檢查某個條件是否滿足。
條件滿足: 如果條件滿足,執行操作。
條件不滿足: 如果條件不滿足,立即返回,避免執行不必要的操作。
常用場景
sychronized輕量級鎖膨脹邏輯, 只需要一個線程膨脹獲取monitor對象
DCL單例實現
服務組件的初始化
public class BalkingExample {
? ? private boolean condition = false;
? ? public synchronized void performOperation() {
? ? ? ? // 檢查條件
? ? ? ? if (condition) {
? ? ? ? ? ? // 條件滿足,執行操作
? ? ? ? ? ? System.out.println("Operation performed!");
? ? ? ? ? ? // 執行完操作后,將條件重置為不滿足
? ? ? ? ? ? condition = false;
? ? ? ? } else {
? ? ? ? ? ? // 條件不滿足,返回
? ? ? ? ? ? System.out.println("Condition not met. Operation skipped.");
? ? ? ? }
? ? }
? ? public synchronized void setCondition() {
? ? ? ? // 修改條件為滿足
? ? ? ? condition = true;
? ? ? ? // 喚醒可能在等待的線程
? ? ? ? notifyAll();
? ? }
? ? public static void main(String[] args) {
? ? ? ? BalkingExample example = new BalkingExample();
? ? ? ? // 啟動一個線程執行操作
? ? ? ? Thread operationThread = new Thread(() -> example.performOperation());
? ? ? ? // 啟動另一個線程設置條件為滿足
? ? ? ? Thread setConditionThread = new Thread(() -> {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? // 模擬一些操作
? ? ? ? ? ? ? ? Thread.sleep(2000);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? example.setCondition();
? ? ? ? });
? ? ? ? operationThread.start();
? ? ? ? setConditionThread.start();
? ? }
}
四、多線程分工模式
1. 簡介
Thread-Per-Message模式、Worker Thread模式和生產者-消費者模式屬于多線程分工模式。
Thread-Per-Message模式需要注意線程的創建、銷毀以及是否會導致OOM
Worker Thread模式需要注意死鎖問題,提交的任務之間不要有依賴
生產者-消費者模式可以直接使用線程池來實現
2. Thread-Per-Message模式—最簡單實用的分工方法
Thread-Per-Message 模式是一種多線程設計模式,它通過為每個消息或任務創建一個新的線程來處理。每個消息都由一個獨立的線程處理,從而實現并發處理的效果。這種模式通常用于處理異步任務,每個任務都在自己的線程中執行,從而不阻塞主線程或其他任務的執行。
以下是 Thread-Per-Message 模式的一般步驟:
消息創建: 每當需要處理一個任務或消息時,創建一個新的線程。
任務處理: 將任務或消息分配給新創建的線程,由該線程負責處理。
線程生命周期: 線程在完成任務后結束,釋放資源。
應用場景
Thread-Per-Message模式的一個最經典的應用場景是網路編程里服務端的實現,服務端為每個客戶端請求創建一個獨立的線程,當線程處理完畢后,自動銷毀,這是一種簡單的并發處理網絡請求的方法。
public class ThreadPerMessageExample {
? ? public static void main(String[] args) {
? ? ? ? for (int i = 0; i < 5; i++) {
? ? ? ? ? ? // 模擬創建并處理消息
? ? ? ? ? ? Message message = new Message("Message " + i);
? ? ? ? ? ? processMessage(message);
? ? ? ? }
? ? }
? ? private static void processMessage(Message message) {
? ? ? ? // 創建新線程處理消息
? ? ? ? new Thread(() -> {
? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + " processing " + message.getContent());
? ? ? ? }).start();
? ? }
? ? static class Message {
? ? ? ? private String content;
? ? ? ? public Message(String content) {
? ? ? ? ? ? this.content = content;
? ? ? ? }
? ? ? ? public String getContent() {
? ? ? ? ? ? return content;
? ? ? ? }
? ? }
}
3. Working Thread模式—如何避免重復創建線程
“Worker Thread” 模式是一種并發設計模式,用于處理異步任務。在這個模式中,創建一個固定數量的工作線程(Worker Thread),這些線程負責處理異步任務隊列中的任務。通過使用 Worker Thread 模式,可以實現異步處理,提高系統的響應性,并允許主線程(或其他線程)繼續執行其他任務。
Worker Thread 模式能避免線程頻繁創建、銷毀的問題,而且能夠限制線程的最大數量。Java 語言里可以直接使用線程池來實現 Worker Thread 模式,線程池是一個非常基礎和優秀的工具類,甚至有些大廠的編碼規范都不允許用 new Thread() 來創建線程,必須使用線程池。
Java和GO等其它語言不同,它不會創建輕量級現場,它創建的每一個線程都對應于一個操作系統內核級線程,如果評價創建會導致系統性能過低
public class WorkerThreadExample {
? ? public static void main(String[] args) {
? ? ? ? // 創建固定數量的工作線程池
? ? ? ? ExecutorService executorService = Executors.newFixedThreadPool(3);
? ? ? ? // 提交異步任務
? ? ? ? for (int i = 0; i < 5; i++) {
? ? ? ? ? ? executorService.submit(() -> {
? ? ? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + " processing task");
? ? ? ? ? ? ? ? // 模擬任務執行時間
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? }
? ? ? ? // 關閉線程池
? ? ? ? executorService.shutdown();
? ? }
}
4. 生產者 - 消費者模式—用流水線的思想提高效率
生產者 - 消費者模式的核心是一個任務隊列,生產者線程生產任務,并將任務添加到任務隊列中,而消費者線程從任務隊列中獲取任務并執行。生產者-消費者模式是一種經典的多線程設計模式,用于解決生產者和消費者之間的協作問題。在這個模式中,有一個共享的緩沖區(或隊列),生產者將產品放入緩沖區,而消費者從緩沖區取出產品。這有助于實現生產者和消費者的解耦和協同工作。
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerExample {
? ? private static final int CAPACITY = 5;
? ? private static Queue<Integer> buffer = new LinkedList<>();
? ? public static void main(String[] args) {
? ? ? ? Thread producerThread = new Thread(() -> {
? ? ? ? ? ? for (int i = 0; i < 10; i++) {
? ? ? ? ? ? ? ? produce(i);
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? Thread consumerThread = new Thread(() -> {
? ? ? ? ? ? for (int i = 0; i < 10; i++) {
? ? ? ? ? ? ? ? consume();
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? producerThread.start();
? ? ? ? consumerThread.start();
? ? }
? ? private static void produce(int item) {
? ? ? ? synchronized (buffer) {
? ? ? ? ? ? // 檢查緩沖區是否已滿
? ? ? ? ? ? while (buffer.size() == CAPACITY) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? // 如果滿了,等待消費者取出產品
? ? ? ? ? ? ? ? ? ? buffer.wait();
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? // 生產產品并放入緩沖區
? ? ? ? ? ? buffer.offer(item);
? ? ? ? ? ? System.out.println("Produced: " + item);
? ? ? ? ? ? // 通知消費者可以取出產品
? ? ? ? ? ? buffer.notifyAll();
? ? ? ? }
? ? }
? ? private static void consume() {
? ? ? ? synchronized (buffer) {
? ? ? ? ? ? // 檢查緩沖區是否為空
? ? ? ? ? ? while (buffer.isEmpty()) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? // 如果為空,等待生產者生產產品
? ? ? ? ? ? ? ? ? ? buffer.wait();
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? // 從緩沖區取出產品并消費
? ? ? ? ? ? int item = buffer.poll();
? ? ? ? ? ? System.out.println("Consumed: " + item);
? ? ? ? ? ? // 通知生產者可以繼續生產產品
? ? ? ? ? ? buffer.notifyAll();
? ? ? ? }
? ? }
}
生產者-消費者隊列的優點
支持異步處理
場景:用戶注冊后,需要發注冊郵件和注冊短信。傳統的做法有兩種 1.串行的方式;2.并行方式
引入消息隊列,將不是必須的業務邏輯異步處理
解耦
場景:用戶下單后,訂單系統需要通知庫存系統扣減庫存。
削峰,消除生產者生產與消費者消費之間速度差異
在計算機當中,創建的線程越多,CPU進行上下文切換的成本就越大,所以我們在編程的時候創建的線程并不是越多越好,而是適量即可,采用生產者和消費者模式就可以很好的支持我們使用適量的線程來完成任務。如果在某一段業務高峰期的時間里生產者“生產”任務的速率很快,而消費者“消費”任務速率很慢,由于中間的任務隊列的存在,也可以起到緩沖的作用,我們在使用MQ中間件的時候,經常說的削峰填谷也就是這個意思。
過飽問題解決方案
在實際項目開發中會有些極端的情況,導致生產者/消費者模式可能出現過飽的問題。單位時間內,生產者的速度大于消費者的速度,導致任務不斷堆積在阻塞隊列中,隊列堆滿只是時間問題。
是不是只要保證消費者的消費速度一直對生產者的生產速度快就可以解決過飽問題?
其實我們只要在業務可以容忍的最長響應時間內,把堆積的任務處理完,那就不算過飽。
什么是業務容忍的最長響應時間? 比如埋點數據統計前一天的數據生成報表,第二天老板要看的,你前一天的數據第二天還沒處理完,那就不行,這樣的系統我們就要保證,消費者在24小時內的消費能力要比生產者高才行。
場景一:消費者每天能處理的量比生產者生產的少;如生產者每天1萬條,消費者每天只能消費 5千條。
解決辦法:消費者加機器
原因:生產者沒法限流,因為要一天內處理完,只能消費者加機器
場景二:消費者每天能處理的量比生產者生產的多。系統高峰期生產者速度太快,把隊列塞爆了
解決辦法:適當加大隊列
原因:消費者一天的消費能力已經高于生產者,那說明一天之內肯定能處理完,保證高峰期別把隊列塞滿就好
場景三:消費者每天能處理的量比生產者生產的多。條件有限或其他原因,隊列沒法設置特別大。系統高峰期生產者速度太快,把隊列塞爆了
解決辦法:生產者限流
原因:消費者一天的消費能力高于生產者,說明一天內能處理完,隊列又太小,那只能限流生產者,讓高峰期塞隊列的速度慢點
?