涉及到的相關技術
? ? ?1.DTM分布式事務管理器,解決跨數據庫、跨服務、跨語言棧更新數據的一致性問題。
? ? ? 2.SAGA事務模式,SAGA事務模式是DTM中常用的一種模式,簡單易上手.(當然還有其它更多的事務模式,這里采用的SAGA只不過是其中一種較為簡單的方法)
? ? ??3.Go-zero框架,ETCD服務注冊...
更多內容移步至:go-zero 縮短從需求到上線的距離?和?介紹 | DTM開源項目文檔
業務場景
? ? ? ? 如果是在單體架構的業務當中,是不需要用到分布式事務的.單體架構中,涉及到需要保證多個事務同時成功的場景,只需要創建一個全局的事務對象 如:tx := db.Begin(),然后統一用這一個tx去管理接下來的業務邏輯即可.
? ? ? ? 不清楚在一個api中如何調用其它服務rpc的可以看看我的另一篇博客中的一種解決辦法:
go-zero標準的項目結構,以及如何使用docker-compose部署道linux服務器上-CSDN博客
? ? ? ? 但是在go-zero框架的這種微服務中,比如說:我在一個用戶服務的api中調用了用戶服務rpc中注冊的業務,并且同時還調用了標簽服務的rpc層中的選擇標簽的業務.?那么,此時我就需要保證用戶的注冊和標簽的選擇這兩個在不同服務下執行的業務邏輯同時成功.(總不能用戶賬號密碼插入到的表中,但是突然斷網了,導致標簽沒有選擇上去吧,這個是不符合我的業務的).
DTM 環境搭建(Windows本地搭建)
????????!!!!!!!!!!!!!!?這個環境請注意,是需要在你本地去搭建的,至于為什么,我會在后面解釋,最重要的先把環境搭建起來吧! 我采用的是docker-compose去搭建.(如果不了解windows電腦如何配置docker環境,可以移步:)
Windows11電腦是如何搭建docker環境的-CSDN博客
? ? ? ? 廢話不多說,首先從搭建環境講起.(我這里采用的是docker-compose搭建我需要的環境)
上圖就是項目的結構
在dtm和etcd的目錄下面各自新建一個Dockerfile文件,Dockerfile都不需要過多的配置,只需要用到最基礎的鏡像即可.在dtm的目錄下還需要新建一個config.yml文件.
DTM下Dokcerfile以及config.yml的編寫
FROM yedf/dtm:latestLABEL maintainer="zyf021026 <shichuxin6@163.com>"
# 指定要存儲trans狀態的存儲驅動
# Store:### 默認存儲驅動
# Driver: 'boltdb'### redis 存儲驅動
# Driver: 'redis'
# Host: 'localhost'
# User: ''
# Password: ''
# Port: 6379### mysql 存儲驅動
# Driver: 'mysql'
# Host: 'mysql'
# User: 'root'
# Password: '123456'
# Port: 3306### postgres 存儲驅動
# Driver: 'postgres'
# Host: 'localhost'
# User: 'postgres'
# Password: 'mysecretpassword'
# Port: '5432'### 以下配置僅適用于 postgres/mysql 驅動
# MaxOpenConns: 500
# MaxIdleConns: 500
# ConnMaxLifeTime: 5
# TransGlobalTable: 'dtm.trans_global'
# TransBranchOpTable: 'dtm.trans_branch_op'### 以下配置僅適用于 redis/boltdb 驅動
# DataExpire: 604800 # Trans 過期時間
# RedisPrefix: '{}' # Redis 存儲前綴MicroService:Driver: 'dtm-driver-gozero' # 要處理注冊/發現的驅動程序的名稱Target: 'etcd://your-ip:2379/dtmservice' # 注冊 dtm 服務的 etcd 地址EndPoint: 'your-ip:36790'# 以下配置的單位為'秒'
# TransCronInterval: 3
# TimeoutToFail: 35
# RetryInterval: 10# 日志等級
# LogLevel: 'info'
ETCD的Dockerfile文件編寫
FROM bitnami/etcd:latestLABEL maintainer="zyf021026 <shichuxin6@163.com>"
使用docker-compose 構建鏡像,啟動容器
version: '3'networks:backend:driver: bridge######## 項目依賴的環境,啟動項目之前要先啟動此環境 #######
services:etcd:build:context: etcdenvironment:- TZ=Asia/Shanghai- ALLOW_NONE_AUTHENTICATION=yesports: # 設置端口映射- "2379:2379"networks:- backendrestart: alwaysdtm:build:context: ./dtmenvironment:- TZ=Asia/Shanghaientrypoint:- "/app/dtm/dtm"- "-c=/app/dtm/configs/config.yaml"privileged: truevolumes:- ./dtm/config.yml:/app/dtm/configs/config.yaml # 將 dtm 配置文件掛載到容器里ports:- "36789:36789"- "36790:36790"networks:- backendrestart: alwaysdepends_on:- etcd
? ?在根目錄下面執行docker-compose up -d 將需要的環境搭建起來
執行如下圖中的命令:
? ? ? ? 新建子事務屏障的數據庫(庫名和表名請不要修改) 可以作為獨立的一個數據庫使用,沒有必要把自己的項目數據庫名稱改為dtm_barrier
/*Navicat Premium Data TransferSource Server : LinkSource Server Type : MySQLSource Server Version : 50743Source Host : 39.101.77.206:3306Source Schema : dtm_barrierTarget Server Type : MySQLTarget Server Version : 50743File Encoding : 65001Date: 03/03/2024 13:57:48
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for barrier
-- ----------------------------
DROP TABLE IF EXISTS `barrier`;
CREATE TABLE `barrier` (`id` bigint(22) NOT NULL AUTO_INCREMENT,`trans_type` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`gid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`branch_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`op` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`barrier_id` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`reason` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT 'the branch type who insert this record',`create_time` datetime NULL DEFAULT CURRENT_TIMESTAMP,`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`) USING BTREE,UNIQUE INDEX `gid`(`gid`, `branch_id`, `op`, `barrier_id`) USING BTREE,INDEX `create_time`(`create_time`) USING BTREE,INDEX `update_time`(`update_time`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1482 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;
SAGA事務模式的使用
? ? ? ??簡單說明一下,SAGA分布式事務模式,是沒有辦法攜帶返回值的,因此盡量此處要避免需要有返回值的業務場景.
? ? ? ??直接用代碼來展示SAGA事務模式的使用方法吧!
? ? 用戶注冊服務PRC的編寫以及事務失敗補償機制的編寫
這里不再演示proto文件是如何編寫的
用戶注冊服務的rpc
func (l *UserCreateLogic) UserCreate(in *user.UserCreateRequest) (pd *user.UserCreateResponse, endErr error) {// 獲取 RawDB// 注冊db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()// 獲取子事務屏障對象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)if err != nil {return nil, status.Error(500, err.Error())}// 開啟子事務屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) error {// 加密密碼pwd, _ := bcrypt.GetPwd(in.Password)// 插入用戶數據_, err = tx.Exec("INSERT INTO users (id , created_at, updated_at, username, password, avatar, phone) VALUES (?,?, ?, ?, ?, ?, ?)", in.Id, time.Now(), time.Now(), in.Username, pwd, in.Avatar, in.Phone)//返回子事務執行失敗if err != nil {return err}return nil})if err != nil {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) //如果失敗,不再重試,直接回滾}return &user.UserCreateResponse{}, endErr
}
用戶注冊服務rpc的失敗補償 (如果注冊服務的rpc失敗,就會執行相應的補償方法)
func (l *UserCreateRevertLoginLogic) UserCreateRevertLogin(in *user.UserCreateRequest) (pd *user.UserCreateResponse, err error) {fmt.Println("用戶標簽回滾開始--->")// 獲取 RawDBdb, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()// 獲取子事務屏障對象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)if err != nil {return nil, status.Error(500, err.Error())}// 開啟子事務屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) error {fmt.Println("注冊事務走入了補償")//刪除插入的標簽數據 和 用戶數據_, err = tx.Exec("DELETE FROM tb_user_tag where user_id = ?", in.Id)_, err = tx.Exec("DELETE FROM users where id = ?", in.Id)//返回子事務執行失敗if err != nil {return err}return nil})if err != nil {fmt.Println("failed---->", err)return nil, err}fmt.Println("刪除成功")fmt.Println("用戶標簽回滾結束--->")return &user.UserCreateResponse{}, nil
}
? ? ? ? 標簽服務Rpc的編寫以及事務失敗補償機制的編寫
標簽服務的rpc
func (l *SignUserChooseTagLogic) SignUserChooseTag(in *tag.UserChooseTagRequest) (*tag.UserChooseTagRequest, error) {// 獲取 RawDB// 注冊賬號時,選擇標簽db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()if err != nil {return nil, status.Error(500, err.Error())}// 獲取子事務屏障對象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)// 開啟子事務屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) (err error) {// 用戶注冊時選擇標簽var exists boolerr = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM tb_user_tag WHERE tag_id = ? and user_id = ?)", in.TagId, in.UserId).Scan(&exists)if err != nil {return err}if exists {return fmt.Errorf("標簽重復選擇")}fmt.Println("開始插入標簽")_, err = tx.Exec("INSERT INTO tb_user_tag (tb_user_tag.created_at , tb_user_tag.updated_at , tag_id, user_id) VALUES (?,?,?, ?)", time.Now(), time.Now(), in.TagId, in.UserId)if err != nil {return fmt.Errorf("標簽選擇失敗")}return nil})if err != nil {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) //事務失敗不再重試,直接回滾}return &tag.UserChooseTagRequest{}, nil
}
標簽選擇失敗補償的rpc
func (l *SignUserChooseTagRevertLogic) SignUserChooseTagRevert(in *tag.UserChooseTagRequest) (*tag.UserChooseTagRequest, error) {fmt.Println("用戶標簽SignUserChooseTagRevert--->開始")// 獲取 RawDBdb, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()if err != nil {return nil, status.Error(500, err.Error())}// 獲取子事務屏障對象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)// 開啟子事務屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) (err error) {fmt.Println("注冊時選擇標簽進入了補償")logc.Info(l.ctx)//刪除記錄_, err = tx.Exec("DELETE FROM tb_user_tag where tag_id = ? and user_id = ?", in.TagId, in.UserId)return err})if err != nil {return nil, err}fmt.Println("用戶標簽SignUserChooseTagRevert--->結束")return &tag.UserChooseTagRequest{}, nil
}
至此rpc層的業務邏輯全部編寫完畢,但請一定要注意每一個rpc的返回值,一定要按照?如&tag.UserChooseTagRequest{}返回,不能簡單的返回一個nil值.否則會導致事務一直無法提交
? ? ? ? API層的編寫
func (l *SignUpLogic) SignUp(req *types.UserCreateRequest) (resp *types.UserCreateResponse, err error) {//首先判斷用戶是否存在_, err = l.svcCtx.UserRpc.UserIsExists(l.ctx, &user.UserCreateRequest{Phone: req.Phone,})if err != nil {return nil, err}// 獲取UserRpc 的BuildTargetuserRpcBuildServer, err := l.svcCtx.Config.UserRpc.BuildTarget()if err != nil {return nil, status.Error(100, "用戶注冊異常")}// 獲取TagRpc 的BuildTargettagRpcBuildServer, err := l.svcCtx.Config.TagRpc.BuildTarget()if err != nil {return nil, status.Error(100, "標簽選擇異常")}empty := user.Empty{}//dtm服務的etcd注冊地址var dtmServer = l.svcCtx.Config.Dtm//dtmServer := "etcd://etcd:2379/dtmservice"fmt.Println(dtmServer)// 創建一個gidgid := dtmgrpc.MustGenGid(dtmServer)//創建一個自增idif _, err := l.svcCtx.UserRpc.AddUserId(l.ctx, &empty); err != nil {return nil, fmt.Errorf("CREATE user id error:%v", err)}userID, _ := l.svcCtx.UserRpc.NextUserID(l.ctx, &empty)saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).Add(tagRpcBuildServer+"/tag.TagSign/SignUserChooseTag", tagRpcBuildServer+"/tag.TagSign/SignUserChooseTagRevert", &tag.UserChooseTagRequest{UserId: userID.NextUserId,TagId: req.StartTagId,}).Add(userRpcBuildServer+"/user.UserService/UserCreate", userRpcBuildServer+"/user.UserService/UserCreateRevertLogin", &user.UserCreateRequest{Username: req.Username,Password: req.Password,Avatar: req.Avatar,Phone: req.Phone,Id: userID.NextUserId,})//事務提交if err := saga.Submit(); err != nil {//自增主鍵減少1if _, err := l.svcCtx.UserRpc.DecUserID(l.ctx, &empty); err != nil {logx.Error(err)}logx.Error(err)return nil, fmt.Errorf("saga submit error:%v", err)}return &types.UserCreateResponse{}, nil
}
上面代碼的邏輯,相信如果各位接觸到微服務,一定是可以理解的,由于saga的事務模式沒有返回值,所以我通過redis生成一個自增id來使用,而不再采用mysql的自增主鍵id.
上面代碼中的地址可以在rpc生成的pb.go中找到
自己的理解
? ? ? ? 經歷了長度一周多對分布式事務的研究,寫一點自己的簡單理解吧!(比較淺顯)
????????saga事務模式需要自己寫事務的補償方法,子事務屏障內的事務執行失敗之后,就會執行對應的事務補償方法!即回滾事務.補償方法內寫的便是對這一次執行的插入,修改語句的相反操作.比如我增加某一條數據,補償內就寫上對刪除的操作.
? ? ? ? 感覺和MySQL的Undo log 回滾日志很相似啊!Undo log日志會記錄更新前的數據到日志中,是在一個事務下執行過程中,在還沒有提交之前,如果發生意外,就可以通過這個日志回滾到事務執行之前的數據了.
? ? ? ??