go-zero(十七)結合DTM :實現分布式事務

1. 基礎概念介紹

1.1 什么是分布式事務

在微服務架構中,一個業務操作常常需要調用多個服務來完成。例如,在電商系統中下單時,需要同時操作訂單服務和庫存服務。這種跨服務的操作就需要分布式事務來保證數據一致性。

分布式事務面臨以下挑戰:

  • 多個服務間如何協調
  • 服務失敗如何回滾
  • 如何保證最終一致性
  • 網絡故障、服務不可用等問題處理

1.2 什么是DTM

DTM (Distributed Transaction Manager) 是一個開源的分布式事務管理器,它支持多種事務模式,如TCC、SAGA、XA等。DTM的特點包括:

  • 簡單易用的接口
  • 支持多種事務模式
  • 支持多種編程語言
  • 高可用、高性能設計

DTM讓開發者可以像使用本地事務一樣簡單地使用分布式事務,大大降低了開發難度。

1.3 分布式事務模式簡介

分布式事務有多種實現模式,本教程將介紹兩種常用模式:

SAGA模式
將一個大事務拆分為多個小事務,每個小事務都有對應的補償操作。如果某個小事務失敗,則執行已完成事務的補償操作,實現回滾。

TCC模式
將一個事務分為三個階段:Try(嘗試)、Confirm(確認)、Cancel(取消)。Try階段進行資源檢查和預留,Confirm階段確認執行,Cancel階段在Try失敗后進行取消操作。

這兩種模式各有優缺點,適用于不同的業務場景。

1.4分布式事務模式對比

讓我們了解一下DTM支持的幾種事務模式:

事務模式使用場景優點缺點
SAGA長事務、低一致性要求實現簡單,可靠性高最終一致性,補償邏輯復雜
TCC對一致性要求高強一致性,性能好實現復雜,需要多個接口
XA標準事務處理強一致性,使用簡單性能較差,鎖定資源時間長
2階段(2pc)消息可靠消息投遞簡單易用,適合異步最終一致性

2. 項目概述

2.1 業務場景

本教程將實現一個簡化的電商下單場景,涉及兩個核心服務:

  1. 訂單服務:負責創建和管理訂單
  2. 庫存服務:負責管理商品庫存

業務流程如下:

  • 用戶下單時,需要創建訂單并扣減庫存
  • 如果任一操作失敗,需要回滾所有操作,保證數據一致性

我們將使用兩種不同的分布式事務模式來實現這個場景:

  • 使用TCC模式處理訂單服務
  • 使用SAGA模式處理庫存服務

3. 環境準備

3.1 前置環境

首先,要先把以下環境部署好:

  1. Go(1.16+)
  2. Go-Zero
  3. MySQL(5.7+):用于存儲數據和DTM事務信息
  4. etcd:用于服務發現

3.2DTM服務部署

Docker快速啟動
DTM 服務可以通過 Docker 快速啟動:

# 拉取 DTM 鏡像
docker pull yedf/dtm:latest# 啟動 DTM 服務
docker run -it --name dtm -p 36789:36789 -p 36790:36790 -e STORE_DRIVER=mysql -e STORE_HOST=mysql:3306 -e STORE_USER=root -e STORE_PASSWORD=123456 -e STORE_PORT=3306 yedf/dtm:latest

注意:如果環境都是使用docker部署,網絡要使用同一個。

本地部署
或者拉取源碼啟動:

git clone https://github.com/dtm-labs/dtm

源碼拉取成功后,我們需要修改conf.sample.yml文件,把服務注冊到ETCD,把下面的設置取消注釋:

 MicroService: # gRPC/HTTP based microservice configDriver: 'dtm-driver-gozero' # name of the driver to handle register/discoverTarget: 'etcd://localhost:2379/dtmservice' # register dtm server to this urlEndPoint: 'localhost:36790'Store:Driver: 'mysql'Host: 'localhost'User: 'root'Password: '123456'Port: 3306DB: 'dtm'  

然后使用這個配置啟動服務

go run mian.go -c conf.sample.yml

部署完成后,可以通過瀏覽器訪問 http://localhost:36789 ,來查看管理頁面 ,這個頁面可以很直觀的查看到,分布式事務是否成功,以及使用的哪些事務模式。

4.使用SAGA模式

我們先從簡單的SAGA模式開始,實現庫存服務的扣減和補償操作。

4.1 SAGA模式介紹

SAGA模式是一種分布式事務解決方案,將一個大事務拆分為多個小事務,每個小事務都有對應的補償操作。

SAGA的流程如下:

  1. 執行一系列正向操作(如扣減庫存)
  2. 如果任一操作失敗,則執行已完成操作的補償操作(如增加庫存)

SAGA適用于可以通過補償操作回滾的業務場景。

4.2 數據庫設計

為了演示分布式事務,我們需要創建三個數據庫:訂單數據庫、庫存數據庫和DTM數據庫。

訂單數據庫:

