ABP VNext + CloudEvents:事件驅動微服務互操作性

ABP VNext + CloudEvents:事件驅動微服務互操作性 🚀


📚 目錄

  • ABP VNext + CloudEvents:事件驅動微服務互操作性 🚀
    • 一、引言 ?
      • ?? TL;DR
      • 📚 背景與動機
      • 🏗? 整體架構圖
    • 二、環境準備與依賴安裝 🛠?
      • 2.1 環境要求
      • 2.2 .NET 依賴安裝
      • 2.3 Go 與 Python 安裝
    • 三、CloudEvents 規范概覽 📚
    • 四、gRPC Protobuf 定義 📦
    • 五、在 ABP VNext 中發布 & 消費 CloudEvent 🚀
      • 5.1 Program.cs 完整配置
      • 5.2 發布 CloudEvent
      • 5.3 接收 CloudEvent
    • 六、與 Knative Eventing 集成 🐳
    • 七、與 Azure Event Grid 集成 ??
      • 7.1 獲取密鑰
      • 7.2 發布 CloudEvent
      • 7.3 訂閱端點
    • 八、多語言互操作示例 🌐
      • 8.1 Python Flask 消費
      • 8.2 Go 發布到 Event Grid
    • 九、示例場景 🔄
    • 十、性能、可用性與測試 📈


一、引言 ?

?? TL;DR

  • 🌐 使用 CloudEvents 1.0 統一事件元數據,消除 Knative、Azure Event Grid、Kafka 等平臺差異
  • ?? 在 ABP VNext 中通過 Typed HttpClient、gRPC 客戶端及 Polly 重試快速發布/消費事件
  • 🐍 支持 .NET、Go、Python 多語言互操作,包含完整認證、TLS/證書與錯誤處理
  • 🔄 演示在 Knative EventingAzure Event Grid 間雙向互操作,并接入 OpenTelemetry 全鏈路追蹤

📚 背景與動機

微服務生態中自定義事件格式難以互通;CloudEvents(CNCF 標準)定義了必需字段、JSON/Protobuf 格式與傳輸綁定,極大降低跨平臺、跨語言的集成成本。

🏗? 整體架構圖

Structured/gRPC
HTTP/gRPC
用戶界面
ABP VNext OrderService
Dapr Pub/Sub
Knative Broker
InventoryService
Azure Event Grid
AnalyticsService

二、環境準備與依賴安裝 🛠?

