Go-zero中分布式事務的實現(DTM分布式事務管理器,在一個APi中如何調用兩個不同服務的rpc層,并保證兩個不同服務之間的業務邏輯同時成功)

涉及到的相關技術

? ? ?1.DTM分布式事務管理器,解決跨數據庫、跨服務、跨語言棧更新數據的一致性問題。

? ? ? 2.SAGA事務模式,SAGA事務模式是DTM中常用的一種模式,簡單易上手.(當然還有其它更多的事務模式,這里采用的SAGA只不過是其中一種較為簡單的方法)

? ? ??3.Go-zero框架,ETCD服務注冊...

更多內容移步至:go-zero 縮短從需求到上線的距離?和?介紹 | DTM開源項目文檔

業務場景

? ? ? ? 如果是在單體架構的業務當中,是不需要用到分布式事務的.單體架構中,涉及到需要保證多個事務同時成功的場景,只需要創建一個全局的事務對象 如:tx := db.Begin(),然后統一用這一個tx去管理接下來的業務邏輯即可.

? ? ? ? 不清楚在一個api中如何調用其它服務rpc的可以看看我的另一篇博客中的一種解決辦法:

go-zero標準的項目結構,以及如何使用docker-compose部署道linux服務器上-CSDN博客

? ? ? ? 但是在go-zero框架的這種微服務中,比如說:我在一個用戶服務的api中調用了用戶服務rpc中注冊的業務,并且同時還調用了標簽服務的rpc層中的選擇標簽的業務.?那么,此時我就需要保證用戶的注冊和標簽的選擇這兩個在不同服務下執行的業務邏輯同時成功.(總不能用戶賬號密碼插入到的表中,但是突然斷網了,導致標簽沒有選擇上去吧,這個是不符合我的業務的).

DTM 環境搭建(Windows本地搭建)

????????!!!!!!!!!!!!!!?這個環境請注意,是需要在你本地去搭建的,至于為什么,我會在后面解釋,最重要的先把環境搭建起來吧! 我采用的是docker-compose去搭建.(如果不了解windows電腦如何配置docker環境,可以移步:)

Windows11電腦是如何搭建docker環境的-CSDN博客

? ? ? ? 廢話不多說,首先從搭建環境講起.(我這里采用的是docker-compose搭建我需要的環境)

上圖就是項目的結構

在dtm和etcd的目錄下面各自新建一個Dockerfile文件,Dockerfile都不需要過多的配置,只需要用到最基礎的鏡像即可.在dtm的目錄下還需要新建一個config.yml文件.

DTM下Dokcerfile以及config.yml的編寫

FROM yedf/dtm:latestLABEL maintainer="zyf021026 <shichuxin6@163.com>"
# 指定要存儲trans狀態的存儲驅動
# Store:### 默認存儲驅動
#   Driver: 'boltdb'### redis 存儲驅動
#   Driver: 'redis'
#   Host: 'localhost'
#   User: ''
#   Password: ''
#   Port: 6379### mysql 存儲驅動
#   Driver: 'mysql'
#   Host: 'mysql'
#   User: 'root'
#   Password: '123456'
#   Port: 3306### postgres 存儲驅動
#   Driver: 'postgres'
#   Host: 'localhost'
#   User: 'postgres'
#   Password: 'mysecretpassword'
#   Port: '5432'### 以下配置僅適用于 postgres/mysql 驅動
#   MaxOpenConns: 500
#   MaxIdleConns: 500
#   ConnMaxLifeTime: 5
#   TransGlobalTable: 'dtm.trans_global'
#   TransBranchOpTable: 'dtm.trans_branch_op'### 以下配置僅適用于 redis/boltdb 驅動
#   DataExpire: 604800 # Trans 過期時間
#   RedisPrefix: '{}'  # Redis 存儲前綴MicroService:Driver: 'dtm-driver-gozero'           # 要處理注冊/發現的驅動程序的名稱Target: 'etcd://your-ip:2379/dtmservice' # 注冊 dtm 服務的 etcd 地址EndPoint: 'your-ip:36790'# 以下配置的單位為'秒'
# TransCronInterval: 3
# TimeoutToFail: 35
# RetryInterval: 10# 日志等級
# LogLevel: 'info'

ETCD的Dockerfile文件編寫

FROM bitnami/etcd:latestLABEL maintainer="zyf021026 <shichuxin6@163.com>"

使用docker-compose 構建鏡像,啟動容器