CREATE DATABASE IF NOT EXISTS dtm_order;
USE dtm_order;CREATE TABLE `orders` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`user_id` bigint(20) NOT NULL COMMENT '用戶ID',`product_id` bigint(20) NOT NULL COMMENT '產品ID',`amount` int(11) NOT NULL COMMENT '數量',`money` decimal(10,2) NOT NULL COMMENT '金額',`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '狀態: 0-創建 1-完成',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

庫存數據庫:

CREATE DATABASE IF NOT EXISTS dtm_stock;
USE dtm_stock;CREATE TABLE `stock` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`product_id` bigint(20) NOT NULL COMMENT '產品ID',`amount` int(11) NOT NULL COMMENT '庫存數量',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uniq_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;-- 插入測試數據
INSERT INTO `stock` (`product_id`, `amount`) VALUES (1, 100);
INSERT INTO `stock` (`product_id`, `amount`) VALUES (2, 200);

DTM數據庫:

CREATE DATABASE IF NOT EXISTS dtm;

DTM會自動創建所需的表,無需手動創建。

注意: 創建的數據庫要和dtm配置文件中指定的數據庫要一致

在這里插入圖片描述

4.3 庫存服務實現

首先,創建庫存服務的Proto定義文件stock/stock.proto:

syntax = "proto3";package stock;
option go_package="./stock";// 庫存服務
service Stock {// 扣減庫存rpc DeductStock(DeductStockReq) returns (DeductStockResp);// 補償庫存rpc CompensateStock(CompensateStockReq) returns (CompensateStockResp);
}// 扣減庫存請求
message DeductStockReq {int64 productId = 1;  // 產品IDint32 amount = 2;     // 數量
}// 扣減庫存響應
message DeductStockResp {bool success = 1;     // 是否成功string message = 2;   // 消息
}// 補償庫存請求
message CompensateStockReq {int64 productId = 1;  // 產品IDint32 amount = 2;     // 數量
}// 補償庫存響應
message CompensateStockResp {bool success = 1;     // 是否成功string message = 2;   // 消息
}

切換到 rpc/stock/目錄下,使用goctl生成代碼:

cd stock
goctl rpc protoc stock.proto --go_out=. --go-grpc_out=. --zrpc_out=.

修改庫存服務的配置文件stock/etc/stock.yaml:

Name: stock.rpc
ListenOn: 0.0.0.0:9002
Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(localhost:3306)/dtm_stock?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghaiDTM: etcd://localhost:2379/dtmservice

修改配置結構stock/internal/config/config.go:

package configimport ("github.com/zeromicro/go-zero/zrpc"
)type Config struct {zrpc.RpcServerConf//增加數據庫字段DB struct {DataSource string}DTM string // DTM服務地址
}

修改服務上下文stock/internal/svc/service_context.go:

package svcimport ("github.com/Mikaelemmmm/gozerodtm/stock/internal/config""github.com/Mikaelemmmm/gozerodtm/stock/internal/model""github.com/zeromicro/go-zero/core/stores/sqlx"
)type ServiceContext struct {Config     config.ConfigStockModel model.StockModel  // 庫存模型
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config:     c,StockModel: model.NewStockModel(sqlx.NewMysql(c.DB.DataSource)),  // 初始化庫存模型}
}

切換到 stock/model 目錄下,使用goctl工具生成model代碼:

goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock" -dir .

生成代碼后,到stock/model/stockmodel.go,增加DeductStockAddStock方法:

package modelimport ("database/sql""fmt""time""github.com/zeromicro/go-zero/core/stores/sqlx"
)var _ StockModel = (*customStockModel)(nil)/*
....
*/type (// StockModel 是stock表的模型StockModel interface {// DeductStock 扣減庫存DeductStock(productId int64, amount int32) error//AddStock 增加庫存(用于補償)AddStock(productId int64, amount int32) error}// DeductStock 扣減庫存
func (m *customStockModel) DeductStock(productId int64, amount int32) error {query := fmt.Sprintf("update %s set amount = amount - ? where product_id = ? and amount >= ?", m.table)result, err := m.conn.Exec(query, amount, productId, amount)if err != nil {return err}affected, err := result.RowsAffected()if err != nil {return err}if affected == 0 {return fmt.Errorf("庫存不足")}return nil
}// AddStock 增加庫存(用于補償)
func (m *customStockModel) AddStock(productId int64, amount int32) error {query := fmt.Sprintf("update %s set amount = amount + ? where product_id = ?", m.table)_, err := m.conn.Exec(query, amount, productId)return err
}

實現扣減庫存邏輯stock/internal/logic/deduct_stock_logic.go:

package logicimport ("context""github.com/dtm-labs/dtm/client/dtmcli""google.golang.org/grpc/codes""google.golang.org/grpc/status""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)
/*
....
*/
// 扣減庫存
func (l *DeductStockLogic) DeductStock(in *stock.DeductStockReq) (*stock.DeductStockResp, error) {// todo: add your logic here and delete this lineerr := l.svcCtx.StockModel.DeductStock(in.ProductId, in.Amount)if err != nil {logx.Errorf("扣減庫存失敗: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回錯誤}return &stock.DeductStockResp{Success: true,Message: "扣減庫存成功",}, nil
}

需要注意的是, 如果事務執行失敗,返回的是RPC的狀態碼 codes.Aborted 和 事務結果dtmcli.ResultFailure ,如果直接返回錯誤的,DTM是不會進行事務補償的。

return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) 
  • Aborted:表示操作被中止,通常是由于并發問題,如序列檢查失敗、事務中止等。

  • ResultFailuredtmimp.ResultFailure)表示事務或事務分支執行失敗。當事務或事務分支在執行過程中遇到錯誤,例如數據庫插入失敗、服務調用超時等,會返回失敗結果。這個結果會觸發全局事務管理器進行相應的處理,如回滾事務。

后面會單獨介紹下gRPC狀態碼和dtmcli的事務狀態。

實現補償庫存邏輯stock/internal/logic/compensate_stock_logic.go:

package logicimport ("context""fmt""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)/*
....
*/// 補償庫存
func (l *CompensateStockLogic) CompensateStock(in *stock.CompensateStockReq) (*stock.CompensateStockResp, error) {// todo: add your logic here and delete this linefmt.Println("收到補償庫存請求:", in.ProductId, in.Amount)err := l.svcCtx.StockModel.AddStock(in.ProductId, in.Amount)if err != nil {logx.Errorf("補償庫存失敗: %v", err)return &stock.CompensateStockResp{Success: false,Message: err.Error(),}, nil}return &stock.CompensateStockResp{Success: true,Message: "補償庫存成功",}, nil
}

4.4 SAGA調用示例

創建test_saga.go,演示如何使用SAGA模式調用庫存服務:

package mainimport ("dtm_demo/rpc/stock/stock""log""github.com/dtm-labs/client/dtmgrpc""github.com/zeromicro/go-zero/core/discov""github.com/zeromicro/go-zero/zrpc"
)func main() {// 初始化客戶端stockRpcConf := zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"localhost:2379"},Key:   "stock.rpc",},}client := zrpc.MustNewClient(stockRpcConf)client.Conn()// DTM服務地址dtmServer := "etcd://localhost:2379/dtmservice"// 創建請求stockReq := &stock.DeductStockReq{ProductId: 1,Amount:    10,  //設置庫存}// 生成全局事務IDgid := dtmgrpc.MustGenGid(dtmServer)// 創建SAGA事務saga := dtmgrpc.NewSagaGrpc(dtmServer, gid)//stockRpcBusiServer 實際上就是自動獲取庫存服務地址 etcd://localhost:2379/stock.rpcstockRpcBusiServer, err := stockRpcConf.BuildTarget()if err != nil {log.Fatal(err)}saga.Add(stockRpcBusiServer+"/stock.Stock/DeductStock", stockRpcBusiServer+"/stock.Stock/CompensateStock", stockReq)// 提交事務err = saga.Submit()if err != nil {log.Fatalf("SAGA事務提交失敗: %v", err)}
}

需要注意的是 /stock.Stock/DeductStock/stock.Stock/CompensateStock 這個路徑是區分大小寫的,一定要和goctl工具生成的一致

啟動庫存服務:

go run stock.go 

運行測試程序:

go run test_saga.go

如果一切正常,庫存將被成功扣減。如果服務出現異常(例如庫存不足),DTM會自動執行補償操作,增加庫存。

當我把 Amount 設置為 100 ,庫存服務提示庫存不足,并自動進行補償

在這里插入圖片描述

5. 使用TCC模式

接下來,我們使用更復雜的TCC模式實現訂單服務。

5.1 TCC模式介紹

TCC(Try-Confirm-Cancel)是一種更靈活的分布式事務模式,它將一個事務分為三個階段:

  1. Try:資源檢查和預留,但不真正執行業務操作
  2. Confirm:確認執行業務操作,只有當所有Try都成功時執行
  3. Cancel:取消操作,當任一Try失敗時執行

TCC模式比SAGA模式更為靈活,但也更復雜,需要業務自行實現三個階段的處理邏輯。

5.2 數據庫設計

orderstock,仍然使用SAGA演示中的數據庫表,在dtm_stock數據庫中添加庫存凍結表stock_tcc.sql:


USE dtm_stock;
CREATE TABLE stock_tcc (id bigint(20) NOT NULL AUTO_INCREMENT,product_id bigint(20) NOT NULL COMMENT '產品ID',freeze_amount int(11) NOT NULL COMMENT '凍結數量',transaction_id varchar(64) NOT NULL COMMENT '事務ID',create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY uniq_trans_product (`transaction_id`, `product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