2.1 環境要求

  • Kubernetes v1.25+(含 Knative Eventing v1.10+
  • Azure 訂閱:具備 Event Grid 主題 與訪問密鑰
  • .NET 9 SDK
  • Go 1.20+
  • Python 3.9+

2.2 .NET 依賴安裝

dotnet add package CloudNative.CloudEvents               --version 2.8.0
dotnet add package CloudNative.CloudEvents.Http          --version 2.8.0
dotnet add package CloudNative.CloudEvents.Core          --version 2.8.0
dotnet add package CloudNative.CloudEvents.Protobuf      --version 2.8.0
dotnet add package CloudNative.CloudEvents.SystemTextJson--version 2.8.0
dotnet add package CloudNative.CloudEvents.AspNetCore    --version 2.8.0
dotnet add package Microsoft.Extensions.Http.Polly       --version 8.0.0
dotnet add package Azure.Messaging.EventGrid             --version 5.11.0
dotnet add package Dapr.Client                           --version 1.11.0   # 可選

2.3 Go 與 Python 安裝

go get github.com/cloudevents/sdk-go/v2
pip install cloudevents flask

三、CloudEvents 規范概覽 📚

  • 必需字段specversionidsourcetype

  • 常用字段timedatacontenttypedataschema、擴展屬性

  • 傳輸模式

    • Structured(完整 JSON)
    • Binary(HTTP Header + Body)
    • gRPC(Protobuf)
  • 原生兼容:Knative Broker、Azure Event Grid、Kafka、Dapr Pub/Sub


四、gRPC Protobuf 定義 📦

  1. 從 NuGet 包 CloudNative.CloudEvents.Protobufproto/ 目錄復制官方 cloudevents.proto 到項目 Protos/

  2. Protos/mycompany.events.proto 定義業務契約:

    // Protos/mycompany.events.proto
    syntax = "proto3";
    package mycompany.events;import "cloudevents.proto";service CloudEventService {rpc Send (SendRequest) returns (SendResponse);
    }message SendRequest {io.cloudevents.v1.CloudEvent event = 1;
    }
    message SendResponse {}
    
  3. .csproj 中添加:

    <ItemGroup><Protobuf Include="Protos\cloudevents.proto" GrpcServices="None" /><Protobuf Include="Protos\mycompany.events.proto" GrpcServices="Server;Client" />
    </ItemGroup>
    

五、在 ABP VNext 中發布 & 消費 CloudEvent 🚀

5.1 Program.cs 完整配置

var builder = WebApplication.CreateBuilder(args);// 1. 配置 Authentication & Authorization
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme).AddJwtBearer(options =>{options.Authority = builder.Configuration["Jwt:Authority"];options.Audience = builder.Configuration["Jwt:Audience"];options.TokenValidationParameters = new TokenValidationParameters{ValidateIssuer = true,ValidateAudience = true,ValidateLifetime = true,ValidateIssuerSigningKey = true,IssuerSigningKey = new SymmetricSecurityKey(Encoding.UTF8.GetBytes(builder.Configuration["Jwt:Key"]))};});
builder.Services.AddAuthorization();// 2. 注冊 Dapr Client(可選)
builder.Services.AddDaprClient();// 3. 控制器 & CloudEvents JSON 格式化
builder.Services.AddControllers().AddCloudEventsJsonFormatters();// 4. Typed HttpClient(Knative Broker)
builder.Services.AddHttpClient("CloudEventClient", client =>
{client.BaseAddress = new Uri("http://broker-ingress.knative-eventing.svc.cluster.local/default/");client.DefaultRequestHeaders.Add("Content-Type", "application/cloudevents+json");
})
.AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(3, _ => TimeSpan.FromMilliseconds(200)));// 5. 注冊 gRPC 客戶端(含自簽名證書示例)
builder.Services.AddGrpcClient<CloudEventService.CloudEventServiceClient>(o =>
{o.Address = new Uri("https://grpc-server:5001");
})
.ConfigurePrimaryHttpMessageHandler(() =>
{var handler = new HttpClientHandler();handler.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;return handler;
});// 6. 注冊 Event Grid 客戶端
builder.Services.AddSingleton(sp =>
{var config = sp.GetRequiredService<IConfiguration>();return new EventGridPublisherClient(new Uri(config["EventGrid:Endpoint"]),new AzureKeyCredential(config["EventGrid:Key"]));
});// 7. OpenTelemetry(Tracing + Metrics)
builder.Services.AddOpenTelemetryTracing(b => b.AddAspNetCoreInstrumentation().AddHttpClientInstrumentation().AddGrpcClientInstrumentation().AddJaegerExporter());
builder.Services.AddOpenTelemetryMetrics(m => m.AddPrometheusExporter());var app = builder.Build();
app.UseAuthentication();
app.UseAuthorization();// 暴露 Prometheus /metrics 端點
app.UseOpenTelemetryPrometheusScrapingEndpoint();app.MapControllers();
app.Run();

5.2 發布 CloudEvent

