ABP VNext + Cosmos DB Change Feed:搭建實時數據變更流服務

ABP VNext + Cosmos DB Change Feed:搭建實時數據變更流服務 🚀


📚 目錄

  • ABP VNext + Cosmos DB Change Feed:搭建實時數據變更流服務 🚀
      • TL;DR ?🚀
    • 1. 環境與依賴 🏗?
    • 2. 服務注冊與依賴注入 🔌
    • 3. 封裝 Change Feed 為 IHostedService 🔧
      • 3.1 HostedService 生命周期流程圖
      • 3.2 `ChangeFeedHostedService` 實現
    • 4. 事務與冪等 🛡?
    • 5. 發布到事件總線 📡
      • MassTransit 示例
    • 6. 容錯與監控 🛠?📊
    • 7. 橫向擴展 🌐
    • 參考文檔 📖


TL;DR ?🚀

  • 全托管 DI:CosmosClient 由容器單例管理,HostedService 構造注入,優雅釋放。
  • 作用域與事務:回調內創建新 Scope,結合 IUnitOfWorkManager 實現事務一致性🛡?。
  • Exactly-Once:通過(DocumentId, ETag)唯一索引 + 手動 Checkpoint,確保不漏不重?。
  • 容錯重試:Polly 指數退避重試與熔斷,處理啟動與回調中的網絡抖動🔄。
  • 監控可擴展:日志、指標、Dead-Letter 容錯,中控告警 + 多實例自動分片,助力彈性伸縮📊。

1. 環境與依賴 🏗?

  • .NET 平臺:.NET 6 + / ABP VNext 6.x

  • Azure 資源:Cosmos DB Core API(Source 容器 + Lease 容器)

  • 主要 NuGet 包

    dotnet add package Microsoft.Azure.Cosmos
    dotnet add package Volo.Abp.EventBus.MassTransit
    dotnet add package Streamiz.Kafka.Net.Stream        # 可選
    dotnet add package Volo.Abp.EntityFrameworkCore
    dotnet add package Polly
    
  • appsettings.json 配置

    {"Cosmos": {"ConnectionString": "<your-connection-string>","Database": "MyAppDb","SourceContainer": "Docs","LeaseContainer": "Leases"},"RabbitMq": { "Host": "rabbitmq://localhost" },"Kafka":   { "BootstrapServers": "localhost:9092" }
    }
    

2. 服務注冊與依賴注入 🔌

MyAppModuleConfigureServices 中:

public override void ConfigureServices(ServiceConfigurationContext context)
{var configuration = context.Services.GetConfiguration();// CosmosClient 單例托管context.Services.AddSingleton(sp =>new CosmosClient(configuration["Cosmos:ConnectionString"]));// Polly 重試策略:3 次指數退避context.Services.AddSingleton(sp => Policy.Handle<Exception>().WaitAndRetryAsync(retryCount: 3,sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),onRetry: (ex, ts, retryCount, ctx) =>{var logger = sp.GetRequiredService<ILogger<ChangeFeedHostedService>>();logger.LogWarning(ex, "?? ChangeFeed 啟動重試,第 {RetryCount} 次", retryCount);}));// 注冊 HostedServicecontext.Services.AddHostedService<ChangeFeedHostedService>();
}

💡 Tip:將 Cosmos、RabbitMQ、Kafka 等配置抽象到 SettingDefinition,支持動態變更。


3. 封裝 Change Feed 為 IHostedService 🔧

3.1 HostedService 生命周期流程圖

應用啟動
DI 容器構建
觸發 IHostedService.StartAsync
啟動 ChangeFeedProcessor
監聽文檔變更
HandleChangesAsync 回調
發布事件 & 寫審計 & Checkpoint
準備下一批

?? “觸發 StartAsync”更準確地反映了 ASP.NET Core Host 的啟動流程。

3.2 ChangeFeedHostedService 實現

