系列文章目錄
1、.Net Core微服務入門系列(一)——項目搭建
2、.Net Core微服務入門全紀錄(二)——Consul-服務注冊與發現(上)
3、.Net Core微服務入門全紀錄(三)——Consul-服務注冊與發現(下)
4、.Net Core微服務入門全紀錄(四)——Ocelot-API網關(上)
5、.Net Core微服務入門全紀錄(五)——Ocelot-API網關(下)
6、.Net Core微服務入門全紀錄(六)——EventBus-事件總線
7、.Net Core微服務入門全紀錄(八)——Docker Compose與容器網絡
文章目錄
- 系列文章目錄
- 前言📃
- 一、EventBus-事件總線
- 1.1 什么是事件總線?
- 1.2 為什么要用EventBus
- 二、CAP使用
- 2.1 環境準備
- 2.2 代碼修改
- 三、運行測試
- 四、總結
前言📃
關于 微服務 的概念解釋網上有很多, 個人理解微服務是一種系統架構模式,它和語言無關,和框架無關,和工具無關,和服務器環境無關。
微服務思想 是將傳統的單體系統按照業務拆分成多個職責單一、且可獨立運行的接口服務。至于服務如何拆分,沒有明確的定義。幾乎任何后端語言都能做微服務開發。微服務也并不是完美無缺的,微服務架構會帶來更多的問題,增加系統的復雜度,引入更多的技術棧。
上一篇【.Net Core微服務入門全紀錄(五)——Ocelot-API網關(下)】中已經完成了 Ocelot + Consul
的搭建,這一篇簡單說一下 EventBus。
一、EventBus-事件總線
1.1 什么是事件總線?
🌈事件總線 是對觀察者(發布-訂閱)模式的一種實現。它是一種集中式事件處理機制,允許不同的組件之間進行彼此通信而又不需要相互依賴,達到一種 解耦 的目的。
如果沒有接觸過 EventBus
,可能不太好理解。其實 EventBus
在客戶端開發中應用非常廣泛android,ios,web
前端等,用于多個組件(或者界面)之間的相互通信。
1.2 為什么要用EventBus
就拿當前的項目舉例,我們有一個訂單服務,一個產品服務。客戶端有一個下單功能,當用戶下單時,調用訂單服務的下單接口,那么下單接口需要調用產品服務的減庫存接口,這涉及到服務與服務之間的調用。那么服務之間又怎么調用呢?直接 RESTAPI
?或者效率更高的gRPC
?可能這兩者各有各的使用場景,但是他們都存在一個服務之間的耦合問題,或者難以做到異步調用。
試想一下:假設我們下單時調用訂單服務,訂單服務需要調用產品服務,產品服務又要調用物流服務,物流服務再去調用xx服務 等等。。。如果每個服務處理時間需要2s,不使用異步的話,那這種體驗可想而知。
如果使用 EventBus
的話,那么訂單服務只需要向 EventBus
發一個“下單事件”就可以了。產品服務會訂閱“下單事件”,當產品服務收到下單事件時,自己去減庫存就好了。這樣就避免了兩個服務之間直接調用的耦合性,并且真正做到了異步調用。
既然涉及到多個服務之間的異步調用,那么就不得不提分布式事務。分布式事務并不是微服務獨有的問題,而是所有的分布式系統都會存在的問題。
關于分布式事務,可以查一下 “CAP原則”
和 “BASE理論”
了解更多。當今的分布式系統更多的會追求事務的最終一致性。
下面使用國人開發的優秀項目 “CAP”
,來演示一下 EventBus
的基本使用。之所以使用 “CAP”
是因為它既能解決分布式系統的最終一致性,同時又是一個 EventBus
,它具備 EventBus
的所有功能!
作者介紹:https://www.cnblogs.com/savorboard/p/cap.html
二、CAP使用
2.1 環境準備
在 Docker
中準備一下需要的環境,首先是數據庫,數據庫我使用 PostgreSQL
,用別的也行。CAP
支持:SqlServer,MySql,PostgreSql,MongoDB。
然后是MQ,這里我使用 RabbitMQ
,Kafka
也可以。
Docker運行RabbitMQ:
docker pull rabbitmq:management
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management
🔑默認用戶:guest,密碼:guest
環境準備就完成了,Docker
就是這么方便。
2.2 代碼修改
為了模擬以上業務,需要修改大量代碼,下面代碼如有遺漏的直接去github找。
NuGet安裝:
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Npgsql.EntityFrameworkCore.PostgreSQL
CAP相關:
DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.PostgreSql
Order.API/Controllers/OrdersController.cs 增加下單接口:
[Route("[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{private readonly ILogger<OrdersController> _logger;private readonly IConfiguration _configuration;private readonly ICapPublisher _capBus;private readonly OrderContext _context;public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context){_logger = logger;_configuration = configuration;_capBus = capPublisher;_context = context;}[HttpGet]public IActionResult Get(){string result = $"【訂單服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";return Ok(result);}/// <summary>/// 下單 發布下單事件/// </summary>/// <param name="order"></param>/// <returns></returns>[Route("Create")][HttpPost]public async Task<IActionResult> CreateOrder(Models.Order order){using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true)){//業務代碼order.CreateTime = DateTime.Now;_context.Orders.Add(order);var r = await _context.SaveChangesAsync() > 0;if (r){//發布下單事件await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID });return Ok();}return BadRequest();}}}
Order.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary>
/// 下單事件消息
/// </summary>
public class CreateOrderMessageDto
{/// <summary>/// 產品ID/// </summary>public int ProductID { get; set; }/// <summary>/// 購買數量/// </summary>public int Count { get; set; }
}
Order.API/Models/Order.cs訂單實體類:
public class Order
{[Key][DatabaseGenerated(DatabaseGeneratedOption.Identity)]public int ID { get; set; }/// <summary>/// 下單時間/// </summary>[Required]public DateTime CreateTime { get; set; }/// <summary>/// 產品ID/// </summary>[Required]public int ProductID { get; set; }/// <summary>/// 購買數量/// </summary>[Required]public int Count { get; set; }
}
Order.API/Models/OrderContext.cs數據庫Context:
public class OrderContext : DbContext
{public OrderContext(DbContextOptions<OrderContext> options): base(options){}public DbSet<Order> Orders { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){}
}
Order.API/appsettings.json增加數據庫連接字符串:
"ConnectionStrings": {"OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;"
}
Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:
public void ConfigureServices(IServiceCollection services)
{services.AddControllers();services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext")));//CAPservices.AddCap(x =>{x.UseEntityFramework<OrderContext>();x.UseRabbitMQ("host.docker.internal");});
}
以上是訂單服務的修改。
Product.API/Controllers/ProductsController.cs增加減庫存接口:
[Route("[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{private readonly ILogger<ProductsController> _logger;private readonly IConfiguration _configuration;private readonly ICapPublisher _capBus;private readonly ProductContext _context;public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context){_logger = logger;_configuration = configuration;_capBus = capPublisher;_context = context;}[HttpGet]public IActionResult Get(){string result = $"【產品服務】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";return Ok(result);}/// <summary>/// 減庫存 訂閱下單事件/// </summary>/// <param name="message"></param>/// <returns></returns>[NonAction][CapSubscribe("order.services.createorder")]public async Task ReduceStock(CreateOrderMessageDto message){//業務代碼var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);product.Stock -= message.Count;await _context.SaveChangesAsync();}}
Product.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary>
/// 下單事件消息
/// </summary>
public class CreateOrderMessageDto
{/// <summary>/// 產品ID/// </summary>public int ProductID { get; set; }/// <summary>/// 購買數量/// </summary>public int Count { get; set; }
}
Product.API/Models/Product.cs產品實體類:
public class Product
{[Key][DatabaseGenerated(DatabaseGeneratedOption.Identity)]public int ID { get; set; }/// <summary>/// 產品名稱/// </summary>[Required][Column(TypeName = "VARCHAR(16)")]public string Name { get; set; }/// <summary>/// 庫存/// </summary>[Required]public int Stock { get; set; }
}
Product.API/Models/ProductContext.cs數據庫Context:
public class ProductContext : DbContext
{public ProductContext(DbContextOptions<ProductContext> options): base(options){}public DbSet<Product> Products { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){base.OnModelCreating(modelBuilder);//初始化種子數據modelBuilder.Entity<Product>().HasData(new Product{ID = 1,Name = "產品1",Stock = 100},new Product{ID = 2,Name = "產品2",Stock = 100});}
}
Product.API/appsettings.json增加數據庫連接字符串:
"ConnectionStrings": {"ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;"
}
Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:\
public void ConfigureServices(IServiceCollection services)
{services.AddControllers();services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext")));//CAPservices.AddCap(x =>{x.UseEntityFramework<ProductContext>();x.UseRabbitMQ("host.docker.internal");});
}
以上是產品服務的修改。
訂單服務和產品服務的修改到此就完成了,看著修改很多,其實功能很簡單。就是各自增加了自己的數據庫表,然后訂單服務增加了下單接口,下單接口會發出 “下單事件”
。產品服務增加了減庫存接口,減庫存接口會訂閱 “下單事件”
。然后客戶端調用下單接口下單時,產品服務會減去相應的庫存,功能就這么簡單。
關于 EF數據庫遷移
之類的基本使用就不介紹了。使用 Docker
重新構建鏡像,運行訂單服務,產品服務:
docker build -t orderapi:1.1 -f ./Order.API/Dockerfile .
docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060"
docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061"
docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062"docker build -t productapi:1.1 -f ./Product.API/Dockerfile .
docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050"
docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051"
docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"
最后 Ocelot.APIGateway/ocelot.json 增加一條路由配置:
好了,進行到這里,整個環境就有點復雜了。確保我們的PostgreSQL,RabbitMQ,Consul,Gateway,服務實例都正常運行。
服務實例運行成功后,數據庫應該是這樣的:
📃產品表種子數據:
cap.published
表和 cap.received
表是由 CAP
自動生成的,它內部是使用本地消息表+MQ來實現異步確保。
三、運行測試
這次使用 Postman
作為客戶端調用下單接口( 9070
是之前的 Ocelot
網關端口):
訂單庫 published
表:
訂單庫 order
表:
產品庫 received
表:
產品庫 product
表:
再試一下:
OK,完成。雖然功能很簡單,但是我們實現了服務的解耦,異步調用,和最終一致性。
四、總結
注意,上面的例子純粹是為了說明 EventBus
的使用,實際中的下單流程絕對不會這么做的!希望大家不要較真。
可能有人會說如果下單成功,但是庫存不足導致減庫存失敗了怎么辦,是不是要回滾訂單表的數據?如果產生這種想法,說明還沒有真正理解最終一致性的思想。
首先下單前肯定會檢查一下庫存數量,既然允許下單那么必然是庫存充足的。這里的事務是指:訂單保存到數據庫,和下單事件保存到 cap.published
表(保存到 cap.published
表理論上就能夠發送到MQ)這兩件事情,要么一同成功,要么一同失敗。如果這個事務成功,那么就可以認為這個業務流程是成功的,至于產品服務的減庫存是否成功那就是產品服務的事情了(理論上也應該是成功的,因為消息已經確保發到了MQ,產品服務必然會收到消息),CAP也提供了失敗重試,和失敗回調機制。
如果非要數據回滾也是能實現的,CAP
的 ICapPublisher.Publish
方法提供一個 callbackName
參數,當減庫存時,可以觸發這個回調。其本質也是通過發布訂閱完成,這是不推薦的做法,就不詳細說了,有興趣自己研究一下。
另外,CAP
無法保證消息不重復,實際使用中需要自己考慮一下消息的重復過濾和冪等性。