using CloudNative.CloudEvents;
using CloudNative.CloudEvents.Protobuf;
using CloudNative.CloudEvents.Http;
using CloudNative.CloudEvents.SystemTextJson;
using Azure.Messaging.EventGrid;public class OrderService
{private readonly HttpClient _http;private readonly CloudEventService.CloudEventServiceClient _grpc;private readonly EventGridPublisherClient _egClient;private readonly Dapr.Client.DaprClient _dapr;private readonly ILogger<OrderService> _logger;public OrderService(IHttpClientFactory httpFactory,CloudEventService.CloudEventServiceClient grpc,EventGridPublisherClient egClient,Dapr.Client.DaprClient dapr,ILogger<OrderService> logger){_http    = httpFactory.CreateClient("CloudEventClient");_grpc    = grpc;_egClient= egClient;_dapr    = dapr;_logger  = logger;}public async Task PublishAsync(Guid orderId, decimal amount){var ce = new CloudEvent("com.mycompany.order.created", new Uri("urn:abp:orderservice")){Id              = Guid.NewGuid().ToString(),Time            = DateTimeOffset.UtcNow,DataContentType = "application/json",Data            = new { OrderId = orderId, Amount = amount }};ce.DataSchema = new Uri("https://schemas.mycompany.com/order/1.0");ce.Extensions["version"] = "1.0";// 1. HTTP Structuredvar httpContent = new CloudEventContent(ce, ContentMode.Structured, new JsonEventFormatter());var resp = await _http.PostAsync("", httpContent);resp.EnsureSuccessStatusCode();// 2. gRPC Binarytry{var protoEvent = ce.ToProto();await _grpc.SendAsync(new SendRequest { Event = protoEvent });}catch (RpcException ex){_logger.LogError(ex, "gRPC send failed for {EventId}", ce.Id);throw;}// 3. Azure Event Gridawait _egClient.SendCloudEventAsync(ce);// 4. Dapr Pub/Sub(可選)await _dapr.PublishEventAsync("pubsub", "order.created", ce);}
}

5.3 接收 CloudEvent

[ApiController]
[Route("api/events")]
public class EventsController : ControllerBase
{private readonly IOrderAppService _orders;private readonly ILogger<EventsController> _logger;public EventsController(IOrderAppService orders, ILogger<EventsController> logger){_orders = orders;_logger = logger;}[HttpPost][Authorize]public async Task<IActionResult> Receive([FromBody] CloudEvent ce){try{var order = ce.Data.ToObject<OrderCreatedDto>();await _orders.ProcessOrderAsync(order);return Ok();}catch (Exception ex){_logger.LogError(ex, "Processing failed for CloudEvent {EventId}", ce.Id);return StatusCode(500);}}
}

六、與 Knative Eventing 集成 🐳

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:name: default---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:name: order-trigger
spec:broker: defaultfilter:attributes:type: com.mycompany.order.createdsubscriber:uri: http://my-abp-app.default.svc.cluster.local/api/eventsdelivery:retry: 5backoffPolicy: exponentialdeadLetterSink:uri: http://deadletter.default.svc.cluster.local
POST /default
OrderService
Knative Broker
InventoryService
AnalyticsService

七、與 Azure Event Grid 集成 ??

7.1 獲取密鑰

topicKey=$(az eventgrid topic key list \--name myTopic \--resource-group myRg \--query key1 -o tsv)

EventGrid:EndpointEventGrid:Key 寫入 appsettings.json 或環境變量。

7.2 發布 CloudEvent

// egClient 通過 DI 注入
await _egClient.SendCloudEventAsync(ce);

7.3 訂閱端點

[HttpPost("api/eventgrid")]
public IActionResult OnEvent([FromBody] CloudEvent ce)
{_logger.LogInformation("EG Received {EventId}", ce.Id);return Ok();
}

八、多語言互操作示例 🌐

8.1 Python Flask 消費

pip install cloudevents flask
from flask import Flask, request, abort
from cloudevents.http import from_httpapp = Flask(__name__)@app.route("/python-events", methods=["POST"])
def receive():try:ce = from_http(request.headers, request.get_data())print("📥 Received:", ce["id"], ce.data)return "", 200except Exception:abort(400)if __name__ == "__main__":app.run(port=3000)

8.2 Go 發布到 Event Grid

import ("context""log""os"cloudevents "github.com/cloudevents/sdk-go/v2"
)func main() {target := os.Getenv("EVENT_GRID_ENDPOINT")key    := os.Getenv("EVENT_GRID_KEY")c, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(target),cloudevents.WithHeader("aeg-sas-key", key),)if err != nil {log.Fatalf("? client error: %v", err)}e := cloudevents.NewEvent()e.SetSource("urn:go:inventory")e.SetType("com.mycompany.inventory.updated")e.SetData(cloudevents.ApplicationJSON, map[string]int{"productId": 123, "qty": 10})if res := c.Send(context.Background(), e); cloudevents.IsUndelivered(res) {log.Fatalf("? send failed: %v", res)}log.Println("? Event sent")
}

九、示例場景 🔄

用戶界面OrderServiceKnative BrokerInventoryServiceEvent GridAnalyticsServiceDatabaseSubmit OrderPublish order.createdTrigger Inventory ReductionSend to Event GridDistribute eventWrite Reports用戶界面OrderServiceKnative BrokerInventoryServiceEvent GridAnalyticsServiceDatabase

十、性能、可用性與測試 📈

  • HTTP vs gRPC

    • HTTP Structured 易調試;gRPC Binary 延遲更低、吞吐更高
  • 重試 & 死信

    • Knative:retry + deadLetterSink
    • Event Grid:指數退避重試 + 死信存儲
  • Schema 管理

    • 使用 DataSchema 與擴展屬性版本化事件
    • 可結合 Schema Registry(如 Azure Schema Registry)
  • 安全

    • 全鏈路 HTTPS + JWT/SAS 驗證 + 消息簽名
  • 測試示例

    • xUnit 集成測試WebApplicationFactory<Program> 驗證 /api/events
    • k6 性能腳本(HTTP vs gRPC 對比)
