ABP VNext + MassTransit:構建分布式事務與異步消息協作

ABP VNext + MassTransit:構建分布式事務與異步消息協作 🚀


📚 目錄

  • ABP VNext + MassTransit:構建分布式事務與異步消息協作 🚀
    • 📚 1. 背景與動機
    • 🛠? 2. 環境與依賴
    • 🔧 3. 在 ABP 模塊中注冊 MassTransit
      • 3.1 強類型配置綁定
      • 3.2 模塊配置
    • 📝 4. 完整消息契約
    • 🔄 5. Saga 實體 & 狀態機
      • 5.1 `OrderState` with RowVersion
      • 5.2 `OrderSagaDbContext` & `OrderStateMap`
      • 5.3 狀態機 + 時序圖
    • 📤 6. 發布 & 消費示例
      • 6.1 發布
      • 6.2 消費
    • 📦 7. 分布式事務 Outbox 流程
    • 🔍 8. 可觀測性 & 性能監控
    • 🛠? 9. Kafka 兼容示例


? TL;DR

  • 🚀 零侵入 MassTransit:使用 services.AddMassTransit(...) 集成,無需依賴 Volo.Abp.EventBus.MassTransit;若需 ABP 自有事件總線,可安裝 Volo.Abp.EventBus.RabbitMQ
  • 💾 生產級 Saga 持久化:通過 services.AddDbContext<OrderSagaDbContext>(…) + .EntityFrameworkRepository(...) 保證狀態持久化與樂觀并發(需配置 RowVersion
  • 📬 標準化 Outbox:在 DbContext 內調用 builder.ConfigureEventOutbox() 并配置 AbpDistributedEventBusOptions,實現數據庫寫入與消息發布的原子性
  • 🔍 一體化可觀測性:棄用 Prometheus 直出,采用 OpenTelemetry Trace + Metrics,通過 UseOpenTelemetryTracing()AddOpenTelemetryMetrics().AddPrometheusExporter() 深度監控消息流轉

📚 1. 背景與動機

在微服務架構中,“下單 → 支付 → 發貨” 屬于跨服務長流程,既要求 數據一致性,又追求 高可用可觀測

  • 傳統 2PC 性能低、易死鎖;
  • 本地事務+補償模式缺乏集中管理與可視化;
  • Saga 模式通過狀態機、持久化與補償,提供更優的分布式事務解決方案

🛠? 2. 環境與依賴

  • .NET:6 +

  • ABP:VNext 6.x +

  • 中間件:RabbitMQ(默認)或 Kafka

  • 核心 NuGet 包

    dotnet add package MassTransit
    dotnet add package MassTransit.RabbitMQ
    dotnet add package MassTransit.Kafka
    dotnet add package MassTransit.AspNetCore
    
  • appsettings.json

    {"MassTransit": {"UseRabbitMq": true,"RabbitMq": {"Host": "rabbitmq://localhost","Username": "guest","Password": "guest"},"Kafka": {"BootstrapServers": "localhost:9092"}}
    }
    

🔧 3. 在 ABP 模塊中注冊 MassTransit

3.1 強類型配置綁定

public class MassTransitOptions
{public bool UseRabbitMq { get; set; }public RabbitMqOptions RabbitMq   { get; set; } = new();public KafkaOptions    Kafka      { get; set; } = new();
}
services.Configure<MassTransitOptions>(Configuration.GetSection("MassTransit"));
var mtOptions = services.BuildServiceProvider().GetRequiredService<IOptions<MassTransitOptions>>().Value;

3.2 模塊配置

[DependsOn(typeof(AbpAutofacModule))]
public class OrderProcessingModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){var options = context.Services.GetConfiguration().GetSection("MassTransit").Get<MassTransitOptions>();// 先注冊 Saga DbContext(用于遷移)context.Services.AddDbContext<OrderSagaDbContext>(builder =>builder.UseSqlServer(Configuration.GetConnectionString("Default")));context.Services.AddMassTransit(x =>{// —— Saga 持久化 & 樂觀并發 —— x.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository(r =>{r.ExistingDbContext<OrderSagaDbContext>();r.UseSqlServer();r.ConcurrencyMode = ConcurrencyMode.Optimistic;});x.AddConsumer<AcceptOrderConsumer>();if (options.UseRabbitMq){x.UsingRabbitMq((ctx, cfg) =>{var rmq = options.RabbitMq;cfg.Host(rmq.Host, h =>{h.Username(rmq.Username);h.Password(rmq.Password);});cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));cfg.UseCircuitBreaker(cb =>{cb.TrackingPeriod  = TimeSpan.FromMinutes(1);cb.TripThreshold   = 15;cb.ActiveThreshold = 10;cb.ResetInterval   = TimeSpan.FromMinutes(5);});cfg.UseHealthCheck(ctx);cfg.UseOpenTelemetryTracing();  cfg.ConfigureEndpoints(ctx);});}else{x.AddRider(r =>{r.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository();r.UsingKafka((ctx, k) =>{k.Host(options.Kafka.BootstrapServers);k.TopicEndpoint<SubmitOrder>("submit-order-topic","order-group",e => e.ConfigureSaga<OrderState>(ctx));});});}});}
}