public class ChangeFeedHostedService : IHostedService, IDisposable
{private readonly CosmosClient _cosmosClient;private readonly IConfiguration _config;private readonly ILogger<ChangeFeedHostedService> _logger;private readonly IAsyncPolicy _retryPolicy;private readonly IServiceProvider _serviceProvider;private ChangeFeedProcessor _processor;public ChangeFeedHostedService(CosmosClient cosmosClient,IConfiguration config,ILogger<ChangeFeedHostedService> logger,IAsyncPolicy retryPolicy,IServiceProvider serviceProvider){_cosmosClient    = cosmosClient;_config          = config;_logger          = logger;_retryPolicy     = retryPolicy;_serviceProvider = serviceProvider;}public async Task StartAsync(CancellationToken ct){await _retryPolicy.ExecuteAsync(async () =>{_logger.LogInformation("🔄 ChangeFeedHostedService 正在啟動...");var dbName = _config["Cosmos:Database"];var src    = _cosmosClient.GetContainer(dbName, _config["Cosmos:SourceContainer"]);var lease  = _cosmosClient.GetContainer(dbName, _config["Cosmos:LeaseContainer"]);_processor = src.GetChangeFeedProcessorBuilder<MyDocument>("abp-processor", HandleChangesAsync).WithInstanceName(Environment.MachineName).WithLeaseContainer(lease).WithStartTime(DateTime.MinValue.ToUniversalTime()).Build();await _processor.StartAsync(ct);_logger.LogInformation("? ChangeFeedProcessor 已啟動");});}public async Task StopAsync(CancellationToken ct){if (_processor != null){_logger.LogInformation("🛑 ChangeFeedProcessor 正在停止...");await _processor.StopAsync(ct);_logger.LogInformation("? ChangeFeedProcessor 已停止");}}public void Dispose() => _processor = null;private async Task HandleChangesAsync(IReadOnlyCollection<MyDocument> docs,CancellationToken ct){if (docs == null || docs.Count == 0) return;_logger.LogInformation("📥 收到 {Count} 條文檔變更", docs.Count);// 創建新的 DI Scopeusing var scope = _serviceProvider.CreateScope();var uowManager = scope.ServiceProvider.GetRequiredService<IUnitOfWorkManager>();var eventBus   = scope.ServiceProvider.GetRequiredService<IDistributedEventBus>();var auditRepo  = scope.ServiceProvider.GetRequiredService<IRepository<AuditEntry, Guid>>();// 開始事務using var uow = await uowManager.BeginAsync();foreach (var doc in docs){try{// 發布領域事件await eventBus.PublishAsync(new DocumentChangedEvent(doc.Id, doc), ct);// 審計寫入,唯一索引保證冪等var entry = new AuditEntry{DocumentId = doc.Id,ETag       = doc.ETag,Operation  = doc.Operation,Timestamp  = DateTime.UtcNow,Payload    = JsonConvert.SerializeObject(doc)};await auditRepo.InsertAsync(entry, autoSave: true);}catch (DbUpdateException dbEx)when (dbEx.InnerException?.Message.Contains("UNIQUE") ?? false){_logger.LogWarning("?? 文檔 {DocumentId}@{ETag} 唯一索引沖突,跳過", doc.Id, doc.ETag);}catch (Exception ex){_logger.LogError(ex, "🔥 寫審計失敗,寫入 Dead-Letter 容器");await WriteToDeadLetterAsync(doc, ex, ct);// 回滾本次事務await uow.RollbackAsync();// 跳過到下一文檔continue;}}// 提交事務await uow.CompleteAsync();// 手動 Checkpointawait _processor.CheckpointAsync(ct);_logger.LogInformation("🗸 Checkpoint 完成,位置已記錄");}private Task WriteToDeadLetterAsync(MyDocument doc, Exception ex, CancellationToken ct){// TODO: 實現將失敗批次寫入 Dead-Letter 容器或隊列,用于離線補償return Task.CompletedTask;}
}

4. 事務與冪等 🛡?

HandleChangesAsync
IUnitOfWorkManager.Begin
Publish Event & Insert Audit
異常?
寫入 Dead-Letter
Rollback UoW
Complete UoW
Checkpoint

💡 Tip:在 AuditEntry 上建立 (DocumentId, ETag) 唯一索引,捕獲 DbUpdateException 后跳過重復。


5. 發布到事件總線 📡

ChangeFeedProcessor
IDistributedEventBus.PublishAsync
MassTransit/RabbitMQ
Streamiz/Kafka
DocumentChangedConsumer
DocumentChangedProcessor

MassTransit 示例

services.AddMassTransit(cfg =>
{cfg.AddConsumer<DocumentChangedConsumer>();cfg.UsingRabbitMq((ctx, rc) =>{rc.Host(Configuration["RabbitMq:Host"]);rc.ReceiveEndpoint("change-feed-queue", e =>e.ConfigureConsumer<DocumentChangedConsumer>(ctx));});
});
public class DocumentChangedConsumer : IConsumer<DocumentChangedEvent>
{public async Task Consume(ConsumeContext<DocumentChangedEvent> ctx){// 下游業務邏輯…}
}

6. 容錯與監控 🛠?📊