// k6 script snippet
import http from 'k6/http';
import grpc from 'k6/net/grpc';const client = new grpc.Client();
client.load(['protos'], 'mycompany.events.proto');
client.connect('grpc-server:5001', { plaintext: true });export default function() {http.post('http://broker-ingress.knative-eventing.svc.cluster.local/default',JSON.stringify({ /* CloudEvent JSON */ }),{ headers: { 'Content-Type': 'application/cloudevents+json' } });client.invoke('mycompany.events.CloudEventService/Send',{ event: {/* proto CloudEvent */} });
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/91852.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/91852.shtml
英文地址,請注明出處:http://en.pswp.cn/web/91852.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

軟件測試測評公司關于HTTP安全頭配置與測試?

瀏覽器和服務器之間那幾行看不見的HTTP安全頭配置&#xff0c;往往是抵御網絡攻擊的關鍵防線。作為軟件測試測評公司&#xff0c;我們發現超過六成的高危漏洞源于安全頭缺失或誤配。別小看這些響應頭&#xff0c;它們能直接掐斷跨站腳本、點擊劫持、數據嗅探的攻擊路徑。五條命…

Mysql集成技術

目錄 mysql的編譯安裝與部署 1.編譯安裝mysql 2.部署mysql mysql主從復制 什么是mysql主從復制&#xff1f; 1.配置master 2.配置slave 3.存在數據時添加slave2 4.GTID模式 什么是GTID模式&#xff1f; 配置GTID 5.延遲復制 6.慢查詢日志 核心作用 開啟慢查詢日志…

《MySQL進階核心技術剖析(一): 存儲引擎》

目錄 一、存儲引擎 1.1 MySQL體系結構 1.2 存儲引擎介紹 1). 建表時指定存儲引擎 2). 查詢當前數據庫支持的存儲引擎 1.3 存儲引擎特點 1.3.1 InnoDB 1.3.2 MyISAM 1.3.3 Memory 1.3.4 區別及特點 1.4 存儲引擎選擇 一、存儲引擎 1.1 MySQL體系結構 1). 連接層 最上…

sqli-labs:Less-26關卡詳細解析

1. 思路&#x1f680; 本關的SQL語句為&#xff1a; $sql"SELECT * FROM users WHERE id$id LIMIT 0,1";注入類型&#xff1a;字符串型&#xff08;單引號包裹&#xff09;、GET操作提示&#xff1a;參數需以閉合關鍵參數&#xff1a;id php輸出語句的部分代碼&am…

Spring Boot 的事務注解 @Transactional 失效的幾種情況

開發中我們經常會用到 Spring Boot 的事務注解&#xff0c;為含有多種操作的方法添加事務&#xff0c;做到如果某一個環節出錯&#xff0c;全部回滾的效果。但是在開發中可能會因為不了解事務機制&#xff0c;而導致我們的方法使用了 Transactional 注解但是沒有生效的情況&…

#C語言——刷題攻略:牛客編程入門訓練(四):運算

&#x1f31f;菜鳥主頁&#xff1a;晨非辰的主頁 &#x1f440;學習專欄&#xff1a;《C語言刷題合集》 &#x1f4aa;學習階段&#xff1a;C語言方向初學者 ?名言欣賞&#xff1a;"代碼行數決定你的下限&#xff0c;算法思維決定你的上限。" 目錄 1. BC25 牛牛買電…

阻抗分析中的軟件解調計算

接上篇 重溫無功功率測量-CSDN博客 已知被測阻抗兩端電壓與流過 通過兩個ADC同步采集到。 激勵頻率10k, 采樣率1M, 每周期100個點 關鍵是:采樣率除以激勵頻率, 得是4的倍數... 所以ADC不能自由運行, 得用一個timer來觸發. 因為要進行同相分量正交分量計算。 1&#xff1a;直…

ubuntu 鏡像克隆

一、克隆 1、準備 一個u盤&#xff08;制作啟動盤&#xff09; 一個移動固態硬盤&#xff08;大于要克隆系統盤的1.2倍&#xff09; 2、使用 rufus生成系統啟動盤 &#xff08;1&#xff09;下載ubuntu iso 桌面版 https://cn.ubuntu.com/download &#xff08;2&#x…

Axure下拉菜單:從基礎交互到高保真元件庫應用

在Web端產品設計中&#xff0c;下拉菜單&#xff08;Dropdown Menu&#xff09; 是用戶與系統交互的核心組件之一&#xff0c;它通過隱藏次要選項、節省頁面空間的方式&#xff0c;提升信息密度與操作效率。無論是基礎下拉菜單、圖標式下拉菜單&#xff0c;還是復雜的多級下拉菜…

復現YOLOV5+訓練指定數據集