📝 4. 完整消息契約

public record SubmitOrder    (Guid OrderId, decimal Amount, DateTime Timestamp);
public record AcceptOrder    (Guid OrderId);
public record OrderCompleted (Guid OrderId, DateTime CompletedAt);
public record OrderFaulted   (Guid OrderId, string Reason);

🔄 5. Saga 實體 & 狀態機

5.1 OrderState with RowVersion

public class OrderState : SagaStateMachineInstance
{public Guid     CorrelationId { get; set; }public string   CurrentState  { get; set; } = "";public DateTime? Created      { get; set; }public DateTime? Completed    { get; set; }public byte[]?  RowVersion    { get; set; }  // 樂觀并發標記
}

5.2 OrderSagaDbContext & OrderStateMap

public class OrderSagaDbContext : SagaDbContext
{public OrderSagaDbContext(DbContextOptions<OrderSagaDbContext> options): base(options) { }protected override IEnumerable<ISagaClassMap> Configurations=> new[] { new OrderStateMap() };
}public class OrderStateMap : SagaClassMap<OrderState>
{protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model){entity.Property(x => x.RowVersion).IsRowVersion();}
}

5.3 狀態機 + 時序圖

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{public State Submitted  { get; private set; }public State Completed  { get; private set; }public Event<SubmitOrder>  SubmitOrderEvent  { get; private set; }public Event<AcceptOrder>  AcceptOrderEvent  { get; private set; }public Event<OrderFaulted>  OrderFaultedEvent { get; private set; }public OrderStateMachine(){InstanceState(x => x.CurrentState);Event(() => SubmitOrderEvent,   x => x.CorrelateById(m => m.Message.OrderId));Event(() => AcceptOrderEvent,   x => x.CorrelateById(m => m.Message.OrderId));Event(() => OrderFaultedEvent,  x => x.CorrelateById(m => m.Message.OrderId));Initially(When(SubmitOrderEvent).Then(ctx => ctx.Saga.Created = DateTime.UtcNow).TransitionTo(Submitted));During(Submitted,When(AcceptOrderEvent).ThenAsync(ctx => /* 發貨等業務 */ Task.CompletedTask).PublishAsync(ctx => ctx.Init<OrderCompleted>(new{ ctx.Saga.CorrelationId, CompletedAt = DateTime.UtcNow })).Then(ctx => ctx.Saga.Completed = DateTime.UtcNow).TransitionTo(Completed));DuringAny(When(OrderFaultedEvent).ThenAsync(ctx => { /* 補償邏輯 */ return Task.CompletedTask; }).Finalize());SetCompletedWhenFinalized();}
}
Yes
No
SubmitOrder
State: Submitted
AcceptOrder
處理成功?
Publish OrderCompleted
Set Completed
Finalized
Publish OrderFaulted
Compensation

