🚀 ABP VNext + Dapr Workflows:輕量級分布式工作流
📚 目錄
- 🚀 ABP VNext + Dapr Workflows:輕量級分布式工作流
- 一、引言 ?
- TL;DR 🔥
- 二、環境與依賴 🛠?
- 三、系統架構與流程圖 🏗?
- 四、在 ABP 模塊中注冊 Dapr Workflows 📦
- 五、定義 Workflow 與 Activities 🎯
- 5.1 定義活動(Activity)
- 5.2 定義工作流(Workflow)
- 六、觸發與查詢工作流 🔍
- 6.1 啟動 ABP 應用
- 6.2 發起工作流
- 6.3 暴露查詢端點
- 七、示例演示 🎬
- 八、最佳實踐與優化 💡
一、引言 ?
TL;DR 🔥
- 在 ABP VNext 應用中,只需一行
services.AddDaprWorkflow(...)
即可無侵入集成 Dapr Workflow SDK,開啟長運行分布式工作流編排 🎉 (Dapr Docs) - 通過
state.redis
或 CosmosDB 等可插拔 State Store 實現跨服務狀態持久化與恢復,支持 Saga 補償模式 🔄 (Dapr Docs) - 定義繼承自
Workflow<TInput, TOutput>
的工作流類與WorkflowActivity<TArg, TResult>
的活動類,使用context.CallActivityAsync
保證確定性重放 🛠? (Diagrid) - 演示“下單—保留庫存—扣款—失敗補償”全流程,涵蓋高性能、高可用、易復現實踐 ?
背景
在微服務架構中,分布式事務難以擴展,“最終一致性”與 Saga 模式已成主流。Dapr Workflows 提供代碼化工作流,基于 DurableTask 引擎在 State Store 中持久化狀態,結合補償與定時器,簡化復雜業務的可靠編排。
二、環境與依賴 🛠?
-
.NET 平臺:.NET 9,ABP vNext v9.x
-
Dapr 運行時:Dapr CLI ≥1.10;Workflow Runtime v1.15.4
-
NuGet 包:
dotnet add package Dapr.Workflow --version 1.15.4
-
State Store 組件 (
components/statestore.yaml
):apiVersion: dapr.io/v1alpha1 kind: Component metadata:name: statestore spec:type: state.redisversion: v1metadata:- name: redisHostvalue: "localhost:6379"# 生產環境推薦使用支持事務的后端,如 Azure Cosmos DB 或 SQL Server
(Dapr Docs)
-
基礎設施:Redis / Azure Cosmos DB;Dapr Sidecar
三、系統架構與流程圖 🏗?
- OrderService API 通過 Dapr Sidecar 調用 Workflow 管理 API
- Workflow Runtime 調度活動并將狀態寫入 State Store,支持斷點重放
- Saga 補償:在失敗場景通過補償活動保證最終一致性
四、在 ABP 模塊中注冊 Dapr Workflows 📦
using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;public class MyAppModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){// 可選:顯式注冊 DaprClientcontext.Services.AddDaprClient();// 一行集成 Dapr Workflows,自動注冊 Client 與 Workercontext.Services.AddDaprWorkflow(options =>{options.RegisterWorkflow<OrderWorkflow>();options.RegisterActivity<ReserveInventoryActivity>();options.RegisterActivity<ChargePaymentActivity>();options.RegisterActivity<RefundPaymentActivity>();options.RegisterActivity<ReleaseInventoryActivity>();});}
}
AddDaprWorkflow
會自動注冊DaprWorkflowClient
、DaprClient
(若未注冊)及后臺 HostedService,無需額外中間件調用 (Dapr Docs)
五、定義 Workflow 與 Activities 🎯
using Dapr.Workflow;
using Dapr.Workflow.Models;
5.1 定義活動(Activity)
繼承自 WorkflowActivity<TArg, TResult>
并重寫 RunAsync
,實現冪等邏輯:
public record PaymentInput(Guid OrderId, decimal Amount);public class ReserveInventoryActivity : WorkflowActivity<Guid, bool>
{public override Task<bool> RunAsync(WorkflowActivityContext context,Guid orderId){// 調用庫存服務,保證冪等return Task.FromResult(true);}
}public class ChargePaymentActivity : WorkflowActivity<PaymentInput, bool>
{public override Task<bool> RunAsync(WorkflowActivityContext context,PaymentInput input){// 調用支付服務,保證冪等return Task.FromResult(true);}
}
(Diagrid)
5.2 定義工作流(Workflow)
繼承自 Workflow<OrderDto, object>
,在 RunAsync
中編排活動并處理補償:
public class OrderWorkflow : Workflow<OrderDto, object>
{public override async Task<object> RunAsync(WorkflowContext context,OrderDto order){var logger = context.CreateReplaySafeLogger<OrderWorkflow>();logger.LogInformation("Order {OrderId} 開始", order.Id);try{await context.CallActivityAsync<bool>(nameof(ReserveInventoryActivity),order.Id);await context.CallActivityAsync<bool>(nameof(ChargePaymentActivity),new PaymentInput(order.Id, order.Amount));}catch (Exception ex){logger.LogWarning(ex, "執行失敗,開始補償");await context.CallActivityAsync<bool>(nameof(RefundPaymentActivity),order.Id);await context.CallActivityAsync<bool>(nameof(ReleaseInventoryActivity),order.Id);throw;}logger.LogInformation("Order {OrderId} 完成", order.Id);return null!;}
}
(Diagrid)
六、觸發與查詢工作流 🔍
6.1 啟動 ABP 應用
dapr run \--app-id order-api \--app-port 5000 \--dapr-http-port 3500 \--components-path ./components \dotnet run
6.2 發起工作流
using Dapr.Workflow;public class OrderAppService : ApplicationService
{public async Task<string> CreateOrderAsync(CreateOrderDto dto){var client = ServiceProvider.GetRequiredService<DaprWorkflowClient>();string instanceId = Guid.NewGuid().ToString();await client.ScheduleNewWorkflowAsync(workflowName: nameof(OrderWorkflow),instanceId: instanceId,input: dto);return instanceId;}// 新增:查詢工作流狀態public async Task<WorkflowState> GetWorkflowStateAsync(string instanceId){var client = ServiceProvider.GetRequiredService<DaprWorkflowClient>();return await client.GetWorkflowStateAsync(instanceId, includeInputsAndOutputs: true);}
}
使用
ScheduleNewWorkflowAsync
啟動實例 (Dapr Docs)
6.3 暴露查詢端點
using Dapr.Workflow;
using Microsoft.AspNetCore.Mvc;[ApiController]
[Route("api/workflows")]
public class WorkflowController : ControllerBase
{private readonly DaprWorkflowClient _client;public WorkflowController(DaprWorkflowClient client) => _client = client;[HttpGet("{instanceId}")]public async Task<IActionResult> Get(string instanceId){var state = await _client.GetWorkflowStateAsync(instanceId, includeInputsAndOutputs: true);return Ok(state);}
}
curl http://localhost:5000/api/workflows/{instanceId}
- 返回 JSON 包含
RuntimeStatus
、輸入輸出、歷史事件等信息。
七、示例演示 🎬
-
基礎設施
docker run -d --name redis -p 6379:6379 redis dapr init --runtime-version v1.10
-
運行應用并發起訂單
curl -X POST http://localhost:5000/api/orders \-H "Content-Type: application/json" \-d '{"productId":"123","quantity":1,"amount":100}'
-
查詢狀態
curl http://localhost:5000/api/workflows/{instanceId}
-
模擬失敗:在
ChargePaymentActivity
拋出異常,驗證補償活動自動執行 💥
八、最佳實踐與優化 💡
- 冪等性:活動內部調用盡量冪等,防止重試產生副作用。
- 超時與重試:結合 Durable Timers 及
RetryOptions
控制超時與重試。 - 并行與分支:可在工作流中使用
Task.WhenAll(...)
或動態CallActivityAsync
實現并行。 - 版本兼容:升級工作流時,通過前綴或遷移邏輯兼容老實例。
- 生產環境:推薦使用支持事務回滾的 State Store(Cosmos DB、SQL Server)替代 Redis (Dapr Docs)