在 C# 中實現 生產者-消費者模式,通常需要多個線程來處理數據的生產和消費。我們可以使用 Queue<T>
來作為存儲數據的隊列,并使用 Thread
、Mutex
或 Monitor
來確保線程安全。BlockingCollection<T>
是 C# 提供的一個線程安全的集合,可以非常方便地用于實現生產者-消費者模式。
生產者-消費者模式的關鍵點:
- 生產者線程:產生數據并將其放入隊列中。
- 消費者線程:從隊列中取出數據并進行處理。
- 線程同步:使用
BlockingCollection<T>
等線程安全的集合來避免競爭條件,同時確保生產者和消費者之間的協調。
示例:使用 BlockingCollection<T>
C# 提供了 BlockingCollection<T>
類,它可以用來在生產者和消費者線程之間提供同步機制。它是一個線程安全的集合,并支持阻塞操作,因此可以自動協調生產者和消費者的行為。
代碼示例:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;class Program
{// 使用 BlockingCollection 實現線程安全的隊列static BlockingCollection<int> queue = new BlockingCollection<int>(5); // 隊列最大容量為5// 生產者線程static void Producer(){int item = 0;while (true){Thread.Sleep(500); // 模擬生產延遲// 生產數據并加入隊列queue.Add(item);Console.WriteLine("生產者生產數據: " + item);item++;}}// 消費者線程static void Consumer(){while (true){int item = queue.Take(); // 阻塞直到隊列中有數據Console.WriteLine("消費者消費數據: " + item);Thread.Sleep(1000); // 模擬消費延遲}}static void Main(){// 啟動生產者線程Thread producerThread = new Thread(Producer);producerThread.Start();// 啟動消費者線程Thread consumerThread = new Thread(Consumer);consumerThread.Start();// 等待線程結束(實際上,生產者和消費者線程會永遠運行下去)producerThread.Join();consumerThread.Join();}
}
代碼解釋:
BlockingCollection<int> queue
:一個線程安全的隊列,最大容量為 5。BlockingCollection
會在隊列滿時阻塞生產者線程,在隊列為空時阻塞消費者線程。Producer()
:模擬生產者線程,每 500 毫秒生成一個數據并放入隊列中。如果隊列已滿,Add
操作會阻塞生產者線程,直到隊列有空位。Consumer()
:模擬消費者線程,每秒消費一個數據。Take()
會阻塞直到隊列中有數據。Thread.Sleep()
:用來模擬生產和消費的延遲。
BlockingCollection<T>
的關鍵方法:
Add(T item)
:將項目添加到集合中。如果集合已滿,它將阻塞直到有空余空間。Take()
:從集合中移除并返回一個項。如果集合為空,它將阻塞直到有可用項。TryAdd(T item)
:嘗試將項目添加到集合中。如果成功則返回true
,否則返回false
,不會阻塞。TryTake(out T item)
:嘗試從集合中移除并返回一個項。如果集合為空,返回false
。
擴展:多個生產者和多個消費者
BlockingCollection<T>
支持多個生產者和多個消費者,并且可以通過它來輕松實現復雜的生產者-消費者模型。你只需要啟動多個線程來執行生產者和消費者的邏輯即可。
示例:多個生產者和多個消費者
using System;
using System.Collections.Concurrent;
using System.Threading;class Program
{static BlockingCollection<int> queue = new BlockingCollection<int>(5); // 隊列最大容量為5// 生產者線程static void Producer(int id){int item = 0;while (true){Thread.Sleep(500); // 模擬生產延遲// 生產數據并加入隊列queue.Add(item);Console.WriteLine($"生產者 {id} 生產數據: {item}");item++;}}// 消費者線程static void Consumer(int id){while (true){int item = queue.Take(); // 阻塞直到隊列中有數據Console.WriteLine($"消費者 {id} 消費數據: {item}");Thread.Sleep(1000); // 模擬消費延遲}}static void Main(){// 啟動多個生產者線程for (int i = 1; i <= 2; i++){int producerId = i;new Thread(() => Producer(producerId)).Start();}// 啟動多個消費者線程for (int i = 1; i <= 3; i++){int consumerId = i;new Thread(() => Consumer(consumerId)).Start();}// 主線程等待Console.ReadLine();}
}
代碼解釋:
- 多個生產者線程:在
Main()
方法中,啟動了 2 個生產者線程。每個線程調用Producer()
方法,生成不同的數據并將其放入共享隊列。 - 多個消費者線程:啟動了 3 個消費者線程,它們從同一個共享隊列中取出數據進行處理。
運行結果:
生產者 1 生產數據: 0
生產者 2 生產數據: 0
消費者 1 消費數據: 0
生產者 1 生產數據: 1
消費者 2 消費數據: 1
消費者 3 消費數據: 2
...
總結:
BlockingCollection<T>
是 C# 中實現生產者-消費者模式的理想工具。它是線程安全的,支持阻塞操作,且可以容納多個生產者和消費者。- 通過
BlockingCollection<T>
的Add
和Take
方法,生產者和消費者可以安全地進行數據交換而無需擔心并發問題。 - 使用多個生產者和消費者線程時,
BlockingCollection<T>
會自動處理隊列的同步和線程間協調。