一、復現YOLOV5代碼 1.github下載&#xff1a;https://github.com/MIPIT-Team/SSA-YOLO 2.配置環境&#xff1a;創建虛擬環境yolo5 conda create -n yolo5 python3.9 #對應文件夾下pip install -r requirements.txt報錯&#xff1a;ERROR: pips dependency resolver does no…

Agents-SDK智能體開發[4]之集成MCP入門

文章目錄說明一 Agents SDK接入MCP1.1 MCP技術回顧1.2 MCP基礎實踐流程1.2.1 天氣查詢服務器Server創建流程1.2.2 服務器依賴安裝和代碼編寫1.2.3 環境配置文件1.2.4 客戶端代碼編寫1.3 測試運行二 MCPAgents SDK基礎調用2.1 weather_server.py2.2 client_agent.py2.3 運行測試…

Camera相機人臉識別系列專題分析之十九:MTK ISP6S平臺FDNode傳遞三方FFD到APP流程解析

【關注我,后續持續新增專題博文,謝謝!!!】 上一篇我們講了: 這一篇我們開始講: Camera相機人臉識別系列專題分析之十九:MTK平臺FDNode傳遞三方FFD到APP流程解析 目錄 一、背景 二、:OcamMeta傳遞FFD到APP 2.1:OcamMeta 2.2 :OcamMeta::process更新FFD 2.…

【實時Linux實戰系列】構建實時監測與報警系統

在實時系統中&#xff0c;監測與報警系統是確保系統正常運行和及時響應異常情況的關鍵組件。實時監測與報警系統能夠實時收集系統數據&#xff0c;分析關鍵事件&#xff0c;并在檢測到異常時發出警報。這種系統廣泛應用于工業自動化、醫療設備監控、網絡安全等領域。掌握實時監…

PHP入門及數據類型

PHP數據類型 PHP標記 //HTML風格 <?phpecho "hello world"; ?> //簡短風格 <?echo "hello world"; ?>數據類型 PHP 最初源于 Perl 語言&#xff0c;與 Perl 類似&#xff0c;PHP 對數據類型采取較為寬松的態度。PHP 規定&#xff0c;變量數…

沸點 | 嬴圖參加世界人工智能大會

2025 WAIC于 7 月 26 日至 28 日在上海舉行。大會展覽面積突破 7 萬平方米&#xff0c;800 余家企業參展。嬴圖作為圖數據庫領域的領先企業&#xff0c;攜前沿技術與創新應用精彩亮相。?大會期間&#xff0c;嬴圖創始人兼CEO孫宇熙與來自全球的頂尖學者、企業代表共同探討人工…

2. 字符設備驅動

一、設備號 1.1. 什么是設備號 設備號是用來標記一類設備以及區分這類設備中具體個體的一組號碼。 設備號由主設備號和次設備號組成。主設備號的作用為標記一類設備、用于標識設備驅動程序,而次設備號的作用是為了區分這類設備中的具體個體設備及用于標識同一驅動程序下的具…

uboot armv8 啟動流程之 linker script

section 詳細說明.text按如下順序&#xff0c;中斷向量表vectors, 啟動入口代碼start.o,普通text, glue &#xff08;arm thumb2 相互調用時自動生成的代碼&#xff09;*(.vectors)CPUDIR/start.o (.text*)*(.text*)*(.glue*)__image_copy_start 標記為text 段入口&#xff0c;…

xxljob總結

XXL-Job 支持多種任務類型&#xff0c;以下是常見任務類型的示例 Demo&#xff0c;包含核心配置和代碼片段&#xff0c;幫助快速理解用法&#xff1a;一、Bean模式任務&#xff08;最常用&#xff09;通過注解 XxlJob 定義任務方法&#xff0c;直接在 Spring 容器中管理&…

Python包安全工程實踐:構建安全可靠的Python生態系統

在現代計算環境中&#xff0c;性能往往是Python包成功的關鍵因素。本文將深入探討Python包的性能優化技術&#xff0c;包括并發編程模型、性能分析工具、內存優化策略以及原生代碼集成等高級主題&#xff0c;幫助你構建高性能的Python組件。1. 性能分析基礎1.1 性能分析工具矩陣…

kubernetes基礎知識

個人博客站—運維鹿: http://www.kervin24.top CSDN博客—做個超努力的小奚&#xff1a; https://blog.csdn.net/qq_52914969?typeblog一、kubernetes介紹Kubernetes本質是一組服務器集群&#xff0c;它可以在集群的每個節點上運行特定的程序&#xff0c;來對節點中的容器進行…