Saga 模式
Saga 最初出現在1987年Hector Garcaa-Molrna & Kenneth Salem發表的一篇名為《Sagas》的論文里。其核心思想是將長事務拆分為多個短事務,借助Saga事務協調器的協調,來保證要么所有操作都成功完成,要么運行相應的補償事務以撤消先前完成的工作,從而維護多個服務之間的數據一致性。舉例而言,假設有個在線購物網站,其后端服務劃分為訂單服務、支付服務和庫存服務。那么一次下訂單的Saga流程如下圖所示:

在Saga模式中本地事務是Saga 參與者執行的工作單元,每個本地事務都會更新數據庫并發布消息或事件以觸發 Saga 中的下一個本地事務。如果本地事務失敗,Saga 會執行一系列補償事務,以撤消先前本地事務所做的更改。 對于Saga模式的實現又分為兩種形式:
協同式:把Saga 的決策和執行順序邏輯分布在Saga的每個參與方中,通過交換事件的方式進行流轉。示例圖如下所示:

編排式:把Saga的決策和執行順序邏輯集中定義在一個Saga 編排器中。Saga 編排器發出命令式消息給各個Saga 參與方,指示這些參與方執行怎樣的操作。

從上圖可以看出,對于協同式Saga 存在一個致命的弊端,那就是存在循環依賴的問題,每個Saga參與方都需要訂閱所有影響它們的事件,耦合性較高,且由于Saga 邏輯分散在各參與方,不便維護。相對而言,編排式Saga 則實現了關注點分離,協調邏輯集中在編排器中定義,Saga 參與者僅需實現供編排器調用的API 即可。 在.NET 中也有開箱即用的開源框架實現了編排式的Saga事務模型,也就是MassTransit Courier
,接下來就來實際探索一番。
MassTransit Courier 簡介
MassTransit Courier 是對Routing Slip(路由單) 模式的實現。該模式用于運行時動態指定消息處理步驟,解決不同消息可能有不同消息處理步驟的問題。實現機制是消息處理流程的開始,創建一個路由單,這個路由單定義消息的處理步驟,并附加到消息中,消息按路由單進行傳輸,每個處理步驟都會查看_路由單_并將消息傳遞到路由單中指定的下一個處理步驟。 在MassTransit Courier中是通過抽象IActivity
和RoutingSlip
來實現了Routing Slip模式。通過按需有序組合一系列的Activity,得到一個用來限定消息處理順序的Routing Slip。而每個Activity的具體抽象就是IActivity
和IExecuteActivity
。二者的差別在于IActivity
定義了Execute
和Compensate
兩個方法,而IExecuteActivitiy
僅定義了Execute
方法。其中Execute
代表正向操作,Compensate
代表反向補償操作。用一個簡單的下單流程:創建訂單->扣減庫存->支付訂單舉例而言,使用Courier的實現示意圖如下所示:
基于Courier 實現編排式Saga事務
那具體如何使用MassTransit Courier
來應用編排式Saga 模式呢,接下來就來創建解決方案來實現以上下單流程示例。
創建解決方案
依次創建以下項目,除共享類庫項目外,均安裝MassTransit
和MassTransit.RabbitMQ
NuGet包。
項目 | 項目名 | 項目類型 |
---|---|---|
訂單服務 | MassTransit.CourierDemo.OrderService | ASP.NET Core Web API |
庫存服務 | MassTransit.CourierDemo.InventoryService | Worker Service |
支付服務 | MassTransit.CourierDemo.PaymentService | Worker Service |
共享類庫 | MassTransit.CourierDemo.Shared | Class Library |
三個服務都添加擴展類MassTransitServiceExtensions
,并在Program.cs
類中調用services.AddMassTransitWithRabbitMq();
注冊服務。
using?System.Reflection;
using?MassTransit.CourierDemo.Shared.Models;namespace?MassTransit.CourierDemo.InventoryService;public?static?class?MassTransitServiceExtensions
{public?static?IServiceCollection?AddMassTransitWithRabbitMq(this?IServiceCollection?services){return?services.AddMassTransit(x?=>{x.SetKebabCaseEndpointNameFormatter();//?By?default,?sagas?are?in-memory,?but?should?be?changed?to?a?durable//?saga?repository.x.SetInMemorySagaRepositoryProvider();var?entryAssembly?=?Assembly.GetEntryAssembly();x.AddConsumers(entryAssembly);x.AddSagaStateMachines(entryAssembly);x.AddSagas(entryAssembly);x.AddActivities(entryAssembly);x.UsingRabbitMq((context,?busConfig)?=>{busConfig.Host(host:?"localhost",port:?5672,virtualHost:?"masstransit",configure:?hostConfig?=>{hostConfig.Username("guest");hostConfig.Password("guest");});busConfig.ConfigureEndpoints(context);});});}
}
訂單服務
訂單服務作為下單流程的起點,需要承擔構建RoutingSlip
的職責,因此可以創建一個OrderRoutingSlipBuilder
來構建RoutingSlip
,代碼如下:
using?MassTransit.Courier.Contracts;
using?MassTransit.CourierDemo.Shared.Models;namespace?MassTransit.CourierDemo.OrderService;
public?static?class?OrderRoutingSlipBuilder
{public?static?RoutingSlip?BuildOrderRoutingSlip(CreateOrderDto?createOrderDto){var?createOrderAddress?=?new?Uri("queue:create-order_execute");var?deduceStockAddress?=?new?Uri("queue:deduce-stock_execute");var?payAddress?=?new?Uri("queue:pay-order_execute");????????var?routingSlipBuilder?=?new?RoutingSlipBuilder(Guid.NewGuid());routingSlipBuilder.AddActivity(name:?"order-activity",executeAddress:?createOrderAddress,arguments:?createOrderDto);routingSlipBuilder.AddActivity(name:?"deduce-stock-activity",?executeAddress:?deduceStockAddress);routingSlipBuilder.AddActivity(name:?"pay-activity",?executeAddress:?payAddress);var?routingSlip?=?routingSlipBuilder.Build();return?routingSlip;}
}
從以上代碼可知,構建一個路由單需要以下幾步:
明確業務用例涉及的具體用例,本例中為:
創建訂單:CreateOrder
扣減庫存:DeduceStock
支付訂單:PayOrder
根據用例名,按短橫線隔開命名法(kebab-case)定義用例執行地址,格式為
queue:<usecase>_execute
,本例中為:創建訂單執行地址:queue:create-order_execute
創建訂單執行地址:queue:deduce-stock_execute
創建訂單執行地址:queue:pay-order_execute
創建路由單:
通過
RoutingSlipBuilder(Guid.NewGuid())
創建路由單構建器實例根據業務用例流轉順序,調用
AddActivity()
方法依次添加Activity用來執行用例,因為第一個創建訂單用例需要入口參數,因此傳入了一個CreateOrderDto
DTO(Data Transfer Object)對象調用
Build()
方法創建路由單
對于本例而言,由于下單流程是固定流程,因此以上路由單的構建也是按業務用例進行定義的。而路由單的強大之處在于,可以按需動態組裝。在實際電商場景中,有些訂單是無需執行庫存扣減的,比如充值訂單,對于這種情況,僅需在創建路由單時判斷若為充值訂單則不添加扣減庫存的Activity即可。 對于訂單服務必然要承擔創建訂單的職責,定義CreateOrderActivity
(Activity的命名要與上面定義的用例對應)如下,其中OrderRepository
為一個靜態訂單倉儲類:
public?class?CreateOrderActivity?:?IActivity<CreateOrderDto,?CreateOrderLog>
{private?readonly?ILogger<CreateOrderActivity>?_logger;public?CreateOrderActivity(ILogger<CreateOrderActivity>?logger){_logger?=?logger;}//?訂單創建public?async?Task<ExecutionResult>?Execute(ExecuteContext<CreateOrderDto>?context){var?order?=?await?CreateOrder(context.Arguments);var?log?=?new?CreateOrderLog(order.OrderId,?order.CreatedTime);_logger.LogInformation($"Order?[{order.OrderId}]?created?successfully!");return?context.CompletedWithVariables(log,?new?{order.OrderId});}private?async?Task<Order>?CreateOrder(CreateOrderDto?orderDto){var?shoppingItems?=orderDto.ShoppingCartItems.Select(item?=>?new?ShoppingCartItem(item.SkuId,?item.Price,?item.Qty));var?order?=?new?Order(orderDto.CustomerId).NewOrder(shoppingItems.ToArray());await?OrderRepository.Insert(order);return?order;}//?訂單補償(取消訂單)public?async?Task<CompensationResult>?Compensate(CompensateContext<CreateOrderLog>?context){var?order?=?await?OrderRepository.Get(context.Log.OrderId);order.CancelOrder();var?exception?=?context.Message.ActivityExceptions.FirstOrDefault();_logger.LogWarning($"Order?[{order.OrderId}?has?been?canceled?duo?to?{exception.ExceptionInfo.Message}!");return?context.Compensated();}
}
從以上代碼可知,實現一個Activity,需要以下步驟:
定義實現
IActivity<in TArguments, in TLog>
需要的參數類:TArguments
對應正向執行入口參數,會在Execute
方法中使用,本例中為CreateOrderDto
,用于訂單創建。TLog
對應反向補償參數,會在Compensate
方法中使用,本例中為CreateOrderLog
,用于訂單取消。
實現
IActivity<in TArguments, in TLog>
接口中的Execute
方法:具體用例的實現,本例中對應訂單創建邏輯
創建
TLog
反向補償參數實例,以便業務異常時能夠按需補償返回Activity執行結果,并按需傳遞參數至下一個Activity,本例僅傳遞訂單Id至下一流程。
實現
IActivity<in TArguments, in TLog>
接口中的Compensate
方法:具體反向補償邏輯的實現,本例中對應取消訂單
返回反向補償執行結果
訂單服務的最后一步就是定義WebApi來接收創建訂單請求,為簡要起便創建OrderController
如下:
using?MassTransit.CourierDemo.Shared.Models;
using?Microsoft.AspNetCore.Mvc;namespace?MassTransit.CourierDemo.OrderService.Controllers;[ApiController]
[Route("[controller]")]
public?class?OrderController?:?ControllerBase
{private?readonly?IBus?_bus;public?OrderController(IBus?bus){_bus?=?bus;}[HttpPost]public?async?Task<IActionResult>?CreateOrder(CreateOrderDto?createOrderDto){//?創建訂單路由單var?orderRoutingSlip?=?OrderRoutingSlipBuilder.BuildOrderRoutingSlip(createOrderDto);//?執行訂單流程await?_bus.Execute(orderRoutingSlip);return?Ok();}
}
庫存服務
庫存服務在整個下單流程的職責主要是庫存的扣減和返還,但由于從上游用例僅傳遞了OrderId參數到庫存扣減Activity,因此在庫存服務需要根據OrderId 去請求訂單服務獲取要扣減的庫存項才能執行扣減邏輯。而這可以通過使用MassTransit的Reqeust/Response 模式
來實現,具體步驟如下:
在共享類庫
MassTransit.CourierDemo.Shared
中定義IOrderItemsRequest
和IOrderItemsResponse
:
namespace?MassTransit.CourierDemo.Shared.Models;public?interface?IOrderItemsRequest
{public?string?OrderId?{?get;?}
}
public?interface?IOrderItemsResponse
{public?List<DeduceStockItem>?DeduceStockItems?{?get;?set;?}public?string?OrderId?{?get;?set;?}
}
在訂單服務中實現
IConsumer<IOrderItemsRequest
:
using?MassTransit.CourierDemo.OrderService.Repositories;
using?MassTransit.CourierDemo.Shared.Models;namespace?MassTransit.CourierDemo.OrderService.Consumers;public?class?OrderItemsRequestConsumer?:?IConsumer<IOrderItemsRequest>
{public?async?Task?Consume(ConsumeContext<IOrderItemsRequest>?context){var?order?=?await?OrderRepository.Get(context.Message.OrderId);await?context.RespondAsync<IOrderItemsResponse>(new{order.OrderId,?DeduceStockItems?=?order.OrderItems.Select(item?=>?new?DeduceStockItem(item.SkuId,?item.Qty)).ToList()});}
}
在庫存服務注冊
service.AddMassTransit()
中注冊x.AddRequestClient<IOrderItemsRequest>();
:
using?System.Reflection;
using?MassTransit.CourierDemo.Shared.Models;namespace?MassTransit.CourierDemo.InventoryService;public?static?class?MassTransitServiceExtensions
{public?static?IServiceCollection?AddMassTransitWithRabbitMq(this?IServiceCollection?services){return?services.AddMassTransit(x?=>{//...????????????x.AddRequestClient<IOrderItemsRequest>();//...});}
}
在需要的類中注冊
IRequestClient<OrderItemsRequest>
服務即可。
最終扣減庫存的Activity實現如下:
public?class?DeduceStockActivity?:?IActivity<DeduceOrderStockDto,?DeduceStockLog>
{private?readonly?IRequestClient<IOrderItemsRequest>?_orderItemsRequestClient;private?readonly?ILogger<DeduceStockActivity>?_logger;public?DeduceStockActivity(IRequestClient<IOrderItemsRequest>?orderItemsRequestClient,ILogger<DeduceStockActivity>?logger){_orderItemsRequestClient?=?orderItemsRequestClient;_logger?=?logger;}//?庫存扣減public?async?Task<ExecutionResult>?Execute(ExecuteContext<DeduceOrderStockDto>?context){var?deduceStockDto?=?context.Arguments;var?orderResponse?=await?_orderItemsRequestClient.GetResponse<IOrderItemsResponse>(new?{?deduceStockDto.OrderId?});if?(!CheckStock(orderResponse.Message.DeduceStockItems))return?context.Faulted(new?Exception("insufficient?stock"));DeduceStocks(orderResponse.Message.DeduceStockItems);var?log?=?new?DeduceStockLog(deduceStockDto.OrderId,?orderResponse.Message.DeduceStockItems);_logger.LogInformation($"Inventory?has?been?deducted?for?order?[{deduceStockDto.OrderId}]!");return?context.CompletedWithVariables(log,?new?{?log.OrderId?});}//?庫存檢查private?bool?CheckStock(List<DeduceStockItem>?deduceItems){foreach?(var?stockItem?in?deduceItems){if?(InventoryRepository.GetStock(stockItem.SkuId)?<?stockItem.Qty)?return?false;}return?true;}private?void?DeduceStocks(List<DeduceStockItem>?deduceItems){foreach?(var?stockItem?in?deduceItems){InventoryRepository.TryDeduceStock(stockItem.SkuId,?stockItem.Qty);}}//庫存補償public?Task<CompensationResult>?Compensate(CompensateContext<DeduceStockLog>?context){foreach?(var?deduceStockItem?in?context.Log.DeduceStockItems){InventoryRepository.ReturnStock(deduceStockItem.SkuId,?deduceStockItem.Qty);}_logger.LogWarning($"Inventory?has?been?returned?for?order?[{context.Log.OrderId}]!");return?Task.FromResult(context.Compensated());}
}
支付服務
對于下單流程的支付用例來說,要么成功要么失敗,并不需要像以上兩個服務一樣定義補償邏輯,因此僅需要實現IExecuteActivity<in TArguments>
接口即可,該接口僅定義了Execute
接口方法,具體PayOrderActivity
實現如下:
using?MassTransit.CourierDemo.Shared;
using?MassTransit.CourierDemo.Shared.Models;namespace?MassTransit.CourierDemo.PaymentService.Activities;public?class?PayOrderActivity?:?IExecuteActivity<PayDto>
{private?readonly?IBus?_bus;private?readonly?IRequestClient<IOrderAmountRequest>?_client;private?readonly?ILogger<PayOrderActivity>?_logger;public?PayOrderActivity(IBus?bus,IRequestClient<IOrderAmountRequest>?client,ILogger<PayOrderActivity>?logger){_bus?=?bus;_client?=?client;_logger?=?logger;}public?async?Task<ExecutionResult>?Execute(ExecuteContext<PayDto>?context){var?response?=?await?_client.GetResponse<IOrderAmountResponse>(new?{?context.Arguments.OrderId?});????????//?do?payment...if?(response.Message.Amount?%?2?==?0){_logger.LogInformation($"Order?[{context.Arguments.OrderId}]?paid?successfully!");return?context.Completed();}_logger.LogWarning($"Order?[{context.Arguments.OrderId}]?payment?failed!");return?context.Faulted(new?Exception("Order?payment?failed?due?to?insufficient?account?balance."));}
}
以上代碼中也使用了MassTransit的Reqeust/Response 模式
來獲取訂單要支付的余額,并根據訂單金額是否為偶數來模擬支付失敗。
運行結果
啟動三個項目,并在Swagger中發起訂單創建請求,如下圖所示:

由于訂單總額為奇數,因此支付會失敗,最終控制臺輸出如下圖所示:
打開RabbitMQ后臺,可以看見MassTransit按照約定創建了以下隊列用于服務間的消息傳遞:

但你肯定好奇本文中使用的路由單具體是怎樣實現的?簡單,停掉庫存服務,再發送一個訂單創建請求,然后從隊列獲取未消費的消息即可解開謎底。以下是抓取的一條消息示例:
{"messageId":?"ac5d0000-e330-482a-b7bc-08dada7915ab","requestId":?null,"correlationId":?"ce8af31b-a65c-4dfa-915c-4ae5174820f9","conversationId":?"ac5d0000-e330-482a-28a5-08dada7915ad","initiatorId":?null,"sourceAddress":?"rabbitmq://localhost/masstransit/THINKPAD_MassTransitCourierDemoOrderService_bus_itqoyy8dgbrniyeobdppw6engn?temporary=true","destinationAddress":?"rabbitmq://localhost/masstransit/deduce-stock_execute?bind=true","responseAddress":?null,"faultAddress":?null,"messageType":?["urn:message:MassTransit.Courier.Contracts:RoutingSlip"],"message":?{"trackingNumber":?"ce8af31b-a65c-4dfa-915c-4ae5174820f9","createTimestamp":?"2022-12-10T06:38:01.5452768Z","itinerary":?[{"name":?"deduce-stock-activity","address":?"queue:deduce-stock_execute","arguments":?{}},{"name":?"pay-activity","address":?"queue:pay-order_execute","arguments":?{}}],"activityLogs":?[{"executionId":?"ac5d0000-e330-482a-7cb2-08dada7915bf","name":?"order-activity","timestamp":?"2022-12-10T06:38:01.7115314Z","duration":?"00:00:00.0183136","host":?{"machineName":?"THINKPAD","processName":?"MassTransit.CourierDemo.OrderService","processId":?23980,"assembly":?"MassTransit.CourierDemo.OrderService","assemblyVersion":?"1.0.0.0","frameworkVersion":?"6.0.9","massTransitVersion":?"8.0.7.0","operatingSystemVersion":?"Microsoft?Windows?NT?10.0.19044.0"}}],"compensateLogs":?[{"executionId":?"ac5d0000-e330-482a-7cb2-08dada7915bf","address":?"rabbitmq://localhost/masstransit/create-order_compensate","data":?{"orderId":?"8c47a1db-cde3-43bb-a809-644f36e7ca99","createdTime":?"2022-12-10T14:38:01.7272895+08:00"}}],"variables":?{"orderId":?"8c47a1db-cde3-43bb-a809-644f36e7ca99"},"activityExceptions":?[],"subscriptions":?[]},"expirationTime":?null,"sentTime":?"2022-12-10T06:38:01.774618Z","headers":?{"MT-Forwarder-Address":?"rabbitmq://localhost/masstransit/create-order_execute"}
}
從中可以看到信封中的message.itinerary
定義了消息的行程,從而確保消息按照定義的流程進行流轉。同時通過message.compensateLogs
來指引若失敗將如何回滾。
總結
通過以上示例的講解,相信了解到MassTransit Courier的強大之處。Courier中的RoutingSlip充當著事務編排器的角色,將Saga的決策和執行順序邏輯封裝在消息體內隨著消息進行流轉,從而確保各服務僅需關注自己的業務邏輯,而無需關心事務的流轉,真正實現了關注點分離。