5.3 Tcc庫存服務實現

因為我們現在要實現TCC模式,需要使用到 try , confirm ,cancel 三個方法,所以需要重新定義庫存服務:

syntax = "proto3";package pb;
option go_package="./pb";// 庫存查詢請求
message ProductStockReq {int64 product_id = 1;
}// 庫存查詢響應
message ProductStockResp {int64 product_id = 1;int32 amount = 2;
}// 扣減庫存請求
message DeductStockReq {int64 product_id = 1;int32 amount = 2;
}// 扣減庫存響應
message DeductStockResp {bool success = 1;
}// TCC事務相關
message TccReq {string transaction_id = 1;int64 product_id = 2;int32 amount = 3;float money = 4;
}message TccResp {bool success = 1;
}// 庫存服務
service Stock {// 查詢商品庫存rpc GetProductStock(ProductStockReq) returns (ProductStockResp);// 普通扣減庫存(非TCC模式)//rpc DeductStock(DeductStockReq) returns (DeductStockResp);// TCC模式相關接口rpc TryDeductStock(TccReq) returns (TccResp);     // Try階段rpc ConfirmDeductStock(TccReq) returns (TccResp); // Confirm階段rpc CancelDeductStock(TccReq) returns (TccResp);  // Cancel階段
}

切換到/stock下,使用goctl生成代碼:

 goctl rpc protoc stock.proto --go_out=. --go-grpc_out=. --zrpc_out=.

