1. 基礎概念介紹
1.1 什么是分布式事務
在微服務架構中,一個業務操作常常需要調用多個服務來完成。例如,在電商系統中下單時,需要同時操作訂單服務和庫存服務。這種跨服務的操作就需要分布式事務來保證數據一致性。
分布式事務面臨以下挑戰:
- 多個服務間如何協調
- 服務失敗如何回滾
- 如何保證最終一致性
- 網絡故障、服務不可用等問題處理
1.2 什么是DTM
DTM (Distributed Transaction Manager) 是一個開源的分布式事務管理器,它支持多種事務模式,如TCC、SAGA、XA等。DTM的特點包括:
- 簡單易用的接口
- 支持多種事務模式
- 支持多種編程語言
- 高可用、高性能設計
DTM讓開發者可以像使用本地事務一樣簡單地使用分布式事務,大大降低了開發難度。
1.3 分布式事務模式簡介
分布式事務有多種實現模式,本教程將介紹兩種常用模式:
SAGA模式:
將一個大事務拆分為多個小事務,每個小事務都有對應的補償操作。如果某個小事務失敗,則執行已完成事務的補償操作,實現回滾。
TCC模式:
將一個事務分為三個階段:Try(嘗試)、Confirm(確認)、Cancel(取消)。Try階段進行資源檢查和預留,Confirm階段確認執行,Cancel階段在Try失敗后進行取消操作。
這兩種模式各有優缺點,適用于不同的業務場景。
1.4分布式事務模式對比
讓我們了解一下DTM支持的幾種事務模式:
事務模式 | 使用場景 | 優點 | 缺點 |
---|---|---|---|
SAGA | 長事務、低一致性要求 | 實現簡單,可靠性高 | 最終一致性,補償邏輯復雜 |
TCC | 對一致性要求高 | 強一致性,性能好 | 實現復雜,需要多個接口 |
XA | 標準事務處理 | 強一致性,使用簡單 | 性能較差,鎖定資源時間長 |
2階段(2pc)消息 | 可靠消息投遞 | 簡單易用,適合異步 | 最終一致性 |
2. 項目概述
2.1 業務場景
本教程將實現一個簡化的電商下單場景,涉及兩個核心服務:
- 訂單服務:負責創建和管理訂單
- 庫存服務:負責管理商品庫存
業務流程如下:
- 用戶下單時,需要創建訂單并扣減庫存
- 如果任一操作失敗,需要回滾所有操作,保證數據一致性
我們將使用兩種不同的分布式事務模式來實現這個場景:
- 使用TCC模式處理訂單服務
- 使用SAGA模式處理庫存服務
3. 環境準備
3.1 前置環境
首先,要先把以下環境部署好:
- Go(1.16+)
- Go-Zero:
- MySQL(5.7+):用于存儲數據和DTM事務信息
- etcd:用于服務發現
3.2DTM服務部署
Docker快速啟動
DTM 服務可以通過 Docker 快速啟動:
# 拉取 DTM 鏡像
docker pull yedf/dtm:latest# 啟動 DTM 服務
docker run -it --name dtm -p 36789:36789 -p 36790:36790 -e STORE_DRIVER=mysql -e STORE_HOST=mysql:3306 -e STORE_USER=root -e STORE_PASSWORD=123456 -e STORE_PORT=3306 yedf/dtm:latest
注意:如果環境都是使用docker部署,網絡要使用同一個。
本地部署
或者拉取源碼啟動:
git clone https://github.com/dtm-labs/dtm
源碼拉取成功后,我們需要修改conf.sample.yml
文件,把服務注冊到ETCD,把下面的設置取消注釋:
MicroService: # gRPC/HTTP based microservice configDriver: 'dtm-driver-gozero' # name of the driver to handle register/discoverTarget: 'etcd://localhost:2379/dtmservice' # register dtm server to this urlEndPoint: 'localhost:36790'Store:Driver: 'mysql'Host: 'localhost'User: 'root'Password: '123456'Port: 3306DB: 'dtm'
然后使用這個配置啟動服務
go run mian.go -c conf.sample.yml
部署完成后,可以通過瀏覽器訪問 http://localhost:36789 ,來查看管理頁面 ,這個頁面可以很直觀的查看到,分布式事務是否成功,以及使用的哪些事務模式。
4.使用SAGA模式
我們先從簡單的SAGA模式開始,實現庫存服務的扣減和補償操作。
4.1 SAGA模式介紹
SAGA模式是一種分布式事務解決方案,將一個大事務拆分為多個小事務,每個小事務都有對應的補償操作。
SAGA的流程如下:
- 執行一系列正向操作(如扣減庫存)
- 如果任一操作失敗,則執行已完成操作的補償操作(如增加庫存)
SAGA適用于可以通過補償操作回滾的業務場景。
4.2 數據庫設計
為了演示分布式事務,我們需要創建三個數據庫:訂單數據庫、庫存數據庫和DTM數據庫。
訂單數據庫:
CREATE DATABASE IF NOT EXISTS dtm_order;
USE dtm_order;CREATE TABLE `orders` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`user_id` bigint(20) NOT NULL COMMENT '用戶ID',`product_id` bigint(20) NOT NULL COMMENT '產品ID',`amount` int(11) NOT NULL COMMENT '數量',`money` decimal(10,2) NOT NULL COMMENT '金額',`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '狀態: 0-創建 1-完成',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
庫存數據庫:
CREATE DATABASE IF NOT EXISTS dtm_stock;
USE dtm_stock;CREATE TABLE `stock` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`product_id` bigint(20) NOT NULL COMMENT '產品ID',`amount` int(11) NOT NULL COMMENT '庫存數量',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uniq_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;-- 插入測試數據
INSERT INTO `stock` (`product_id`, `amount`) VALUES (1, 100);
INSERT INTO `stock` (`product_id`, `amount`) VALUES (2, 200);
DTM數據庫:
CREATE DATABASE IF NOT EXISTS dtm;
DTM會自動創建所需的表,無需手動創建。
注意: 創建的數據庫要和dtm配置文件中指定的數據庫要一致
4.3 庫存服務實現
首先,創建庫存服務的Proto定義文件stock/stock.proto
:
syntax = "proto3";package stock;
option go_package="./stock";// 庫存服務
service Stock {// 扣減庫存rpc DeductStock(DeductStockReq) returns (DeductStockResp);// 補償庫存rpc CompensateStock(CompensateStockReq) returns (CompensateStockResp);
}// 扣減庫存請求
message DeductStockReq {int64 productId = 1; // 產品IDint32 amount = 2; // 數量
}// 扣減庫存響應
message DeductStockResp {bool success = 1; // 是否成功string message = 2; // 消息
}// 補償庫存請求
message CompensateStockReq {int64 productId = 1; // 產品IDint32 amount = 2; // 數量
}// 補償庫存響應
message CompensateStockResp {bool success = 1; // 是否成功string message = 2; // 消息
}
切換到 rpc/stock/目錄下,使用goctl生成代碼:
cd stock
goctl rpc protoc stock.proto --go_out=. --go-grpc_out=. --zrpc_out=.
修改庫存服務的配置文件stock/etc/stock.yaml
:
Name: stock.rpc
ListenOn: 0.0.0.0:9002
Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(localhost:3306)/dtm_stock?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghaiDTM: etcd://localhost:2379/dtmservice
修改配置結構stock/internal/config/config.go
:
package configimport ("github.com/zeromicro/go-zero/zrpc"
)type Config struct {zrpc.RpcServerConf//增加數據庫字段DB struct {DataSource string}DTM string // DTM服務地址
}
修改服務上下文stock/internal/svc/service_context.go
:
package svcimport ("github.com/Mikaelemmmm/gozerodtm/stock/internal/config""github.com/Mikaelemmmm/gozerodtm/stock/internal/model""github.com/zeromicro/go-zero/core/stores/sqlx"
)type ServiceContext struct {Config config.ConfigStockModel model.StockModel // 庫存模型
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,StockModel: model.NewStockModel(sqlx.NewMysql(c.DB.DataSource)), // 初始化庫存模型}
}
切換到 stock/model
目錄下,使用goctl工具生成model代碼:
goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock" -dir .
生成代碼后,到stock/model/stockmodel.go
,增加DeductStock
和AddStock
方法:
package modelimport ("database/sql""fmt""time""github.com/zeromicro/go-zero/core/stores/sqlx"
)var _ StockModel = (*customStockModel)(nil)/*
....
*/type (// StockModel 是stock表的模型StockModel interface {// DeductStock 扣減庫存DeductStock(productId int64, amount int32) error//AddStock 增加庫存(用于補償)AddStock(productId int64, amount int32) error}// DeductStock 扣減庫存
func (m *customStockModel) DeductStock(productId int64, amount int32) error {query := fmt.Sprintf("update %s set amount = amount - ? where product_id = ? and amount >= ?", m.table)result, err := m.conn.Exec(query, amount, productId, amount)if err != nil {return err}affected, err := result.RowsAffected()if err != nil {return err}if affected == 0 {return fmt.Errorf("庫存不足")}return nil
}// AddStock 增加庫存(用于補償)
func (m *customStockModel) AddStock(productId int64, amount int32) error {query := fmt.Sprintf("update %s set amount = amount + ? where product_id = ?", m.table)_, err := m.conn.Exec(query, amount, productId)return err
}
實現扣減庫存邏輯stock/internal/logic/deduct_stock_logic.go
:
package logicimport ("context""github.com/dtm-labs/dtm/client/dtmcli""google.golang.org/grpc/codes""google.golang.org/grpc/status""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)
/*
....
*/
// 扣減庫存
func (l *DeductStockLogic) DeductStock(in *stock.DeductStockReq) (*stock.DeductStockResp, error) {// todo: add your logic here and delete this lineerr := l.svcCtx.StockModel.DeductStock(in.ProductId, in.Amount)if err != nil {logx.Errorf("扣減庫存失敗: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回錯誤}return &stock.DeductStockResp{Success: true,Message: "扣減庫存成功",}, nil
}
需要注意的是, 如果事務執行失敗,返回的是RPC的狀態碼 codes.Aborted
和 事務結果dtmcli.ResultFailure
,如果直接返回錯誤的,DTM是不會進行事務補償的。
return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
-
Aborted
:表示操作被中止,通常是由于并發問題,如序列檢查失敗、事務中止等。 -
ResultFailure
(dtmimp.ResultFailure
)表示事務或事務分支執行失敗。當事務或事務分支在執行過程中遇到錯誤,例如數據庫插入失敗、服務調用超時等,會返回失敗結果。這個結果會觸發全局事務管理器進行相應的處理,如回滾事務。
后面會單獨介紹下gRPC狀態碼和dtmcli的事務狀態。
實現補償庫存邏輯stock/internal/logic/compensate_stock_logic.go
:
package logicimport ("context""fmt""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)/*
....
*/// 補償庫存
func (l *CompensateStockLogic) CompensateStock(in *stock.CompensateStockReq) (*stock.CompensateStockResp, error) {// todo: add your logic here and delete this linefmt.Println("收到補償庫存請求:", in.ProductId, in.Amount)err := l.svcCtx.StockModel.AddStock(in.ProductId, in.Amount)if err != nil {logx.Errorf("補償庫存失敗: %v", err)return &stock.CompensateStockResp{Success: false,Message: err.Error(),}, nil}return &stock.CompensateStockResp{Success: true,Message: "補償庫存成功",}, nil
}
4.4 SAGA調用示例
創建test_saga.go
,演示如何使用SAGA模式調用庫存服務:
package mainimport ("dtm_demo/rpc/stock/stock""log""github.com/dtm-labs/client/dtmgrpc""github.com/zeromicro/go-zero/core/discov""github.com/zeromicro/go-zero/zrpc"
)func main() {// 初始化客戶端stockRpcConf := zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"localhost:2379"},Key: "stock.rpc",},}client := zrpc.MustNewClient(stockRpcConf)client.Conn()// DTM服務地址dtmServer := "etcd://localhost:2379/dtmservice"// 創建請求stockReq := &stock.DeductStockReq{ProductId: 1,Amount: 10, //設置庫存}// 生成全局事務IDgid := dtmgrpc.MustGenGid(dtmServer)// 創建SAGA事務saga := dtmgrpc.NewSagaGrpc(dtmServer, gid)//stockRpcBusiServer 實際上就是自動獲取庫存服務地址 etcd://localhost:2379/stock.rpcstockRpcBusiServer, err := stockRpcConf.BuildTarget()if err != nil {log.Fatal(err)}saga.Add(stockRpcBusiServer+"/stock.Stock/DeductStock", stockRpcBusiServer+"/stock.Stock/CompensateStock", stockReq)// 提交事務err = saga.Submit()if err != nil {log.Fatalf("SAGA事務提交失敗: %v", err)}
}
需要注意的是 /stock.Stock/DeductStock
和/stock.Stock/CompensateStock
這個路徑是區分大小寫的,一定要和goctl工具生成的一致
啟動庫存服務:
go run stock.go
運行測試程序:
go run test_saga.go
如果一切正常,庫存將被成功扣減。如果服務出現異常(例如庫存不足),DTM會自動執行補償操作,增加庫存。
當我把 Amount
設置為 100 ,庫存服務提示庫存不足,并自動進行補償
5. 使用TCC模式
接下來,我們使用更復雜的TCC模式實現訂單服務。
5.1 TCC模式介紹
TCC(Try-Confirm-Cancel)是一種更靈活的分布式事務模式,它將一個事務分為三個階段:
- Try:資源檢查和預留,但不真正執行業務操作
- Confirm:確認執行業務操作,只有當所有Try都成功時執行
- Cancel:取消操作,當任一Try失敗時執行
TCC模式比SAGA模式更為靈活,但也更復雜,需要業務自行實現三個階段的處理邏輯。
5.2 數據庫設計
order
和stock
,仍然使用SAGA演示中的數據庫表,在dtm_stock數據庫中添加庫存凍結表stock_tcc.sql
:
USE dtm_stock;
CREATE TABLE stock_tcc (id bigint(20) NOT NULL AUTO_INCREMENT,product_id bigint(20) NOT NULL COMMENT '產品ID',freeze_amount int(11) NOT NULL COMMENT '凍結數量',transaction_id varchar(64) NOT NULL COMMENT '事務ID',create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY uniq_trans_product (`transaction_id`, `product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
5.3 Tcc庫存服務實現
因為我們現在要實現TCC模式,需要使用到 try , confirm ,cancel 三個方法,所以需要重新定義庫存服務:
syntax = "proto3";package pb;
option go_package="./pb";// 庫存查詢請求
message ProductStockReq {int64 product_id = 1;
}// 庫存查詢響應
message ProductStockResp {int64 product_id = 1;int32 amount = 2;
}// 扣減庫存請求
message DeductStockReq {int64 product_id = 1;int32 amount = 2;
}// 扣減庫存響應
message DeductStockResp {bool success = 1;
}// TCC事務相關
message TccReq {string transaction_id = 1;int64 product_id = 2;int32 amount = 3;float money = 4;
}message TccResp {bool success = 1;
}// 庫存服務
service Stock {// 查詢商品庫存rpc GetProductStock(ProductStockReq) returns (ProductStockResp);// 普通扣減庫存(非TCC模式)//rpc DeductStock(DeductStockReq) returns (DeductStockResp);// TCC模式相關接口rpc TryDeductStock(TccReq) returns (TccResp); // Try階段rpc ConfirmDeductStock(TccReq) returns (TccResp); // Confirm階段rpc CancelDeductStock(TccReq) returns (TccResp); // Cancel階段
}
切換到/stock下,使用goctl生成代碼:
goctl rpc protoc stock.proto --go_out=. --go-grpc_out=. --zrpc_out=.
切換到 stock_svr/internal/model
目錄下,生成model代碼:
goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock" -dir .goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock_tcc" -dir .
修改stock.yaml
:
Name: stock.rpc
ListenOn: 0.0.0.0:8080
Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(127.0.0.1:33069)/dtm_stock?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai
修改stock_svr/internal/config/config.go
:
type Config struct {zrpc.RpcServerConfDB struct {DataSource string}
}
修改stock_svr/internal/svc/servicecontext.go
:
type ServiceContext struct {Config config.ConfigStockModel model.StockModel // 庫存模型StockTccModel model.StockTccModel
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,StockModel: model.NewStockModel(sqlx.NewMysql(c.DB.DataSource)), // 初始化庫存模型StockTccModel: model.NewStockTccModel(sqlx.NewMysql(c.DB.DataSource)), // 初始化庫存模型}
}
修改stock_svr/internal/model/stocktccmodel.go
增加新方法:
type (StockTccModel interface {stockTccModelwithSession(session sqlx.Session) StockTccModel//增加DeleteByTransProductId 方法DeleteByTransProductId(ctx context.Context, transactionId string, productId int64) error}
/***/// DeleteByTransProductId 根據事務ID和商品ID刪除記錄
func (m *customStockTccModel) DeleteByTransProductId(ctx context.Context, transactionId string, productId int64) error {query := fmt.Sprintf("delete from %s where transaction_id = ? and product_id = ?", m.table)_, err := m.conn.ExecCtx(ctx, query, transactionId, productId)return err
}
修改stock_svr/internal/logic/trydeductstocklogic.go
實現Try接口:
// TCC模式相關接口
func (l *TryDeductStockLogic) TryDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("嘗試扣減庫存")//查詢商品庫存stockInfo, err := l.svcCtx.StockModel.FindOneByProductId(l.ctx, in.ProductId)if err != nil && err != model.ErrNotFound {//return &pb.TccResp{Success: false}, fmt.Errorf("沒有該商品")//return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)//!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試//這時候就不要返回codes.Aborted, dtmcli.ResultFailure ,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}// 檢查庫存是否足夠if stockInfo.Amount < int64(in.Amount) {//return &pb.TccResp{Success: false}, fmt.Errorf("庫存不足")//如果庫存不足,直接返回失敗,讓dtm回滾//返回 codes.Aborted, dtmcli.ResultFailure 才能回滾return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}//如果錢小于100,直接返回失敗,讓dtm回滾if in.Money < 100 {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 扣減庫存stockInfo.Amount -= int64(in.Amount)err = l.svcCtx.StockModel.Update(l.ctx, stockInfo)if err != nil {//return &pb.TccResp{Success: false}, fmt.Errorf("庫存扣除失敗")//如果庫存扣除失敗,直接返回失敗,不要讓它一直重試,讓dtm回滾 ,return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 記錄凍結庫存,用于回滾freeze := &model.StockTcc{ProductId: in.ProductId,FreezeAmount: int64(in.Amount),TransactionId: in.TransactionId,}_, err = l.svcCtx.StockTccModel.Insert(l.ctx, freeze)if err != nil {// 如果凍結記錄失敗,回滾庫存stockInfo.Amount += int64(in.Amount)_ = l.svcCtx.StockModel.Update(l.ctx, stockInfo)//return &pb.TccResp{Success: false}, fmt.Errorf("記錄凍結庫存失敗: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}return &pb.TccResp{Success: true}, nil
}
修改stock_svr/internal/logic/confirmdeductstocklogic.go
,實現 confirm 接口:
// Confirm階段:確認扣減庫存
func (l *ConfirmDeductStockLogic) ConfirmDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("Confirm階段:確認扣減庫存")err := l.svcCtx.StockTccModel.DeleteByTransProductId(l.ctx, in.TransactionId, in.ProductId)if err != nil {logx.Errorf("確認扣減庫存失敗: %v", err)// 這里返回失敗,DTM會進行重試//return &pb.TccResp{Success: false}, errreturn nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}return &pb.TccResp{Success: true}, nil
}
修改stock_svr/internal/logic/canceldeductstocklogic.go
,實現 cancel方法:
func (l *CancelDeductStockLogic) CancelDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("取消扣減庫存")// 查找凍結記錄freeze, err := l.svcCtx.StockTccModel.FindOneByTransactionIdProductId(l.ctx, in.TransactionId, in.ProductId)if err != nil {if err == model.ErrNotFound {// 如果沒有找到凍結記錄,可能是已經處理過了,直接返回成功return &pb.TccResp{Success: true}, nil}/*在 TCC(Try-Confirm-Cancel)事務模式里,Cancel 方法本身的作用就是進行事務回滾,所以一般不需要再在 Cancel 方法里額外執行事務回滾操作*/return &pb.TccResp{Success: false}, fmt.Errorf("查詢凍結記錄失敗: %v", err)//return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 恢復庫存stockInfo, err := l.svcCtx.StockModel.FindOneByProductId(l.ctx, in.ProductId)if err != nil {return &pb.TccResp{Success: false}, fmt.Errorf("查詢商品庫存失敗: %v", err)}stockInfo.Amount += freeze.FreezeAmounterr = l.svcCtx.StockModel.Update(l.ctx, stockInfo)if err != nil {return &pb.TccResp{Success: false}, fmt.Errorf("恢復庫存失敗: %v", err)}// 刪除凍結記錄err = l.svcCtx.StockTccModel.Delete(l.ctx, freeze.Id)if err != nil {logx.Errorf("刪除凍結記錄失敗: %v", err)// 雖然刪除凍結記錄失敗,但庫存已經恢復,下次重試時不會重復恢復庫存}return &pb.TccResp{Success: true}, nil
}
注意錯誤返回方法:
- codes.Aborted 一般表示操作被中止,而 dtmcli.ResultFailure 告知 DTM 操作失敗,需要回滾整個事務。當 Confirm 階段出現問題,且希望 DTM 回滾整個事務時,適合用這個返回。
適用場景:
確認操作無法完成,比如數據庫記錄不存在、數據狀態不一致等,這種情況下需要 DTM 觸發 Cancel 階段來回滾事務。
- status.Error(codes.Internal, err.Error())
codes.Internal 表示服務器內部出錯,一般意味著系統出現了臨時故障,DTM 會持續重試該操作,直至成功或者達到最大重試次數。
適用場景:
操作失敗是由臨時問題引起的,像數據庫連接暫時中斷、網絡抖動等,這種情況下希望 DTM 重試操作,直至成功。
5.4 訂單服務Api實現
首先,創建訂單服務的Proto定義文件order/order.api
:
```syntax = "v1"type (CreateOrderReq {UserId int64 `json:"userId"`ProductId int64 `json:"productId"`Amount int `json:"amount"`Money float64 `json:"money"`}CreateOrderResp {OrderId int64 `json:"orderId"`}OrderInfo {Id int64 `json:"id"`UserId int64 `json:"userId"`ProductId int64 `json:"productId"`Amount int `json:"amount"`Money float64 `json:"money"`Status int `json:"status"`CreateTime string `json:"createTime"`}GetOrderReq {OrderId int64 `json:"orderId"`}GetOrderResp {Order OrderInfo `json:"order"`}
)service Order {@handler CreateOrderpost /api/order/create (CreateOrderReq) returns (CreateOrderResp)@handler GetOrderpost /api/order/get (GetOrderReq) returns (GetOrderResp)
}
切換到api/order/下,使用goctl生成代碼:
goctl api go -api .\order.api -dir .
切換到api/order/internal/model
下, 生成model代碼:
goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_order" -table="orders" -dir .
修改order.yaml
:
Name: Order
Host: 0.0.0.0
Port: 8888StockRpcConf:Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(127.0.0.1:33069)/dtm_order?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghaiDTM: etcd://127.0.0.1:2379/dtmservice
修改order/internal/config/config.go
:
type Config struct {rest.RestConfStockRpcConf zrpc.RpcClientConfDTM stringDB struct {DataSource string}
}
修改order/internal/svc/servicecontext.go
:
type ServiceContext struct {Config config.ConfigOrderModel model.OrdersModelStockRpc stock.Stock
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,OrderModel: model.NewOrdersModel(sqlx.NewMysql(c.DB.DataSource)),StockRpc: stock.NewStock(zrpc.MustNewClient(c.StockRpcConf)),}
}
修改order/internal/logic/createorderlogic.go
:
func (l *CreateOrderLogic) CreateOrder(req *types.CreateOrderReq) (resp *types.CreateOrderResp, err error) {// todo: add your logic here and delete this line//使用tcc,讓try去檢查并扣減庫存,confirm和cancel去處理庫存的回滾/*// 查詢商品庫存stockResp, err := l.svcCtx.StockRpc.GetProductStock(l.ctx, &pb.ProductStockReq{ProductId: req.ProductId,})if err != nil && !errors.Is(err, model.ErrNotFound) {return nil, fmt.Errorf("查詢商品庫存失敗: %v", err)}fmt.Println(stockResp.Amount)if stockResp.Amount < int32(req.Amount) {return nil, fmt.Errorf("庫存不足")}*/order := &model.Orders{UserId: req.UserId,ProductId: req.ProductId,Amount: int64(req.Amount),Money: req.Money,Status: 0, // 0-創建中}result, err := l.svcCtx.OrderModel.Insert(l.ctx, order)if err != nil {return nil, fmt.Errorf("創建訂單失敗: %v", err)}orderId, err := result.LastInsertId()if err != nil {return nil, fmt.Errorf("獲取訂單ID失敗: %v", err)}// 4. 使用DTM的TCC模式進行分布式事務處理dtmServer := l.svcCtx.Config.DTM // DTM服務地址stockServer, err := l.svcCtx.Config.StockRpcConf.BuildTarget()if err != nil {}// 生成全局事務IDgid := dtmgrpc.MustGenGid(dtmServer)// 創建TCC事務err = dtmgrpc.TccGlobalTransaction(dtmServer, gid, func(tcc *dtmgrpc.TccGrpc) error {req := &pb.TccReq{ProductId: req.ProductId,Amount: int32(req.Amount),Money: float32(req.Money),}r := &emptypb.Empty{} // 用于接收響應err := tcc.CallBranch(req,stockServer+"/pb.Stock/TryDeductStock",stockServer+"/pb.Stock/ConfirmDeductStock",stockServer+"/pb.Stock/CancelDeductStock",r)return err})if err != nil {return nil, fmt.Errorf("創建訂單失敗: %v", err)}// 5. 更新訂單狀態為已創建orderUpdate, err := l.svcCtx.OrderModel.FindOne(l.ctx, orderId)if err != nil {logx.Errorf("查詢訂單失敗: %v", err)// 這里不返回錯誤,因為已經扣減了庫存,訂單狀態可以通過其他方式修復} else {orderUpdate.Status = 1 // 1-已完成err = l.svcCtx.OrderModel.Update(l.ctx, orderUpdate)if err != nil {logx.Errorf("更新訂單狀態失敗: %v", err)// 同上,不返回錯誤}}return &types.CreateOrderResp{OrderId: orderId,}, nil
}
5.5 測試TCC
先啟動 stock 服務
go run stock.go
再啟動order api
go run order.go
創建請求:
curl -X POST -H "Content-Type: application/json" -d '{"userId": 1, "productId": 1, "amount": 1 , "money" :200.10}' http://localhost:8888/api/order/create
去stock服務那邊查看日志,可以看到 先啟動try ,然后使用confirm
解下模擬失敗請求,錢設置為99.9:
curl -X POST -H "Content-Type: application/json" -d '{"userId": 1, "productId": 1, "amount": 999, "money" :99.9}' http://localhost:8888/api/order/create
事務啟動cancel 進行回滾
6.DTM屏障 ,Barrier機制
6.1 Barrier機制介紹
DTM的Barrier機制主要解決以下問題:
- 空補償問題:在Saga模式中,如果正向操作還未執行,補償操作就執行了,這是空補償。
- 空回滾問題:在TCC模式中,如果Try操作未執行,Cancel操作就執行了,這是空回滾。
- 懸掛問題:在分布式事務中,可能由于網絡延遲等原因,先收到了Cancel/Confirm操作,后收到Try操作,此時Try操作應該被拒絕,這是懸掛。
- 冪等: 由于任何一個請求都可能出現網絡異常,出現重復請求,所有的分布式事務分支操作,都需要保證冪等性
Barrier的主要原理是在數據庫中添加一個barrier表,并使用數據庫事務保證原子性,記錄每個全局事務、分支事務、操作類型的執行情況,從而防止以上問題的發生。
用Dtm官方提供的dtmcli.barrier.mysql.sql
sql語句來創建dtm_barrier
表
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,trans_type varchar(45) default '',gid varchar(128) default '',branch_id varchar(128) default '',op varchar(45) default '',barrier_id varchar(45) default '',reason varchar(45) default '' comment 'the branch type who insert this record',create_time datetime DEFAULT now(),update_time datetime DEFAULT now(),key(create_time),key(update_time),UNIQUE key(gid, branch_id, op, barrier_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
6.2 使用Barrier改進服務
我們以之前的sage的例子,做簡單的修改。
需要注意的是,在rpc的業務中,如果使用了barrier的話,那么在model中與db交互時候必須要用事務,并且一定要跟barrier用同一個事務
,到stock/model/stockmodel.go
,修改DeductStock
和AddStock
方法:
package modelimport ("database/sql""fmt""time""github.com/zeromicro/go-zero/core/stores/sqlx"
)var _ StockModel = (*customStockModel)(nil)/*
....
*/type (// StockModel 是stock表的模型StockModel interface {// DeductStock 扣減庫存DeductStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error)//AddStock 增加庫存(用于補償)AddStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error)}// DeductStock 扣減庫存
func (m *customStockModel) DeductStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error) {query := fmt.Sprintf("update %s set amount = amount - ? where product_id = ? and amount >= ?", m.table)return tx.Exec(query, amount, productId, amount)
}// AddStock 增加庫存(用于補償)
func (m *customStockModel) AddStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error){query := fmt.Sprintf("update %s set amount = amount + ? where product_id = ?", m.table)return tx.Exec(query, amount, productId)}
實現扣減庫存邏輯stock/internal/logic/deduct_stock_logic.go
:
package logicimport ("context""github.com/dtm-labs/dtm/client/dtmcli""google.golang.org/grpc/codes""google.golang.org/grpc/status""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)
/*
....
*/
// 扣減庫存func (l *DeductStockLogic) DeductStock(in *stock.DeductStockReq) (*stock.DeductStockResp, error) {// todo: add your logic here and delete this line//barrier防止空補償、空懸掛等具體看dtm官網即可,別忘記加barrier表在當前庫中,因為判斷補償與要執行的sql一起本地事務barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()if err != nil {//!!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {_,err := l.svcCtx.StockModel.DeductStock(tx,in.ProductId, in.Amount)if err != nil {logx.Errorf("扣減庫存失敗: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回錯誤}return nil}); err != nil {//!!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}return &stock.DeductStockResp{Success: true,Message: "扣減庫存成功",}, nil
}
修改補償庫存邏輯stock/internal/logic/compensate_stock_logic.go
:
package logicimport ("context""fmt""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)/*
....
*/// 補償庫存
func (l *CompensateStockLogic) CompensateStock(in *stock.CompensateStockReq) (*stock.CompensateStockResp, error) {// todo: add your logic here and delete this linefmt.Println("收到補償庫存請求:", in.ProductId, in.Amount)//barrier防止空補償、空懸掛等具體看dtm官網即可,別忘記加barrier表在當前庫中,因為判斷補償與要執行的sql一起本地事務barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()if err != nil {//!!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {_, err := l.svcCtx.StockModel.AddStock(tx, in.ProductId, in.Amount)if err != nil {logx.Errorf("補償庫存失敗: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回錯誤}return nil}); err != nil {//!!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!return nil, status.Error(codes.Internal, err.Error())}return &stock.CompensateStockResp{Success: true,Message: "補償庫存成功",}, nil
}