C# Channel實現線程間通信
同步方式實現:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;namespace ConsoleApp1
{public class ChannelDemo{static Channel<Message> channel1 = Channel.CreateUnbounded<Message>();public static void Main2(){sender.Start(1);receive1.Start(2);receive2.Start(3);sender.Join();Thread.Sleep(3000);receive1.Interrupt();receive2.Interrupt();receive1.Join();receive2.Join();Console.ReadKey();}static Thread sender = new Thread(SendMsg);static Thread receive1 = new Thread(ReceiveMsg);static Thread receive2 = new Thread(ReceiveMsg);static void SendMsg(object id){for (int i = 0; i < 20; i++){if (channel1.Writer.TryWrite(new Message((int)id, i.ToString()))){Console.WriteLine($"【線程{id}】發送了【{i}】");}}}static void ReceiveMsg(object id){try{while (true){if (channel1.Reader.TryRead(out Message message)){Console.WriteLine($"【線程{id}】從【線程{message.id}】接收了【{message.content}】");}Thread.Sleep(1);}}catch (ThreadInterruptedException ex){Console.WriteLine($"接收結束");}}}
}
異步方式:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Remoting.Channels;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;namespace ConsoleApp1
{public class ChannelDemo2{static Channel<Message> channel1 = Channel.CreateUnbounded<Message>();public static async void Main2(){await Task.WhenAll(sender, sender2);channel1.Writer.Complete();await Task.WhenAll(receive1, receive2);Console.ReadKey();}static Task sender = SendMsgAsync(channel1.Writer, 1);static Task sender2 = SendMsgAsync(channel1.Writer, 4);static Task receive1 = ReceiveMsgAsync(channel1.Reader, 2);static Task receive2 = ReceiveMsgAsync(channel1.Reader, 3);static async Task SendMsgAsync(ChannelWriter<Message> writer, int id){for (int i = 0; i < 20; i++){await writer.WriteAsync(new Message((int)id, i.ToString()));Console.WriteLine($"【線程{id}】發送了【{i}】");}}static async Task ReceiveMsgAsync(ChannelReader<Message> reader,int id){try{while (!reader.Completion.IsCompleted){Message message = await reader.ReadAsync(); Console.WriteLine($"【線程{id}】從【線程{message.id}】接收了【{message.content}】");}}catch (ChannelClosedException ex){Console.WriteLine($"ChannelClosed 接收結束");}}}
}
在對Channel進行實例化的時候,也可以傳遞一個Options,這里面可以對消息容量,是否多個發送者和接受者進行定義。