【.NET Core】|?總結/Edison Zhou
1補償事務和冪等性
在微服務架構下,我們會采用異步通信來對各個微服務進行解耦,從而我們會用到消息中間件來傳遞各個消息。?
補償事務
某些情況下,消費者需要返回值以告訴發布者執行結果,以便于發布者實施一些動作,通常情況下這屬于補償范圍。
例如,在一個電商程序中,訂單初始狀態為 pending,當商品數量成功扣除時將狀態標記為 succeeded ,否則為 failed。
那么,這樣看來實現邏輯應該是:當訂單微服務提交訂單,并發布了一個已下單的消息至下游微服務比如庫存微服務,當庫存微服務扣減庫存后,無論扣減成功與否,都發送一個回調給訂單微服務告知扣減狀態。
如果我們自己來實現,可能需要較多的工作量,我們可以借助CAP組件來實現,它提供的callback功能可以很方便的做到這一點。
冪等性
所謂冪等性,就是用戶對于同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點擊而產生了副作用。
在采用了消息中間件的分布式系統中,存在3中可能:
Exactly Once(*) (僅有一次)
At Most Once (最多一次)
At Least Once (最少一次)
帶 * 號的也就是Exactly Once在實際場景中,很難達到。
我們都知道,在CAP組件中,采用了數據庫表(準確來說是臨時存儲),也許可以做到At Most Once,但是并沒有提供嚴格保證消息不丟失的相關功能或配置。因此,CAP采用的交付保證是At Least Once,它并沒有實現冪等。
其實,目前業界大多數基于事件驅動的框架都是要求用戶自己來保證冪等性的,比如ENode,RocketMQ等。
綜述,CAP組件可以幫助實現一些比較不嚴格的冪等,但是嚴格的冪等無法做到。這就需要我們自己來處理,通常有兩種方式:
(1)以自然的方式處理冪等消息
比如數據庫提供的 INSERT ON DUPLICATE KEY UPDATE 或者是才去類型的程序判斷行為。
(2)顯示處理冪等消息
這種方式更為常見,在消息傳遞過程中傳遞ID,然后由單獨的消息跟蹤器來處理。比如,我們可以借助Redis來實現這個消息跟蹤器,下面的示例就是基于Redis來顯示處理冪等的。
2基于CAP組件的Sample
這里我們以剛剛提到的電商服務為例,訂單服務負責下單,庫存服務負責扣減庫存,二者通過Kafka進行消息傳遞,通過MongoDB進行持久化數據,CAP作為事件總線。
案例結構圖
訂單下單時會將將初始化狀態為Pending的訂單數據存入MongoDB,然后發送一個訂單已下達的消息至事件總線,下游系統庫存服務訂閱這個消息并消費,也就是扣減庫存。庫存扣減成功后,訂單服務根據扣減狀態將訂單狀態改為Succeeded或Failed。
編寫訂單服務
創建一個ASP.NET 5/6 WebAPI項目,引入以下Package:
PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package?DotNetCore.CAP.Kafka
PM>Install-Package?DotNetCore.CAP.MongoDB
編寫一個Controller用于接收下單請求:
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{private readonly IOrderRepository _orderRepository;private readonly IMapper _mapper;private readonly ICapPublisher _eventPublisher;public OrdersController(IOrderRepository orderRepository, IMapper mapper, ICapPublisher eventPublisher){_orderRepository = orderRepository;_mapper = mapper;_eventPublisher = eventPublisher;}[HttpGet]public async Task<ActionResult<IList<OrderVO>>> GetAllOrders(){var orders = await _orderRepository.GetAllOrders();return Ok(_mapper.Map<IList<OrderVO>>(orders));}[HttpGet("id")]public async Task<ActionResult<OrderVO>> GetOrder(string id){var order = await _orderRepository.GetOrder(id);if (order == null)return NotFound();return Ok(_mapper.Map<OrderVO>(order));}[HttpPost]public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO){var order = _mapper.Map<Order>(orderDTO);// 01.生成訂單初始數據order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();order.CreatedDate = DateTime.Now;order.Status = OrderStatus.Pending;// 02.訂單數據存入MongoDBawait _orderRepository.CreateOrder(order);// 03.發布訂單已生成事件消息await _eventPublisher.PublishAsync(name: EventNameConstants.TOPIC_ORDER_SUBMITTED,contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED);return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));}
}
這里使用了CAP提供的callback機制實現訂單狀態的修改。其原理就是新建了一個Consumer用于接收庫存微服務的新Topic訂閱消費。其中,Topic名字定義在了一個常量中。
public class ProductStockDeductedEventService : IProductStockDeductedEventService, ICapSubscribe
{private readonly IOrderRepository _orderRepository;public ProductStockDeductedEventService(IOrderRepository orderRepository){_orderRepository = orderRepository;}[CapSubscribe(name: EventNameConstants.TOPIC_STOCK_DEDUCTED, Group = EventNameConstants.GROUP_STOCK_DEDUCTED)]public async Task MarkOrderStatus(EventData<ProductStockDeductedEvent> eventData){if (eventData == null || eventData.MessageBody == null)return;var order = await _orderRepository.GetOrder(eventData.MessageBody.OrderId);if (order == null)return;if (eventData.MessageBody.IsSuccess){order.Status = OrderStatus.Succeed;// Todo: 一些額外的邏輯}else{order.Status = OrderStatus.Failed;// Todo: 一些額外的邏輯}await _orderRepository.UpdateOrder(order);}
}
這里回調的消費邏輯很簡單,就是根據庫存扣減的結果更新訂單的狀態。
編寫庫存服務
創建一個ASP.NET 5/6 WebAPI項目,引入以下Package:
PM>Install-Package AutoMapper
PM>Install-Package AutoMapper.Extensions.Microsoft.DependencyInjection
PM>Install-Package DotNetCore.CAP
PM>Install-Package?DotNetCore.CAP.Kafka
PM>Install-Package?DotNetCore.CAP.MongoDB
編寫一個Controller用于接收庫存查詢請求:
public class StocksController : ControllerBase
{private readonly IStockRepository _stockRepository;private readonly IMapper _mapper;private readonly ICapPublisher _eventPublisher;public StocksController(IStockRepository stockRepository, IMapper mapper, ICapPublisher eventPublisher){_stockRepository = stockRepository;_mapper = mapper;_eventPublisher = eventPublisher;}[HttpGet]public async Task<ActionResult<IList<StockVO>>> GetAllStocks(){var stocks = await _stockRepository.GetAllStocks();return Ok(_mapper.Map<IList<StockVO>>(stocks));}[HttpGet("id")]public async Task<ActionResult<StockVO>> GetStock(string id){var stock = await _stockRepository.GetStock(id);if (stock == null)return NotFound();return Ok(_mapper.Map<StockVO>(stock));}[HttpPost]public async Task<ActionResult<StockVO>> CreateStock(StockDTO stockDTO){var stock = _mapper.Map<Stock>(stockDTO);stock.CreatedDate = DateTime.Now;stock.UpdatedDate = stock.CreatedDate;await _stockRepository.CreateStock(stock);return CreatedAtAction(nameof(GetStock), new { id = stock.ProductId }, _mapper.Map<StockVO>(stock));}
}
編寫一個Consumer用于消費訂單下達事件的消息:
public class NewOrderSubmittedEventService : INewOrderSubmittedEventService, ICapSubscribe
{private readonly IStockRepository _stockRepository;private readonly IMsgTracker _msgTracker;public NewOrderSubmittedEventService(IStockRepository stockRepository, IMsgTracker msgTracker){_stockRepository = stockRepository;_msgTracker = msgTracker;}[CapSubscribe(name: EventNameConstants.TOPIC_ORDER_SUBMITTED, Group = EventNameConstants.GROUP_ORDER_SUBMITTED)]public async Task<EventData<ProductStockDeductedEvent>> DeductProductStock(EventData<NewOrderSubmittedEvent> eventData){// 冪等性保障if(await _msgTracker.HasProcessed(eventData.Id))return null;// 產品Id合法性校驗var productStock = await _stockRepository.GetStock(eventData.MessageBody.ProductId);if (productStock == null)return null;// 核心扣減邏輯EventData<ProductStockDeductedEvent> result;if (productStock.StockQuantity - eventData.MessageBody.Quantity >= 0){// 扣減產品實際庫存productStock.StockQuantity -= eventData.MessageBody.Quantity;// 提交至數據庫await _stockRepository.UpdateStock(productStock);result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, true));}else{// Todo: 一些額外的邏輯result = new EventData<ProductStockDeductedEvent>(new ProductStockDeductedEvent(eventData.MessageBody.OrderId, false, "扣減庫存失敗"));}// 冪等性保障await?_msgTracker.MarkAsProcessed(eventData.Id);return result;}
}
在消費邏輯中,會經歷冪等性校驗、合法性校驗、扣減邏輯?和 添加消費記錄。最終,會再次發送一個訂單扣減完成事件,供訂單服務將其作為回調進行消費,也就是更新訂單狀態。
自定義MsgTracker
在上面的示例代碼中,我們自定義了一個MsgTracker消息跟蹤器,它是基于Redis實現的,示例代碼如下:
public class RedisMsgTracker : IMsgTracker
{private const string KEY_PREFIX = "msgtracker:"; // 默認Key前綴private const int DEFAULT_CACHE_TIME = 60 * 60 * 24 * 3; // 默認緩存時間為3天,單位為秒private readonly IRedisCacheClient _redisCacheClient;public RedisMsgTracker(IRedisCacheClient redisCacheClient){_redisCacheClient = redisCacheClient ?? throw new ArgumentNullException("RedisClient未初始化");}public async Task<bool> HasProcessed(string msgId){var msgRecord = await _redisCacheClient.GetAsync<MsgTrackLog>($"{KEY_PREFIX}{msgId}");if (msgRecord == null)return false;return true;}public async Task MarkAsProcessed(string msgId){var msgRecord = new MsgTrackLog(msgId);await _redisCacheClient.SetAsync($"{KEY_PREFIX}{msgId}", msgRecord, DEFAULT_CACHE_TIME);}
}
在示例代碼中,約定了所有服務發送的消息都是EventData類,它接受一個泛型,定義如下:
public class EventData<T> where T : class
{public string Id { get; set; }public T MessageBody { get; set; }public DateTime CreatedDate { get; set; }public EventData(T messageBody)
{MessageBody = messageBody;CreatedDate = DateTime.Now;Id = SnowflakeGenerator.Instance().GetId().ToString();}
}
其中,它自帶了一個由雪花算法生成的消息Id用于傳遞過程中的唯一性,這個Id也被MsgTracker用于冪等性校驗。
測試驗證
首先,在庫存服務里面先查一下各個商品的庫存:
可以看到商品Id為1003的庫存有5個。
其次,在訂單服務里面新建一個訂單請求,買5個Id為1003的商品:
{"userId": "1002","productId": "1003","quantity": 5
}
提交成功后,查看庫存狀態:
然后再查看訂單狀態:
如果這時再下單Id=1003的商品,訂單狀態變為-1即Failed:
CAP與本地事務集成
在上面的示例代碼中,如果訂單提交MongoDB成功,但是在發布消息的時候失敗了,那么下單邏輯就應該是失敗的。這時,我們希望這兩個操作可以在一個事務里邊進行原子性保障,CAP提供了與本地事務的集成機制,在本地消息表與業務邏輯數據存儲為同一個存儲類型介質下(如本文例子的MongoDB)可以做到事務的集成。
例如,我們將數據持久化和消息發布/消費重構在一個Service類中進行封裝,Controller只需調用即可。
(1)封裝OrderService
public class OrderService : IOrderService
{private readonly ICapPublisher _eventPublisher;private readonly IMongoCollection<Order> _orders;private readonly IMongoClient _client;public OrderService(IOrderDatabaseSettings settings, ICapPublisher eventPublisher){_client = new MongoClient(settings.ConnectionString);_orders = _client.GetDatabase(settings.DatabaseName).GetCollection<Order>(settings.OrderCollectionName);_eventPublisher = eventPublisher;}public async Task<IList<Order>> GetAllOrders(){return await _orders.Find(o => true).ToListAsync();}public async Task<Order> GetOrder(string orderId){return await _orders.Find(o => o.OrderId == orderId).FirstOrDefaultAsync();}public async Task CreateOrder(Order order){// 本地事務集成示例using (var session = _client.StartTransaction(_eventPublisher)){// 01.訂單數據存入MongoDB_orders.InsertOne(order);// 02.發布訂單已生成事件消息_eventPublisher.Publish(name: EventNameConstants.TOPIC_ORDER_SUBMITTED,contentObj: new EventData<NewOrderSubmittedEvent>(new NewOrderSubmittedEvent(order.OrderId, order.ProductId, order.Quantity)),callbackName: EventNameConstants.TOPIC_STOCK_DEDUCTED);// 03.提交事務await session.CommitTransactionAsync();}}public async Task UpdateOrder(Order order){await _orders.ReplaceOneAsync(o => o.OrderId == order.OrderId, order);}
}
(2)Controller修改調用方式
[HttpPost]
public async Task<ActionResult<OrderVO>> CreateOrder(OrderDTO orderDTO)
{var order = _mapper.Map<Order>(orderDTO);// 01.生成訂單初始數據order.OrderId = SnowflakeGenerator.Instance().GetId().ToString();order.CreatedDate = DateTime.Now;order.Status = OrderStatus.Pending;// 02.訂單數據提交await _orderService.CreateOrder(order);return CreatedAtAction(nameof(GetOrder), new { id = order.OrderId }, _mapper.Map<OrderVO>(order));
}
同理,我們也可以將Consumer端的消費邏輯重構為CAP與本地事務集成,這里不再贅述。
本文示例代碼細節:https://github.com/EdisonChou/EDT.EventBus.Sample
End總結
本文介紹了事務補償與冪等性的基本概念,并基于CAP組件給了一個事務補償和冪等性保障的DEMO示例,在實際使用中可能還會借助CAP提供的事務能力將數據持久化和發布消息作為一個事務實現原子性,即CAP與本地事務的集成。
希望本文能夠對你有所幫助!
參考資料
CAP官方文檔,https://cap.dotnetcore.xyz/user-guide/zh/cap
年終總結:Edison的2021年終總結
數字化轉型:我在傳統企業做數字化轉型
C#刷題:C#刷劍指Offer算法題系列文章目錄
.NET面試:.NET開發面試知識體系
.NET大會:2020年中國.NET開發者大會PDF資料