ABP VNext + Cosmos DB Change Feed:搭建實時數據變更流服務 🚀
📚 目錄
- ABP VNext + Cosmos DB Change Feed:搭建實時數據變更流服務 🚀
- TL;DR ?🚀
- 1. 環境與依賴 🏗?
- 2. 服務注冊與依賴注入 🔌
- 3. 封裝 Change Feed 為 IHostedService 🔧
- 3.1 HostedService 生命周期流程圖
- 3.2 `ChangeFeedHostedService` 實現
- 4. 事務與冪等 🛡?
- 5. 發布到事件總線 📡
- MassTransit 示例
- 6. 容錯與監控 🛠?📊
- 7. 橫向擴展 🌐
- 參考文檔 📖
TL;DR ?🚀
- 全托管 DI:CosmosClient 由容器單例管理,HostedService 構造注入,優雅釋放。
- 作用域與事務:回調內創建新 Scope,結合
IUnitOfWorkManager
實現事務一致性🛡?。 - Exactly-Once:通過(DocumentId, ETag)唯一索引 + 手動 Checkpoint,確保不漏不重?。
- 容錯重試:Polly 指數退避重試與熔斷,處理啟動與回調中的網絡抖動🔄。
- 監控可擴展:日志、指標、Dead-Letter 容錯,中控告警 + 多實例自動分片,助力彈性伸縮📊。
1. 環境與依賴 🏗?
-
.NET 平臺:.NET 6 + / ABP VNext 6.x
-
Azure 資源:Cosmos DB Core API(Source 容器 + Lease 容器)
-
主要 NuGet 包
dotnet add package Microsoft.Azure.Cosmos dotnet add package Volo.Abp.EventBus.MassTransit dotnet add package Streamiz.Kafka.Net.Stream # 可選 dotnet add package Volo.Abp.EntityFrameworkCore dotnet add package Polly
-
appsettings.json 配置
{"Cosmos": {"ConnectionString": "<your-connection-string>","Database": "MyAppDb","SourceContainer": "Docs","LeaseContainer": "Leases"},"RabbitMq": { "Host": "rabbitmq://localhost" },"Kafka": { "BootstrapServers": "localhost:9092" } }
2. 服務注冊與依賴注入 🔌
在 MyAppModule
的 ConfigureServices
中:
public override void ConfigureServices(ServiceConfigurationContext context)
{var configuration = context.Services.GetConfiguration();// CosmosClient 單例托管context.Services.AddSingleton(sp =>new CosmosClient(configuration["Cosmos:ConnectionString"]));// Polly 重試策略:3 次指數退避context.Services.AddSingleton(sp => Policy.Handle<Exception>().WaitAndRetryAsync(retryCount: 3,sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),onRetry: (ex, ts, retryCount, ctx) =>{var logger = sp.GetRequiredService<ILogger<ChangeFeedHostedService>>();logger.LogWarning(ex, "?? ChangeFeed 啟動重試,第 {RetryCount} 次", retryCount);}));// 注冊 HostedServicecontext.Services.AddHostedService<ChangeFeedHostedService>();
}
💡 Tip:將 Cosmos、RabbitMQ、Kafka 等配置抽象到 SettingDefinition
,支持動態變更。
3. 封裝 Change Feed 為 IHostedService 🔧
3.1 HostedService 生命周期流程圖
?? “觸發 StartAsync”更準確地反映了 ASP.NET Core Host 的啟動流程。
3.2 ChangeFeedHostedService
實現
public class ChangeFeedHostedService : IHostedService, IDisposable
{private readonly CosmosClient _cosmosClient;private readonly IConfiguration _config;private readonly ILogger<ChangeFeedHostedService> _logger;private readonly IAsyncPolicy _retryPolicy;private readonly IServiceProvider _serviceProvider;private ChangeFeedProcessor _processor;public ChangeFeedHostedService(CosmosClient cosmosClient,IConfiguration config,ILogger<ChangeFeedHostedService> logger,IAsyncPolicy retryPolicy,IServiceProvider serviceProvider){_cosmosClient = cosmosClient;_config = config;_logger = logger;_retryPolicy = retryPolicy;_serviceProvider = serviceProvider;}public async Task StartAsync(CancellationToken ct){await _retryPolicy.ExecuteAsync(async () =>{_logger.LogInformation("🔄 ChangeFeedHostedService 正在啟動...");var dbName = _config["Cosmos:Database"];var src = _cosmosClient.GetContainer(dbName, _config["Cosmos:SourceContainer"]);var lease = _cosmosClient.GetContainer(dbName, _config["Cosmos:LeaseContainer"]);_processor = src.GetChangeFeedProcessorBuilder<MyDocument>("abp-processor", HandleChangesAsync).WithInstanceName(Environment.MachineName).WithLeaseContainer(lease).WithStartTime(DateTime.MinValue.ToUniversalTime()).Build();await _processor.StartAsync(ct);_logger.LogInformation("? ChangeFeedProcessor 已啟動");});}public async Task StopAsync(CancellationToken ct){if (_processor != null){_logger.LogInformation("🛑 ChangeFeedProcessor 正在停止...");await _processor.StopAsync(ct);_logger.LogInformation("? ChangeFeedProcessor 已停止");}}public void Dispose() => _processor = null;private async Task HandleChangesAsync(IReadOnlyCollection<MyDocument> docs,CancellationToken ct){if (docs == null || docs.Count == 0) return;_logger.LogInformation("📥 收到 {Count} 條文檔變更", docs.Count);// 創建新的 DI Scopeusing var scope = _serviceProvider.CreateScope();var uowManager = scope.ServiceProvider.GetRequiredService<IUnitOfWorkManager>();var eventBus = scope.ServiceProvider.GetRequiredService<IDistributedEventBus>();var auditRepo = scope.ServiceProvider.GetRequiredService<IRepository<AuditEntry, Guid>>();// 開始事務using var uow = await uowManager.BeginAsync();foreach (var doc in docs){try{// 發布領域事件await eventBus.PublishAsync(new DocumentChangedEvent(doc.Id, doc), ct);// 審計寫入,唯一索引保證冪等var entry = new AuditEntry{DocumentId = doc.Id,ETag = doc.ETag,Operation = doc.Operation,Timestamp = DateTime.UtcNow,Payload = JsonConvert.SerializeObject(doc)};await auditRepo.InsertAsync(entry, autoSave: true);}catch (DbUpdateException dbEx)when (dbEx.InnerException?.Message.Contains("UNIQUE") ?? false){_logger.LogWarning("?? 文檔 {DocumentId}@{ETag} 唯一索引沖突,跳過", doc.Id, doc.ETag);}catch (Exception ex){_logger.LogError(ex, "🔥 寫審計失敗,寫入 Dead-Letter 容器");await WriteToDeadLetterAsync(doc, ex, ct);// 回滾本次事務await uow.RollbackAsync();// 跳過到下一文檔continue;}}// 提交事務await uow.CompleteAsync();// 手動 Checkpointawait _processor.CheckpointAsync(ct);_logger.LogInformation("🗸 Checkpoint 完成,位置已記錄");}private Task WriteToDeadLetterAsync(MyDocument doc, Exception ex, CancellationToken ct){// TODO: 實現將失敗批次寫入 Dead-Letter 容器或隊列,用于離線補償return Task.CompletedTask;}
}
4. 事務與冪等 🛡?
💡 Tip:在 AuditEntry
上建立 (DocumentId, ETag)
唯一索引,捕獲 DbUpdateException
后跳過重復。
5. 發布到事件總線 📡
MassTransit 示例
services.AddMassTransit(cfg =>
{cfg.AddConsumer<DocumentChangedConsumer>();cfg.UsingRabbitMq((ctx, rc) =>{rc.Host(Configuration["RabbitMq:Host"]);rc.ReceiveEndpoint("change-feed-queue", e =>e.ConfigureConsumer<DocumentChangedConsumer>(ctx));});
});
public class DocumentChangedConsumer : IConsumer<DocumentChangedEvent>
{public async Task Consume(ConsumeContext<DocumentChangedEvent> ctx){// 下游業務邏輯…}
}
6. 容錯與監控 🛠?📊
- Polly 重試:啟動與回調均受重試策略保護🔁。
- Dead-Letter 容錯:異常時寫入專用容器/隊列,離線補償。
- 日志:
ILogger<ChangeFeedHostedService>
記錄啟動/停止、批次數量、Checkpoint、異常詳情。 - 監控指標:集成 Application Insights 或 Prometheus,暴露 Lease 分片數、消費延遲、批量大小、錯誤率等。
7. 橫向擴展 🌐
- 多實例分片:同一 ProcessorName 啟動 N 實例,Cosmos DB 自動均衡 Lease 分片。
- 彈性伸縮:結合監控告警,自動擴縮 Kubernetes Deployment 或 VMSS,實現高峰應對。
參考文檔 📖
- Azure Cosmos DB Change Feed 官方文檔
- ABP 事件總線指南
- MassTransit 文檔