ABP VNext + MassTransit:構建分布式事務與異步消息協作 🚀
📚 目錄
- ABP VNext + MassTransit:構建分布式事務與異步消息協作 🚀
- 📚 1. 背景與動機
- 🛠? 2. 環境與依賴
- 🔧 3. 在 ABP 模塊中注冊 MassTransit
- 3.1 強類型配置綁定
- 3.2 模塊配置
- 📝 4. 完整消息契約
- 🔄 5. Saga 實體 & 狀態機
- 5.1 `OrderState` with RowVersion
- 5.2 `OrderSagaDbContext` & `OrderStateMap`
- 5.3 狀態機 + 時序圖
- 📤 6. 發布 & 消費示例
- 6.1 發布
- 6.2 消費
- 📦 7. 分布式事務 Outbox 流程
- 🔍 8. 可觀測性 & 性能監控
- 🛠? 9. Kafka 兼容示例
? TL;DR
- 🚀 零侵入 MassTransit:使用
services.AddMassTransit(...)
集成,無需依賴Volo.Abp.EventBus.MassTransit
;若需 ABP 自有事件總線,可安裝Volo.Abp.EventBus.RabbitMQ
- 💾 生產級 Saga 持久化:通過
services.AddDbContext<OrderSagaDbContext>(…)
+.EntityFrameworkRepository(...)
保證狀態持久化與樂觀并發(需配置RowVersion
) - 📬 標準化 Outbox:在
DbContext
內調用builder.ConfigureEventOutbox()
并配置AbpDistributedEventBusOptions
,實現數據庫寫入與消息發布的原子性 - 🔍 一體化可觀測性:棄用 Prometheus 直出,采用 OpenTelemetry Trace + Metrics,通過
UseOpenTelemetryTracing()
與AddOpenTelemetryMetrics().AddPrometheusExporter()
深度監控消息流轉
📚 1. 背景與動機
在微服務架構中,“下單 → 支付 → 發貨” 屬于跨服務長流程,既要求 數據一致性,又追求 高可用 與 可觀測。
- 傳統 2PC 性能低、易死鎖;
- 本地事務+補償模式缺乏集中管理與可視化;
- Saga 模式通過狀態機、持久化與補償,提供更優的分布式事務解決方案
🛠? 2. 環境與依賴
-
.NET:6 +
-
ABP:VNext 6.x +
-
中間件:RabbitMQ(默認)或 Kafka
-
核心 NuGet 包:
dotnet add package MassTransit dotnet add package MassTransit.RabbitMQ dotnet add package MassTransit.Kafka dotnet add package MassTransit.AspNetCore
-
appsettings.json
{"MassTransit": {"UseRabbitMq": true,"RabbitMq": {"Host": "rabbitmq://localhost","Username": "guest","Password": "guest"},"Kafka": {"BootstrapServers": "localhost:9092"}} }
🔧 3. 在 ABP 模塊中注冊 MassTransit
3.1 強類型配置綁定
public class MassTransitOptions
{public bool UseRabbitMq { get; set; }public RabbitMqOptions RabbitMq { get; set; } = new();public KafkaOptions Kafka { get; set; } = new();
}
services.Configure<MassTransitOptions>(Configuration.GetSection("MassTransit"));
var mtOptions = services.BuildServiceProvider().GetRequiredService<IOptions<MassTransitOptions>>().Value;
3.2 模塊配置
[DependsOn(typeof(AbpAutofacModule))]
public class OrderProcessingModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){var options = context.Services.GetConfiguration().GetSection("MassTransit").Get<MassTransitOptions>();// 先注冊 Saga DbContext(用于遷移)context.Services.AddDbContext<OrderSagaDbContext>(builder =>builder.UseSqlServer(Configuration.GetConnectionString("Default")));context.Services.AddMassTransit(x =>{// —— Saga 持久化 & 樂觀并發 —— x.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository(r =>{r.ExistingDbContext<OrderSagaDbContext>();r.UseSqlServer();r.ConcurrencyMode = ConcurrencyMode.Optimistic;});x.AddConsumer<AcceptOrderConsumer>();if (options.UseRabbitMq){x.UsingRabbitMq((ctx, cfg) =>{var rmq = options.RabbitMq;cfg.Host(rmq.Host, h =>{h.Username(rmq.Username);h.Password(rmq.Password);});cfg.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));cfg.UseCircuitBreaker(cb =>{cb.TrackingPeriod = TimeSpan.FromMinutes(1);cb.TripThreshold = 15;cb.ActiveThreshold = 10;cb.ResetInterval = TimeSpan.FromMinutes(5);});cfg.UseHealthCheck(ctx);cfg.UseOpenTelemetryTracing(); cfg.ConfigureEndpoints(ctx);});}else{x.AddRider(r =>{r.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository();r.UsingKafka((ctx, k) =>{k.Host(options.Kafka.BootstrapServers);k.TopicEndpoint<SubmitOrder>("submit-order-topic","order-group",e => e.ConfigureSaga<OrderState>(ctx));});});}});}
}
📝 4. 完整消息契約
public record SubmitOrder (Guid OrderId, decimal Amount, DateTime Timestamp);
public record AcceptOrder (Guid OrderId);
public record OrderCompleted (Guid OrderId, DateTime CompletedAt);
public record OrderFaulted (Guid OrderId, string Reason);
🔄 5. Saga 實體 & 狀態機
5.1 OrderState
with RowVersion
public class OrderState : SagaStateMachineInstance
{public Guid CorrelationId { get; set; }public string CurrentState { get; set; } = "";public DateTime? Created { get; set; }public DateTime? Completed { get; set; }public byte[]? RowVersion { get; set; } // 樂觀并發標記
}
5.2 OrderSagaDbContext
& OrderStateMap
public class OrderSagaDbContext : SagaDbContext
{public OrderSagaDbContext(DbContextOptions<OrderSagaDbContext> options): base(options) { }protected override IEnumerable<ISagaClassMap> Configurations=> new[] { new OrderStateMap() };
}public class OrderStateMap : SagaClassMap<OrderState>
{protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model){entity.Property(x => x.RowVersion).IsRowVersion();}
}
5.3 狀態機 + 時序圖
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{public State Submitted { get; private set; }public State Completed { get; private set; }public Event<SubmitOrder> SubmitOrderEvent { get; private set; }public Event<AcceptOrder> AcceptOrderEvent { get; private set; }public Event<OrderFaulted> OrderFaultedEvent { get; private set; }public OrderStateMachine(){InstanceState(x => x.CurrentState);Event(() => SubmitOrderEvent, x => x.CorrelateById(m => m.Message.OrderId));Event(() => AcceptOrderEvent, x => x.CorrelateById(m => m.Message.OrderId));Event(() => OrderFaultedEvent, x => x.CorrelateById(m => m.Message.OrderId));Initially(When(SubmitOrderEvent).Then(ctx => ctx.Saga.Created = DateTime.UtcNow).TransitionTo(Submitted));During(Submitted,When(AcceptOrderEvent).ThenAsync(ctx => /* 發貨等業務 */ Task.CompletedTask).PublishAsync(ctx => ctx.Init<OrderCompleted>(new{ ctx.Saga.CorrelationId, CompletedAt = DateTime.UtcNow })).Then(ctx => ctx.Saga.Completed = DateTime.UtcNow).TransitionTo(Completed));DuringAny(When(OrderFaultedEvent).ThenAsync(ctx => { /* 補償邏輯 */ return Task.CompletedTask; }).Finalize());SetCompletedWhenFinalized();}
}
📤 6. 發布 & 消費示例
6.1 發布
public class OrderAppService : ApplicationService
{private readonly IPublishEndpoint _publish;public OrderAppService(IPublishEndpoint publish) => _publish = publish;public async Task<Guid> CreateOrderAsync(decimal amount){var orderId = Guid.NewGuid();// 本地寫庫…await _publish.Publish(new SubmitOrder(orderId, amount, DateTime.UtcNow));return orderId;}
}
6.2 消費
public class AcceptOrderConsumer : IConsumer<AcceptOrder>
{public async Task Consume(ConsumeContext<AcceptOrder> ctx){// 支付、庫存等業務// 失敗時:await ctx.Publish(new OrderFaulted(ctx.Message.OrderId, "庫存不足"));}
}
📦 7. 分布式事務 Outbox 流程
public class AppDbContext : AbpDbContext<AppDbContext>, IHasEventOutbox
{public DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }protected override void OnModelCreating(ModelBuilder builder){base.OnModelCreating(builder);builder.ConfigureEventOutbox();}
}
Configure<AbpDistributedEventBusOptions>(opt =>
{opt.Outboxes.Configure(cfg =>cfg.UseDbContext<AppDbContext>());
});
🔍 8. 可觀測性 & 性能監控
- OpenTelemetry Trace:
cfg.UseOpenTelemetryTracing()
捕獲消息發布、消費、Saga 狀態切換等全鏈路 - OpenTelemetry Metrics:
services.AddOpenTelemetryMetrics(builder =>builder.AddPrometheusExporter());
Grafana 拉取 /metrics
可視化監控。
-
并發限流:在接收端口上設置:
cfg.ReceiveEndpoint("accept-order-queue", e => {e.UseConcurrencyLimit(4);e.ConfigureConsumer<AcceptOrderConsumer>(ctx); });
-
批量消費:
x.AddConsumer<BatchOrderConsumer>(cfg =>cfg.Options<BatchOptions>(o => o.MessageLimit = 50)); x.UsingRabbitMq((_, cfg) => {cfg.ReceiveEndpoint("batch-queue", e =>{e.ConfigureConsumer<BatchOrderConsumer>(context);}); });
進一步提升吞吐。
🛠? 9. Kafka 兼容示例
x.AddRider(r =>
{r.AddSagaStateMachine<OrderStateMachine, OrderState>().EntityFrameworkRepository();r.UsingKafka((ctx, k) =>{k.Host("localhost:9092");k.TopicEndpoint<SubmitOrder>("submit-order-topic","order-group",e => e.ConfigureSaga<OrderState>(ctx));});
});