version: '3'networks:backend:driver: bridge######## 項目依賴的環境,啟動項目之前要先啟動此環境 #######
services:etcd:build:context: etcdenvironment:- TZ=Asia/Shanghai- ALLOW_NONE_AUTHENTICATION=yesports: # 設置端口映射- "2379:2379"networks:- backendrestart: alwaysdtm:build:context: ./dtmenvironment:- TZ=Asia/Shanghaientrypoint:- "/app/dtm/dtm"- "-c=/app/dtm/configs/config.yaml"privileged: truevolumes:- ./dtm/config.yml:/app/dtm/configs/config.yaml # 將 dtm 配置文件掛載到容器里ports:- "36789:36789"- "36790:36790"networks:- backendrestart: alwaysdepends_on:- etcd

? ?在根目錄下面執行docker-compose up -d 將需要的環境搭建起來

執行如下圖中的命令:

? ? ? ? 新建子事務屏障的數據庫(庫名和表名請不要修改) 可以作為獨立的一個數據庫使用,沒有必要把自己的項目數據庫名稱改為dtm_barrier

/*Navicat Premium Data TransferSource Server         : LinkSource Server Type    : MySQLSource Server Version : 50743Source Host           : 39.101.77.206:3306Source Schema         : dtm_barrierTarget Server Type    : MySQLTarget Server Version : 50743File Encoding         : 65001Date: 03/03/2024 13:57:48
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for barrier
-- ----------------------------
DROP TABLE IF EXISTS `barrier`;
CREATE TABLE `barrier`  (`id` bigint(22) NOT NULL AUTO_INCREMENT,`trans_type` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`gid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`branch_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`op` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`barrier_id` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',`reason` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT 'the branch type who insert this record',`create_time` datetime NULL DEFAULT CURRENT_TIMESTAMP,`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`) USING BTREE,UNIQUE INDEX `gid`(`gid`, `branch_id`, `op`, `barrier_id`) USING BTREE,INDEX `create_time`(`create_time`) USING BTREE,INDEX `update_time`(`update_time`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1482 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

SAGA事務模式的使用

? ? ? ??簡單說明一下,SAGA分布式事務模式,是沒有辦法攜帶返回值的,因此盡量此處要避免需要有返回值的業務場景.

? ? ? ??直接用代碼來展示SAGA事務模式的使用方法吧!

? ? 用戶注冊服務PRC的編寫以及事務失敗補償機制的編寫

這里不再演示proto文件是如何編寫的

用戶注冊服務的rpc

func (l *UserCreateLogic) UserCreate(in *user.UserCreateRequest) (pd *user.UserCreateResponse, endErr error) {// 獲取 RawDB// 注冊db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()// 獲取子事務屏障對象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)if err != nil {return nil, status.Error(500, err.Error())}// 開啟子事務屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) error {// 加密密碼pwd, _ := bcrypt.GetPwd(in.Password)// 插入用戶數據_, err = tx.Exec("INSERT INTO users (id , created_at, updated_at, username, password, avatar, phone) VALUES (?,?, ?, ?, ?, ?, ?)", in.Id, time.Now(), time.Now(), in.Username, pwd, in.Avatar, in.Phone)//返回子事務執行失敗if err != nil {return err}return nil})if err != nil {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) //如果失敗,不再重試,直接回滾}return &user.UserCreateResponse{}, endErr
}

用戶注冊服務rpc的失敗補償 (如果注冊服務的rpc失敗,就會執行相應的補償方法)

func (l *UserCreateRevertLoginLogic) UserCreateRevertLogin(in *user.UserCreateRequest) (pd *user.UserCreateResponse, err error) {fmt.Println("用戶標簽回滾開始--->")// 獲取 RawDBdb, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()// 獲取子事務屏障對象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)if err != nil {return nil, status.Error(500, err.Error())}// 開啟子事務屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) error {fmt.Println("注冊事務走入了補償")//刪除插入的標簽數據 和 用戶數據_, err = tx.Exec("DELETE FROM tb_user_tag where user_id = ?", in.Id)_, err = tx.Exec("DELETE FROM users where id = ?", in.Id)//返回子事務執行失敗if err != nil {return err}return nil})if err != nil {fmt.Println("failed---->", err)return nil, err}fmt.Println("刪除成功")fmt.Println("用戶標簽回滾結束--->")return &user.UserCreateResponse{}, nil
}

? ? ? ? 標簽服務Rpc的編寫以及事務失敗補償機制的編寫

標簽服務的rpc

func (l *SignUserChooseTagLogic) SignUserChooseTag(in *tag.UserChooseTagRequest) (*tag.UserChooseTagRequest, error) {// 獲取 RawDB// 注冊賬號時,選擇標簽db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()if err != nil {return nil, status.Error(500, err.Error())}// 獲取子事務屏障對象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)// 開啟子事務屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) (err error) {// 用戶注冊時選擇標簽var exists boolerr = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM tb_user_tag WHERE tag_id = ? and user_id = ?)", in.TagId, in.UserId).Scan(&exists)if err != nil {return err}if exists {return fmt.Errorf("標簽重復選擇")}fmt.Println("開始插入標簽")_, err = tx.Exec("INSERT INTO tb_user_tag (tb_user_tag.created_at , tb_user_tag.updated_at , tag_id, user_id) VALUES (?,?,?, ?)", time.Now(), time.Now(), in.TagId, in.UserId)if err != nil {return fmt.Errorf("標簽選擇失敗")}return nil})if err != nil {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) //事務失敗不再重試,直接回滾}return &tag.UserChooseTagRequest{}, nil
}

標簽選擇失敗補償的rpc

func (l *SignUserChooseTagRevertLogic) SignUserChooseTagRevert(in *tag.UserChooseTagRequest) (*tag.UserChooseTagRequest, error) {fmt.Println("用戶標簽SignUserChooseTagRevert--->開始")// 獲取 RawDBdb, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()if err != nil {return nil, status.Error(500, err.Error())}// 獲取子事務屏障對象barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)// 開啟子事務屏障err = barrier.CallWithDB(db, func(tx *sql.Tx) (err error) {fmt.Println("注冊時選擇標簽進入了補償")logc.Info(l.ctx)//刪除記錄_, err = tx.Exec("DELETE FROM tb_user_tag where tag_id = ? and user_id = ?", in.TagId, in.UserId)return err})if err != nil {return nil, err}fmt.Println("用戶標簽SignUserChooseTagRevert--->結束")return &tag.UserChooseTagRequest{}, nil
}

至此rpc層的業務邏輯全部編寫完畢,但請一定要注意每一個rpc的返回值,一定要按照?如&tag.UserChooseTagRequest{}返回,不能簡單的返回一個nil值.否則會導致事務一直無法提交

? ? ? ? API層的編寫


func (l *SignUpLogic) SignUp(req *types.UserCreateRequest) (resp *types.UserCreateResponse, err error) {//首先判斷用戶是否存在_, err = l.svcCtx.UserRpc.UserIsExists(l.ctx, &user.UserCreateRequest{Phone: req.Phone,})if err != nil {return nil, err}// 獲取UserRpc 的BuildTargetuserRpcBuildServer, err := l.svcCtx.Config.UserRpc.BuildTarget()if err != nil {return nil, status.Error(100, "用戶注冊異常")}// 獲取TagRpc 的BuildTargettagRpcBuildServer, err := l.svcCtx.Config.TagRpc.BuildTarget()if err != nil {return nil, status.Error(100, "標簽選擇異常")}empty := user.Empty{}//dtm服務的etcd注冊地址var dtmServer = l.svcCtx.Config.Dtm//dtmServer := "etcd://etcd:2379/dtmservice"fmt.Println(dtmServer)// 創建一個gidgid := dtmgrpc.MustGenGid(dtmServer)//創建一個自增idif _, err := l.svcCtx.UserRpc.AddUserId(l.ctx, &empty); err != nil {return nil, fmt.Errorf("CREATE user id error:%v", err)}userID, _ := l.svcCtx.UserRpc.NextUserID(l.ctx, &empty)saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).Add(tagRpcBuildServer+"/tag.TagSign/SignUserChooseTag", tagRpcBuildServer+"/tag.TagSign/SignUserChooseTagRevert", &tag.UserChooseTagRequest{UserId: userID.NextUserId,TagId:  req.StartTagId,}).Add(userRpcBuildServer+"/user.UserService/UserCreate", userRpcBuildServer+"/user.UserService/UserCreateRevertLogin", &user.UserCreateRequest{Username: req.Username,Password: req.Password,Avatar:   req.Avatar,Phone:    req.Phone,Id:       userID.NextUserId,})//事務提交if err := saga.Submit(); err != nil {//自增主鍵減少1if _, err := l.svcCtx.UserRpc.DecUserID(l.ctx, &empty); err != nil {logx.Error(err)}logx.Error(err)return nil, fmt.Errorf("saga submit error:%v", err)}return &types.UserCreateResponse{}, nil
}

上面代碼的邏輯,相信如果各位接觸到微服務,一定是可以理解的,由于saga的事務模式沒有返回值,所以我通過redis生成一個自增id來使用,而不再采用mysql的自增主鍵id.

上面代碼中的地址可以在rpc生成的pb.go中找到

自己的理解

? ? ? ? 經歷了長度一周多對分布式事務的研究,寫一點自己的簡單理解吧!(比較淺顯)

????????saga事務模式需要自己寫事務的補償方法,子事務屏障內的事務執行失敗之后,就會執行對應的事務補償方法!即回滾事務.補償方法內寫的便是對這一次執行的插入,修改語句的相反操作.比如我增加某一條數據,補償內就寫上對刪除的操作.

? ? ? ? 感覺和MySQL的Undo log 回滾日志很相似啊!Undo log日志會記錄更新前的數據到日志中,是在一個事務下執行過程中,在還沒有提交之前,如果發生意外,就可以通過這個日志回滾到事務執行之前的數據了.

? ? ? ??

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/717993.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/717993.shtml
英文地址,請注明出處:http://en.pswp.cn/news/717993.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Windows 2012 設置 nginx 開機自啟動(適用于windows2012/10)

Windows 2012 設置 nginx 開機自啟動&#xff08;適用于windows2012/10&#xff09;https://www.cnblogs.com/xuegqcto/articles/7521483.html 在windows server 2012上安裝nginx&#xff0c;同時配置開機自啟動服務&#xff08;推薦使用“Windows Service Wrapper”工具&…

leetcode 740.刪除并活得點數

這道題和打家劫舍得思路很像。 思路&#xff1a;首先我們看到題目的意思&#xff0c;就是說我們如果選擇了一個數&#xff0c;那么它相鄰的數就會不得選入&#xff0c;也就是刪除。這就是上一個題那個相鄰的家不能偷的問題唄&#xff01; 我們從那個地方轉換一下&#xff0c;…

【Linux】線程概念|線程理解|線程控制

文章目錄 線程概念Linux中線程是否存在的討論線程創建和線程控制線程的終止和等待&#xff08;三種終止方式 pthread_join()的void**retval&#xff09; 線程概念 線程就是進程內部的一個執行流&#xff0c;線程在進程內運行&#xff0c;線程在進程的地址空間內運行&#xff0…

LeetCode-第14題-最長公共前綴

1.題目描述 編寫一個函數來查找字符串數組中的最長公共前綴。 如果不存在公共前綴&#xff0c;返回空字符串 ""。 2.樣例描述 3.思路描述 按字符串數組每個數組的長度&#xff0c;將字符串數組從小到大排序&#xff1b;他們的公共前綴一定小于或等于最長元素長度…

(Aliyun AI ACP 06)視覺智能基礎知識:視覺智能常用模型與算法

文章目錄 阿里云人工智能工程師ACP認證考試知識點輔助閱讀&#xff08;Aliyun AI ACP 06&#xff09;視覺智能基礎知識&#xff1a;視覺智能常用模型與算法視覺智能建模流程圖像預處理技術圖像特征提取算法深度學習模型 阿里云人工智能工程師ACP認證考試知識點輔助閱讀 &#…

2024年智能駕駛年度策略:自動駕駛開始由創造型行業轉向工程型行業

感知模塊技術路徑已趨于收斂&#xff0c;自動駕駛從創造型行業邁向工程型行業。在特斯拉的引領下&#xff0c;國內主機廠2022年以來紛紛跟隨特斯拉相繼提出“重感知、輕地圖”技術方案&#xff0c;全球自動駕駛行業感知模塊技術路徑從百花齊放開始走向收斂。我們認為主機廠智能…

2023.3.3周報

目錄 摘要 一、文獻閱讀 1、題目 2、摘要 3、模型架構 4、文獻解讀 一、Introduction 二、實驗 三、結論 二、PINN 一、PINN比傳統數值方法有哪些優勢 二、PINN方法 三、正問題與反問題 三、PINN實驗 一、數學方程 二、模型搭建 總結 摘要 本周我閱讀了一篇…

Postman上傳文件的操作方法

前言 調用某個接口&#xff0c;測試上傳文件功能。一時間不知如何上傳文件&#xff0c;本文做個操作記錄&#xff0c;期望與你有益。 步驟一、設置Headers key:Content-Type value:multipart/form-data 步驟二、設置Body 選擇form-data key:file下拉框選擇file類型value&…

STM32(8)NVIC編程

中斷源由部分片上外設產生 在misc.h中找&#xff0c;雜項 配置NVIC GPIO和AFIO不能產生中斷源&#xff0c;但能通過EXTI&#xff0c;由EXTI產生中斷源 NVIC不需要開啟時鐘&#xff0c;因為NVIC模塊位于內核內部&#xff0c;芯片一上電就能工作。 中斷響應函數 中斷向量表在啟…

Java:JVM基礎

文章目錄 參考JVM內存區域程序計數器虛擬機棧本地方法棧堆方法區符號引用與直接引用運行時常量池字符串常量池直接內存 參考 JavaGuide JVM內存區域 程序計數器 程序計數器是一塊較小的內存空間&#xff0c;可以看做是當前線程所執行的字節碼的行號指示器&#xff0c;各線程…

Unity 常用的4種燈光、制作鏡子、燈光的調用修改數值、

創建燈光時&#xff0c;一般用4種&#xff1a;定向光、點光源、聚光、區域光、 定向光&#xff1a;太陽 點光源&#xff1a;燈泡 聚光燈&#xff1a;手電筒 區域光&#xff1a;烘焙-貼圖 燈光選擇已烘焙 需要先選擇被烘焙的物體&#xff0c;然后再選擇Contribute GI 等待進…

java中的set

Set Set集合概述和特點 不可以存儲重復元素 沒有索引,不能使用普通for循環遍歷 哈希值 哈希值簡介 是JDK根據對象的地址或者字符串或者數字算出來的int類型的數值 如何獲取哈希值 Object類中的public int hashCode()&#xff1a;返回對象的哈希碼值。 哈希值的特點 同一個…

分布式ID生成算法|雪花算法 Snowflake | Go實現

寫在前面 在分布式領域中&#xff0c;不可避免的需要生成一個全局唯一ID。而在近幾年的發展中有許多分布式ID生成算法&#xff0c;比較經典的就是 Twitter 的雪花算法(Snowflake Algorithm)。當然國內也有美團的基于snowflake改進的Leaf算法。那么今天我們就來介紹一下雪花算法…

計算機視覺基礎知識(二)---數字圖像

像素 像素是分辨率的單位;構成位圖圖像的最基本單元;每個像素都有自己的顏色; 圖像分辨率 單位英寸內的像素點數;單位為PPI(Pixels Per Inch),為像素每英寸;PPI表示每英寸對角線上所擁有的像素數目:,x:長度像素數目,y:寬度像素數目,Z:屏幕大小;屏幕尺寸(大小)指的是對角線長…

GO語言學習筆記(與Java的比較學習)(八)

接口與反射 接口是什么 Go 語言不是一種 “傳統” 的面向對象編程語言&#xff1a;它里面沒有類和繼承的概念。 但是 Go 語言里有非常靈活的 接口 概念&#xff0c;通過它可以實現很多面向對象的特性。接口提供了一種方式來 說明 對象的行為&#xff1a;如果誰能搞定這件事&…

springer模板參考文獻不顯示

Spring期刊模板網站&#xff0c;我的問題是23年12月的版本 https://www.springernature.com/gp/authors/campaigns/latex-author-support/see-where-our-services-will-take-you/18782940 參考文獻顯示問好&#xff0c;在sn-article.tex文件中&#xff0c;這個sn-mathphys-num…

數據結構c版(3)——排序算法

本章我們來學習一下數據結構的排序算法&#xff01; 目錄 1.排序的概念及其運用 1.1排序的概念 1.2 常見的排序算法 2.常見排序算法的實現 2.1 插入排序 2.1.1基本思想&#xff1a; 2.1.2直接插入排序&#xff1a; 2.1.3 希爾排序( 縮小增量排序 ) 2.2 選擇排序 2.2…

rtt的io設備框架面向對象學習-io設備管理層

目錄 1.設備基類2.rtt基類2.1 rtt基類定義2.2 對象容器定義2.3 rtt基類構造函數 3.io設備管理接口4.總結 這層我的理解就是rtt基類和設備基類所在&#xff0c;所以抽離出來好點&#xff0c;不然每個設備類都要重復它。 1.設備基類 /include/rtdef.h中定義了設備基類struct rt_…

記錄踩過的坑-PyTorch

安裝報錯 按PyTorch官網給出的命令 pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 報錯 ERROR: Could not find a version that satisfies the requirement torch (from versions: none) ERROR: No matching distributio…

Redis為什么這么快?

基于內存&#xff1a;Redis 將數據存儲在內存中&#xff0c;內存訪問速度遠高于磁盤訪問速度&#xff0c;因此能夠快速讀寫數據。單線程模型&#xff1a;Redis 使用單線程模型來處理客戶端請求&#xff0c;避免了多線程之間的切換開銷&#xff0c;簡化了并發控制&#xff0c;提…