分布式事務、CAP定理、事件總線,在當前微服務、分布式、集群大行其道的架構前提下,是不可逃避的幾個關鍵字,在此不會過多闡述相關的理論知識。Shashlik.EventBus
就是一個基于.NET6的開源事件總線解決方案,同時也是分布式事務最終一致性、延遲事件解決方案。Shashlik.EventBus
采用的是異步確保的思路(本地消息表),將消息數據與業務數據在同一事務中進行提交或回滾,以此來保證消息數據的可靠性。其設計目標是高性能、簡單、易用、易擴展,為拋棄歷史包袱,僅支持NET6,采用最寬松的 MIT 開源協議。
?https://github.com/dotnet-shashlik/shashlik.eventbus
各位爺高興了給個star唄。
如圖所示,消息數據需要和業務數據在同一的事務中進行提交或者回滾,最后Shashlik.EventBus會檢查消息數據是否已提交,如果已提交才會執行真正的消息發送。所以要求事務的隔離級別最低為讀已提交(RC)。
關于消息冪等
Shashlik.EventBus不能保證業務消息的冪等性,為了保證消息的可靠傳輸,EventBus以及消息中間件對消息QOS處理等級必須為at least once (至少到達一次),一般消息中間件都需要開啟消息持久化避免消息丟失。簡而言之就是一個事件處理類可能處理多次同一個事件,事件消息的冪等性應該由業務方進行處理。比如用戶訂單付款完成為一個事件,付款完成后需要修改訂單狀態為待發貨,也就是在付款完成事件處理類中可能收到多次這個訂單的付款完成事件,那么業務的冪等性處理就可以使用鎖,判斷訂單狀態,如果訂單狀態已經為待發貨,則直接返回并忽略本次事件響應。
延遲事件
Shashlik.EventBus支持基于本地的延遲事件機制,考慮到不是所有的消息中間件都支持延遲功能,且為了最大程度保證消息的可靠性,最后采用了System.Timers.Timer來執行延遲功能。
延遲事件同樣適用于分布式事務最終一致性,但如果延遲事件處理類處理異常由重試器介入處理后,那么最終的延遲執行時間和期望的延遲時間就會產生較大的差異,是否忽略這里的時間差需要由具體的業務來決定。比如訂單30分鐘未付款需要關閉訂單,30分鐘后關閉訂單出現了異常,最后由重試器到了40分鐘后才關閉,也不影響訂單,那么認為這個時間差可以容忍。又比如雙11啦,發布一個延遲事件,晚上12點叫醒我起來買買買,只有1分鐘時間,過了就買不到了,那么這種情況可以在事件處理類中,自行根據當前時間、事件發送時間、延遲執行時間等要素,自行決定業務如何處理。
延遲事件和普通事件在事件定義和事件處理類聲明和處理時沒有任何區別,僅僅是在發布事件時需要指定延遲時間。
上代碼
需求:一個新用戶注冊以后有以下需求:
發送歡迎注冊短信;
發放新用戶優惠券;
30分鐘后推送新用戶優惠活動信息。
1. 服務配置,這里以MySql + RabbitMQ為例:
services.AddEventBus(r =>{// 這些都是缺省配置,可以直接services.AddEventBus()// 運行環境,注冊到MQ的事件名稱和事件處理名稱會帶上此后綴r.Environment = "Production";// 最大失敗重試次數,默認60次r.RetryFailedMax = 60;// 消息重試間隔,默認2分鐘r.RetryInterval = 60 * 2;// 單次重試消息數量限制,默認100r.RetryLimitCount = 100;// 成功的消息過期時間,默認3天,失敗的消息永不過期,必須處理r.SucceedExpireHour = 24 * 3; // 消息處理失敗后,重試器介入時間,默認5分鐘后r.StartRetryAfter = 60 * 5; // 事務提交超時時間,單位秒,默認60秒r.TransactionCommitTimeout = 60;// 重試器執行時消息鎖定時長r.LockTime = 110;})// 使用ef DbContext mysql.AddMySql<DemoDbContext>()// 配置RabbitMQ.AddRabbitMQ(r =>{r.Host = "localhost";r.UserName = "rabbit";r.Password = "123123";});
定義事件
// 新用戶注冊完成事件,實現接口IEventpublic class NewUserEvent : IEvent{public string Id { get;set; }public string Name { get; set; }}// 定義新用戶注冊延遲活動推送事件public class NewUserPromotionEvent : IEvent{public string Id { get;set; }public string Name { get; set; }public string PromotionId { get; set; }}
發布事件
public class UserManager{public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext){EventPublisher = eventPublisher;DbContext = dbContext;}private IEventPublisher EventPublisher { get; }private DemoDbContext DbContext { get; }public async Task CreateUserAsync(UserInput input){// 開啟本地事務using var tran = await DbContext.DataBase.BeginTransactionAsync();try{// 創建用戶邏輯處理...// 發布新用戶事件// 通過注入IEventPublisher發布事件,需要傳入事務上下文數據await EventPublisher.PublishAsync(new NewUserEvent{Id = user.Id,Name = input.Name}, DbContext.GetTransactionContext());// 發布延遲事件// 通過ef擴展,直接使用DbContext發布事件,自動使用當前上下文事務await DbContext.PublishEventAsync(new NewUserPromotionEvent{Id = user.Id,Name = input.Name,PromotionId = "1"}, DatetimeOffset.Now.AddMinutes(30));// 提交本地事務await tran.CommitAsync();}catch(Exception ex){// 回滾事務,消息數據也將回滾不會發布await tran.RollbackAsync();}}}
定義事件處理類
// 一個事件可以有多個處理類,可以分布在不同的微服務中// 用于發送短信的事件處理類public class NewUserEventForSmsHandler : IEventHandler<NewUserEvent>{public async Task Execute(NewUserEvent @event, IDictionary<string, string> items){// 發送短信...}}// 用于發放消費券的事件處理類public class NewUserEventForCouponsHandler : IEventHandler<NewUserEvent>{public async Task Execute(NewUserEvent @event, IDictionary<string, string> items){// 業務處理...}}// 用于新用戶延遲活動的事件處理類,將在指定時間執行public class NewUserPromotionEventHandler : IEventHandler<NewUserPromotionEvent>{public async Task Execute(NewUserPromotionEvent @event, IDictionary<string, string> items){// 業務處理...}}
默認的,發布、聲明到消息中間件的事件、事件處理器名稱生產規則為{Type.Name}.{Options.Environment},在分布式架構下需要,您需要了解這個默認規則,這點不同于CAP框架必須顯示聲明,當然Shashlik.EventBus也可以使用EventBusNameAttribute特性來顯示聲明,詳細說明請上github查看wiki文檔。
XA事務支持(TransactionScope)
雖然盡可能的不要使用TransactionScope,但在某些場景仍然是需要的,Shashlik.EventBus對其提供了事務支持,可以通過XaTransactionContext.Current獲取當前環境的事務上下文,發布事件如下:
public class UserManager{public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext){EventPublisher = eventPublisher;DbContext = dbContext;}private IEventPublisher EventPublisher { get; }private DemoDbContext DbContext { get; }public async Task CreateUserAsync(UserInput input){// 開啟事務using var scope = new TransactionScope();try{// 創建用戶邏輯處理...// 發布新用戶事件// 通過注入IEventPublisher發布事件,需要傳入事務上下文數據await EventPublisher.PublishAsync(new NewUserEvent{Id = user.Id,Name = input.Name// 使用 XaTransactionContext.Current}, XaTransactionContext.Current);// 提交事務await scope.Complete();}catch(Exception ex){// 回滾事務,消息數據也將回滾不會發布await tran.RollbackAsync();}}}
擴展
如果默認實現不能滿足你的需求,可以自行實現可擴展接口,并注冊即可。
IMsgIdGenerator:消息Id生成器,是指傳輸的全局唯一id,不是指存儲的id。默認guid
IEventPublisher:事件發布處理器。
IMessageSerializer:消息序列化、反序列化處理類。默認Newtonsoft.Json。
IReceivedMessageRetryProvider:已接收消息重試器。
IPublishedMessageRetryProvider:已發布消息重試器。
IEventHandlerInvoker: 事件處理執行器
IEventNameRuler:事件名稱規則生成(對應消息隊列topic/route)。
IEventHandlerNameRuler:事件處理名稱規則生成(對應消息隊列queue/group)。
IEventHandlerFindProvider:事件處理類查找器
IExpiredMessageProvider:已過期消息刪除處理器。
IMessageListener:消息監聽處理器。
IRetryProvider:重試執行器。
IPublishHandler:消息發布處理器。
IReceivedHandler:消息接收處理器。
IMessageStorageInitializer:存儲介質初始化。
IMessageStorage:消息存儲、讀取等操作。
例:
// 替換默認的IMsgIdGeneratorservice.AddSingleton<IMsgIdGenerator, CustomMsgIdGenerator>();service.AddEventBus().AddMemoryQueue().AddMemoryStorage();
后續計劃
功能
?Dashboard
消息中間件支持
?RabbitMQ
?Kafka
?RocketMQ
?ActiveMQ
?Pulsar
?Redis
存儲支持
?MySql
?PostgreSql
?SqlServer
?Oracle
?MongoDb