切換到 stock_svr/internal/model 目錄下,生成model代碼:

goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock" -dir .goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock_tcc" -dir .

修改stock.yaml:

Name: stock.rpc
ListenOn: 0.0.0.0:8080
Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(127.0.0.1:33069)/dtm_stock?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai

修改stock_svr/internal/config/config.go:

type Config struct {zrpc.RpcServerConfDB struct {DataSource string}
}

修改stock_svr/internal/svc/servicecontext.go:

type ServiceContext struct {Config        config.ConfigStockModel    model.StockModel // 庫存模型StockTccModel model.StockTccModel
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config:        c,StockModel:    model.NewStockModel(sqlx.NewMysql(c.DB.DataSource)),    // 初始化庫存模型StockTccModel: model.NewStockTccModel(sqlx.NewMysql(c.DB.DataSource)), // 初始化庫存模型}
}

修改stock_svr/internal/model/stocktccmodel.go 增加新方法:

type (StockTccModel interface {stockTccModelwithSession(session sqlx.Session) StockTccModel//增加DeleteByTransProductId 方法DeleteByTransProductId(ctx context.Context, transactionId string, productId int64) error}
/***/// DeleteByTransProductId 根據事務ID和商品ID刪除記錄
func (m *customStockTccModel) DeleteByTransProductId(ctx context.Context, transactionId string, productId int64) error {query := fmt.Sprintf("delete from %s where transaction_id = ? and product_id = ?", m.table)_, err := m.conn.ExecCtx(ctx, query, transactionId, productId)return err
}

修改stock_svr/internal/logic/trydeductstocklogic.go 實現Try接口:


// TCC模式相關接口
func (l *TryDeductStockLogic) TryDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("嘗試扣減庫存")//查詢商品庫存stockInfo, err := l.svcCtx.StockModel.FindOneByProductId(l.ctx, in.ProductId)if err != nil && err != model.ErrNotFound {//return &pb.TccResp{Success: false}, fmt.Errorf("沒有該商品")//return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)//!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試//這時候就不要返回codes.Aborted, dtmcli.ResultFailure ,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}// 檢查庫存是否足夠if stockInfo.Amount < int64(in.Amount) {//return &pb.TccResp{Success: false}, fmt.Errorf("庫存不足")//如果庫存不足,直接返回失敗,讓dtm回滾//返回 codes.Aborted, dtmcli.ResultFailure 才能回滾return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}//如果錢小于100,直接返回失敗,讓dtm回滾if in.Money < 100 {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 扣減庫存stockInfo.Amount -= int64(in.Amount)err = l.svcCtx.StockModel.Update(l.ctx, stockInfo)if err != nil {//return &pb.TccResp{Success: false}, fmt.Errorf("庫存扣除失敗")//如果庫存扣除失敗,直接返回失敗,不要讓它一直重試,讓dtm回滾 ,return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 記錄凍結庫存,用于回滾freeze := &model.StockTcc{ProductId:     in.ProductId,FreezeAmount:  int64(in.Amount),TransactionId: in.TransactionId,}_, err = l.svcCtx.StockTccModel.Insert(l.ctx, freeze)if err != nil {// 如果凍結記錄失敗,回滾庫存stockInfo.Amount += int64(in.Amount)_ = l.svcCtx.StockModel.Update(l.ctx, stockInfo)//return &pb.TccResp{Success: false}, fmt.Errorf("記錄凍結庫存失敗: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}return &pb.TccResp{Success: true}, nil
}

修改stock_svr/internal/logic/confirmdeductstocklogic.go,實現 confirm 接口:

// Confirm階段:確認扣減庫存
func (l *ConfirmDeductStockLogic) ConfirmDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("Confirm階段:確認扣減庫存")err := l.svcCtx.StockTccModel.DeleteByTransProductId(l.ctx, in.TransactionId, in.ProductId)if err != nil {logx.Errorf("確認扣減庫存失敗: %v", err)// 這里返回失敗,DTM會進行重試//return &pb.TccResp{Success: false}, errreturn nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}return &pb.TccResp{Success: true}, nil
}

修改stock_svr/internal/logic/canceldeductstocklogic.go,實現 cancel方法:


func (l *CancelDeductStockLogic) CancelDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("取消扣減庫存")// 查找凍結記錄freeze, err := l.svcCtx.StockTccModel.FindOneByTransactionIdProductId(l.ctx, in.TransactionId, in.ProductId)if err != nil {if err == model.ErrNotFound {// 如果沒有找到凍結記錄,可能是已經處理過了,直接返回成功return &pb.TccResp{Success: true}, nil}/*在 TCC(Try-Confirm-Cancel)事務模式里,Cancel 方法本身的作用就是進行事務回滾,所以一般不需要再在 Cancel 方法里額外執行事務回滾操作*/return &pb.TccResp{Success: false}, fmt.Errorf("查詢凍結記錄失敗: %v", err)//return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 恢復庫存stockInfo, err := l.svcCtx.StockModel.FindOneByProductId(l.ctx, in.ProductId)if err != nil {return &pb.TccResp{Success: false}, fmt.Errorf("查詢商品庫存失敗: %v", err)}stockInfo.Amount += freeze.FreezeAmounterr = l.svcCtx.StockModel.Update(l.ctx, stockInfo)if err != nil {return &pb.TccResp{Success: false}, fmt.Errorf("恢復庫存失敗: %v", err)}// 刪除凍結記錄err = l.svcCtx.StockTccModel.Delete(l.ctx, freeze.Id)if err != nil {logx.Errorf("刪除凍結記錄失敗: %v", err)// 雖然刪除凍結記錄失敗,但庫存已經恢復,下次重試時不會重復恢復庫存}return &pb.TccResp{Success: true}, nil
}

