大綱
1.生單鏈路的業務代碼
2.生單鏈路中可能會出現數據不一致的問題
3.Seata AT模式下的分布式事務的原理
4.Seata AT模式下的分布式事務的讀寫隔離原理
5.Seata AT模式下的死鎖問題以及超時機制
6.Seata AT模式下的讀寫隔離機制的影響
7.生單鏈路使用Seata AT模式的具體步驟
8.生單鏈路使用Seata AT模式時的原理流程
9.生單鏈路使用Seata AT模式時的并發問題
10.生單鏈路如何解決庫存全局鎖爭用問題
1.生單鏈路的業務代碼
(1)生成訂單流程
(2)入參檢查與風控檢查
(3)獲取商品信息與計算訂單價格及驗證價格
(4)鎖定優惠券與商品庫存
(5)新增訂單到數據庫
(6)發送延遲消息到MQ
(1)生成訂單流程
@Service
public class OrderServiceImpl implements OrderService {...//提交訂單/生成訂單接口@GlobalTransactional(rollbackFor = Exception.class)@Overridepublic CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) {//1.入參檢查checkCreateOrderRequestParam(createOrderRequest);//2.風控檢查checkRisk(createOrderRequest);//3.獲取商品信息List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest);//4.計算訂單價格CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList);//5.驗證訂單實付金額checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO);//6.鎖定優惠券lockUserCoupon(createOrderRequest);//7.鎖定商品庫存lockProductStock(createOrderRequest);//8.生成訂單到數據庫addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);//9.發送訂單延遲消息用于支付超時自動關單sendPayOrderTimeoutDelayMessage(createOrderRequest);//返回訂單信息CreateOrderDTO createOrderDTO = new CreateOrderDTO();createOrderDTO.setOrderId(createOrderRequest.getOrderId());return createOrderDTO;}...
}
(2)入參檢查與風控檢查
@Service
public class OrderServiceImpl implements OrderService {...//檢查創建訂單請求參數private void checkCreateOrderRequestParam(CreateOrderRequest createOrderRequest) {ParamCheckUtil.checkObjectNonNull(createOrderRequest);//訂單ID檢查String orderId = createOrderRequest.getOrderId();ParamCheckUtil.checkStringNonEmpty(orderId, OrderErrorCodeEnum.ORDER_ID_IS_NULL);//業務線標識檢查Integer businessIdentifier = createOrderRequest.getBusinessIdentifier();ParamCheckUtil.checkObjectNonNull(businessIdentifier, OrderErrorCodeEnum.BUSINESS_IDENTIFIER_IS_NULL);if (BusinessIdentifierEnum.getByCode(businessIdentifier) == null) {throw new OrderBizException(OrderErrorCodeEnum.BUSINESS_IDENTIFIER_ERROR);}//用戶ID檢查String userId = createOrderRequest.getUserId();ParamCheckUtil.checkStringNonEmpty(userId, OrderErrorCodeEnum.USER_ID_IS_NULL);//訂單類型檢查Integer orderType = createOrderRequest.getOrderType();ParamCheckUtil.checkObjectNonNull(businessIdentifier, OrderErrorCodeEnum.ORDER_TYPE_IS_NULL);if (OrderTypeEnum.getByCode(orderType) == null) {throw new OrderBizException(OrderErrorCodeEnum.ORDER_TYPE_ERROR);}//賣家ID檢查String sellerId = createOrderRequest.getSellerId();ParamCheckUtil.checkStringNonEmpty(sellerId, OrderErrorCodeEnum.SELLER_ID_IS_NULL);//配送類型檢查Integer deliveryType = createOrderRequest.getDeliveryType();ParamCheckUtil.checkObjectNonNull(deliveryType, OrderErrorCodeEnum.USER_ADDRESS_ERROR);if (DeliveryTypeEnum.getByCode(deliveryType) == null) {throw new OrderBizException(OrderErrorCodeEnum.DELIVERY_TYPE_ERROR);}//地址信息檢查String province = createOrderRequest.getProvince();String city = createOrderRequest.getCity();String area = createOrderRequest.getArea();String streetAddress = createOrderRequest.getStreet();ParamCheckUtil.checkStringNonEmpty(province, OrderErrorCodeEnum.USER_ADDRESS_ERROR);ParamCheckUtil.checkStringNonEmpty(city, OrderErrorCodeEnum.USER_ADDRESS_ERROR);ParamCheckUtil.checkStringNonEmpty(area, OrderErrorCodeEnum.USER_ADDRESS_ERROR);ParamCheckUtil.checkStringNonEmpty(streetAddress, OrderErrorCodeEnum.USER_ADDRESS_ERROR);//區域ID檢查String regionId = createOrderRequest.getRegionId();ParamCheckUtil.checkStringNonEmpty(regionId, OrderErrorCodeEnum.REGION_ID_IS_NULL);//經緯度檢查BigDecimal lon = createOrderRequest.getLon();BigDecimal lat = createOrderRequest.getLat();ParamCheckUtil.checkObjectNonNull(lon, OrderErrorCodeEnum.USER_LOCATION_IS_NULL);ParamCheckUtil.checkObjectNonNull(lat, OrderErrorCodeEnum.USER_LOCATION_IS_NULL);//收貨人信息檢查String receiverName = createOrderRequest.getReceiverName();String receiverPhone = createOrderRequest.getReceiverPhone();ParamCheckUtil.checkStringNonEmpty(receiverName, OrderErrorCodeEnum.ORDER_RECEIVER_IS_NULL);ParamCheckUtil.checkStringNonEmpty(receiverPhone, OrderErrorCodeEnum.ORDER_RECEIVER_IS_NULL);//客戶端設備信息檢查String clientIp = createOrderRequest.getClientIp();ParamCheckUtil.checkStringNonEmpty(clientIp, OrderErrorCodeEnum.CLIENT_IP_IS_NULL);//商品條目信息檢查List<CreateOrderRequest.OrderItemRequest> orderItemRequestList = createOrderRequest.getOrderItemRequestList();ParamCheckUtil.checkCollectionNonEmpty(orderItemRequestList, OrderErrorCodeEnum.ORDER_ITEM_IS_NULL);for (CreateOrderRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {Integer productType = orderItemRequest.getProductType();Integer saleQuantity = orderItemRequest.getSaleQuantity();String skuCode = orderItemRequest.getSkuCode();ParamCheckUtil.checkObjectNonNull(productType, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR);ParamCheckUtil.checkObjectNonNull(saleQuantity, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR);ParamCheckUtil.checkStringNonEmpty(skuCode, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR);}//訂單費用信息檢查List<CreateOrderRequest.OrderAmountRequest> orderAmountRequestList = createOrderRequest.getOrderAmountRequestList();ParamCheckUtil.checkCollectionNonEmpty(orderAmountRequestList, OrderErrorCodeEnum.ORDER_AMOUNT_IS_NULL);for (CreateOrderRequest.OrderAmountRequest orderAmountRequest : orderAmountRequestList) {Integer amountType = orderAmountRequest.getAmountType();ParamCheckUtil.checkObjectNonNull(amountType, OrderErrorCodeEnum.ORDER_AMOUNT_TYPE_IS_NULL);if (AmountTypeEnum.getByCode(amountType) == null) {throw new OrderBizException(OrderErrorCodeEnum.ORDER_AMOUNT_TYPE_PARAM_ERROR);}}Map<Integer, Integer> orderAmountMap = orderAmountRequestList.stream().collect(Collectors.toMap(CreateOrderRequest.OrderAmountRequest::getAmountType, CreateOrderRequest.OrderAmountRequest::getAmount));//訂單支付原價不能為空if (orderAmountMap.get(AmountTypeEnum.ORIGIN_PAY_AMOUNT.getCode()) == null) {throw new OrderBizException(OrderErrorCodeEnum.ORDER_ORIGIN_PAY_AMOUNT_IS_NULL);}//訂單運費不能為空if (orderAmountMap.get(AmountTypeEnum.SHIPPING_AMOUNT.getCode()) == null) {throw new OrderBizException(OrderErrorCodeEnum.ORDER_SHIPPING_AMOUNT_IS_NULL);}//訂單實付金額不能為空if (orderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()) == null) {throw new OrderBizException(OrderErrorCodeEnum.ORDER_REAL_PAY_AMOUNT_IS_NULL);}if (StringUtils.isNotEmpty(createOrderRequest.getCouponId())) {//訂單優惠券抵扣金額不能為空if (orderAmountMap.get(AmountTypeEnum.COUPON_DISCOUNT_AMOUNT.getCode()) == null) {throw new OrderBizException(OrderErrorCodeEnum.ORDER_DISCOUNT_AMOUNT_IS_NULL);}}//訂單支付信息檢查List<CreateOrderRequest.PaymentRequest> paymentRequestList = createOrderRequest.getPaymentRequestList();ParamCheckUtil.checkCollectionNonEmpty(paymentRequestList, OrderErrorCodeEnum.ORDER_PAYMENT_IS_NULL);for (CreateOrderRequest.PaymentRequest paymentRequest : paymentRequestList) {Integer payType = paymentRequest.getPayType();Integer accountType = paymentRequest.getAccountType();if (payType == null || PayTypeEnum.getByCode(payType) == null) {throw new OrderBizException(OrderErrorCodeEnum.PAY_TYPE_PARAM_ERROR);}if (accountType == null || AccountTypeEnum.getByCode(accountType) == null) {throw new OrderBizException(OrderErrorCodeEnum.ACCOUNT_TYPE_PARAM_ERROR);}}}//風控檢查private void checkRisk(CreateOrderRequest createOrderRequest) {//調用風控服務進行風控檢查CheckOrderRiskRequest checkOrderRiskRequest = createOrderRequest.clone(CheckOrderRiskRequest.class);JsonResult<CheckOrderRiskDTO> jsonResult = riskApi.checkOrderRisk(checkOrderRiskRequest);if (!jsonResult.getSuccess()) {throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());}}...
}
(3)獲取商品信息與計算訂單價格及驗證價格
@Service
public class OrderServiceImpl implements OrderService {...//獲取訂單條目商品信息private List<ProductSkuDTO> listProductSkus(CreateOrderRequest createOrderRequest) {List<CreateOrderRequest.OrderItemRequest> orderItemRequestList = createOrderRequest.getOrderItemRequestList();List<ProductSkuDTO> productSkuList = new ArrayList<>();for (CreateOrderRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {String skuCode = orderItemRequest.getSkuCode();ProductSkuQuery productSkuQuery = new ProductSkuQuery();productSkuQuery.setSkuCode(skuCode);productSkuQuery.setSellerId(createOrderRequest.getSellerId());JsonResult<ProductSkuDTO> jsonResult = productApi.getProductSku(productSkuQuery);if (!jsonResult.getSuccess()) {throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());}ProductSkuDTO productSkuDTO = jsonResult.getData();//sku不存在if (productSkuDTO == null) {throw new OrderBizException(OrderErrorCodeEnum.PRODUCT_SKU_CODE_ERROR, skuCode);}productSkuList.add(productSkuDTO);}return productSkuList;}//計算訂單價格,如果使用了優惠券、紅包、積分等,會一并進行扣減//@param createOrderRequest 訂單信息//@param productSkuList 商品信息private CalculateOrderAmountDTO calculateOrderAmount(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList) {CalculateOrderAmountRequest calculateOrderPriceRequest = createOrderRequest.clone(CalculateOrderAmountRequest.class, CloneDirection.FORWARD);//訂單條目補充商品信息Map<String, ProductSkuDTO> productSkuDTOMap = productSkuList.stream().collect(Collectors.toMap(ProductSkuDTO::getSkuCode, Function.identity()));calculateOrderPriceRequest.getOrderItemRequestList().forEach(item -> {String skuCode = item.getSkuCode();ProductSkuDTO productSkuDTO = productSkuDTOMap.get(skuCode);item.setProductId(productSkuDTO.getProductId());item.setSalePrice(productSkuDTO.getSalePrice());});//調用營銷服務計算訂單價格JsonResult<CalculateOrderAmountDTO> jsonResult = marketApi.calculateOrderAmount(calculateOrderPriceRequest);//檢查價格計算結果if (!jsonResult.getSuccess()) {throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());}CalculateOrderAmountDTO calculateOrderAmountDTO = jsonResult.getData();if (calculateOrderAmountDTO == null) {throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR);}//訂單費用信息List<OrderAmountDTO> orderAmountList = ObjectUtil.convertList(calculateOrderAmountDTO.getOrderAmountList(), OrderAmountDTO.class);if (orderAmountList == null || orderAmountList.isEmpty()) {throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR);}//訂單條目費用明細List<OrderAmountDetailDTO> orderItemAmountList = ObjectUtil.convertList(calculateOrderAmountDTO.getOrderAmountDetail(), OrderAmountDetailDTO.class);if (orderItemAmountList == null || orderItemAmountList.isEmpty()) {throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR);}return calculateOrderAmountDTO;}//驗證訂單實付金額private void checkRealPayAmount(CreateOrderRequest createOrderRequest, CalculateOrderAmountDTO calculateOrderAmountDTO) {List<CreateOrderRequest.OrderAmountRequest> originOrderAmountRequestList = createOrderRequest.getOrderAmountRequestList();Map<Integer, CreateOrderRequest.OrderAmountRequest> originOrderAmountMap =originOrderAmountRequestList.stream().collect(Collectors.toMap(CreateOrderRequest.OrderAmountRequest::getAmountType, Function.identity()));//前端給的實付金額Integer originRealPayAmount = originOrderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()).getAmount();List<CalculateOrderAmountDTO.OrderAmountDTO> orderAmountDTOList = calculateOrderAmountDTO.getOrderAmountList();Map<Integer, CalculateOrderAmountDTO.OrderAmountDTO> orderAmountMap =orderAmountDTOList.stream().collect(Collectors.toMap(CalculateOrderAmountDTO.OrderAmountDTO::getAmountType, Function.identity()));//營銷計算出來的實付金額Integer realPayAmount = orderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()).getAmount();if (!originRealPayAmount.equals(realPayAmount)) {//訂單驗價失敗throw new OrderBizException(OrderErrorCodeEnum.ORDER_CHECK_REAL_PAY_AMOUNT_FAIL);}}...
}
(4)鎖定優惠券與商品庫存
@Service
public class OrderServiceImpl implements OrderService {...//鎖定用戶優惠券private void lockUserCoupon(CreateOrderRequest createOrderRequest) {String couponId = createOrderRequest.getCouponId();if (StringUtils.isEmpty(couponId)) {return;}LockUserCouponRequest lockUserCouponRequest = createOrderRequest.clone(LockUserCouponRequest.class);//調用營銷服務鎖定用戶優惠券JsonResult<Boolean> jsonResult = marketApi.lockUserCoupon(lockUserCouponRequest);//檢查鎖定用戶優惠券結果if (!jsonResult.getSuccess()) {throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());}}//鎖定商品庫存private void lockProductStock(CreateOrderRequest createOrderRequest) {String orderId = createOrderRequest.getOrderId();List<LockProductStockRequest.OrderItemRequest> orderItemRequestList = ObjectUtil.convertList(createOrderRequest.getOrderItemRequestList(), LockProductStockRequest.OrderItemRequest.class);LockProductStockRequest lockProductStockRequest = new LockProductStockRequest();lockProductStockRequest.setOrderId(orderId);lockProductStockRequest.setOrderItemRequestList(orderItemRequestList);JsonResult<Boolean> jsonResult = inventoryApi.lockProductStock(lockProductStockRequest);//檢查鎖定商品庫存結果if (!jsonResult.getSuccess()) {throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());}}...
}@DubboService(version = "1.0.0", interfaceClass = MarketApi.class, retries = 0)
public class MarketApiImpl implements MarketApi {...//鎖定用戶優惠券記錄@Overridepublic JsonResult<Boolean> lockUserCoupon(LockUserCouponRequest lockUserCouponRequest) {try {Boolean result = couponService.lockUserCoupon(lockUserCouponRequest);return JsonResult.buildSuccess(result);} catch (MarketBizException e) {log.error("biz error", e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error", e);return JsonResult.buildError(e.getMessage());}}...
}@Service
public class CouponServiceImpl implements CouponService {...//鎖定用戶優惠券@Transactional(rollbackFor = Exception.class)@Overridepublic Boolean lockUserCoupon(LockUserCouponRequest lockUserCouponRequest) {//檢查入參checkLockUserCouponRequest(lockUserCouponRequest);String userId = lockUserCouponRequest.getUserId();String couponId = lockUserCouponRequest.getCouponId();CouponDO couponDO = couponDAO.getUserCoupon(userId, couponId);if (couponDO == null) {throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_NULL);}//判斷優惠券是否已經使用了if (CouponUsedStatusEnum.USED.getCode().equals(couponDO.getUsed())) {throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_USED);}couponDO.setUsed(CouponUsedStatusEnum.USED.getCode());couponDO.setUsedTime(new Date());couponDAO.updateById(couponDO);return true;}...
}@DubboService(version = "1.0.0", interfaceClass = InventoryApi.class, retries = 0)
public class InventoryApiImpl implements InventoryApi {@Autowiredprivate InventoryService inventoryService;//鎖定商品庫存@Overridepublic JsonResult<Boolean> lockProductStock(LockProductStockRequest lockProductStockRequest) {try {Boolean result = inventoryService.lockProductStock(lockProductStockRequest);return JsonResult.buildSuccess(result);} catch (InventoryBizException e) {log.error("biz error", e);return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());} catch (Exception e) {log.error("system error", e);return JsonResult.buildError(e.getMessage());}}...
}@Service
public class InventoryServiceImpl implements InventoryService {...//鎖定商品庫存@Transactional(rollbackFor = Exception.class)@Overridepublic Boolean lockProductStock(LockProductStockRequest lockProductStockRequest) {//檢查入參checkLockProductStockRequest(lockProductStockRequest);List<LockProductStockRequest.OrderItemRequest> orderItemRequestList = lockProductStockRequest.getOrderItemRequestList();for (LockProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {String skuCode = orderItemRequest.getSkuCode();ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode);if (productStockDO == null) {throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_ERROR);}Integer saleQuantity = orderItemRequest.getSaleQuantity();//執行庫存扣減,并需要解決防止超賣的問題int nums = productStockDAO.lockProductStock(skuCode, saleQuantity);if (nums <= 0) {throw new InventoryBizException(InventoryErrorCodeEnum.LOCK_PRODUCT_SKU_STOCK_ERROR);}}return true;}...
}
(5)新增訂單到數據庫
@Service
public class OrderServiceImpl implements OrderService {...//新增訂單數據到數據庫private void addNewOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) {//封裝新訂單數據NewOrderDataHolder newOrderDataHolder = new NewOrderDataHolder();//生成主訂單FullOrderData fullMasterOrderData = addNewMasterOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);//封裝主訂單數據到NewOrderData對象中newOrderDataHolder.appendOrderData(fullMasterOrderData);//如果存在多種商品類型,需要按商品類型進行拆單Map<Integer, List<ProductSkuDTO>> productTypeMap = productSkuList.stream().collect(Collectors.groupingBy(ProductSkuDTO::getProductType));if (productTypeMap.keySet().size() > 1) {for (Integer productType : productTypeMap.keySet()) {//生成子訂單FullOrderData fullSubOrderData = addNewSubOrder(fullMasterOrderData, productType);//封裝子訂單數據到NewOrderData對象中newOrderDataHolder.appendOrderData(fullSubOrderData);}}//保存訂單到數據庫//訂單信息List<OrderInfoDO> orderInfoDOList = newOrderDataHolder.getOrderInfoDOList();if (!orderInfoDOList.isEmpty()) {orderInfoDAO.saveBatch(orderInfoDOList);}//訂單條目List<OrderItemDO> orderItemDOList = newOrderDataHolder.getOrderItemDOList();if (!orderItemDOList.isEmpty()) {orderItemDAO.saveBatch(orderItemDOList);}//訂單配送信息List<OrderDeliveryDetailDO> orderDeliveryDetailDOList = newOrderDataHolder.getOrderDeliveryDetailDOList();if (!orderDeliveryDetailDOList.isEmpty()) {orderDeliveryDetailDAO.saveBatch(orderDeliveryDetailDOList);}//訂單支付信息List<OrderPaymentDetailDO> orderPaymentDetailDOList = newOrderDataHolder.getOrderPaymentDetailDOList();if (!orderPaymentDetailDOList.isEmpty()) {orderPaymentDetailDAO.saveBatch(orderPaymentDetailDOList);}//訂單費用信息List<OrderAmountDO> orderAmountDOList = newOrderDataHolder.getOrderAmountDOList();if (!orderAmountDOList.isEmpty()) {orderAmountDAO.saveBatch(orderAmountDOList);}//訂單費用明細List<OrderAmountDetailDO> orderAmountDetailDOList = newOrderDataHolder.getOrderAmountDetailDOList();if (!orderAmountDetailDOList.isEmpty()) {orderAmountDetailDAO.saveBatch(orderAmountDetailDOList);}//訂單狀態變更日志信息List<OrderOperateLogDO> orderOperateLogDOList = newOrderDataHolder.getOrderOperateLogDOList();if (!orderOperateLogDOList.isEmpty()) {orderOperateLogDAO.saveBatch(orderOperateLogDOList);}//訂單快照數據List<OrderSnapshotDO> orderSnapshotDOList = newOrderDataHolder.getOrderSnapshotDOList();if (!orderSnapshotDOList.isEmpty()) {orderSnapshotDAO.saveBatch(orderSnapshotDOList);}}//新增主訂單信息訂單private FullOrderData addNewMasterOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) {NewOrderBuilder newOrderBuilder = new NewOrderBuilder(createOrderRequest, productSkuList, calculateOrderAmountDTO, orderProperties);FullOrderData fullOrderData = newOrderBuilder.buildOrder().buildOrderItems().buildOrderDeliveryDetail().buildOrderPaymentDetail().buildOrderAmount().buildOrderAmountDetail().buildOperateLog().buildOrderSnapshot().build();//訂單信息OrderInfoDO orderInfoDO = fullOrderData.getOrderInfoDO();//訂單條目信息List<OrderItemDO> orderItemDOList = fullOrderData.getOrderItemDOList();//訂單費用信息List<OrderAmountDO> orderAmountDOList = fullOrderData.getOrderAmountDOList();//補全地址信息OrderDeliveryDetailDO orderDeliveryDetailDO = fullOrderData.getOrderDeliveryDetailDO();String detailAddress = getDetailAddress(orderDeliveryDetailDO);orderDeliveryDetailDO.setDetailAddress(detailAddress);//補全訂單狀態變更日志OrderOperateLogDO orderOperateLogDO = fullOrderData.getOrderOperateLogDO();String remark = "創建訂單操作0-10";orderOperateLogDO.setRemark(remark);//補全訂單商品快照信息List<OrderSnapshotDO> orderSnapshotDOList = fullOrderData.getOrderSnapshotDOList();for (OrderSnapshotDO orderSnapshotDO : orderSnapshotDOList) {//優惠券信息if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_COUPON.getCode())) {...}//訂單費用信息else if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_AMOUNT.getCode())) {orderSnapshotDO.setSnapshotJson(JsonUtil.object2Json(orderAmountDOList));}//訂單條目信息else if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_ITEM.getCode())) {orderSnapshotDO.setSnapshotJson(JsonUtil.object2Json(orderItemDOList));}}return fullOrderData;}...
}
(6)發送延遲消息到MQ
@Service
public class OrderServiceImpl implements OrderService {...//發送支付訂單超時延遲消息,用于支付超時自動關單private void sendPayOrderTimeoutDelayMessage(CreateOrderRequest createOrderRequest) {PayOrderTimeoutDelayMessage message = new PayOrderTimeoutDelayMessage();message.setOrderId(createOrderRequest.getOrderId());message.setBusinessIdentifier(createOrderRequest.getBusinessIdentifier());message.setCancelType(OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode());message.setUserId(createOrderRequest.getUserId());message.setOrderType(createOrderRequest.getOrderType());message.setOrderStatus(OrderStatusEnum.CREATED.getCode());String msgJson = JsonUtil.object2Json(message);defaultProducer.sendMessage(RocketMqConstant.PAY_ORDER_TIMEOUT_DELAY_TOPIC, msgJson,RocketDelayedLevel.DELAYED_30m, "支付訂單超時延遲消息");}...
}@Component
public class DefaultProducer {private final DefaultMQProducer producer;@Autowiredpublic DefaultProducer(RocketMQProperties rocketMQProperties) {producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP);producer.setNamesrvAddr(rocketMQProperties.getNameServer());start();}//對象在使用之前必須要調用一次,只能初始化一次public void start() {try {this.producer.start();} catch (MQClientException e) {log.error("producer start error", e);}}...//發送消息public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) {Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));try {if (delayTimeLevel > 0) {msg.setDelayTimeLevel(delayTimeLevel);}SendResult send = producer.send(msg);if (SendStatus.SEND_OK == send.getSendStatus()) {log.info("發送MQ消息成功, type:{}, message:{}", type, message);} else {throw new OrderBizException(send.getSendStatus().toString());}} catch (Exception e) {log.error("發送MQ消息失敗:", e);throw new OrderBizException(OrderErrorCodeEnum.SEND_MQ_FAILED);}}...
}
2.生單鏈路中可能會出現數據不一致的問題
在更新優惠券本地事務、更新庫存本地事務、插入訂單數據本地事務中,可能會出現優惠券和庫存都已經更新成功了,但訂單數據卻插入失敗,此時就會出現數據不一致的問題。
3.Seata AT模式下的分布式事務的原理
說明一:需要部署一個Seata Server服務器。
說明二:在各個服務的分支事務的數據庫中,需要新增一張undo_log表,用來記錄各個服務的分支事務失敗時可以執行的回滾SQL。
說明三:當入口服務開啟一個分布式事務時,需要向Seata Server開啟一個全局事務。
說明四:各個服務對其分支事務的執行情況會同步給Seata Server服務器。
說明五:當Seata Server發現某分支事務失敗時,便會通知各服務進行事務回滾。
說明六:當各個服務進行事務回滾時,會從undo_log表查出對應SQL去執行。
4.Seata AT模式下的分布式事務的讀寫隔離原理
為了避免有其他線程修改某數據后又進行回滾,就一定要加本地鎖 + 全局鎖。本地鎖是為了避免當前機器的其他線程對數據進行修改并回滾,全局鎖是為了避免分布式機器的線程對數據進行修改并回滾。
服務A在更新某數據之前,需要先獲取本地鎖。服務A在成功獲取本地鎖之后,需要插入undo log數據。接著,服務A需要向Seata Server服務器獲取全局鎖。服務A在成功獲取全局鎖之后,會提交本地事務并釋放本地鎖。
如果服務A對服務B進行RPC調用并提交其本地事務,則繼續按前面的步驟處理服務B的數據更新。當服務B的本地事務也提交完成后,不需要繼續執行其他分支事務了,服務A便可以提交分布式事務,并釋放全局鎖。
分布式事務的全鏈路在執行完畢前,對應數據的全局鎖是不會釋放的。
5.Seata AT模式下的死鎖問題以及超時機制
Seata的一條事務鏈路里,每一個事務都會按如下順序執行:首先獲取本地鎖更新本地數據,然后插入undo log記錄,接著獲取本地數據對應的全局鎖,最后提交本地事務并釋放本地鎖。
Seata的一條事務鏈路里,一個事務執行完就會繼續執行下一個事務。如果事務鏈路里的所有事務都執行完成了,那么就提交事務,并釋放全局鎖。如果某個事務需要回滾,那么就需要獲取該事務本地數據的本地鎖,然后獲取undo log記錄生成逆向操作的SQL語句來進行補償和更新,補償完畢才能釋放本地數據的全局鎖。
由于Seata AT模式的寫隔離是通過本地數據的全局鎖來實現的,所以寫隔離的過程中,就涉及到了本地數據的本地鎖和全局鎖兩把鎖,這時候就很容易導致出現死鎖的情況。
比如當事務1的分支事務提交數據1的本地事務后,會釋放數據1的本地鎖。此時事務2的分支事務就可以獲取數據1的本地鎖,但要等待獲取事務1釋放數據1的全局鎖后,才能釋放數據1的本地鎖。如果事務1的后續分支事務出現異常需要進行回滾,那么事務1就需要獲取數據1的本地鎖,執行回滾補償處理。事務1執行完分支事務的回滾補償處理后,才能釋放數據1的全局鎖。
于是就出現了這樣的死鎖場景:事務1對數據1的回滾,占用了數據1的全局鎖,需等待獲取數據1的本地鎖。事務2對數據1的更新,占用了數據1的本地鎖,需等待獲取數據1的全局鎖。
Seata為了解決這個問題,會引入等待全局鎖的超時機制。如果事務2在等待數據1的全局鎖時出現超時,就會釋放其占用的本地鎖。從而讓事務1能獲取到數據1的本地鎖,完成其事務操作,而不用一直等待。
6.Seata AT模式下的讀寫隔離機制的影響
由于全局鎖的存在,會嚴重影響Seata AT分布式事務的并發吞吐量。所以除非是金融級別的系統,才會使用像Seata AT模式這么嚴格的事務來保證數據的強一致性。
當然,通常情況下分布式事務基本都是更新不同的數據。只要更新不同的數據,那么Seata AT分布式事務也不會出現全局鎖等待。只有一些特殊情況下,才可能會出現大量分布式事務更新同一條數據。當使用Seata AT分布式事務時,特別注意盡量不要讓全局鎖等待。
如果不使用全局鎖,那么Seata AT模式的分布式事務就會出現寫未提交。就可能出現分支事務更新失敗時無法回滾,因為回滾的數據已被覆蓋。
Seata AT模式的分布式事務默認是讀未提交的,即分布式事務在未提交前,分支事務更新的數據是可被其他事務讀到的。
此外,很多公司都是使用基于RocketMQ的柔性事務來實現分布式事務。
7.生單鏈路使用Seata AT模式的具體步驟
(1)生單鏈路中分布式事務的主要分支事務
(2)訂單系統 + 優惠券系統 + 商品庫存系統都需要在pom.xml文件中引入Seata
(3)訂單系統的生單接口作為分布式事務入口需添加@GlobalTransactional注解開啟全局事務
(4)優惠券系統的鎖定優惠券接口需添加Spring的事務注解@Transactional
(5)商品庫存系統的鎖定庫存接口需添加Spring的事務注解@Transactional
(6)各分支事務操作的數據庫需要添加undo log表
(1)生單鏈路中分布式事務的主要分支事務
分布式事務入口:訂單系統的生單接口
分支事務1:優惠券系統鎖定優惠券
分支事務2:商品庫存系統鎖定商品庫存
分支事務3:訂單系統創建訂單數據
(2)訂單系統 + 優惠券系統 + 商品庫存系統都需要在pom.xml文件中引入Seata
<!-- 引入seata整合分布式事務 -->
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId></exclusion></exclusions>
</dependency><!-- 跟安裝的seata-server需要保持版本一致 -->
<dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.3.0</version>
</dependency>
(3)訂單系統的生單接口作為分布式事務入口需添加@GlobalTransactional注解開啟全局事務
通過添加Seata提供的注解@GlobalTransactional來開啟全局事務。
@Service
public class OrderServiceImpl implements OrderService {...//提交訂單/生成訂單接口//@param createOrderRequest 提交訂單請求入參//@return 訂單號@GlobalTransactional(rollbackFor = Exception.class)@Overridepublic CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) {//1.入參檢查checkCreateOrderRequestParam(createOrderRequest);//2.風控檢查checkRisk(createOrderRequest);//3.獲取商品信息List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest);//4.計算訂單價格CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList);//5.驗證訂單實付金額checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO);//6.鎖定優惠券lockUserCoupon(createOrderRequest);//7.鎖定商品庫存lockProductStock(createOrderRequest);//8.生成訂單到數據庫addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);//9.發送訂單延遲消息用于支付超時自動關單sendPayOrderTimeoutDelayMessage(createOrderRequest);//返回訂單信息CreateOrderDTO createOrderDTO = new CreateOrderDTO();createOrderDTO.setOrderId(createOrderRequest.getOrderId());return createOrderDTO;}...
}
(4)優惠券系統的鎖定優惠券接口需添加Spring的事務注解@Transactional
通過添加Spring提供的@Transactional注解來開啟本地事務。Seata會代理Spring的事務,進行本地鎖申請 + undo log寫入 + 全局鎖請求 + 提交/回滾本地事務等操作。
@Service
public class CouponServiceImpl implements CouponService {...//鎖定用戶優惠券@Transactional(rollbackFor = Exception.class)@Overridepublic Boolean lockUserCoupon(LockUserCouponRequest lockUserCouponRequest) {//檢查入參checkLockUserCouponRequest(lockUserCouponRequest);String userId = lockUserCouponRequest.getUserId();String couponId = lockUserCouponRequest.getCouponId();CouponDO couponDO = couponDAO.getUserCoupon(userId, couponId);if (couponDO == null) {throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_NULL);}//判斷優惠券是否已經使用了if (CouponUsedStatusEnum.USED.getCode().equals(couponDO.getUsed())) {throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_USED);}couponDO.setUsed(CouponUsedStatusEnum.USED.getCode());couponDO.setUsedTime(new Date());couponDAO.updateById(couponDO);return true;}...
}
(5)商品庫存系統的鎖定庫存接口需添加Spring的事務注解@Transactional
通過添加Spring提供的@Transactional注解來開啟本地事務。Seata會代理Spring的事務,進行本地鎖申請 + undo log寫入 + 全局鎖請求 + 提交/回滾本地事務等操作。
@Service
public class InventoryServiceImpl implements InventoryService {...//鎖定商品庫存@Transactional(rollbackFor = Exception.class)@Overridepublic Boolean lockProductStock(LockProductStockRequest lockProductStockRequest) {//檢查入參checkLockProductStockRequest(lockProductStockRequest);List<LockProductStockRequest.OrderItemRequest> orderItemRequestList = lockProductStockRequest.getOrderItemRequestList();for (LockProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {String skuCode = orderItemRequest.getSkuCode();ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode);if (productStockDO == null) {throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_ERROR);}Integer saleQuantity = orderItemRequest.getSaleQuantity();//執行庫存扣減,并需要解決防止超賣的問題int nums = productStockDAO.lockProductStock(skuCode, saleQuantity);if (nums <= 0) {throw new InventoryBizException(InventoryErrorCodeEnum.LOCK_PRODUCT_SKU_STOCK_ERROR);}}return true;}...
}
(6)各分支事務操作的數據庫需要添加undo log表
訂單系統的數據庫 + 優惠券系統的數據庫 + 庫存系統的數據庫,都需要添加如下一張undo_log表,提供給Seata使用。
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
8.生單鏈路使用Seata AT模式時的原理流程
(1)undo log的生成
(2)生單鏈路使用Seata AT模式時的原理流程
(1)undo log的生成
首先根據更新字段查詢出前鏡像Before Image,然后進行本地事務的更新,接著根據更新字段查詢出更新后的后鏡像After Image,這樣就可以根據前后鏡像 + 執行SQL語句的基本信息拼成一條undo log。
分布式事務入口會向Seata Server注冊一個全局事務XID,分支事務會向Seata Server注冊一個分支事務Branch ID。
如下是Seata官網提供的一條undo log數據示例:
{"branchId": 641789253,"undoItems": [{"afterImage": {"rows": [{"fields": [{"name": "id","type": 4,"value": 1}, {"name": "name","type": 12,"value": "GTS"}, {"name": "since","type": 12,"value": "2014"}]}],"tableName": "product"},"beforeImage": {"rows": [{"fields": [{"name": "id","type": 4,"value": 1}, {"name": "name","type": 12,"value": "TXC"}, {"name": "since","type": 12,"value": "2014"}]}],"tableName": "product"},"sqlType": "UPDATE"}],"xid": "xid:xxx"
}
(2)生單鏈路使用Seata AT模式時的原理流程
9.生單鏈路使用Seata AT模式時的并發問題
(1)在鎖定優惠券環節不存在并發獲取全局鎖問題
(2)在鎖定庫存環節存在并發獲取全局鎖問題
(3)在生成訂單環節不存在并發獲取全局鎖問題
生單鏈路中的分布式事務環節在于:鎖定優惠券 + 鎖定庫存 + 生成訂單。
(1)在鎖定優惠券環節不存在并發獲取全局鎖問題
每個用戶都會有屬于自己的優惠券。日常情況下,都是不同的用戶使用不同的優惠券購買商品,所以并不會出現并發獲取同一條優惠券數據的全局鎖的情況。
(2)在鎖定庫存環節存在并發獲取全局鎖問題
對于爆品或秒殺,大量用戶可能都會基于某商品進行下單扣減庫存,因此會出現并發獲取同一個SKU數據的全局鎖。
第一個獲取到某SKU數據的全局鎖的事務,在進行生成訂單環節由于需要插入多條SQL,所以可能會比較耗時,從而導致并發等待獲取該SKU數據的全局鎖的其他事務等待時間過長。
(3)在生成訂單環節不存在并發獲取全局鎖問題
生成訂單環節,涉及到多條SQL的插入操作,也存在耗時的風險。
10.生單鏈路如何解決庫存全局鎖爭用問題
(1)鎖定庫存時的全局鎖爭用問題分析
(2)庫存分桶方案+柔性事務方案+Seata事務方案
(1)鎖定庫存時的全局鎖爭用問題分析
一個商品SKU就對應一條庫存數據記錄,如果大量用戶同時購買一個商品SKU,必然導致多個分布式事務都去競爭和等待同一個SKU庫存數據的全局鎖。
(2)庫存分桶方案+柔性事務方案+Seata事務方案
一.庫存分桶方案
一般一個SKU就一條庫存數據,在庫存分桶方案下,一個SKU會有多條庫存數據。比如1萬的庫存可分為1000條庫存數據,每條庫存數據可扣庫存為10。每次扣減庫存時,按照一定的規則和算法,選擇一個庫存分桶去進行扣減。
二.RocketMQ柔性事務方案
通過RocketMQ柔性事務方案來替換Seata剛性事務方案。在互聯網公司里,一般的業務系統,都使用RocketMQ柔性事務。大多情況下,RocketMQ柔性事務都能確保數據是一致的。
剛性事務指的是分支事務出現異常或者失敗,則全局回滾。柔性事務指的是分支事務出現異常或者失敗,則不斷重試直到成功。
使用RocketMQ柔性事務方案,需要確保消息成功被投遞到RocketMQ。
三.使用沒有全局鎖的分布式事務方案,比如TCC
Seata支持AT、TCC、Saga、XA這幾種事務方案。對于生單鏈路的建議是使用混合的分布式事務方案:鎖定營銷使用AT模式 + 鎖定庫存使用TCC模式。