- 核心概念
- 創建Channel
- 無界通道
- 有界通道
- `FullMode`選項
- 生產者-消費者模式
- 生產者寫入數據
- 消費者讀取數據
- 完整示例
- 高級配置
- 優化選項:
- 取消操作:通過 `CancellationToken` 取消讀寫。
- 錯誤處理
- 適用場景
- `Channel`的類型
- 創建`Channel`
- 寫入和讀取消息
- 使用場景
- 示例代碼
- 注意事項
在
C#
中,
System.Threading.Channels
提供了
高效的異步生產-消費模型,適用于多任務間的數據傳遞。以下是其核心概念及使用方法的總結:
核心概念
Channel<T>
:異步消息隊列,支持多生產者和多消費者。
ChannelWriter<T>
:用于異步寫入數據(WriteAsync
),完成后需調用 Complete()
。
ChannelReader<T>
:用于異步讀取數據,支持 ReadAsync
或 ReadAllAsync
遍歷。
創建Channel
無界通道
var channel = Channel.CreateUnbounded<int>();
容量無限,適用于不確定數據量的場景。
有界通道
var options = new BoundedChannelOptions(10)
{FullMode = BoundedChannelFullMode.Wait // 滿時等待
};
var channel = Channel.CreateBounded<int>(options);
FullMode
選項
-
Wait
(默認):寫入時阻塞直到有空間。 -
DropOldest
/DropNewest
:丟棄最舊/最新數據。 -
DropWrite
:丟棄當前寫入的數據。
生產者-消費者模式
生產者寫入數據
async Task Producer(ChannelWriter<int> writer)
{for (int i = 0; i < 10; i++){await writer.WriteAsync(i);await Task.Delay(100);}writer.Complete(); // 標記完成
}
消費者讀取數據
async Task Consumer(ChannelReader<int> reader)
{// 方式1: ReadAllAsync遍歷await foreach (var item in reader.ReadAllAsync()){Console.WriteLine($"Received: {item}");}// 方式2: 手動循環while (await reader.WaitToReadAsync()){while (reader.TryRead(out var item)){Console.WriteLine($"Received: {item}");}}
}
完整示例
using System;
using System.Threading.Channels;
using System.Threading.Tasks;class Program
{static async Task Main(){var channel = Channel.CreateUnbounded<int>();var producer = Producer(channel.Writer);var consumer = Consumer(channel.Reader);await Task.WhenAll(producer, consumer);}static async Task Producer(ChannelWriter<int> writer){try{for (int i = 0; i < 10; i++){await writer.WriteAsync(i);await Task.Delay(100);}}catch (Exception ex){writer.Complete(ex); // 傳遞異常}finally{writer.Complete();}}static async Task Consumer(ChannelReader<int> reader){try{await foreach (var item in reader.ReadAllAsync()){Console.WriteLine($"Processed: {item}");}}catch (Exception ex){Console.WriteLine($"Error: {ex.Message}");}}
}
高級配置
優化選項:
var options = new UnboundedChannelOptions()
{SingleWriter = true, // 單一生產者優化SingleReader = false // 允許多消費者
};
取消操作:通過 CancellationToken
取消讀寫。
await writer.WriteAsync(item, cancellationToken);
錯誤處理
生產者異常時,調用 writer.Complete(ex)
通知消費者。
消費者通過 try-catch
捕獲遍歷時的異常。
適用場景
數據流水線處理。
高吞吐量的異步任務。
多任務間的負載均衡。
在C#
中,System.Threading.Channels
是一個強大的異步通信機制,主要用于實現生產者-消費者模式。它提供了線程安全的通道(Channel
),用于在不同線程之間傳遞數據。以下是關于C# Channel
的詳細介紹:
Channel
的類型
Channel
有兩種類型:
有界通道(Bounded Channel
):具有固定容量,當通道已滿時,可以根據指定的策略處理新消息。
無界通道(Unbounded Channel
):沒有容量限制,適合生產者和消費者速度匹配的場景。
創建Channel
使用Channel.CreateBounded<T>
創建有界通道,需要指定容量和滿時的處理策略(如Wait
、DropNewest
、DropOldest
等)。
使用Channel.CreateUnbounded<T>
創建無界通道。
寫入和讀取消息
生產者通過channel.Writer.WriteAsync()
方法寫入消息。
消費者通過channel.Reader.ReadAsync()
或channel.Reader.WaitToReadAsync()
讀取消息。
使用場景
Channel
主要用于生產者-消費者模式,可以實現高效的異步數據處理。它支持多線程操作,并可以通過SingleReader
和SingleWriter
屬性限制通道的讀寫行為。
示例代碼
以下是一個簡單的生產者-消費者示例:
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10)
{FullMode = BoundedChannelFullMode.Wait
});Task producer = Task.Run(async () =>
{for (int i = 0; i < 10; i++){await channel.Writer.WriteAsync(i);Console.WriteLine($"Produced: {i}");}channel.Writer.Complete();
});Task consumer = Task.Run(async () =>
{while (await channel.Reader.WaitToReadAsync()){if (channel.Reader.TryRead(out var item)){Console.WriteLine($"Consumed: {item}");}}
});await Task.WhenAll(producer, consumer);
注意事項
- 緩沖區溢出:生產者寫入速度過快可能導致緩沖區溢出。
- 正確關閉
Channel
:在數據完全消費后關閉Channel
,避免數據丟失。