注意錯誤返回方法:

  1. codes.Aborted 一般表示操作被中止,而 dtmcli.ResultFailure 告知 DTM 操作失敗,需要回滾整個事務。當 Confirm 階段出現問題,且希望 DTM 回滾整個事務時,適合用這個返回。

適用場景:
確認操作無法完成,比如數據庫記錄不存在、數據狀態不一致等,這種情況下需要 DTM 觸發 Cancel 階段來回滾事務。

  1. status.Error(codes.Internal, err.Error())
    codes.Internal 表示服務器內部出錯,一般意味著系統出現了臨時故障,DTM 會持續重試該操作,直至成功或者達到最大重試次數。

適用場景:
操作失敗是由臨時問題引起的,像數據庫連接暫時中斷、網絡抖動等,這種情況下希望 DTM 重試操作,直至成功。

5.4 訂單服務Api實現

首先,創建訂單服務的Proto定義文件order/order.api:

```syntax = "v1"type (CreateOrderReq {UserId    int64 `json:"userId"`ProductId int64 `json:"productId"`Amount    int   `json:"amount"`Money      float64 `json:"money"`}CreateOrderResp {OrderId int64 `json:"orderId"`}OrderInfo {Id         int64   `json:"id"`UserId     int64   `json:"userId"`ProductId  int64   `json:"productId"`Amount     int     `json:"amount"`Money      float64 `json:"money"`Status     int     `json:"status"`CreateTime string  `json:"createTime"`}GetOrderReq {OrderId int64 `json:"orderId"`}GetOrderResp {Order OrderInfo `json:"order"`}
)service Order {@handler CreateOrderpost /api/order/create (CreateOrderReq) returns (CreateOrderResp)@handler GetOrderpost /api/order/get (GetOrderReq) returns (GetOrderResp)
}

切換到api/order/下,使用goctl生成代碼:

goctl api go -api .\order.api -dir .    

切換到api/order/internal/model下, 生成model代碼:

goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_order" -table="orders" -dir .

修改order.yaml:

Name: Order
Host: 0.0.0.0
Port: 8888StockRpcConf:Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource:  root:123456@tcp(127.0.0.1:33069)/dtm_order?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghaiDTM: etcd://127.0.0.1:2379/dtmservice

修改order/internal/config/config.go:


type Config struct {rest.RestConfStockRpcConf zrpc.RpcClientConfDTM stringDB  struct {DataSource string}
}

修改order/internal/svc/servicecontext.go:


type ServiceContext struct {Config config.ConfigOrderModel model.OrdersModelStockRpc   stock.Stock
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config:     c,OrderModel: model.NewOrdersModel(sqlx.NewMysql(c.DB.DataSource)),StockRpc:   stock.NewStock(zrpc.MustNewClient(c.StockRpcConf)),}
}

修改order/internal/logic/createorderlogic.go :


func (l *CreateOrderLogic) CreateOrder(req *types.CreateOrderReq) (resp *types.CreateOrderResp, err error) {// todo: add your logic here and delete this line//使用tcc,讓try去檢查并扣減庫存,confirm和cancel去處理庫存的回滾/*// 查詢商品庫存stockResp, err := l.svcCtx.StockRpc.GetProductStock(l.ctx, &pb.ProductStockReq{ProductId: req.ProductId,})if err != nil && !errors.Is(err, model.ErrNotFound) {return nil, fmt.Errorf("查詢商品庫存失敗: %v", err)}fmt.Println(stockResp.Amount)if stockResp.Amount < int32(req.Amount) {return nil, fmt.Errorf("庫存不足")}*/order := &model.Orders{UserId:    req.UserId,ProductId: req.ProductId,Amount:    int64(req.Amount),Money:     req.Money,Status:    0, // 0-創建中}result, err := l.svcCtx.OrderModel.Insert(l.ctx, order)if err != nil {return nil, fmt.Errorf("創建訂單失敗: %v", err)}orderId, err := result.LastInsertId()if err != nil {return nil, fmt.Errorf("獲取訂單ID失敗: %v", err)}// 4. 使用DTM的TCC模式進行分布式事務處理dtmServer := l.svcCtx.Config.DTM // DTM服務地址stockServer, err := l.svcCtx.Config.StockRpcConf.BuildTarget()if err != nil {}// 生成全局事務IDgid := dtmgrpc.MustGenGid(dtmServer)// 創建TCC事務err = dtmgrpc.TccGlobalTransaction(dtmServer, gid, func(tcc *dtmgrpc.TccGrpc) error {req := &pb.TccReq{ProductId: req.ProductId,Amount:    int32(req.Amount),Money:     float32(req.Money),}r := &emptypb.Empty{} // 用于接收響應err := tcc.CallBranch(req,stockServer+"/pb.Stock/TryDeductStock",stockServer+"/pb.Stock/ConfirmDeductStock",stockServer+"/pb.Stock/CancelDeductStock",r)return err})if err != nil {return nil, fmt.Errorf("創建訂單失敗: %v", err)}// 5. 更新訂單狀態為已創建orderUpdate, err := l.svcCtx.OrderModel.FindOne(l.ctx, orderId)if err != nil {logx.Errorf("查詢訂單失敗: %v", err)// 這里不返回錯誤,因為已經扣減了庫存,訂單狀態可以通過其他方式修復} else {orderUpdate.Status = 1 // 1-已完成err = l.svcCtx.OrderModel.Update(l.ctx, orderUpdate)if err != nil {logx.Errorf("更新訂單狀態失敗: %v", err)// 同上,不返回錯誤}}return &types.CreateOrderResp{OrderId: orderId,}, nil
}

5.5 測試TCC

先啟動 stock 服務

go run stock.go

再啟動order api

go run order.go

創建請求:

curl -X POST -H "Content-Type: application/json" -d '{"userId": 1, "productId": 1, "amount": 1 , "money" :200.10}' http://localhost:8888/api/order/create

在這里插入圖片描述

去stock服務那邊查看日志,可以看到 先啟動try ,然后使用confirm
在這里插入圖片描述

解下模擬失敗請求,錢設置為99.9:

curl -X POST -H "Content-Type: application/json" -d '{"userId": 1, "productId": 1, "amount": 999, "money" :99.9}' http://localhost:8888/api/order/create

事務啟動cancel 進行回滾
在這里插入圖片描述

6.DTM屏障 ,Barrier機制

6.1 Barrier機制介紹

DTM的Barrier機制主要解決以下問題:

  1. 空補償問題:在Saga模式中,如果正向操作還未執行,補償操作就執行了,這是空補償。
  2. 空回滾問題:在TCC模式中,如果Try操作未執行,Cancel操作就執行了,這是空回滾。
  3. 懸掛問題:在分布式事務中,可能由于網絡延遲等原因,先收到了Cancel/Confirm操作,后收到Try操作,此時Try操作應該被拒絕,這是懸掛。
  4. 冪等: 由于任何一個請求都可能出現網絡異常,出現重復請求,所有的分布式事務分支操作,都需要保證冪等性

Barrier的主要原理是在數據庫中添加一個barrier表,并使用數據庫事務保證原子性,記錄每個全局事務、分支事務、操作類型的執行情況,從而防止以上問題的發生。

用Dtm官方提供的dtmcli.barrier.mysql.sql sql語句來創建dtm_barrier

create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,trans_type varchar(45) default '',gid varchar(128) default '',branch_id varchar(128) default '',op varchar(45) default '',barrier_id varchar(45) default '',reason varchar(45) default '' comment 'the branch type who insert this record',create_time datetime DEFAULT now(),update_time datetime DEFAULT now(),key(create_time),key(update_time),UNIQUE key(gid, branch_id, op, barrier_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

6.2 使用Barrier改進服務

我們以之前的sage的例子,做簡單的修改。

需要注意的是,在rpc的業務中,如果使用了barrier的話,那么在model中與db交互時候必須要用事務,并且一定要跟barrier用同一個事務

,到stock/model/stockmodel.go,修改DeductStockAddStock方法:

package modelimport ("database/sql""fmt""time""github.com/zeromicro/go-zero/core/stores/sqlx"
)var _ StockModel = (*customStockModel)(nil)/*
....
*/type (// StockModel 是stock表的模型StockModel interface {// DeductStock 扣減庫存DeductStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error)//AddStock 增加庫存(用于補償)AddStock(tx *sql.Tx, productId int64, amount int32)  (sql.Result, error)}// DeductStock 扣減庫存
func (m *customStockModel) DeductStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error) {query := fmt.Sprintf("update %s set amount = amount - ? where product_id = ? and amount >= ?", m.table)return tx.Exec(query, amount, productId, amount)
}// AddStock 增加庫存(用于補償)
func (m *customStockModel) AddStock(tx *sql.Tx, productId int64, amount int32)  (sql.Result, error){query := fmt.Sprintf("update %s set amount = amount + ? where product_id = ?", m.table)return tx.Exec(query, amount, productId)}

實現扣減庫存邏輯stock/internal/logic/deduct_stock_logic.go:

package logicimport ("context""github.com/dtm-labs/dtm/client/dtmcli""google.golang.org/grpc/codes""google.golang.org/grpc/status""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)
/*
....
*/
// 扣減庫存func (l *DeductStockLogic) DeductStock(in *stock.DeductStockReq) (*stock.DeductStockResp, error) {// todo: add your logic here and delete this line//barrier防止空補償、空懸掛等具體看dtm官網即可,別忘記加barrier表在當前庫中,因為判斷補償與要執行的sql一起本地事務barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()if err != nil {//!!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {_,err := l.svcCtx.StockModel.DeductStock(tx,in.ProductId, in.Amount)if err != nil {logx.Errorf("扣減庫存失敗: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回錯誤}return nil}); err != nil {//!!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}return &stock.DeductStockResp{Success: true,Message: "扣減庫存成功",}, nil
}

修改補償庫存邏輯stock/internal/logic/compensate_stock_logic.go:

package logicimport ("context""fmt""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)/*
....
*/// 補償庫存
func (l *CompensateStockLogic) CompensateStock(in *stock.CompensateStockReq) (*stock.CompensateStockResp, error) {// todo: add your logic here and delete this linefmt.Println("收到補償庫存請求:", in.ProductId, in.Amount)//barrier防止空補償、空懸掛等具體看dtm官網即可,別忘記加barrier表在當前庫中,因為判斷補償與要執行的sql一起本地事務barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()if err != nil {//!!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {_, err := l.svcCtx.StockModel.AddStock(tx, in.ProductId, in.Amount)if err != nil {logx.Errorf("補償庫存失敗: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回錯誤}return nil}); err != nil {//!!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}return &stock.CompensateStockResp{Success: true,Message: "補償庫存成功",}, nil
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/81089.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/81089.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/81089.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

2025 簡易Scrum指南(簡體中文版)

Scrum是一個輕量級的、以團隊為中心的框架&#xff0c;用于解決復雜的問題并創造價值。Scrum有意保持非完整性&#xff0c;Scrum的設計初衷旨在依靠使用者的集體智慧來不斷演進構建。 Scrum建立在實驗主義和精益思想的基礎上&#xff0c;它賦能團隊靈活巧妙地工作&#xff0c;…

2025最新福昕PDF編輯器,PDF萬能處理工具

軟件介紹 Foxit PDF Editor Pro 2025 中文特別版&#xff08;以前稱為 Foxit PhantomPDF Business&#xff09;是一款專為滿足各種辦公需求而設計的業務就緒的PDF工具包。 軟件特點 1. 強大的PDF編輯能力 創建新文檔&#xff1a;用戶可以從無到有地構建PDF文檔&#xff0c;添…

ollama的若干實踐

1. 本地ollama 1.1 本地安裝ollama 方法 1&#xff1a;手動檢查最新版本并下載 訪問 Ollama 的 GitHub Releases 頁面&#xff1a; 打開 https://github.com/ollama/ollama/releases 查看最新的穩定版本&#xff08;如 v0.7.0 或更高&#xff09; 手動下載最新版本&#xff08…

Spring Security源碼解析

秒懂SpringBoot之全網最易懂的Spring Security教程 SpringBoot整合Spring-Security 認證篇&#xff08;保姆級教程&#xff09; SpringBoot整合Spring Security【超詳細教程】 spring security 超詳細使用教程&#xff08;接入springboot、前后端分離&#xff09; Security 自…

LeetCode 3392.統計符合條件長度為 3 的子數組數目:一次遍歷模擬

【LetMeFly】3392.統計符合條件長度為 3 的子數組數目&#xff1a;一次遍歷模擬 力扣題目鏈接&#xff1a;https://leetcode.cn/problems/count-subarrays-of-length-three-with-a-condition/ 給你一個整數數組 nums &#xff0c;請你返回長度為 3 的 子數組&#xff0c;滿足…

讀論文筆記-CoOp:對CLIP的handcrafted改進

讀論文筆記-Learning to Prompt for Vision-Language Models Problems 現有基于prompt engineering的多模態模型在設計合適的prompt時有很大困難&#xff0c;從而設計了一種更簡單的方法來制作prompt。 Motivations prompt engineering雖然促進了視覺表示的學習&#xff0c…

從零構建 MCP Server 與 Client:打造你的第一個 AI 工具集成應用

目錄 &#x1f680; 從零構建 MCP Server 與 Client&#xff1a;打造你的第一個 AI 工具集成應用 &#x1f9f1; 1. 準備工作 &#x1f6e0;? 2. 構建 MCP Server&#xff08;服務端&#xff09; 2.1 初始化服務器 &#x1f9e9; 3. 添加自定義工具&#xff08;Tools&…

Django 自定義celery-beat調度器,查詢自定義表的Cron表達式進行任務調度

學習目標&#xff1a; 通過自定義的CronScheduler調度器在兼容標準的調度器的情況下&#xff0c;查詢自定義任務表去生成調度任務并分配給celery worker進行執行 不了解Celery框架的小伙伴可以先看一下我的上一篇文章&#xff1a;Celery框架組件分析及使用 學習內容&#xff…

藍橋杯 1. 確定字符串是否包含唯一字符

確定字符串是否包含唯一字符 原題目鏈接 題目描述 實現一個算法來識別一個字符串的字符是否是唯一的&#xff08;忽略字母大小寫&#xff09;。 若唯一&#xff0c;則輸出 YES&#xff0c;否則輸出 NO。 輸入描述 輸入一行字符串&#xff0c;長度不超過 100。 輸出描述 輸…

a-upload組件實現文件的上傳——.pdf,.ppt,.pptx,.doc,.docx,.xls,.xlsx,.txt

實現下面的上傳/下載/刪除功能&#xff1a;要求支持&#xff1a;【.pdf,.ppt,.pptx,.doc,.docx,.xls,.xlsx,.txt】 分析上面的效果圖&#xff0c;分為【上傳】按鈕和【文件列表】功能&#xff1a; 解決步驟1&#xff1a;上傳按鈕 直接上代碼&#xff1a; <a-uploadmultip…

.NET Core 數據庫ORM框架用法簡述

.NET Core ORM框架用法簡述 一、主流.NET Core ORM框架概述 在.NET Core生態系統中&#xff0c;主流的ORM(Object-Relational Mapping)框架包括&#xff1a; ??Entity Framework Core (EF Core)?? - 微軟官方推出的ORM框架??Dapper?? - 輕量級微ORM??Npgsql.Entit…

halcon打開圖形窗口

1、dev_open_window 參數如下&#xff1a; 1&#xff09;Row(輸入參數) y方向上&#xff0c;圖形窗口距離左上角頂端的像素個數 2&#xff09;Column(輸入參數) x方向上&#xff0c;距離左上角左邊的像素個數 3&#xff09;Width(輸入參數) 圖形窗口寬度 4&#xff09;He…

2025東三省D題深圳杯D題數學建模挑戰賽數模思路代碼文章教學

完整內容請看文章最下面的推廣群 一、問題一&#xff1a;混合STR圖譜中貢獻者人數判定 問題解析 給定混合STR圖譜&#xff0c;識別其中的真實貢獻者人數是后續基因型分離與個體識別的前提。圖譜中每個位點最多應出現2n個峰&#xff08;n為人數&#xff09;&#xff0c;但由…

iView Table 組件跨頁選擇功能實現文檔

iView Table 組件跨頁選擇功能實現文檔 功能概述 實現基于 iView Table 組件的多選功能&#xff0c;支持以下特性&#xff1a; ? 跨頁數據持久化選擇? 當前頁全選/取消全選? 自動同步選中狀態顯示? 分頁切換狀態保持? 高性能大數據量支持 實現方案 技術棧 iView UI 4…

家庭服務器IPV6搭建無限郵箱系統指南

qq郵箱操作 // 郵箱配置信息 // 注意&#xff1a;使用QQ郵箱需要先開啟IMAP服務并獲取授權碼 // 設置方法&#xff1a;登錄QQ郵箱 -> 設置 -> 賬戶 -> 開啟IMAP/SMTP服務 -> 生成授權碼 服務器操作 fetchmail 同步QQ郵箱 nginx搭建web顯示本地同步過來的郵箱 ssh…

Tauri v1 與 v2 配置對比

本文檔對比 Tauri v1 和 v2 版本的配置結構和內容差異&#xff0c;幫助開發者了解版本變更并進行遷移。 配置結構變化 v1 配置結構 {"package": { ... },"tauri": { "allowlist": { ... },"bundle": { ... },"security":…

對js的Date二次封裝,繼承了原Date的所有方法,增加了自己擴展的方法,可以實現任意時間往前往后推算多少小時、多少天、多少周、多少月;

封裝js時間工具 概述 該方法繼承了 js 中 Date的所有方法&#xff1b;同時擴展了一部分自用方法&#xff1a; 1、任意時間 往前推多少小時&#xff0c;天&#xff0c;月&#xff0c;周&#xff1b;參數1、2必填&#xff0c;參數3可選beforeDate(num,formatter,dateVal); befo…

TimeDistill:通過跨架構蒸餾的MLP高效長期時間序列預測

原文地址&#xff1a;https://arxiv.org/abs/2502.15016 發表會議&#xff1a;暫定&#xff08;但是Star很高&#xff09; 代碼地址&#xff1a;無 作者&#xff1a;Juntong Ni &#xff08;倪浚桐&#xff09;, Zewen Liu &#xff08;劉澤文&#xff09;, Shiyu Wang&…

DeepSeek最新大模型發布-DeepSeek-Prover-V2-671B

2025 年 4 月 30 日&#xff0c;DeepSeek 開源了新模型 DeepSeek-Prover-V2-671B&#xff0c;該模型聚焦數學定理證明任務&#xff0c;基于混合專家架構&#xff0c;使用 Lean 4 框架進行形式化推理訓練&#xff0c;參數規模達 6710 億&#xff0c;結合強化學習與大規模合成數據…

如何用AI生成假期旅行照?

以下是2025年最新AI生成假期旅行照片的實用工具推薦及使用指南&#xff0c;結合工具特點、研發背景和適用場景進行綜合解析&#xff1a; 一、主流AI旅行照片生成工具推薦與對比 1. 搜狐簡單AI&#xff08;國內工具&#xff09; ? 特點&#xff1a; ? 一鍵優化與背景替換&…