基于CAP組件實現補償事務與冪等性保障

【.NET Core】|?總結/Edison Zhou

1補償事務和冪等性

在微服務架構下,我們會采用異步通信來對各個微服務進行解耦,從而我們會用到消息中間件來傳遞各個消息。?

d6de945864750c23b37fa5432e5ce48e.png

補償事務

某些情況下,消費者需要返回值以告訴發布者執行結果,以便于發布者實施一些動作,通常情況下這屬于補償范圍

例如,在一個電商程序中,訂單初始狀態為 pending,當商品數量成功扣除時將狀態標記為 succeeded ,否則為 failed。

那么,這樣看來實現邏輯應該是:當訂單微服務提交訂單,并發布了一個已下單的消息至下游微服務比如庫存微服務,當庫存微服務扣減庫存后,無論扣減成功與否,都發送一個回調給訂單微服務告知扣減狀態。

如果我們自己來實現,可能需要較多的工作量,我們可以借助CAP組件來實現,它提供的callback功能可以很方便的做到這一點

冪等性

所謂冪等性,就是用戶對于同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點擊而產生了副作用。

在采用了消息中間件的分布式系統中,存在3中可能:

  • Exactly Once(*) (僅有一次)

  • At Most Once (最多一次)

  • At Least Once (最少一次)

帶 * 號的也就是Exactly Once在實際場景中,很難達到

我們都知道,在CAP組件中,采用了數據庫表(準確來說是臨時存儲),也許可以做到At Most Once,但是并沒有提供嚴格保證消息不丟失的相關功能或配置。因此,CAP采用的交付保證是At Least Once,它并沒有實現冪等。

其實,目前業界大多數基于事件驅動的框架都是要求用戶自己來保證冪等性的,比如ENode,RocketMQ等。

綜述,CAP組件可以幫助實現一些比較不嚴格的冪等,但是嚴格的冪等無法做到。這就需要我們自己來處理,通常有兩種方式:

(1)以自然的方式處理冪等消息

比如數據庫提供的 INSERT ON DUPLICATE KEY UPDATE 或者是才去類型的程序判斷行為。

(2)顯示處理冪等消息

這種方式更為常見,在消息傳遞過程中傳遞ID,然后由單獨的消息跟蹤器來處理。比如,我們可以借助Redis來實現這個消息跟蹤器,下面的示例就是基于Redis來顯示處理冪等的。

2基于CAP組件的Sample

這里我們以剛剛提到的電商服務為例,訂單服務負責下單,庫存服務負責扣減庫存,二者通過Kafka進行消息傳遞,通過MongoDB進行持久化數據,CAP作為事件總線。

案例結構圖

訂單下單時會將將初始化狀態為Pending的訂單數據存入MongoDB,然后發送一個訂單已下達的消息至事件總線,下游系統庫存服務訂閱這個消息并消費,也就是扣減庫存。庫存扣減成功后,訂單服務根據扣減狀態將訂單狀態改為Succeeded或Failed。

00ff74e626e150949573e3fa6112c0e0.jpeg

編寫訂單服務

創建一個ASP.NET 5/6 WebAPI項目,引入以下Package:

PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package?DotNetCore.CAP.Kafka
PM>Install-Package?DotNetCore.CAP.MongoDB

編寫一個Controller用于接收下單請求:

[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{private readonly IOrderRepository _orderRepository;private readonly IMapper _mapper;private readonly ICapPublisher _eventPublisher;public OrdersController(IOrderRepository orderRepository, IMapper mapper, ICapPublisher eventPublisher){_orderRepository = orderRepository;_mapper = mapper;_eventPublisher = eventPublisher;}[HttpGet]public async Task<ActionResult<IList<OrderVO>>> GetAllOrders(){var orders = await _orderRepository.GetAllOrders();return Ok(_mapper.Map<IList<OrderVO>>(orders));}[HttpGet("id")]public async Task<ActionResult<OrderVO>> GetOrder(string id){var order = await _orderRepository.GetOrder(id);if (order == null)return NotFound();return Ok(_mapper.Map<OrderVO>(order));}[HttpPost]public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO){var order = _mapper.Map<Order>(orderDTO);// 01.生成訂單初始數據order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();order.CreatedDate = DateTime.Now;order.Status = OrderStatus.Pending;// 02.訂單數據存入MongoDBawait _orderRepository.CreateOrder(order);// 03.發布訂單已生成事件消息await _eventPublisher.PublishAsync(name: EventNameConstants.TOPIC_ORDER_SUBMITTED,contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED);return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));}
}

這里使用了CAP提供的callback機制實現訂單狀態的修改。其原理就是新建了一個Consumer用于接收庫存微服務的新Topic訂閱消費。其中,Topic名字定義在了一個常量中。

public class ProductStockDeductedEventService : IProductStockDeductedEventService, ICapSubscribe
{private readonly IOrderRepository _orderRepository;public ProductStockDeductedEventService(IOrderRepository orderRepository){_orderRepository = orderRepository;}[CapSubscribe(name: EventNameConstants.TOPIC_STOCK_DEDUCTED, Group = EventNameConstants.GROUP_STOCK_DEDUCTED)]public async Task MarkOrderStatus(EventData<ProductStockDeductedEvent> eventData){if (eventData == null || eventData.MessageBody == null)return;var order = await _orderRepository.GetOrder(eventData.MessageBody.OrderId);if (order == null)return;if (eventData.MessageBody.IsSuccess){order.Status = OrderStatus.Succeed;// Todo: 一些額外的邏輯}else{order.Status = OrderStatus.Failed;// Todo: 一些額外的邏輯}await _orderRepository.UpdateOrder(order);}
}

這里回調的消費邏輯很簡單,就是根據庫存扣減的結果更新訂單的狀態。

編寫庫存服務

創建一個ASP.NET 5/6 WebAPI項目,引入以下Package:

PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package?DotNetCore.CAP.Kafka
PM>Install-Package?DotNetCore.CAP.MongoDB

編寫一個Controller用于接收庫存查詢請求:

public class StocksController : ControllerBase
{private readonly IStockRepository _stockRepository;private readonly IMapper _mapper;private readonly ICapPublisher _eventPublisher;public StocksController(IStockRepository stockRepository, IMapper mapper, ICapPublisher eventPublisher){_stockRepository = stockRepository;_mapper = mapper;_eventPublisher = eventPublisher;}[HttpGet]public async Task<ActionResult<IList<StockVO>>> GetAllStocks(){var stocks = await _stockRepository.GetAllStocks();return Ok(_mapper.Map<IList<StockVO>>(stocks));}[HttpGet("id")]public async Task<ActionResult<StockVO>> GetStock(string id){var stock = await _stockRepository.GetStock(id);if (stock == null)return NotFound();return Ok(_mapper.Map<StockVO>(stock));}[HttpPost]public async Task<ActionResult<StockVO>> CreateStock(StockDTO stockDTO){var stock = _mapper.Map<Stock>(stockDTO);stock.CreatedDate = DateTime.Now;stock.UpdatedDate = stock.CreatedDate;await _stockRepository.CreateStock(stock);return CreatedAtAction(nameof(GetStock), new { id = stock.ProductId }, _mapper.Map<StockVO>(stock));}
}

編寫一個Consumer用于消費訂單下達事件的消息:

public class NewOrderSubmittedEventService : INewOrderSubmittedEventService, ICapSubscribe
{private readonly IStockRepository _stockRepository;private readonly IMsgTracker _msgTracker;public NewOrderSubmittedEventService(IStockRepository stockRepository, IMsgTracker msgTracker){_stockRepository = stockRepository;_msgTracker = msgTracker;}[CapSubscribe(name: EventNameConstants.TOPIC_ORDER_SUBMITTED, Group = EventNameConstants.GROUP_ORDER_SUBMITTED)]public async Task<EventData<ProductStockDeductedEvent>> DeductProductStock(EventData<NewOrderSubmittedEvent> eventData){// 冪等性保障if(await _msgTracker.HasProcessed(eventData.Id))return null;// 產品Id合法性校驗var productStock = await _stockRepository.GetStock(eventData.MessageBody.ProductId);if (productStock == null)return null;// 核心扣減邏輯EventData<ProductStockDeductedEvent> result;if (productStock.StockQuantity - eventData.MessageBody.Quantity >= 0){// 扣減產品實際庫存productStock.StockQuantity -= eventData.MessageBody.Quantity;// 提交至數據庫await _stockRepository.UpdateStock(productStock);result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, true));}else{// Todo: 一些額外的邏輯result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, false, "扣減庫存失敗"));}// 冪等性保障await?_msgTracker.MarkAsProcessed(eventData.Id);return result;}
}

在消費邏輯中,會經歷冪等性校驗、合法性校驗、扣減邏輯?和 添加消費記錄。最終,會再次發送一個訂單扣減完成事件,供訂單服務將其作為回調進行消費,也就是更新訂單狀態。

自定義MsgTracker

在上面的示例代碼中,我們自定義了一個MsgTracker消息跟蹤器,它是基于Redis實現的,示例代碼如下:

public class RedisMsgTracker : IMsgTracker
{private const string KEY_PREFIX = "msgtracker:"; // 默認Key前綴private const int DEFAULT_CACHE_TIME = 60 * 60 * 24 * 3; // 默認緩存時間為3天,單位為秒private readonly IRedisCacheClient _redisCacheClient;public RedisMsgTracker(IRedisCacheClient redisCacheClient){_redisCacheClient = redisCacheClient ?? throw new ArgumentNullException("RedisClient未初始化");}public async Task<bool> HasProcessed(string msgId){var msgRecord = await _redisCacheClient.GetAsync<MsgTrackLog>($"{KEY_PREFIX}{msgId}");if (msgRecord == null)return false;return true;}public async Task MarkAsProcessed(string msgId){var msgRecord = new MsgTrackLog(msgId);await _redisCacheClient.SetAsync($"{KEY_PREFIX}{msgId}", msgRecord, DEFAULT_CACHE_TIME);}
}

在示例代碼中,約定了所有服務發送的消息都是EventData類,它接受一個泛型,定義如下:

public class EventData<T> where T : class
{public string Id { get; set; }public T MessageBody { get; set; }public DateTime CreatedDate { get; set; }public EventData(T messageBody)
{MessageBody = messageBody;CreatedDate = DateTime.Now;Id = SnowflakeGenerator.Instance().GetId().ToString();}
}

其中,它自帶了一個由雪花算法生成的消息Id用于傳遞過程中的唯一性,這個Id也被MsgTracker用于冪等性校驗。

測試驗證

首先,在庫存服務里面先查一下各個商品的庫存:

83673435e8629fc8fb98d43780606662.png

可以看到商品Id為1003的庫存有5個。

其次,在訂單服務里面新建一個訂單請求,買5個Id為1003的商品:

{"userId": "1002","productId": "1003","quantity": 5
}

提交成功后,查看庫存狀態:

79f5035950c92af9e174d8b94c577c4d.png

然后再查看訂單狀態:

b65fe2d2d74672835d1e20ed74c3ce03.png

如果這時再下單Id=1003的商品,訂單狀態變為-1即Failed:

e919944d3c151c1fbad24973209c3a0d.png

CAP與本地事務集成

在上面的示例代碼中,如果訂單提交MongoDB成功,但是在發布消息的時候失敗了,那么下單邏輯就應該是失敗的。這時,我們希望這兩個操作可以在一個事務里邊進行原子性保障,CAP提供了與本地事務的集成機制,在本地消息表與業務邏輯數據存儲為同一個存儲類型介質下(如本文例子的MongoDB)可以做到事務的集成。

例如,我們將數據持久化和消息發布/消費重構在一個Service類中進行封裝,Controller只需調用即可。

(1)封裝OrderService

public class OrderService : IOrderService
{private readonly ICapPublisher _eventPublisher;private readonly IMongoCollection<Order> _orders;private readonly IMongoClient _client;public OrderService(IOrderDatabaseSettings settings, ICapPublisher eventPublisher){_client = new MongoClient(settings.ConnectionString);_orders = _client.GetDatabase(settings.DatabaseName).GetCollection<Order>(settings.OrderCollectionName);_eventPublisher = eventPublisher;}public async Task<IList<Order>> GetAllOrders(){return await _orders.Find(o => true).ToListAsync();}public async Task<Order> GetOrder(string orderId){return await _orders.Find(o => o.OrderId == orderId).FirstOrDefaultAsync();}public async Task CreateOrder(Order order){// 本地事務集成示例using (var session = _client.StartTransaction(_eventPublisher)){// 01.訂單數據存入MongoDB_orders.InsertOne(order);// 02.發布訂單已生成事件消息_eventPublisher.Publish(name: EventNameConstants.TOPIC_ORDER_SUBMITTED,contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED);// 03.提交事務await session.CommitTransactionAsync();}}public async Task UpdateOrder(Order order){await _orders.ReplaceOneAsync(o => o.OrderId == order.OrderId, order);}
}

(2)Controller修改調用方式

[HttpPost]
public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO)
{var order = _mapper.Map<Order>(orderDTO);// 01.生成訂單初始數據order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();order.CreatedDate = DateTime.Now;order.Status = OrderStatus.Pending;// 02.訂單數據提交await _orderService.CreateOrder(order);return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));
}

同理,我們也可以將Consumer端的消費邏輯重構為CAP與本地事務集成,這里不再贅述。

本文示例代碼細節:https://github.com/EdisonChou/EDT.EventBus.Sample

End總結

本文介紹了事務補償與冪等性的基本概念,并基于CAP組件給了一個事務補償和冪等性保障的DEMO示例,在實際使用中可能還會借助CAP提供的事務能力將數據持久化和發布消息作為一個事務實現原子性,即CAP與本地事務的集成。

希望本文能夠對你有所幫助!

參考資料

CAP官方文檔,https://cap.dotnetcore.xyz/user-guide/zh/cap

e873cee0bd5a81cd15e3a9b9acd3831c.gif

年終總結:Edison的2021年終總結

數字化轉型:我在傳統企業做數字化轉型

C#刷題:C#刷劍指Offer算法題系列文章目錄

.NET面試:.NET開發面試知識體系

.NET大會:2020年中國.NET開發者大會PDF資料

05695dc2260ec3afedf76afcba41eca7.png

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/283929.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/283929.shtml
英文地址,請注明出處:http://en.pswp.cn/news/283929.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker與k8s

前言 隨著k8s 作為容器編排解決方案變得越來越流行&#xff0c;有些人開始拿 Docker 和 k8s進行對比&#xff0c;不禁問道&#xff1a;Docker 不香嗎&#xff1f; k8s 是kubernets的縮寫&#xff0c;’8‘代表中間的八個字符。 其實 Docker 和 k8s 并非直接的競爭對手&#xff…

Linux下啟動tomcat報java.lang.OutOfMemoryError: PermGen space

2019獨角獸企業重金招聘Python工程師標準>>> 一、錯誤信息 java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav…

Redis安裝[Windows]

一. redis下載地址: https://github.com/ServiceStack/redis-windows/tree/master/downloads 根據需要的下載對應版本*.zip即可.(我這里是win7x64) 二.使用 1. 下載之后解壓到你相應的目錄下: 1 文件介紹&#xff1a; 2 redis-benchmark.exe #基準測試 3 redis-check-aof.e…

簡練軟考知識點整理-項目啟動過程組

啟動過程組包含定義一個新項目或現有項目的一個新階段&#xff0c;授權開始該項目或階段的一組過程。在啟動過程中&#xff0c;定義初步范圍和落實初步財務資源&#xff0c;識別那些將相互作用并影響項目總體結果的內外部干系人&#xff0c;選定項目經理&#xff08;如果尚未安…

在 ASP.NET Core 中上傳文件

簡介文件上傳是指將媒體文件&#xff08;本地文件或網絡文件&#xff09;從客戶端上傳至服務器存儲。ASP.NET Core 支持使用緩沖的模型綁定&#xff08;針對較小文件&#xff09;和無緩沖的流式傳輸&#xff08;針對較大文件&#xff09;上傳一個或多個文件。緩沖和流式傳輸是上…

Paxos算法詳解

Paxos、Raft分布式一致性算法應用場景一文講述了分布式一致性問題與分布式一致性算法的典型應用場景。作為分布式一致性代名詞的Paxos算法號稱是最難理解的算法。本文試圖用通俗易懂的語言講述Paxos算法。 一、Paxos算法背景 Paxos算法是Lamport宗師提出的一種基于消息傳遞的分…

LeetCode 322. Coin Change

原題 You are given coins of different denominations and a total amount of money amount. Write a function to compute the fewest number of coins that you need to make up that amount. If that amount of money cannot be made up by any combination of the coins, …

Teiid:數據虛擬化Data Virtualization平臺

2019獨角獸企業重金招聘Python工程師標準>>> Teiid介紹 http://teiid.jboss.org/ 數據虛擬化的定義 https://en.wikipedia.org/wiki/Data_virtualization http://www.denodo.com/en/data-virtualization/overview 數據虛擬化的文章 Sick of ETL? Database virtuali…

如何仿造一個websocket請求?

之前兩次singnalr、 websocket實時推送相關&#xff1a;? .NET WebSockets 核心原理初體驗[1]? SignalR 從開發到生產部署避坑指南[2]tag&#xff1a;瀏覽器--->nginx--> server其中提到nginx默認不會為客戶端轉發Upgrade、Connection標頭&#xff0c; 因為為了讓被代理…

【轉】為什么自動車完全不可以犯錯誤

為什么自動車完全不可以犯錯誤 有人跟我講&#xff0c;我對Google的自動車要求太苛刻了。人無完人&#xff0c;所以Google的產品也不需要是完美的&#xff0c;只要“夠好用”就有市場。世界上有那么多糟糕的司機&#xff0c;酒后駕車的&#xff0c;開車時發短信的&#xff0c;打…

從“互聯網+教育”到“教育+互聯網”——互聯網文化基因視域下的審思

作者信息 朱敬/廣西師范大學教育學部教授&#xff0c;教育學博士&#xff0c;博士生導師&#xff1b; 蔡建東/河南大學教育學部教授&#xff0c;教育學博士。 本文摘要 近年來國務院與教育部文件逐漸使用“教育互聯網”一詞&#xff0c;從“互聯網教育”到“教育互聯網”&a…

Node.js Stream - 基礎篇

背景 在構建較復雜的系統時&#xff0c;通常將其拆解為功能獨立的若干部分。這些部分的接口遵循一定的規范&#xff0c;通過某種方式相連&#xff0c;以共同完成較復雜的任務。譬如&#xff0c;shell通過管道|連接各部分&#xff0c;其輸入輸出的規范是文本流。 在Node.js中&am…

Axure RP使用攻略--動態面板的用途(8)

寫了幾個Axure教程之后發現&#xff0c;可能教程的起點有些高了&#xff0c;過分的去講效果的實現&#xff0c;而忽略了axure功能以及基礎元件的使用&#xff0c;那么從這個教程開始&#xff0c;把這些逐漸的展開講解。 關于動態面板 動態面板是axure原型制作中使用非常頻繁的一…

ABP 6.0.0-rc.1的新特性

2022-07-26官方發布ABP 6.0.0-rc.1版本&#xff0c;本文挑選了幾個新特性進行了介紹&#xff0c;主要包括LeptonX Lite默認主題、OpenIddict模塊&#xff0c;以及如何將Identity Server遷移到OpenIddict。據ABP官方公眾號介紹&#xff0c;ABP 6.0.0穩定版的計劃發布日期為2022-…

Java并發包--線程池框架

轉載請注明出處&#xff1a;http://www.cnblogs.com/skywang12345/p/3509903.html 線程池架構圖 線程池的架構圖如下&#xff1a; 1. Executor 它是"執行者"接口&#xff0c;它是來執行任務的。準確的說&#xff0c;Executor提供了execute()接口來執行已提交的 Runna…

c 試水解碼jpeg圖片比特流(已成功解碼)

找到一張采用霍夫曼通用DC,AC編碼表的圖片&#xff0c;提取出此圖片的比特流準備對它解碼&#xff0c;再反推怎樣編碼。 下圖是此圖片比特流前100個字節。解碼是每次讀一字節&#xff0c;對這8比特解碼&#xff0c;如8比特不能解碼&#xff0c;再讀入一字節。因為霍夫曼表最多…

Raft算法詳解

Raft算法屬于Multi-Paxos算法&#xff0c;它是在Multi-Paxos思想的基礎上&#xff0c;做了一些簡化和限制&#xff0c;比如增加了日志必須是連續的&#xff0c;只支持領導者、跟隨者和候選人三種狀態&#xff0c;在理解和算法實現上都相對容易許多 從本質上說&#xff0c;Raft算…

淘寶彈性布局方案lib-flexible研究

1. lib-flexible不能與響應式布局兼容 先說說響應式布局的一些基本認識&#xff1a; 響應式布局的表現是&#xff1a;網頁通過css媒介查詢判斷可視區域的寬度&#xff0c;在不同的范圍應用不同的樣式&#xff0c;以便在不同尺寸的設備上呈現最佳的界面效果。典型的例子是&#…

[No0000DB]C# FtpClientHelper Ftp客戶端上傳下載重命名 類封裝

using System; using System.Diagnostics; using System.IO; using System.Text; using Shared;namespace Helpers {public static class FileHelper{#region Methods/// <summary>/// 向文本文件的尾部追加內容/// </summary>/// <param name"filePa…

WPF效果第一百九十四篇之伸縮面板

前面一篇玩耍了一下登錄實現效果;今天在原來的基礎上來玩耍一下伸縮面板的效果;閑話不多扯直接看效果:1、關于前臺簡單布局:2、左側面板伸縮動畫&#xff1a;<Storyboard x:Key"ShowConfigSb"><ThicknessAnimationUsingKeyFrames Storyboard.TargetProperty…