集成DTM實現跨語言分布式事務V1.0
簡介
DTM是一款開源的分布式事務管理器,解決跨數據庫、跨服務、跨語言棧更新數據的一致性問題。
通俗一點說,DTM提供跨服務事務能力,一組服務要么全部成功,要么全部回滾,避免只更新了一部分數據產生的一致性問題。
https://dtm.pub/
安裝運行DTM
以下安裝可以使用Dockerfile的方式與dmc服務一起部署,另外官網介紹的docker安裝模式我沒成功!
安裝go語言環境
#下載安裝go語言包,請使用1.20以上版本,其他版本安裝失敗
wget -c https://dl.google.com/go/go1.20.1.linux-amd64.tar.gz -O - | sudo tar -xz -C /usr/localcd /usr/local/
mkdir gopath vi /etc/profile #在/etc/profile添加go環境配置export GOROOT=/usr/local/go
export GOPATH=/usr/local/gopath
export PATH=$PATH:$GOROOT/bin
export GO111MODULE="on" # 開啟 Go moudles 特性
export GOPROXY=https://goproxy.cn,direct # 安裝 Go 模塊時,國內代理服務器設置#查看版本
go version
安裝dtm服務
git clone https://github.com/dtm-labs/dtm && cd dtm
go build#運行./dtm,看到如下說明成功
[root@paratera128 dtm]# ./dtm
invalid log level: , switching to default: INFO
{"level":"info","ts":"2023-07-25T18:54:17.816-0700","caller":"entry/main.go:46","msg":"dtm version is: v0.0.0-dev"}
{"level":"info","ts":"2023-07-25T18:54:17.818-0700","caller":"config/config.go:121","msg":"config file: loaded config is: \n{\n \"Store\": {\n \"Driver\": \"boltdb\",\n \"Host\": \"\",\n \"Port\": 0,\n \"User\": \"\",\n \"Password\": \"\",\n \"Db\": \"dtm\",\n \"Schema\": \"public\",\n \"MaxOpenConns\": 500,\n \"MaxIdleConns\": 500,\n \"ConnMaxLifeTime\": 5,\n \"DataExpire\": 604800,\n \"FinishedDataExpire\": 86400,\n \"RedisPrefix\": \"{a}\"\n },\n \"TransCronInterval\": 3,\n \"TimeoutToFail\": 35,\n \"RetryInterval\": 10,\n \"RequestTimeout\": 3,\n \"HTTPPort\": 36789,\n \"GrpcPort\": 36790,\n \"JSONRPCPort\": 36791,\n \"MicroService\": {\n \"Driver\": \"default\",\n \"Target\": \"\",\n \"EndPoint\": \"\"\n },\n \"HTTPMicroService\": {\n \"Driver\": \"default\",\n \"RegistryType\": \"\",\n \"RegistryAddress\": \"\",\n \"RegistryOptions\": \"{}\",\n \"Target\": \"\",\n \"EndPoint\": \"\"\n },\n \"UpdateBranchSync\": 0,\n \"UpdateBranchAsyncGoroutineNum\": 1,\n \"LogLevel\": \"info\",\n \"Log\": {\n \"Outputs\": \"stderr\",\n \"RotationEnable\": 0,\n \"RotationConfigJSON\": \"{}\"\n },\n \"TimeZoneOffset\": \"\",\n \"ConfigUpdateInterval\": 3,\n \"AlertRetryLimit\": 3,\n \"AlertWebHook\": \"\",\n \"AdminBasePath\": \"\"\n}"}
{"level":"info","ts":"2023-07-25T18:54:17.820-0700","caller":"maxprocs/maxprocs.go:47","msg":"maxprocs: Leaving GOMAXPROCS=1: CPU quota undefined"}
{"level":"info","ts":"2023-07-25T18:54:17.827-0700","caller":"dtmsvr/svr.go:32","msg":"start dtmsvr"}
{"level":"info","ts":"2023-07-25T18:54:17.828-0700","caller":"dtmsvr/svr.go:51","msg":"dtmsvr http listen at: 36789"}
{"level":"info","ts":"2023-07-25T18:54:17.830-0700","caller":"dtmsvr/svr.go:65","msg":"grpc listening at [::]:36790"}
{"level":"info","ts":"2023-07-25T18:54:17.931-0700","caller":"dtmsvr/svr.go:80","msg":"RegisterService: default"}
{"level":"info","ts":"2023-07-25T18:54:17.932-0700","caller":"dtm/main.go:96","msg":"admin is proxied to admin.dtm.pub"}
{"level":"info","ts":"2023-07-25T18:54:17.933-0700","caller":"dtm/main.go:98","msg":"admin is running at: http://localhost:36789"}
訪問:http://localhost:36789,可以看到所有的事務狀態和過程數據
數據存儲
DTM部署后會默認將數據進行文件的形式存儲,文件名:dtm.bolt,當然,你也可以通過修改conf.yml文件以關系型數據庫,redis等方式存儲
如下提供MySQL的數據庫腳本,可以根據需要定制化監控處理功能,并且DTM提供了相應的API供服務方進行RM表的數據保存
RM表(資源管理器)
CREATE TABLE dtm.barrier (`id` bigint(22) NOT NULL AUTO_INCREMENT,`trans_type` varchar(45) DEFAULT '' COMMENT '事務模式',`gid` varchar(128) DEFAULT '' COMMENT '全局事務id',`branch_id` varchar(128) DEFAULT '' COMMENT '分支事務id',`op` varchar(45) DEFAULT '' COMMENT '事務操作', `barrier_id` varchar(45) DEFAULT '' COMMENT '分支事務層級,如果分支事務包含事務會遞增',`reason` varchar(45) DEFAULT '' COMMENT '插入此記錄的分支類型,如果成功與op相同,如果失敗與op相反',`create_time` datetime DEFAULT CURRENT_TIMESTAMP,`update_time` datetime DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `gid` (`gid`,`branch_id`,`op`,`barrier_id`),KEY `create_time` (`create_time`),KEY `update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8;
TM表(事務管理器)
CREATE TABLE if not EXISTS dtm.trans_global (`id` bigint(22) NOT NULL AUTO_INCREMENT,`gid` varchar(128) NOT NULL COMMENT 'global transaction id',`trans_type` varchar(45) not null COMMENT 'transaction type: saga | xa | tcc | msg',`status` varchar(12) NOT NULL COMMENT 'transaction status: prepared | submitted | aborting | succeed | failed',`query_prepared` varchar(1024) NOT NULL COMMENT 'url to check for msg|workflow',`protocol` varchar(45) not null comment 'protocol: http | grpc | json-rpc',`create_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,`finish_time` datetime DEFAULT NULL,`rollback_time` datetime DEFAULT NULL,`options` varchar(1024) DEFAULT '' COMMENT 'options for transaction like: TimeoutToFail, RequestTimeout',`custom_data` varchar(1024) DEFAULT '' COMMENT 'custom data for transaction',`next_cron_interval` int(11) default null comment 'next cron interval. for use of cron job',`next_cron_time` datetime default null comment 'next time to process this trans. for use of cron job',`owner` varchar(128) not null default '' comment 'who is locking this trans',`ext_data` TEXT comment 'extra data for this trans. currently used in workflow pattern',`result` varchar(1024) DEFAULT '' COMMENT 'result for transaction',`rollback_reason` varchar(1024) DEFAULT '' COMMENT 'rollback reason for transaction',PRIMARY KEY (`id`),UNIQUE KEY `gid` (`gid`),key `owner`(`owner`),key `status_next_cron_time` (`status`, `next_cron_time`) comment 'cron job will use this index to query trans'
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (`id` bigint(22) NOT NULL AUTO_INCREMENT,`gid` varchar(128) NOT NULL COMMENT 'global transaction id',`url` varchar(1024) NOT NULL COMMENT 'the url of this op',`data` TEXT COMMENT 'request body, depreceated',`bin_data` BLOB COMMENT 'request body',`branch_id` VARCHAR(128) NOT NULL COMMENT 'transaction branch ID',`op` varchar(45) NOT NULL COMMENT 'transaction operation type like: action | compensate | try | confirm | cancel',`status` varchar(45) NOT NULL COMMENT 'transaction op status: prepared | succeed | failed',`finish_time` datetime DEFAULT NULL,`rollback_time` datetime DEFAULT NULL,`create_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `gid_uniq` (`gid`, `branch_id`, `op`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS dtm.kv (`id` bigint(22) NOT NULL AUTO_INCREMENT,`cat` varchar(45) NOT NULL COMMENT 'the category of this data',`k` varchar(128) NOT NULL,`v` TEXT,`version` bigint(22) default 1 COMMENT 'version of the value',create_time datetime default NULL,update_time datetime DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE key `uniq_k`(`cat`, `k`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
DTM入門示例
maven依賴
<dependency><groupId>io.github.dtm-labs</groupId><artifactId>dtmcli-java</artifactId><version>2.1.4</version>
</dependency>
application.yml
server:port: 8081#DTM服務地址
dtm:ipport: '192.168.137.128:36789'spring:datasource:url: jdbc:mysql://182.92.67.160:3316/dtm?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTCusername: rootpassword: parateradriver-class-name: com.mysql.cj.jdbc.Driver
Saga模式
將長事務拆分為多個分支事務,由Saga事務協調器協調,如果每個分支事務都成功提交完成,那么全局事務就正常完成,如果某個步驟失敗,則根據相反順序一次調用補償操作。
時序圖
成功完成
失敗回滾
模擬正向操作及反向補償
@RequestMapping("TransOut")
public Object TransOut() {try {logger.info("TransOut");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransOutCompensate")
public Object TransOutCompensate() {try {logger.info("TransOutCompensate");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransIn")
public Object TransIn() {try {logger.info("TransIn");//int i = 1/0 ;模擬異常時回滾return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransInCompensate")
public Object TransInCompensate() {try {logger.info("TransInCompensate");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}
執行全局事務
//這個地方如果DTM服務跟項目不是同一臺機器,請填寫IP地址,并保障可以ping通
private static final String svc = "http://192.168.110.174:8081/api";@Value("${dtm.ipport}")
private String ipPort;@RequestMapping("testSaga")
public String testSage() {DtmClient dtmClient = new DtmClient(ipPort);try {// 全局事務id,保證唯一即可String customGid = UUID.randomUUID().toString();Saga saga = dtmClient.newSaga(customGid)//按業務順序將正向操作和反向補償操作注入到Saga流中,通過事務協調器執行,第三個參數為分支接口傳參(正向操作和反向補償傳參都是一樣的).add(svc + "/TransOut", svc + "/TransOutCompensate", null).add(svc + "/TransIn", svc + "/TransInCompensate", null).enableWaitResult();saga.submit();} catch (Exception e) {log.error("saga submit error", e);return "fail";}return "success";
}
成功完成:
失敗回滾:
TCC模式
TCC是Try、Confirm、Cancel三個詞語的縮寫
- Try 階段:嘗試執行,完成所有業務檢查(一致性), 預留必須業務資源(準隔離性)
- Confirm 階段:如果所有分支的Try都成功了,則走到Confirm階段。Confirm真正執行業務,不作任何業務檢查,只使用 Try 階段預留的業務資源
- Cancel 階段:如果所有分支的Try有一個失敗了,則走到Cancel階段。Cancel釋放 Try 階段預留的業務資源。
時序圖
成功完成
失敗回滾
模擬三階段事務
@RequestMapping("TransOutTry")
public Object TransOutTry() {try {logger.info("TransOutTry");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransOutConfirm")
public Object TransOutConfirm() {try {logger.info("TransOutConfirm");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransOutCancel")
public Object TransOutCancel() {try {logger.info("TransOutCancel");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransInTry")
public Object TransInTry() {try {logger.info("TransInTry");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransInConfirm")
public Object TransInConfirm() {try {logger.info("TransInConfirm");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}@RequestMapping("TransInCancel")
public Object TransInCancel() {try {logger.info("TransInCancel");return DtmResponse.buildDtmResponse(Constants.SUCCESS_RESULT);}catch (Exception e){return DtmResponse.buildDtmResponse(Constants.FAILURE_RESULT);}
}
注冊執行事務分支
private static final String svc = "http://192.168.110.174:8081/api";@Value("${dtm.ipport}")
private String ipport;@RequestMapping("testTcc")
public String testTcc() {//創建dtm clinetDtmClient dtmClient = new DtmClient(ipport);//創建tcc事務String customGid = UUID.randomUUID().toString();try {dtmClient.tccGlobalTransaction(customGid, tcc -> {//第一個參數為接口傳參,三階段對應的都是相同的參數tcc.callBranch(null, svc + "/TransOutTry", svc + "/TransOutConfirm", svc + "/TransOutCancel");tcc.callBranch(null, svc + "/TransInTry", svc + "/TransInConfirm", svc + "/TransInCancel");});} catch (Exception e) {log.error("tccGlobalTransaction error", e);return "fail";}return "success";
}
成功完成:
失敗回滾:
操作異常思考
假如Compensate/Confirm/Cancel操作遇見失敗會怎么樣?按照業務的思路,Compensate/Confirm/Cancel操作是要求最終成功的,遇見失敗的情況,都是由于臨時故障或者程序bug。dtm在Compensate/Confirm/Cancel操作遇見失敗時,會不斷進行重試(時間間隔為10 * 2的冪次方),直到成功,當然,如果是因為業務條件不滿足等非技術環境問題,不應該進入重試,這一點應該在程序編寫時充分考慮。
為了避免程序bug導致補償操作一直無法成功,可以通過DTM管理控制臺進行監控,由運維人員手動后臺處理,強制停止!,不過這個目前做得比較粗糙,湊合著用!
另外如果對業務沒有特別的要求,可以暴力設置超時回滾:
saga.setTimeoutToFail(1800);
Saga與TCC模式對比
其實還有兩階段、XA模式等,因為并發鎖的原因,性能比較低,特別是對于死鎖的處理需要大量的人工介入
Saga | TCC | |
---|---|---|
難度 | 簡單,只需要正反兩個分支事務 | 復雜,需要Try、Confirm、Cancel三個分支事務 |
一致性 | 低,缺少資源預留,并發度高時可能出現臟讀 | 高,一開始就會對所有資源進行預留(try) |
并發執行能力 | 支持并發和按序執行 | 不支持并發 |
事務嵌套 | 不支持 | 支持 |
先易后難,我們后續的功能開發先基于Saga模式進行
DMC平臺集成分布式事務V1.0
數據庫設計
DROP TABLE IF EXISTS trans_setting;
CREATE TABLE IF NOT EXISTS trans_setting (id bigint(22) NOT NULL AUTO_INCREMENT,call_url varchar(45) NOT NULL COMMENT '全局事務執行標志,原則上只需要保證唯一性即可,建議使用項目名+模塊名+方法名,如console.user.getUser',create_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,updated_at datetime ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `call_url` (`call_url`)
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '全局事務執行表' ROW_FORMAT = Compact;DROP TABLE IF EXISTS trans_setting_detail;
CREATE TABLE IF NOT EXISTS trans_setting_detail (id bigint(22) NOT NULL AUTO_INCREMENT,fid bigint(22) NOT NULL COMMENT '全局事務URL表主鍵',branch_url varchar(100) NOT NULL COMMENT '分支url,完整地址:ip、port、restful接口',branch_rb_url varchar(100) NOT NULL COMMENT '分支回滾url,完整地址:ip、port、restful接口',create_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,updated_at datetime ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `fid_branch_url` (`fid`,`branch_url`)
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '分支事務URL表' ROW_FORMAT = Compact;
增刪改查
略…
DTM服務地址配置
dtm:url: 192.168.137.128:36789
全局事務執行接口
通過restful接口的方式,兼容跨語言、服務、數據庫的全局事務處理
/*** 分布式事務統一調度接口** @param callUrl 配置的全局事務執行標志* @param postData 分支事務接口參數* @return*/
@PostMapping("/s-api/globalTransaction")
public String execGlobalTransaction(String callUrl, Object postData) {return transactionService.execGlobalTransaction(callUrl, postData);
}
@Value("${dtm.url}")
private String dtmUrl;public String execGlobalTransaction(String callUrl, Object postData) {QueryWrapper wrapper = new QueryWrapper();wrapper.eq("call_url", callUrl);TransSetting transSetting = settingDao.selectOne(wrapper);wrapper.clear();wrapper.eq("fid", transSetting.getId());List<TransSettingDetail> list = settingDetailDao.selectList(wrapper);DtmClient dtmClient = new DtmClient(dtmUrl);try {//使用callUrl + 時間戳 + 四位隨機數作為gid,比較好定位問題String time = DateUtil.date2String(Calendar.getInstance().getTime(), DateUtil.DATE_TIME);int number = new Random().nextInt(9000) + 1000;String customGid = callUrl + ":" + time + ":" + number;Saga saga = dtmClient.newSaga(customGid);//循環遍歷添加分支urllist.forEach(detail -> {saga.add(detail.getBranchUrl(), detail.getBranchRbUrl(), postData);});saga.enableWaitResult();saga.submit();} catch (Exception e) {log.error("saga submit error", e);return "fail";}return "success";
}
調度測試
從DTM控制臺可以看到,調度成功并且沒有觸發回滾:
我們人為在轉入分支制造異常,再次測試:
調度失敗,發生回滾: