.Net Core微服務入門全紀錄(六)——EventBus-事件總線

系列文章目錄

1、.Net Core微服務入門系列(一)——項目搭建
2、.Net Core微服務入門全紀錄(二)——Consul-服務注冊與發現(上)
3、.Net Core微服務入門全紀錄(三)——Consul-服務注冊與發現(下)
4、.Net Core微服務入門全紀錄(四)——Ocelot-API網關(上)
5、.Net Core微服務入門全紀錄(五)——Ocelot-API網關(下)
6、.Net Core微服務入門全紀錄(六)——EventBus-事件總線
7、.Net Core微服務入門全紀錄(八)——Docker Compose與容器網絡


在這里插入圖片描述

文章目錄

  • 系列文章目錄
  • 前言📃
  • 一、EventBus-事件總線
    • 1.1 什么是事件總線?
    • 1.2 為什么要用EventBus
  • 二、CAP使用
    • 2.1 環境準備
    • 2.2 代碼修改
  • 三、運行測試
  • 四、總結


前言📃

關于 微服務 的概念解釋網上有很多, 個人理解微服務是一種系統架構模式,它和語言無關,和框架無關,和工具無關,和服務器環境無關。

微服務思想 是將傳統的單體系統按照業務拆分成多個職責單一、且可獨立運行的接口服務。至于服務如何拆分,沒有明確的定義。幾乎任何后端語言都能做微服務開發。微服務也并不是完美無缺的,微服務架構會帶來更多的問題,增加系統的復雜度,引入更多的技術棧。

上一篇【.Net Core微服務入門全紀錄(五)——Ocelot-API網關(下)】中已經完成了 Ocelot + Consul 的搭建,這一篇簡單說一下 EventBus

一、EventBus-事件總線

1.1 什么是事件總線?

🌈事件總線 是對觀察者(發布-訂閱)模式的一種實現。它是一種集中式事件處理機制,允許不同的組件之間進行彼此通信而又不需要相互依賴,達到一種 解耦 的目的。

如果沒有接觸過 EventBus ,可能不太好理解。其實 EventBus 在客戶端開發中應用非常廣泛android,ios,web 前端等,用于多個組件(或者界面)之間的相互通信。

1.2 為什么要用EventBus

就拿當前的項目舉例,我們有一個訂單服務,一個產品服務。客戶端有一個下單功能,當用戶下單時,調用訂單服務的下單接口,那么下單接口需要調用產品服務的減庫存接口,這涉及到服務與服務之間的調用。那么服務之間又怎么調用呢?直接 RESTAPI?或者效率更高的gRPC?可能這兩者各有各的使用場景,但是他們都存在一個服務之間的耦合問題,或者難以做到異步調用。

試想一下:假設我們下單時調用訂單服務,訂單服務需要調用產品服務,產品服務又要調用物流服務,物流服務再去調用xx服務 等等。。。如果每個服務處理時間需要2s,不使用異步的話,那這種體驗可想而知。

如果使用 EventBus 的話,那么訂單服務只需要向 EventBus 發一個“下單事件”就可以了。產品服務會訂閱“下單事件”,當產品服務收到下單事件時,自己去減庫存就好了。這樣就避免了兩個服務之間直接調用的耦合性,并且真正做到了異步調用。

既然涉及到多個服務之間的異步調用,那么就不得不提分布式事務。分布式事務并不是微服務獨有的問題,而是所有的分布式系統都會存在的問題。

關于分布式事務,可以查一下 “CAP原則”“BASE理論” 了解更多。當今的分布式系統更多的會追求事務的最終一致性。

下面使用國人開發的優秀項目 “CAP”,來演示一下 EventBus 的基本使用。之所以使用 “CAP”是因為它既能解決分布式系統的最終一致性,同時又是一個 EventBus,它具備 EventBus 的所有功能!
作者介紹:https://www.cnblogs.com/savorboard/p/cap.html

二、CAP使用

2.1 環境準備

Docker 中準備一下需要的環境,首先是數據庫,數據庫我使用 PostgreSQL,用別的也行。CAP 支持:SqlServer,MySql,PostgreSql,MongoDB。

然后是MQ,這里我使用 RabbitMQKafka 也可以。

Docker運行RabbitMQ:

