最近在做一個簡單的風控,其中有一塊需求是這樣的,當主請求參數到達后,會根據這些參數,看調起幾個并發規則,這些規則各自有自己的驗證邏輯,每個規則執行時間長短都不確定,當規則 執行完后,返回主請求,主請求根據規則驗證返回結果,從而決定是否立即response請求,但其他后到的規則 ,要繼續完成后面驗證,以得到驗證結果,以備后用。
初步的.net代碼是這樣實現的:
using System.Threading.Channels;var builder = WebApplication.CreateBuilder(args);var app = builder.Build();app.MapGet("/test", async () =>
{Console.WriteLine($"開始時間 {DateTime.Now.ToString("HH:mm:ss")}");var channels = new List<Channel<Parameter>>();var len = 5;//創建Channel,并關聯ReadAsync方法for (int i = 0; i < len; i++){var channel = Channel.CreateUnbounded<Parameter>(new UnboundedChannelOptions() { AllowSynchronousContinuations = true });channels.Add(channel);await Task.Factory.StartNew(async () =>{Console.WriteLine($"Task {i}");await ReadAsync(channel);});}//向Channel發送信息for (int i = 0; i < channels.Count; i++){var channel = channels[i];await channel.Writer.WriteAsync(new Parameter { I = i });Console.WriteLine($"Write {i}");}//讀取返回Channel,如果有返回True,API提前返回,留下的Channel給RemainingReadAsync執行for (int i = 0; i < channels.Count; i++){var channel = channels[i];if (channel != null && await channel.Reader.WaitToReadAsync()){if (channel.Reader.TryRead(out var par)){if(par.Result){Console.ForegroundColor = ConsoleColor.Green;}Console.WriteLine($"I:{par.I},Result:{par.Result},{DateTime.Now.ToString("HH:mm:ss")}");Console.ResetColor();if (par.Result){//把剩會的推到一個線程中執行for (var r = i + 1; r < channels.Count; r++){await Task.Factory.StartNew(async () =>{await RemainingReadAsync(channels[r]);});}return TypedResults.Ok($"完成,有,{DateTime.Now.ToString("HH:mm:ss")}");}}}}return TypedResults.Ok($"完成,沒有,{DateTime.Now.ToString("HH:mm:ss")}");
});app.Run();
//處理剩余Channelr方法
async Task RemainingReadAsync(Channel<Parameter> channel)
{if (channel != null && await channel.Reader.WaitToReadAsync()){if (channel.Reader.TryRead(out var par)){Console.WriteLine($"I:{par.I},Result:{par.Result},{DateTime.Now.ToString("HH:mm:ss")}");}}
}
//讀取Channel的方法
async Task ReadAsync(Channel<Parameter> channel)
{if (channel != null && await channel.Reader.WaitToReadAsync()){if (channel.Reader.TryRead(out var par)){await Task.Delay((par.I + 1) * 1000);if (par.I == 2){par.Result = true;}else{par.Result = false;}await channel.Writer.WriteAsync(par);}}
}//參數
class Parameter
{public int I { get; set; }public bool Result { get; set; }
}
下圖是執行結果,當五個任務并發調起后,并發寫入數據,執行中,第三個滿足條件,于21:59:13返回請求,但后續的兩個,依然保證完成。
這個Demo只是簡單的完成了邏輯,關于性能還有待進一步測試驗證改進。