  • Polly 重試:啟動與回調均受重試策略保護🔁。
  • Dead-Letter 容錯:異常時寫入專用容器/隊列,離線補償。
  • 日志ILogger<ChangeFeedHostedService> 記錄啟動/停止、批次數量、Checkpoint、異常詳情。
  • 監控指標:集成 Application Insights 或 Prometheus,暴露 Lease 分片數、消費延遲、批量大小、錯誤率等。

7. 橫向擴展 🌐

  • 多實例分片:同一 ProcessorName 啟動 N 實例,Cosmos DB 自動均衡 Lease 分片。
  • 彈性伸縮:結合監控告警,自動擴縮 Kubernetes Deployment 或 VMSS,實現高峰應對。

參考文檔 📖

  • Azure Cosmos DB Change Feed 官方文檔
  • ABP 事件總線指南
  • MassTransit 文檔

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87982.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87982.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87982.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

STM32-定時器

定時器&#xff1a;有4個獨立通道&#xff1a;輸入捕獲&#xff1b;輸出比較PWM生成&#xff1b;單脈沖模式輸出&#xff1b;可通外部信號控制定時器&#xff08;TIMx-ETR&#xff09;&#xff1b;支持針對定時的增量&#xff08;正交&#xff09;編碼器、霍爾傳感器電路通用定…

Windows Server 2019--職業技能大賽B模塊Windows服務器配置樣題

一、賽題說明 &#xff08;一&#xff09;競賽介紹 請詳細閱讀網絡拓撲圖&#xff0c;為所有計算機修改默認防火墻以便允許ICMP和相應的流量&#xff0c;不允許直接關閉主機的防火墻。除了CD-ROM/HDD驅動器&#xff0c;請不要修改虛擬機本身的硬件設置。 &#xff08;二&…

vue3+Echarts實現立體柱狀圖

Echarts柱狀圖中文網&#xff1a;https://echarts.apache.org/examples/zh/index.html#chart-type-bar 效果展示&#xff1a; 主要實現過程是三部分的組合&#xff0c;最上面是一個橢圓&#xff0c;中間是正常的柱子&#xff0c;下方再加上一個橢圓&#xff0c;就出來立體的效…

【UE5】虛幻引擎小百科

一、類名前面的大寫字母的含義是什么UE5常見前綴分類表前綴含義實例用于AActorACharacter&#xff0c;AWeaponBase可放入世界中的對象&#xff08;有位置、可碰撞等&#xff09;UUObject派生類UUserWidget&#xff0c;UWeaponComponent引擎對象、邏輯模塊&#xff0c;不具備Tra…

【Linux系統】vim編輯器 | 編譯器gcc/g++ | make/Makefile

1. vim編輯器一、歷史發展與Vim vs Vi的區別起源與演進Vi&#xff08;1976年&#xff09; &#xff1a;由Bill Joy開發&#xff0c;嵌入BSD Unix系統&#xff0c;是首個面向屏幕的文本編輯器&#xff0c;但功能有限&#xff08;如無多級撤銷&#xff09;。Vim&#xff08;1991年…

國產飛騰主板,賦能網絡安全防御硬手段

? 當前&#xff0c;網絡安全形勢嚴峻&#xff0c;網絡攻擊手段不斷翻新&#xff0c;從數據泄露到電腦中毒&#xff0c;企業、機構乃至國家的數字資產都面臨著巨大風險。在此背景下&#xff0c;國產硬件技術的突破對筑牢網絡安全防線意義重大。 高能計算機基于市場需求&#…

Spring AI 概述與架構設計

目錄一、前言二、簡介三、核心能力概覽四、理解模塊架構圖五、模型適配能力六、最小應用示例七、與傳統 LLM 調用相比八、總結九、參考一、前言 在 AI 正以前所未有的速度“下沉”到各類系統與業務的當下&#xff0c;Spring 官方推出的 Spring AI 項目&#xff0c;為 Java 開發…

UI前端與數字孿生融合新領域:智慧環保的污染源監測與治理

hello寶子們...我們是艾斯視覺擅長ui設計、前端開發、數字孿生、大數據、三維建模、三維動畫10年經驗!希望我的分享能幫助到您!如需幫助可以評論關注私信我們一起探討!致敬感謝感恩!一、引言&#xff1a;數字孿生重構智慧環保的技術范式在環境污染治理壓力持續增大的背景下&…

【go/wails】wails入門系列(一)環境安裝與demo

文章目錄說在前面go安裝nodejs安裝wails創建項目運行說在前面 操作系統&#xff1a;win11go版本&#xff1a;1.24.4nodejs版本&#xff1a;v22.16.0wails版本&#xff1a;v2.10.1 go安裝 官網 這里 下載安裝即可 nodejs 官網 這里 下載安裝即可 安裝wails 設置go國內代理g…

linux qt 使用log4cpp庫

一、日志庫下載 下載地址&#xff1a;https://log4cpp.sourceforge.net/二、日志庫解壓&#xff0c;編譯 1.將文件夾解壓出來2.進入文件夾內部&#xff0c;打開終端3.終端中依次輸入以下命令 mkdir build ./configure --prefix$(pwd)/build make make install 一般來說不會報錯…

探索阿里云Data Integration:數據同步的魔法工具

引言在當今數字化時代&#xff0c;數據已成為企業的核心資產&#xff0c;如同企業發展的 “燃料”&#xff0c;驅動著業務的增長與創新。從用戶行為數據到業務運營數據&#xff0c;從市場趨勢數據到供應鏈數據&#xff0c;每一個數據點都蘊含著巨大的價值&#xff0c;能夠為企業…

【Java面試】Redis的poll函數epoll函數區別?

Redis 在選擇 poll 和 epoll 時主要基于性能需求、連接規模、操作系統支持等因素。以下是具體場景的對比與選擇建議&#xff1a;1. 何時使用 poll 函數&#xff1f;適用場景&#xff1a; 跨平臺兼容性需求&#xff1a;poll 在幾乎所有操作系統&#xff08;如 Windows、BSD、Lin…

RPC--RPCHandler的實現

在RPC框架中&#xff0c;Handler用于接收RpcRequest&#xff0c;經過處理后返回RpcResponseSlf4jpublic class RpcRequestHandler {private final ServiceProvider serviceProvider;//獲取一個單例模式的服務提供類public RpcRequestHandler() {serviceProvider SingletonFact…

C#讀取文件夾和文件列表:全面指南

C#讀取文件夾和文件列表&#xff1a;全面指南 在 C# 開發中&#xff0c;經常需要獲取文件夾中的文件列表或子文件夾結構&#xff0c;例如文件管理器、批量處理工具、備份程序等場景。本文將詳細介紹 C# 中讀取文件夾和文件列表的各種方法&#xff0c;包括基礎操作、遞歸遍歷、過…

從小白到進階:解鎖linux與c語言高級編程知識點嵌入式開發的任督二脈(1)

【硬核揭秘】Linux與C高級編程&#xff1a;從入門到精通&#xff0c;你的全棧之路&#xff01;第一部分&#xff1a;初識Linux與環境搭建&#xff0c;玩轉軟件包管理——嵌入式開發的第一道“坎”嘿&#xff0c;各位C語言的“卷王”們&#xff01;你可能已經習慣了在Windows或m…

.net開源庫SignalR

.NET開源庫SignalR&#xff1a;打造實時Web應用的利器 在當今的Web開發領域&#xff0c;實時性已經成為了許多應用的核心需求。無論是實時聊天、實時數據監控還是實時游戲&#xff0c;都需要服務器能夠及時地將數據推送給客戶端。而.NET開源庫SignalR&#xff0c;正是滿足這一…

SQL Server不同場景批量插入數據的方式詳解

INSERT INTO...VALUES多行語法 該方法適用于單次插入少量數據(通常<1000行),語法簡潔直觀。示例: INSERT INTO Employees (EmployeeID, Name, Department) VALUES (101, Zhang San, IT),(102, Li Si, HR),(103, Wang Wu, Finance)優點:語法簡單易理解,適合開發測試環…

Day08-Flask 或 Django 簡介:構建 Web 應用程序

Flask 或 Django 簡介&#xff1a;構建 Web 應用程序 網絡開發領域提供了豐富的工具和框架&#xff0c;而 Python 作為一門多功能的語言&#xff0c;在構建健壯且可擴展的 Web 應用方面脫穎而出。本課程將作為你使用 Python 進行 Web 開發的入門指南&#xff0c;特別聚焦于兩個…

k8s多集群管理中的聯邦和艦隊如何理解?

在 Kubernetes 多集群管理中&#xff0c;聯邦&#xff08;Federation&#xff09;和艦隊&#xff08;Fleet&#xff09;是兩種不同的方法&#xff0c;用于管理和協調多個 Kubernetes 集群。下面是對這兩種方法的詳細解釋&#xff1a; 聯邦&#xff08;Federation&#xff09; K…

Docker部署MySQL鏡像

1.拉取鏡像 # 拉取指定版本的MySQL鏡像 docker pull mysql:8.02.創建掛載目錄 # 自己創建好如下三個文件夾 路徑任意 [rootiZuf6aigs7rxe6f6oifq7vZ mysql]# ll 總用量 12 drwxr-xr-x 2 root root 4096 7月 7 10:25 config drwxr-xr-x 2 root root 4096 6月 26 16:43 data d…