【Shashlik.EventBus】.NET 事件總線,分布式事務最終一致性簡介

分布式事務、CAP定理、事件總線,在當前微服務、分布式、集群大行其道的架構前提下,是不可逃避的幾個關鍵字,在此不會過多闡述相關的理論知識。Shashlik.EventBus就是一個基于.NET6的開源事件總線解決方案,同時也是分布式事務最終一致性、延遲事件解決方案。Shashlik.EventBus采用的是異步確保的思路(本地消息表),將消息數據與業務數據在同一事務中進行提交或回滾,以此來保證消息數據的可靠性。其設計目標是高性能、簡單、易用、易擴展,為拋棄歷史包袱,僅支持NET6,采用最寬松的 MIT 開源協議。

?https://github.com/dotnet-shashlik/shashlik.eventbus

各位爺高興了給個star唄。

5b8f39c44a253261525473d4dd5435c0.png

如圖所示,消息數據需要和業務數據在同一的事務中進行提交或者回滾,最后Shashlik.EventBus會檢查消息數據是否已提交,如果已提交才會執行真正的消息發送。所以要求事務的隔離級別最低為讀已提交(RC)。

關于消息冪等

Shashlik.EventBus不能保證業務消息的冪等性,為了保證消息的可靠傳輸,EventBus以及消息中間件對消息QOS處理等級必須為at least once (至少到達一次),一般消息中間件都需要開啟消息持久化避免消息丟失。簡而言之就是一個事件處理類可能處理多次同一個事件,事件消息的冪等性應該由業務方進行處理。比如用戶訂單付款完成為一個事件,付款完成后需要修改訂單狀態為待發貨,也就是在付款完成事件處理類中可能收到多次這個訂單的付款完成事件,那么業務的冪等性處理就可以使用鎖,判斷訂單狀態,如果訂單狀態已經為待發貨,則直接返回并忽略本次事件響應。

延遲事件

Shashlik.EventBus支持基于本地的延遲事件機制,考慮到不是所有的消息中間件都支持延遲功能,且為了最大程度保證消息的可靠性,最后采用了System.Timers.Timer來執行延遲功能。

延遲事件同樣適用于分布式事務最終一致性,但如果延遲事件處理類處理異常由重試器介入處理后,那么最終的延遲執行時間和期望的延遲時間就會產生較大的差異,是否忽略這里的時間差需要由具體的業務來決定。比如訂單30分鐘未付款需要關閉訂單,30分鐘后關閉訂單出現了異常,最后由重試器到了40分鐘后才關閉,也不影響訂單,那么認為這個時間差可以容忍。又比如雙11啦,發布一個延遲事件,晚上12點叫醒我起來買買買,只有1分鐘時間,過了就買不到了,那么這種情況可以在事件處理類中,自行根據當前時間、事件發送時間、延遲執行時間等要素,自行決定業務如何處理。

延遲事件和普通事件在事件定義和事件處理類聲明和處理時沒有任何區別,僅僅是在發布事件時需要指定延遲時間。

上代碼

需求:一個新用戶注冊以后有以下需求:

  1. 發送歡迎注冊短信;

  2. 發放新用戶優惠券;

  3. 30分鐘后推送新用戶優惠活動信息。

1. 服務配置,這里以MySql + RabbitMQ為例:

services.AddEventBus(r =>{// 這些都是缺省配置,可以直接services.AddEventBus()// 運行環境,注冊到MQ的事件名稱和事件處理名稱會帶上此后綴r.Environment = "Production";// 最大失敗重試次數,默認60次r.RetryFailedMax = 60;// 消息重試間隔,默認2分鐘r.RetryInterval = 60 * 2;// 單次重試消息數量限制,默認100r.RetryLimitCount = 100;// 成功的消息過期時間,默認3天,失敗的消息永不過期,必須處理r.SucceedExpireHour = 24 * 3;            // 消息處理失敗后,重試器介入時間,默認5分鐘后r.StartRetryAfter = 60 * 5;            // 事務提交超時時間,單位秒,默認60秒r.TransactionCommitTimeout = 60;// 重試器執行時消息鎖定時長r.LockTime = 110;})// 使用ef DbContext mysql.AddMySql<DemoDbContext>()// 配置RabbitMQ.AddRabbitMQ(r =>{r.Host = "localhost";r.UserName = "rabbit";r.Password = "123123";});
  1. 定義事件

// 新用戶注冊完成事件,實現接口IEventpublic class NewUserEvent : IEvent{public string Id { get;set; }public string Name { get; set; }}// 定義新用戶注冊延遲活動推送事件public class NewUserPromotionEvent : IEvent{public string Id { get;set; }public string Name { get; set; }public string PromotionId { get; set; }}
  1. 發布事件

public class UserManager{public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext){EventPublisher = eventPublisher;DbContext = dbContext;}private IEventPublisher EventPublisher { get; }private DemoDbContext DbContext { get; }public async Task CreateUserAsync(UserInput input){// 開啟本地事務using var tran = await DbContext.DataBase.BeginTransactionAsync();try{// 創建用戶邏輯處理...// 發布新用戶事件// 通過注入IEventPublisher發布事件,需要傳入事務上下文數據await EventPublisher.PublishAsync(new NewUserEvent{Id = user.Id,Name = input.Name}, DbContext.GetTransactionContext());// 發布延遲事件// 通過ef擴展,直接使用DbContext發布事件,自動使用當前上下文事務await DbContext.PublishEventAsync(new NewUserPromotionEvent{Id = user.Id,Name = input.Name,PromotionId = "1"}, DatetimeOffset.Now.AddMinutes(30));// 提交本地事務await tran.CommitAsync();}catch(Exception ex){// 回滾事務,消息數據也將回滾不會發布await tran.RollbackAsync();}}}
  1. 定義事件處理類

// 一個事件可以有多個處理類,可以分布在不同的微服務中// 用于發送短信的事件處理類public class NewUserEventForSmsHandler : IEventHandler<NewUserEvent>{public async Task Execute(NewUserEvent @event, IDictionary<string, string> items){// 發送短信...}}// 用于發放消費券的事件處理類public class NewUserEventForCouponsHandler : IEventHandler<NewUserEvent>{public async Task Execute(NewUserEvent @event, IDictionary<string, string> items){// 業務處理...}}// 用于新用戶延遲活動的事件處理類,將在指定時間執行public class NewUserPromotionEventHandler : IEventHandler<NewUserPromotionEvent>{public async Task Execute(NewUserPromotionEvent @event, IDictionary<string, string> items){// 業務處理...}}

默認的,發布、聲明到消息中間件的事件、事件處理器名稱生產規則為{Type.Name}.{Options.Environment},在分布式架構下需要,您需要了解這個默認規則,這點不同于CAP框架必須顯示聲明,當然Shashlik.EventBus也可以使用EventBusNameAttribute特性來顯示聲明,詳細說明請上github查看wiki文檔。

XA事務支持(TransactionScope)

雖然盡可能的不要使用TransactionScope,但在某些場景仍然是需要的,Shashlik.EventBus對其提供了事務支持,可以通過XaTransactionContext.Current獲取當前環境的事務上下文,發布事件如下:

public class UserManager{public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext){EventPublisher = eventPublisher;DbContext = dbContext;}private IEventPublisher EventPublisher { get; }private DemoDbContext DbContext { get; }public async Task CreateUserAsync(UserInput input){// 開啟事務using var scope = new TransactionScope();try{// 創建用戶邏輯處理...// 發布新用戶事件// 通過注入IEventPublisher發布事件,需要傳入事務上下文數據await EventPublisher.PublishAsync(new NewUserEvent{Id = user.Id,Name = input.Name// 使用 XaTransactionContext.Current}, XaTransactionContext.Current);// 提交事務await scope.Complete();}catch(Exception ex){// 回滾事務,消息數據也將回滾不會發布await tran.RollbackAsync();}}}

擴展

如果默認實現不能滿足你的需求,可以自行實現可擴展接口,并注冊即可。

  • IMsgIdGenerator:消息Id生成器,是指傳輸的全局唯一id,不是指存儲的id。默認guid

  • IEventPublisher:事件發布處理器。

  • IMessageSerializer:消息序列化、反序列化處理類。默認Newtonsoft.Json。

  • IReceivedMessageRetryProvider:已接收消息重試器。

  • IPublishedMessageRetryProvider:已發布消息重試器。

  • IEventHandlerInvoker: 事件處理執行器

  • IEventNameRuler:事件名稱規則生成(對應消息隊列topic/route)。

  • IEventHandlerNameRuler:事件處理名稱規則生成(對應消息隊列queue/group)。

  • IEventHandlerFindProvider:事件處理類查找器

  • IExpiredMessageProvider:已過期消息刪除處理器。

  • IMessageListener:消息監聽處理器。

  • IRetryProvider:重試執行器。

  • IPublishHandler:消息發布處理器。

  • IReceivedHandler:消息接收處理器。

  • IMessageStorageInitializer:存儲介質初始化。

  • IMessageStorage:消息存儲、讀取等操作。

例:

// 替換默認的IMsgIdGeneratorservice.AddSingleton<IMsgIdGenerator, CustomMsgIdGenerator>();service.AddEventBus().AddMemoryQueue().AddMemoryStorage();

后續計劃

  • 功能

    • ?Dashboard

  • 消息中間件支持

    • ?RabbitMQ

    • ?Kafka

    • ?RocketMQ

    • ?ActiveMQ

    • ?Pulsar

    • ?Redis

  • 存儲支持

    • ?MySql

    • ?PostgreSql

    • ?SqlServer

    • ?Oracle

    • ?MongoDb

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/282820.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/282820.shtml
英文地址,請注明出處:http://en.pswp.cn/news/282820.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

5個超實用的Visual Studio插件

工欲善其事&#xff0c;必先利其器,整理的一些我必裝的5款Visual Studio插件&#xff0c;希望你們能get到。01 CodeMaidCodeMaid快速整理代碼文件&#xff0c;規范你的代碼&#xff0c;提高代碼閱讀體驗。代碼自動對齊&#xff0c;格式化代碼&#xff08;ps&#xff1a;不用再按…

BZOJ1509: [NOI2003]逃學的小孩(樹的直徑)

Time Limit: 5 Sec Memory Limit: 64 MBSubmit: 1126 Solved: 567[Submit][Status][Discuss]Description Input 第一行是兩個整數N&#xff08;3 ? N ? 200000&#xff09;和M&#xff0c;分別表示居住點總數和街道總數。以下M行&#xff0c;每行給出一條街道的信息。第i1行…

Blazor University (52)依賴注入 —— 擁有多個依賴項:正確的方式

原文鏈接&#xff1a;https://blazor-university.com/dependency-injection/component-scoped-dependencies/owning-multiple-dependencies-the-right-way/擁有多個依賴項&#xff1a;正確的方式在上一節[1]中&#xff0c;我們看到了將多個擁有的依賴項注入組件的錯誤方法。本節…

Gradle 1.12用戶指南翻譯——第五十四章. 構建原生二進制文件

其他章節的翻譯請參見&#xff1a;http://blog.csdn.net/column/details/gradle-translation.html翻譯項目請關注Github上的地址&#xff1a;https://github.com/msdx/gradledoc本文翻譯所在分支&#xff1a;https://github.com/msdx/gradledoc/tree/1.12。直接瀏覽雙語版的文檔…

android 調用c wcf服務,如何使用命名管道從c調用WCF方法?

更新&#xff1a;通過協議here,我無法弄清楚未知的信封記錄.我在網上找不到任何例子.原版的&#xff1a;我有以下WCF服務static void Main(string[] args){var inst new PlusFiver();using (ServiceHost host new ServiceHost(inst,new Uri[] { new Uri("net.pipe://loc…

VK Cup 2015 - Qualification Round 1 A. Reposts(樹)

傳送門 Description One day Polycarp published a funny picture in a social network making a poll about the color of his handle. Many of his friends started reposting Polycarps joke to their news feed. Some of them reposted the reposts and so on. These event…

Lombok@Builder和@NoArgsConstructor沖突

問題 今天在使用lombok簡化model類時。使用Builder建造者模式。報以下異常 解決辦法。 去掉NoArgsConstructor添加AllArgsConstructor源碼分析 下圖是編譯后的源碼 只使用Builder會自動創建全參構造器。而添加上NoArgsConstructor后就不會自動產生全參構造器

現在商業有種競爭叫“跨界打擊”

隨著互聯網的發展&#xff0c;“跨界打擊”的事情可謂是無處不在。行業跨界打擊會搶占某個行業的市場份額&#xff0c;甚至可能淘汰一個行業。跨界打擊者可能是某個行業的新進入者&#xff0c;也可能是現有競爭者&#xff0c;更可能是徹底的替代者或顛覆者。跨界打擊&#xff0…

架構之美閱讀筆記之一

寒假生活開始了&#xff0c;關于軟件架構這部分的學習&#xff0c;我選擇的是《架構之美》這本書。這本出版于2009年的書&#xff0c;由淺入深地講述了從架構的概述&#xff0c;到企業級應用架構&#xff0c;系統架構&#xff0c;最終用戶應用架構&#xff0c;再到語言與架構模…

ntop linux,Linux下開源監控軟件Ntop的性能提升方案

摘要&#xff1a;Ntop是一款Linux下常見的開源監控軟件&#xff0c;它可以監測的數據包括&#xff1a;網絡流量、使用協議、系統負載、端口情況、數據包發送時間等。正常情況下它工作的時候就像一部被動聲納&#xff0c;默默的接收看來自網絡的各種信息&#xff0c;通過對這些數…

Java異常處理教程

異常是在沒有定義正常執行路徑時在Java程序的執行期間可能出現的條件。Java通過將執行操作的代碼與處理錯誤的代碼分離來處理錯誤。 當發生異常時&#xff0c;Java會創建一個包含有關異常的所有信息的對象&#xff0c;并將其傳遞給相應的異常處理代碼。有關異常的信息包括異常的…

性能優化8--內存泄露

一.根源&#xff1a; 內存泄露簡單說就是已經沒有用的資源&#xff0c;但是由于被其他資源引用著無法被GC銷毀。 二.內存泄露常見場景 1.單例導致內存泄露 單例的靜態特性使得它的生命周期同應用的生命周期一樣長&#xff0c;如果一個對象已經沒有用處了&#xff0c;但是單例還…

那些年,登山徒步記錄,立貼

2018年1月-9月份暫無數據。&#xff08;慘無人道&#xff0c;已經喪失了自我。&#xff09; 10月份2017年2月份02月12日 25.00KM 牛木外線3月份暫無數據。 4月份1.04月09日 16.00KM 火鳳線 5月份1.05月06日 20.00KM 漁帽線&#xff08;第一機耕路&#xff09; 6月份1.06月11日 …

記一次 .NET 某打印服務 非托管內存泄漏

一&#xff1a;背景 1. 講故事前段時間有位朋友在微信上找到我&#xff0c;說他的程序出現了內存泄漏&#xff0c;能不能幫他看一下&#xff0c;這個問題還是比較經典的&#xff0c;加上好久沒上非托管方面的東西了&#xff0c;這篇就和大家分享一下&#xff0c;話不多說&#…

android靜態方法如何測試,android – 如何使用mock()和spy()測試靜態方法

通常情況下,如果你最終使用PowerMock,這是一個很好的跡象,表明你最有可能是錯誤的方式.如果不是直接引用畢加索,而是創建一個組件,它的職責是加載圖像,讓我們說類ImageLoader.這會給你什么&#xff1f;>關注點分離&#xff1a;如果明天你決定轉移到Glide,你不應該改變你使用…

mysql經典的8小時問題-wait_timeout

2019獨角獸企業重金招聘Python工程師標準>>> 前段時間 現網突然頻繁報出 連接不上數據庫&#xff0c;偶滴的妖孽&#xff0c;其他地方都是用mysql&#xff0c;也沒遇到這個問題呀。 java.io.EOFExceptionat at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:1913…

Chrome DevTools — Network

記錄網絡請求 默認情況下&#xff0c;只要DevTools在開啟狀態&#xff0c;DevTools會記錄所有的網絡請求&#xff0c;當然&#xff0c;記錄都是在Network面板展示的。 停止記錄網絡請求 點擊Stop recording network log紅色圖標&#xff0c;當它變為灰色時&#xff0c;表示DevT…

Blazor University 中文版網站已上線

在學習 Blazor 的過程中&#xff0c;找到了一個網站 Blazor University&#xff08;https://blazor-university.com&#xff09;。發現網站內容非常詳實&#xff0c;正像首頁所說的&#xff1a;通過瀏覽本網站中的信息&#xff0c;我打算帶您從完全的新手到Blazor的所有方面的專…

android:paddingtop 百分比,相對層中的百分比寬度

相對層中的百分比寬度我正在為登錄進行表單布局。Activity在我的Android應用程序中。下面的圖片是我希望它看起來的樣子&#xff1a;我能夠通過以下方式實現這個布局XML..問題是&#xff0c;這有點麻煩。我不得不對主機EditText的寬度進行硬編碼。具體而言&#xff0c;我必須具…

MySQL 查看表結構簡單命令

一、簡單描述表結構&#xff0c;字段類型 desc tabl_name; 顯示表結構&#xff0c;字段類型&#xff0c;主鍵&#xff0c;是否為空等屬性&#xff0c;但不顯示外鍵。 例如&#xff1a;desc table_name 二、查詢表中列的注釋信息 select * from information_schema.columns wher…