準備
innodb存儲引擎開啟支持分布式事務
set global innodb_support_ax=on
分布式的流程
詳細流程:
- XA START ‘a’;
作用:開始一個新的XA事務,并分配一個唯一的事務ID ‘a’。
說明:在這個命令之后,所有后續的SQL操作都會被包含在這個XA事務中,直到遇到XA END命令。 - insert into z(a,b,c) select 100,2,100;
作用:執行一條插入語句,將值(100, 2, 100)插入到表z中。
說明:這條語句是在XA事務上下文中執行的,這意味著如果最終XA事務沒有成功提交,這個插入操作也不會對數據庫產生實際影響。 - XA END ‘a’;
作用:標記XA事務’a’的操作結束。
說明:這并不意味著事務已經完成或提交,它只是表明當前事務不再接受新的操作。在XA END之后,不能再對該事務進行任何修改操作。 - 進行二階段
4. 1 XA PREPARE ‘a’;
作用:準備XA事務’a’,使其進入預備狀態。
說明:這是兩階段提交(2PC, Two-Phase Commit)的第一階段。在這個階段,所有參與的資源管理器會投票決定是否可以安全地提交該事務。如果所有參與者都準備好提交,則可以進入下一階段;如果有任何一個參與者不能準備好,則整個事務會被回滾。
4.2 XA COMMIT ‘a’;
作用:提交XA事務’a’。
說明:這是兩階段提交的第二階段。只有當所有參與的資源管理器都已準備好(通過XA PREPARE),并且沒有 任何錯誤發生時,才會執行此命令。提交后,所有更改將永久保存到數據庫中。
代碼實例
引入依賴:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version> <!-- 請使用最新的版本 --></dependency>
1.事務ID標識類
package org.example.xa;import javax.transaction.xa.Xid;public class MyXid implements Xid {private int formatId;private byte gtrid[];private byte bqual[];public MyXid(int formatId, byte gtrid[], byte bqual[]){this.formatId = formatId;this.gtrid = gtrid;this.bqual = bqual;}@Overridepublic int getFormatId() {return formatId;}@Overridepublic byte[] getGlobalTransactionId() {return gtrid;}@Overridepublic byte[] getBranchQualifier() {return bqual;}
}
2.定義獲取分布式事務數據源類
package org.example.xa;import com.mysql.cj.jdbc.MysqlXADataSource;public class GetDataSource {private String connString;private String user;private String password;public GetDataSource(String connString, String user, String password) {this.connString = connString;this.user = user;this.password = password;}public MysqlXADataSource getDataSource(){MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();mysqlXADataSource.setURL(connString);mysqlXADataSource.setUser(user);mysqlXADataSource.setPassword(password);return mysqlXADataSource;}
}
- XATest測試類
package org.example.xa;import com.mysql.cj.jdbc.MysqlXADataSource;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;public class XATest {public static void main(String[] args) throws SQLException, XAException {GetDataSource getDataSource1 = new GetDataSource("jdbc:mysql://localhost:3306/test", "root", "123456");MysqlXADataSource dataSource1 = getDataSource1.getDataSource();GetDataSource getDataSource2 = new GetDataSource("jdbc:mysql://localhost:3306/test", "root", "123456");MysqlXADataSource dataSource2 = getDataSource2.getDataSource();//創建分布式事務的資源管理器XAConnection xaConnection1 = dataSource1.getXAConnection();XAResource xaResource1 = xaConnection1.getXAResource();Connection connection1 = xaConnection1.getConnection();Statement statement1 = connection1.createStatement();MyXid myXid1 = new MyXid(100, new byte[]{0x61,0x61,0x61}, new byte[]{0x61,0x61,0x61});//創建分布式事務的資源管理器XAConnection xaConnection2 = dataSource2.getXAConnection();XAResource xaResource2 = xaConnection2.getXAResource();Connection connection2 = xaConnection2.getConnection();Statement statement2 = connection2.createStatement();MyXid myXid2 = new MyXid(100, new byte[]{0x62,0x62,0x62}, new byte[]{0x62,0x62,0x62});// xaResource2.rollback(new MyXid(100, new byte[]{0x11}, new byte[]{0x12}));//1.開始一個新的XA事務 在這個命令之后,所有后續的SQL操作都會被包含在這個XA事務中,直到遇到XA END命令xaResource1.start(myXid1, XAResource.TMNOFLAGS);//2.執行DMLstatement1.execute("insert into z(`a`,`b`,`c`) select 2800,2,2800");//3.分布式事務的end,這并不意味著事務已經完成或提交,它只是表明當前事務不再接受新的操作。在XA END之后,不能再對該事務進行任何修改操作。xaResource1.end(myXid1, XAResource.TMSUCCESS);xaResource2.start(myXid2, XAResource.TMNOFLAGS);statement2.execute("insert into z(`a`,`b`,`c`) select 2900,2,2900");xaResource2.end(myXid2, XAResource.TMSUCCESS);//1.分布式的二階段步驟try{//1.1分布式事務的prepare階段int re1 = xaResource1.prepare(myXid1);int re2 = xaResource2.prepare(myXid2);//1.2分布式事務的commit或者rollback階段if(XAResource.XA_OK != re1 || XAResource.XA_OK != re2){xaResource1.rollback(myXid1);xaResource2.rollback(myXid2);}xaResource1.commit(myXid1, false);xaResource2.commit(myXid2, false);}catch (Throwable throwable){xaResource1.rollback(myXid1);xaResource2.rollback(myXid2);}}
}