📤 6. 發布 & 消費示例

6.1 發布

public class OrderAppService : ApplicationService
{private readonly IPublishEndpoint _publish;public OrderAppService(IPublishEndpoint publish) => _publish = publish;public async Task<Guid> CreateOrderAsync(decimal amount){var orderId = Guid.NewGuid();// 本地寫庫…await _publish.Publish(new SubmitOrder(orderId, amount, DateTime.UtcNow));return orderId;}
}

6.2 消費

public class AcceptOrderConsumer : IConsumer<AcceptOrder>
{public async Task Consume(ConsumeContext<AcceptOrder> ctx){// 支付、庫存等業務// 失敗時:await ctx.Publish(new OrderFaulted(ctx.Message.OrderId, "庫存不足"));}
}

📦 7. 分布式事務 Outbox 流程

public class AppDbContext : AbpDbContext<AppDbContext>, IHasEventOutbox
{public DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }protected override void OnModelCreating(ModelBuilder builder){base.OnModelCreating(builder);builder.ConfigureEventOutbox();}
}
Configure<AbpDistributedEventBusOptions>(opt =>
{opt.Outboxes.Configure(cfg =>cfg.UseDbContext<AppDbContext>());
});
API: CreateOrder
DB Txn Begin
保存 OutgoingEvent
DB Txn Commit
Outbox Worker 定時/輪詢觸發
MassTransit.Publish
消息下行

🔍 8. 可觀測性 & 性能監控

  • OpenTelemetry Tracecfg.UseOpenTelemetryTracing() 捕獲消息發布、消費、Saga 狀態切換等全鏈路
  • OpenTelemetry Metrics
 services.AddOpenTelemetryMetrics(builder =>builder.AddPrometheusExporter());

Grafana 拉取 /metrics 可視化監控。

  • 并發限流:在接收端口上設置:

    cfg.ReceiveEndpoint("accept-order-queue", e =>
    {e.UseConcurrencyLimit(4);e.ConfigureConsumer<AcceptOrderConsumer>(ctx);
    });
    
  • 批量消費

    x.AddConsumer<BatchOrderConsumer>(cfg =>cfg.Options<BatchOptions>(o => o.MessageLimit = 50));
    x.UsingRabbitMq((_, cfg) =>
    {cfg.ReceiveEndpoint("batch-queue", e =>{e.ConfigureConsumer<BatchOrderConsumer>(context);});
    });
    

