利用 TaskCompletionSource 在 SuperSocket 中實現跨模塊異步處理客戶端消息
在使用 SuperSocket 構建 TCP 服務時,我們經常會遇到這樣的需求:
- 服務端接收到客戶端數據后,需要將數據交給其他模塊處理
- 處理完成后再將結果返回給調用模塊或客戶端
- 希望調用模塊能夠異步等待處理結果,而不是阻塞線程
本文將通過 TaskCompletionSource
來實現這一場景,并結合 SuperSocket 的異步回調機制,講解完整實現方法。
1?? TaskCompletionSource 基礎語法
TaskCompletionSource<T>
(簡稱 TCS)是 .NET 中用于手動控制 Task 完成時機的工具。
1.1 基本概念
- Task:異步操作的表示,通常由方法內部執行完成
- TaskCompletionSource:你可以手動控制 Task 何時完成,以及完成結果是什么
var tcs = new TaskCompletionSource<int>();
Task<int> task = tcs.Task;// 模擬異步事件
Task.Run(async () =>
{await Task.Delay(1000); // 模擬耗時操作tcs.SetResult(42); // 手動完成 Task 并返回結果
});int result = await task;
Console.WriteLine(result); // 輸出 42
核心思想:TaskCompletionSource 就像一個“空盒子”,你自己決定什么時候放入結果并“打開盒子”。
1.2 TCS 常用方法
方法 | 功能 |
---|---|
SetResult(T result) | 設置 Task 成功完成并返回結果 |
TrySetResult(T result) | 安全寫法,如果 Task 已完成不會拋異常 |
SetException(Exception ex) | 設置 Task 異常完成 |
SetCanceled() | 取消 Task |
2?? 在 SuperSocket 中使用 TCS
SuperSocket 的核心是事件驅動,客戶端數據到達時會觸發 UsePackageHandler
回調。我們可以利用 TCS,將“事件”轉換為“可 await 的 Task”,實現異步等待消息。
2.1 定義等待方法
在服務端或調用模塊中定義:
private TaskCompletionSource<int> _tcs;public Task<int> WaitForNextPackageAsync()
{_tcs = new TaskCompletionSource<int>();return _tcs.Task; // 返回可 await 的 Task
}
- 外部模塊調用
await WaitForNextPackageAsync()
時,會掛起等待 - 直到
_tcs.SetResult(result)
被觸發
2.2 接收客戶端消息并觸發 TCS
public event Func<StringPackageInfo, Task<int>> OnPackageReceived;private async ValueTask HandlePackageAsync(IAppSession session, StringPackageInfo package)
{int result = 0;// 調用外部模塊處理消息if (OnPackageReceived != null){result = await OnPackageReceived.Invoke(package);}// 完成 TaskCompletionSource,將結果返回給等待方_tcs?.TrySetResult(result);// 同時可以給客戶端發送響應await session.SendAsync(Encoding.UTF8.GetBytes(result.ToString() + "\r\n"));
}
這里實現了 消息接收與處理邏輯解耦:
- SuperSocket 只負責接收消息
- 外部模塊處理業務邏輯
- 調用模塊異步等待處理結果
2.3 外部模塊處理邏輯示例
mainWindow.OnPackageReceived += async (package) =>
{int result = 0;switch (package.Key.ToUpper()){case "ADD":result = package.Parameters.Select(int.Parse).Sum();break;case "SUB":result = package.Parameters.Select(int.Parse).Aggregate((x, y) => x - y);break;case "MULT":result = package.Parameters.Select(int.Parse).Aggregate((x, y) => x * y);break;}return result; // 返回給 TaskCompletionSource
};
2.4 調用模塊等待結果
int result = await mainWindow.WaitForNextPackageAsync();
Console.WriteLine($"處理結果: {result}");
- 外部模塊就像同步等待一樣獲得了處理結果
- 實際上整個流程是 異步、非阻塞 的
3?? 支持多客戶端或多條命令
如果有多個客戶端或希望同時處理多條消息,可以使用 隊列管理 TCS:
private ConcurrentQueue<TaskCompletionSource<int>> _queue = new();public Task<int> WaitNextAsync()
{var tcs = new TaskCompletionSource<int>();_queue.Enqueue(tcs);return tcs.Task;
}private async ValueTask HandlePackageAsync(IAppSession session, StringPackageInfo package)
{int result = CalculateResult(package);if (_queue.TryDequeue(out var tcs))tcs.TrySetResult(result);await session.SendAsync(Encoding.UTF8.GetBytes(result.ToString() + "\r\n"));
}
- 每條消息對應一個 TCS
- 保證多客戶端/多命令都能異步等待結果
4?? 總結
把回調變成了異步等待,這個真的是太酷啦~~~~~~~
通過 TaskCompletionSource
,我們可以:
- 將事件驅動轉為可 await 的異步操作
- 實現跨模塊異步處理客戶端消息
- 保持服務端與業務邏輯解耦
- 同時支持客戶端響應和模塊異步等待
核心模式:
HandlePackageAsync
觸發 → 外部模塊處理 →TaskCompletionSource.SetResult
→ 調用模塊 await 獲取結果
這種模式非常適合 SuperSocket、SignalR、WebSocket 等異步消息驅動場景。