docker pull rabbitmq:management
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management

🔑默認用戶:guest,密碼:guest

環境準備就完成了,Docker 就是這么方便。

2.2 代碼修改

為了模擬以上業務,需要修改大量代碼,下面代碼如有遺漏的直接去github找。

NuGet安裝:

Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Npgsql.EntityFrameworkCore.PostgreSQL

在這里插入圖片描述

CAP相關:

DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.PostgreSql

在這里插入圖片描述

Order.API/Controllers/OrdersController.cs 增加下單接口:

[Route("[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{private readonly ILogger<OrdersController> _logger;private readonly IConfiguration _configuration;private readonly ICapPublisher _capBus;private readonly OrderContext _context;public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context){_logger = logger;_configuration = configuration;_capBus = capPublisher;_context = context;}[HttpGet]public IActionResult Get(){string result = $"【訂單服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";return Ok(result);}/// <summary>/// 下單 發布下單事件/// </summary>/// <param name="order"></param>/// <returns></returns>[Route("Create")][HttpPost]public async Task<IActionResult> CreateOrder(Models.Order order){using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true)){//業務代碼order.CreateTime = DateTime.Now;_context.Orders.Add(order);var r = await _context.SaveChangesAsync() > 0;if (r){//發布下單事件await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID });return Ok();}return BadRequest();}}}

Order.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下單事件消息
/// </summary>
public class CreateOrderMessageDto
{/// <summary>/// 產品ID/// </summary>public int ProductID { get; set; }/// <summary>/// 購買數量/// </summary>public int Count { get; set; }
}

Order.API/Models/Order.cs訂單實體類:

public class Order
{[Key][DatabaseGenerated(DatabaseGeneratedOption.Identity)]public int ID { get; set; }/// <summary>/// 下單時間/// </summary>[Required]public DateTime CreateTime { get; set; }/// <summary>/// 產品ID/// </summary>[Required]public int ProductID { get; set; }/// <summary>/// 購買數量/// </summary>[Required]public int Count { get; set; }
}

Order.API/Models/OrderContext.cs數據庫Context:

public class OrderContext : DbContext
{public OrderContext(DbContextOptions<OrderContext> options): base(options){}public DbSet<Order> Orders { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){}
}

Order.API/appsettings.json增加數據庫連接字符串:

"ConnectionStrings": {"OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;"
}

Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:

public void ConfigureServices(IServiceCollection services)
{services.AddControllers();services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext")));//CAPservices.AddCap(x =>{x.UseEntityFramework<OrderContext>();x.UseRabbitMQ("host.docker.internal");});
}

在這里插入圖片描述
以上是訂單服務的修改。

Product.API/Controllers/ProductsController.cs增加減庫存接口:

[Route("[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{private readonly ILogger<ProductsController> _logger;private readonly IConfiguration _configuration;private readonly ICapPublisher _capBus;private readonly ProductContext _context;public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context){_logger = logger;_configuration = configuration;_capBus = capPublisher;_context = context;}[HttpGet]public IActionResult Get(){string result = $"【產品服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";return Ok(result);}/// <summary>/// 減庫存 訂閱下單事件/// </summary>/// <param name="message"></param>/// <returns></returns>[NonAction][CapSubscribe("order.services.createorder")]public async Task ReduceStock(CreateOrderMessageDto message){//業務代碼var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);product.Stock -= message.Count;await _context.SaveChangesAsync();}}

Product.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下單事件消息
/// </summary>
public class CreateOrderMessageDto
{/// <summary>/// 產品ID/// </summary>public int ProductID { get; set; }/// <summary>/// 購買數量/// </summary>public int Count { get; set; }
}

Product.API/Models/Product.cs產品實體類:

public class Product
{[Key][DatabaseGenerated(DatabaseGeneratedOption.Identity)]public int ID { get; set; }/// <summary>/// 產品名稱/// </summary>[Required][Column(TypeName = "VARCHAR(16)")]public string Name { get; set; }/// <summary>/// 庫存/// </summary>[Required]public int Stock { get; set; }
}

Product.API/Models/ProductContext.cs數據庫Context:

public class ProductContext : DbContext
{public ProductContext(DbContextOptions<ProductContext> options): base(options){}public DbSet<Product> Products { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){base.OnModelCreating(modelBuilder);//初始化種子數據modelBuilder.Entity<Product>().HasData(new Product{ID = 1,Name = "產品1",Stock = 100},new Product{ID = 2,Name = "產品2",Stock = 100});}
}

Product.API/appsettings.json增加數據庫連接字符串:

"ConnectionStrings": {"ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;"
}

Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:\

public void ConfigureServices(IServiceCollection services)
{services.AddControllers();services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext")));//CAPservices.AddCap(x =>{x.UseEntityFramework<ProductContext>();x.UseRabbitMQ("host.docker.internal");});
}

在這里插入圖片描述
以上是產品服務的修改。

訂單服務和產品服務的修改到此就完成了,看著修改很多,其實功能很簡單。就是各自增加了自己的數據庫表,然后訂單服務增加了下單接口,下單接口會發出 “下單事件”。產品服務增加了減庫存接口,減庫存接口會訂閱 “下單事件”。然后客戶端調用下單接口下單時,產品服務會減去相應的庫存,功能就這么簡單。

關于 EF數據庫遷移 之類的基本使用就不介紹了。使用 Docker 重新構建鏡像,運行訂單服務,產品服務:

docker build -t orderapi:1.1 -f ./Order.API/Dockerfile .
docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060"
docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061"
docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062"docker build -t productapi:1.1 -f ./Product.API/Dockerfile .
docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050"
docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051"
docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"

最后 Ocelot.APIGateway/ocelot.json 增加一條路由配置:

在這里插入圖片描述

好了,進行到這里,整個環境就有點復雜了。確保我們的PostgreSQL,RabbitMQ,Consul,Gateway,服務實例都正常運行。

服務實例運行成功后,數據庫應該是這樣的:

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
📃產品表種子數據:

在這里插入圖片描述
cap.published 表和 cap.received 表是由 CAP自動生成的,它內部是使用本地消息表+MQ來實現異步確保。

三、運行測試

這次使用 Postman 作為客戶端調用下單接口( 9070 是之前的 Ocelot 網關端口):

在這里插入圖片描述
訂單庫 published 表:

在這里插入圖片描述

訂單庫 order 表:

在這里插入圖片描述

產品庫 received 表:

在這里插入圖片描述

產品庫 product 表:

在這里插入圖片描述

再試一下:

在這里插入圖片描述
在這里插入圖片描述
OK,完成。雖然功能很簡單,但是我們實現了服務的解耦,異步調用,和最終一致性。

四、總結

注意,上面的例子純粹是為了說明 EventBus 的使用,實際中的下單流程絕對不會這么做的!希望大家不要較真。

可能有人會說如果下單成功,但是庫存不足導致減庫存失敗了怎么辦,是不是要回滾訂單表的數據?如果產生這種想法,說明還沒有真正理解最終一致性的思想。

首先下單前肯定會檢查一下庫存數量,既然允許下單那么必然是庫存充足的。這里的事務是指:訂單保存到數據庫,和下單事件保存到 cap.published 表(保存到 cap.published 表理論上就能夠發送到MQ)這兩件事情,要么一同成功,要么一同失敗。如果這個事務成功,那么就可以認為這個業務流程是成功的,至于產品服務的減庫存是否成功那就是產品服務的事情了(理論上也應該是成功的,因為消息已經確保發到了MQ,產品服務必然會收到消息),CAP也提供了失敗重試,和失敗回調機制。

如果非要數據回滾也是能實現的,CAPICapPublisher.Publish 方法提供一個 callbackName參數,當減庫存時,可以觸發這個回調。其本質也是通過發布訂閱完成,這是不推薦的做法,就不詳細說了,有興趣自己研究一下。
另外,CAP 無法保證消息不重復,實際使用中需要自己考慮一下消息的重復過濾和冪等性。


在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66567.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66567.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66567.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C#防止重復提交

C#防止重復提交 文章目錄 C#防止重復提交前言防止重復提交的思路Web API 防止重復提交代碼實現代碼講解使用方法 MVC防止重復提交總結 前言 當用戶在前端進行提交數據時&#xff0c;如果網絡出現卡頓和前端沒有給出響應的話顧客通常都會狂點提交按鈕&#xff0c;這樣就很容易導…

python學opencv|讀取圖像(三十九 )閾值處理Otsu方法

【1】引言 前序學習了5種閾值處理方法&#xff0c;包括(反)閾值處理、(反)零值處理和截斷處理&#xff0c;還學習了一種自適應處理方法&#xff0c;相關文章鏈接為&#xff1a; python學opencv|讀取圖像&#xff08;三十三&#xff09;閾值處理-灰度圖像-CSDN博客 python學o…

嵌入式硬件篇---PID控制

文章目錄 前言第一部分&#xff1a;連續PID1.比例&#xff08;Proportional&#xff0c;P&#xff09;控制2.積分&#xff08;Integral&#xff0c;I&#xff09;控制3.微分&#xff08;Derivative&#xff0c;D&#xff09;控制4.PID的工作原理5..實質6.分析7.各種PID控制器P控…

日志收集Day001

1.ElasticSearch 作用&#xff1a;日志存儲和檢索 2.單點部署Elasticsearch與基礎配置 rpm -ivh elasticsearch-7.17.5-x86_64.rpm 查看配置文件yy /etc/elasticsearch/elasticsearch.yml&#xff08;這里yy做了別名&#xff0c;過濾掉空行和注釋行&#xff09; yy /etc/el…

集合帖:前綴和及差分模板題 ← 一維及二維

【一維前綴和及一維差分知識點】 ● 一維“前綴和數組”預處理過程&#xff1a;cin>>a[i], sum[i]sum[i-1]a[i] 或者 cin>>sum[i], sum[i]sum[i-1] &#xff08;1≤i≤n&#xff09; ● 一維“區間和”計算過程&#xff1a;sum[y]-sum[x-1] &#xff08;y…

《offer 來了:Java 面試核心知識點精講 -- 框架篇》(附資源)

繼上篇文章介紹了《offer 來了&#xff1a;Java 面試核心知識點精講 -- 原理篇》書后&#xff0c;本文章再給大家推薦兄弟篇 《offer來了&#xff1a;Java面試核心知識點精講--框架篇》&#xff0c; 簡直就是為Java開發者量身定制的面試神器。 本書是對Java程序員面試中常見的…

Low-Level 大一統:如何使用Diffusion Models完成視頻超分、去雨、去霧、降噪等所有Low-Level 任務?

Diffusion Models專欄文章匯總:入門與實戰 前言:視頻在傳輸過程中常常因為各種因素(如惡劣天氣、噪聲、壓縮和傳感器分辨率限制)而出現質量下降,這會嚴重影響計算機視覺任務(如目標檢測和視頻監控)的性能。現有的視頻修復方法雖然取得了一些進展,但通常只能針對特定的退…

Video-RAG:一種將視頻RAG新框架

1. 摘要及主要貢獻點 摘要&#xff1a; 檢索增強生成&#xff08;RAG&#xff09;是一種強大的策略&#xff0c;通過檢索與查詢相關的外部知識并將其整合到生成過程中&#xff0c;以解決基礎模型生成事實性錯誤輸出的問題。然而&#xff0c;現有的RAG方法主要集中于文本信息&…

Docker Load后存儲的鏡像及更改鏡像存儲目錄的方法

Docker Load后存儲的鏡像及更改鏡像存儲目錄的方法 Docker Load后存儲的鏡像更改鏡像存儲目錄的方法腳本說明注意事項Docker作為一種開源的應用容器引擎,已經廣泛應用于軟件開發、測試和生產環境中。通過Docker,開發者可以將應用打包成鏡像,輕松地進行分發和運行。而在某些場…

【零基礎入門unity游戲開發——unity通用篇37】鼠標指針(光標)修改隱藏鎖定(基于unity6開發介紹)

考慮到每個人基礎可能不一樣,且并不是所有人都有同時做2D、3D開發的需求,所以我把 【零基礎入門unity游戲開發】 分為成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要講解C#的基礎語法,包括變量、數據類型、運算符、流程控制、面向對象等,適合沒有編程基礎的…

親測解決Latex File ended while scanning use of \@@BOOKMARK

這個問題只要刪除了.tex后綴文件外的其他同名文件&#xff0c;再次編譯即可。 環境 Win11 MikTex 問題原因 編譯的時候用了好幾種編譯器&#xff0c;然后編譯出現了錯誤。生成了不能使用的引用。 解決方法 刪除.tex后綴文件外的其他同名文件后&#xff0c;再次編譯。 筆…

Amazon MSK 開啟 Public 訪問 SASL 配置的方法

1. 開啟 MSK Public 1.1 配置 MSK 參數 進入 MSK 控制臺頁面&#xff0c;點擊左側菜單 Cluster configuration。選擇已有配置&#xff0c;或者創建新配置。在配置中添加參數 allow.everyone.if.no.acl.foundfalse修改集群配置&#xff0c;選擇到新添加的配置。 1.2 開啟 Pu…

Windows FileZila Server共享電腦文件夾 映射21端口外網連接

我有這樣一個使用場景&#xff0c;在外部網絡環境下&#xff0c;通過手機便捷地讀取存儲在電腦上的視頻文件。比如在外出旅行、出差&#xff0c;身邊沒有攜帶電腦&#xff0c;僅依靠手機設備&#xff0c;就能隨時獲取電腦里存儲的各類視頻&#xff0c;無論是學習資料視頻、工作…

MySQL 實戰 4 種將數據同步到ES方案

文章目錄 1. 前言2. 數據同步方案 2.1 同步雙寫2.2 異步雙寫2.3 定時更新2.4 基于 Binlog 實時同步 3. 數據遷移工具選型 3.1 Canal3.2 阿里云 DTS3.3 Databus3.4 Databus和Canal對比3.4 其它 4. 后記 上周聽到公司新同事分享 MySQL 同步數據到 ES 的方案&#xff0c;發現很有…

虛幻基礎-1:cpu挑選(14600kf)

能幫到你的話&#xff0c;就給個贊吧 &#x1f618; 文章目錄 ue非常吃cpu拉滿主頻打開項目編寫藍圖運行原因 時間長 關于壓力測試 本文以14600kf為例&#xff0c;雙12購入&#xff0c;7月份產。 ue非常吃cpu 經本人測試&#xff0c;ue是非常吃cpu的。 拉滿主頻 無論任何時間…

QTableWidget的簡單使用

1.最簡單的表格示例&#xff1a; ui->tableWidget->setRowCount(2);// 設置行數ui->tableWidget->setColumnCount(3);// 設置列數&#xff0c;一定要放在設置行表頭之前QStringList rowHeaderList;// 行表頭rowHeaderList << QStringLiteral("姓名"…

深入探究分布式日志系統 Graylog:架構、部署與優化

文章目錄 一、Graylog簡介二、Graylog原理架構三、日志系統對比四、Graylog部署傳統部署MongoDB部署OS或者ES部署Garylog部署容器化部署 五、配置詳情六、優化網絡和 REST APIMongoDB 七、升級八、監控九、常見問題及處理 一、Graylog簡介 Graylog是一個簡單易用、功能較全面的…

2024年我的技術成長之路

2024年我的技術成長之路 大家好&#xff0c;我是小寒。又到年底了&#xff0c;一年過得真快啊&#xff01;趁著這次活動的機會&#xff0c;和大家聊聊我這一年在技術上的收獲和踩過的坑。 說實話&#xff0c;今年工作特別忙&#xff0c;寫博客的時間比去年少了不少。不過還是…

嵌入式硬件篇---基本組合邏輯電路

文章目錄 前言基本邏輯門電路1.與門&#xff08;AND Gate&#xff09;2.或門&#xff08;OR Gate&#xff09;3.非門&#xff08;NOT Gate&#xff09;4.與非門&#xff08;NAND Gate&#xff09;5.或非門&#xff08;NOR Gate&#xff09;6.異或門&#xff08;XOR Gate&#x…

數據庫管理-第285期 Oracle 23ai:深入淺出向量索引(20250117)

數據庫管理285期 20245-01-17 數據庫管理-第285期 Oracle 23ai&#xff1a;深入淺出向量索引&#xff08;20250117&#xff09;1 HNSW事務支持解讀 2 IVF分區支持解讀 3 混合向量索引何時選擇混合向量索引為何選擇混合向量索引 總結 數據庫管理-第285期 Oracle 23ai&#xff1a…