    進一步提升吞吐。


🛠? 9. Kafka 兼容示例

x.AddRider(r =>
{r.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository();r.UsingKafka((ctx, k) =>{k.Host("localhost:9092");k.TopicEndpoint<SubmitOrder>("submit-order-topic","order-group",e => e.ConfigureSaga<OrderState>(ctx));});
});

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/89059.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/89059.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/89059.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

語義網技術

用通俗語言說語義網技術&#xff0c;以及它和現在互聯網的關系 一、語義網技術&#xff1a;讓網絡“聽懂人話”的智能升級 現有互聯網就像一本巨大的“圖文報紙”&#xff1a;我們人類看文章、圖片能輕松理解意思&#xff0c;但計算機只能識別文字符號&#xff0c;不知道“蘋…

pytorch學習—4.反向傳播(用pytorch算梯度)

2. 線性模型 3.梯度下降算法 4.反向傳播_嗶哩嗶哩_bilibili 4.1 代碼復現 import torch import matplotlib.pyplot as pltx_data=[1.0,2.0,3.0] y_data=[2.0,4.0,6.0]#這里創建了一個PyTorch張量w,初始值為1.0,并且設置requires_grad=True, #這意味著在計算過程中,PyTo…

7類茶葉嫩芽圖像分類數據集

在茶葉育種、溯源管理與自動采摘等智能農業場景中&#xff0c;茶樹品種的識別與分類是一項關鍵任務。不同茶葉品種在嫩芽期表現出顯著的形態差異&#xff0c;例如顏色、葉緣結構、芽頭密度等。因此&#xff0c;基于圖像的茶葉品種分類不僅具備實際應用價值&#xff0c;也為農業…

【Elasticsearch】Linux環境下安裝Elasticsearch

一&#xff0c;前言 Elasticsearch&#xff08;簡稱 ES&#xff09;是一個基于 ??Apache Lucene?? 構建的開源分布式搜索與分析引擎。它支持??實時數據處理??&#xff0c;提供近實時的全文搜索能力&#xff0c;并通過 ??JSON 格式的 RESTful API?? 實現數據索引與檢…

【數據結構--樹于哨兵查找-1】

查找 從前到后- 線性查找 -就是順序查找. 哨兵法查找–節省每次都要判斷是否越界的這一步驟利于節省開銷&#xff0c;從而提升效率。 參考我的程序 #include <stdio.h> #include <stdlib.h> #include <time.h> #include <stdbool.h>#define SIZE …

MyBatis修改(update)操作

1. 三步法口訣 “接口收對象&#xff0c;SQL全賦值&#xff0c;主鍵定目標” 2. 詳細記憶點 | 步驟 | 口訣 | 說明與示例 | |--------------|----------------|----------------------------------------------------------------------------| | 1. 寫接口 | “接口收對象…

Spring Boot 入門學習

一、 Web應用開發概述 什么是Web應用 1. Web應用 &#xff08;Web Application&#xff09;是一種運行在Web服務器上的軟件程序&#xff0c;由用戶通過Web瀏覽器進行訪問和交互。 2.Web應用與傳統的桌面應用不同&#xff0c;它不需要在個人計算機上安裝特定的軟件&#xff0…

深度解讀概率與證據權重 -Probability and the Weighing of Evidence

以下是I.J.古德&#xff08;I.J. Good&#xff09;的經典著作 《概率與證據權衡》&#xff08;Probability and the Weighing of Evidence, 1950&#xff09; 的中文詳細總結&#xff1a; 本文由「大千AI助手」原創發布&#xff0c;專注用真話講AI&#xff0c;回歸技術本質。拒…

跟著AI學習C#之項目實戰-電商平臺 Day6

&#x1f4c5; Day 6&#xff1a;后臺管理系統開發&#xff08;Admin Panel&#xff09; ? 今日目標&#xff1a; 創建管理員頁面布局實現商品管理&#xff08;CRUD&#xff09;實現訂單管理&#xff08;查看、狀態變更&#xff09;添加權限控制&#xff08;僅管理員可訪問&…

使用OpcUaHelper在C# WinForms中連接OPC UA服務器并讀取數據

使用OpcUaHelper在C# WinForms中連接OPC UA服務器并讀取數據 下面是一個完整的示例&#xff0c;展示如何使用OpcUaHelper庫在C# WinForms應用程序中連接OPC UA服務器并讀取數據。 1. 準備工作 首先&#xff0c;確保你已經安裝了OpcUaHelper NuGet包。可以通過NuGet包管理器控…

鴻蒙應用開發中的數據存儲:SQLite與Preferences全面解析

在鴻蒙&#xff08;HarmonyOS&#xff09;應用開發中&#xff0c;數據存儲是構建功能完整、用戶體驗良好的應用程序的關鍵環節。鴻蒙系統提供了多種數據存儲解決方案&#xff0c;其中SQLite數據庫和Preferences&#xff08;偏好設置&#xff09;是最常用的兩種方式。本文將深入…

夏至之日,共赴實時 AI 之約:RTE Open Day@AGI Playground 2025 回顧

每年 RTE 開發者社區的重磅活動—— RTE Open Day &#xff0c;也在六月的 AGI Playground 現場開啟今年的行程。這是 RTE Open Day 第五期現場&#xff0c;這期我們的關鍵詞是 「Real-Time AI」 和 「Voice Agent」&#xff0c;不僅有來自社區的 16 個項目&#xff0c;還有兩場…

Tomcat性能調優指南

文章目錄 一、Tomcat性能調優概述為什么需要調優Tomcat&#xff1f; 二、Tomcat架構與性能關鍵點三、JVM調優1. 內存配置優化2. 垃圾回收優化3. 其他JVM優化參數 四、連接器(Connector)調優1. NIO vs APR/Native2. 高級NIO配置 五、線程池優化六、會話管理優化1. 會話超時配置2…

Swift 小技巧:用單邊區間優雅處理模糊范圍

進入正題之前先科普一下 Swift 區間的知識。 Swift 中的區間有兩種類型&#xff1a;閉區間和半開區間。 閉區間&#xff1a;用 a...b 表示&#xff0c;包含 a 和 b。半開區間&#xff1a;用 a..<b 表示&#xff0c;包含 a 但不包含 b。 舉個例子 想判斷一個數字是否在 0 …

Tang Prime 20K板OV2640例程

準備用Tang Prime 20K開發板進行OV2640攝像頭采集驗證。 Tang Primer 20K是由開源硬件廠商SiPEED矽速科技推出&#xff0c;是一款以 GW2A-LV18PG256C8/I7 為主芯片的核心板&#xff0c;準備了 2 個擴展板&#xff0c;Dock 和 Lite。板卡包含有HDMI輸出&#xff0c;DVP接口&…

基于Anaconda環境開發IntelliJ IDEA實用JSON轉Java實體插件

在軟件開發中&#xff0c;將JSON數據轉換為Java實體類是常見需求。借助Anaconda環境強大的包管理能力與IntelliJ IDEA的插件開發體系&#xff0c;我們可以打造一款高效實用的JSON轉Java實體插件&#xff0c;顯著提升開發效率。下面將從需求分析、技術選型、開發實現到優化部署&…

idea運行到遠程機器 和 idea遠程JVM調試

一、idea運行到遠程機器 適用場景&#xff0c;本地連接不上遠程機器的部分組件&#xff0c;如&#xff1a;redis、數據庫。 缺點&#xff1a;每次修改程序&#xff0c;會復制所有的 依賴和class 啟動比較慢。 工作原理&#xff1a;遠程機器和本機器&#xff0c;都會啟動一個端口…

微信小程序接入騰訊云短信驗證碼流程

以下是針對 AA公司微信小程序接入騰訊云短信驗證碼 的 全流程操作指南&#xff0c;包含資質申請、簽名/模板配置、代碼對接的完整解決方案&#xff1a; 一、資質申請&#xff08;必須通過審核才能發短信&#xff09; 1?? 進入資質管理頁 路徑&#xff1a;騰訊云控制臺 → 短…

阿里云OSS文件上傳完整實現方案

一、前言 阿里云對象存儲服務(OSS)是一種海量、安全、低成本、高可靠的云存儲服務。本文將詳細介紹如何在Spring Boot項目中集成阿里云OSS實現文件上傳功能。 二、準備工作 1. 獲取OSS配置信息 在開始前&#xff0c;您需要準備以下OSS配置信息&#xff1a; endpoint: OSS服…

【軟考--軟件設計師】10.2 關系型數據庫

10 模式分解 分解 模式分解:將一個關系模式分解為多個子模式 模式分解就是模式規范化的工具&#xff0c;模式分解使用無損連接和保持函數依賴來衡量模式分解后是否導致原有模式中部分信息丟失。 無損連接 保持函數依賴 11、事務管理 事務的ACID性質: (1)原子性(Atomicit…