【Seata源碼學習 】篇四 TM事務管理器是如何開啟全局事務

TM發送 單個或批量 消息

以發送GlobalBeginRequest消息為例

TM在執行攔截器鏈路前將向TC發送GlobalBeginRequest 消息

io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

   @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}return response.getXid();}

注意 消息TYPE_CODE 為 MessageType.TYPE_GLOBAL_BEGIN 值為 1

package io.seata.core.protocol.transaction;import io.seata.core.protocol.MessageType;
import io.seata.core.rpc.RpcContext;/*** The type Global begin request.** @author slievrly*/
public class GlobalBeginRequest extends AbstractTransactionRequestToTC {private int timeout = 60000;private String transactionName;/*** Gets timeout.** @return the timeout*/public int getTimeout() {return timeout;}/*** Sets timeout.** @param timeout the timeout*/public void setTimeout(int timeout) {this.timeout = timeout;}/*** Gets transaction name.** @return the transaction name*/public String getTransactionName() {return transactionName;}/*** Sets transaction name.** @param transactionName the transaction name*/public void setTransactionName(String transactionName) {this.transactionName = transactionName;}@Overridepublic short getTypeCode() {return MessageType.TYPE_GLOBAL_BEGIN;}@Overridepublic AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}@Overridepublic String toString() {StringBuilder result = new StringBuilder();result.append("timeout=");result.append(timeout);result.append(",");result.append("transactionName=");result.append(transactionName);return result.toString();}
}

io.seata.tm.DefaultTransactionManager#syncCall

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);}}

io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)

@Overridepublic Object sendSyncRequest(Object msg) throws TimeoutException {// 獲取seata服務端地址String serverAddress = loadBalance(getTransactionServiceGroup(), msg);// TM RPC服務調用默認超時時間30slong timeoutMillis = this.getRpcRequestTimeout();// 將GlobalBeginRequest 封裝為 RpcMessageRpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);// send batch message// put message into basketMap, @see MergedSendRunnable// 是否開啟seata客戶端批量發送消息 1.5默認關閉if (this.isEnableClientBatchSendRequest()) {// send batch message is sync request, needs to create messageFuture and put it in futures.// 批量發送消息需要將消息封裝為 MessageFuture 對象 并添加到 futures Map集合中MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);// put message into basketMap// 獲取當前服務端地址對應的消息隊列BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,key -> new LinkedBlockingQueue<>());// 將當前消息添加到隊列中 一般不會添加失敗 LinkedBlockingQueue 是無界隊列if (!basket.offer(rpcMessage)) {LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",serverAddress, rpcMessage);return null;}if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());}// 如果當前沒有在發送隊列消息 給mergeLock對象上鎖成功 則喚醒所有等待發送消息的線程// isSending 被volatile 修飾 保證可見性和有序性 但是不保證原子性if (!isSending) {synchronized (mergeLock) {mergeLock.notifyAll();}}try {// MessageFuture 封裝了 CompletableFuture 對象,此時會超時阻塞當前線程,超時時間30秒// 等待 CompletableFuture.complete 完成獲取結果return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {// 如果有異常拋出LOGGER.error("wait response error:{},ip:{},request:{}",exx.getMessage(), serverAddress, rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}} else {// 如果沒有開啟客戶端批量發送消息 先獲取channelChannel channel = clientChannelManager.acquireChannel(serverAddress);// 同步發送消息 并將RPCMessage 封裝為 MessageFuture,并設置超時時間 放入 futures Map集合中// 由父類AbstractNettyRemoting的周期線程每隔3秒檢查一次消息是否超時// 發送消息時會添加 ChannelFutureListener 監聽器,如果消息成功,則調用 CompletableFuture.complete 設置結果,// 并將當前消息id對應的MessageFuture 從futures 中移除return super.sendSync(channel, rpcMessage, timeoutMillis);}}

單個發送消息

如果是發送單個消息,則直接調用AbstractNettyRemoting.sendSync 向TC端發送消息

io.seata.core.rpc.netty.AbstractNettyRemoting#sendSync

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {if (timeoutMillis <= 0) {throw new FrameworkException("timeout should more than 0ms");}if (channel == null) {LOGGER.warn("sendSync nothing, caused by null channel.");return null;}MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);// 設置超時時間 用于檢測是否超時 System.currentTimeMillis() - start > timeoutmessageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);channelWritableCheck(channel, rpcMessage.getBody());String remoteAddr = ChannelUtil.getAddressFromChannel(channel);doBeforeRpcHooks(remoteAddr, rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {//根據消息id從futures中移除,不再進行消息超時檢測MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if (messageFuture1 != null) {//設置結果messageFuture1.setResultMessage(future.cause());}//銷毀連接destroyChannel(future.channel());}});try {// 超時阻塞等待獲取結果Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);doAfterRpcHooks(remoteAddr, rpcMessage, result);return result;} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}}

messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS) 會阻塞等待30s獲取消息,超時則拋出異常 TimeoutException

批量發送消息

 // send batch message is sync request, needs to create messageFuture and put it in futures.// 批量發送消息需要將消息封裝為 MessageFuture 對象 并添加到 futures Map集合中MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);// put message into basketMap// 獲取當前服務端地址對應的消息隊列BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,key -> new LinkedBlockingQueue<>());// 將當前消息添加到隊列中 一般不會添加失敗 LinkedBlockingQueue 是無界隊列if (!basket.offer(rpcMessage)) {LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",serverAddress, rpcMessage);return null;}if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());}// 如果當前沒有在發送隊列消息 給mergeLock對象上鎖成功 則喚醒所有等待發送消息的線程// isSending 被volatile 修飾 保證可見性和有序性 但是不保證原子性if (!isSending) {synchronized (mergeLock) {mergeLock.notifyAll();}}try {// MessageFuture 封裝了 CompletableFuture 對象,此時會超時阻塞當前線程,超時時間30秒// 等待 CompletableFuture.complete 完成獲取結果return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {// 如果有異常拋出LOGGER.error("wait response error:{},ip:{},request:{}",exx.getMessage(), serverAddress, rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}

如果批量發送消息,則會將消息放到basketMap 集合中,AbstractNettyRemotingClient會在初始化時,啟動最大和核心都是一的單線程池線程池,提交MergedSendRunnable 任務,死循環不斷遍歷basketMap,獲取等待發送的消息隊列,最終由io.seata.core.rpc.netty.AbstractNettyRemoting#sendAsync 發送異步消息。需要注意的是,不管是發送同步消息還是異步消息,TM開啟事務所屬的線程都會因messageFuture.get 超時阻塞,只不過發送和獲取返回消息都變成了異步。

public void run() {// 死循環while (true) {//先上鎖synchronized (mergeLock) {// 等待 1s 并釋放當前鎖try {mergeLock.wait(MAX_MERGE_SEND_MILLS);} catch (InterruptedException e) {}}isSending = true;// 遍歷Map集合basketMap.forEach((address, basket) -> {if (basket.isEmpty()) {return;}MergedWarpMessage mergeMessage = new MergedWarpMessage();// 彈出同一個seata服務器地址等待發送的所有消息,合并在一塊發送while (!basket.isEmpty()) {RpcMessage msg = basket.poll();// 獲取消息體 與消息id 封裝為 MergedWarpMessagemergeMessage.msgs.add((AbstractMessage) msg.getBody());mergeMessage.msgIds.add(msg.getId());}if (mergeMessage.msgIds.size() > 1) {printMergeMessageLog(mergeMessage);}Channel sendChannel = null;try {// send batch message is sync request, but there is no need to get the return value.// Since the messageFuture has been created before the message is placed in basketMap,// the return value will be obtained in ClientOnResponseProcessor.// 發送批量消息不會在此處阻塞等待消息的返回  將會采用異步的方式 由 ClientOnResponseProcessor 消息處理器獲取返回消息sendChannel = clientChannelManager.acquireChannel(address);AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {destroyChannel(address, sendChannel);}// fast failfor (Integer msgId : mergeMessage.msgIds) {MessageFuture messageFuture = futures.remove(msgId);if (messageFuture != null) {messageFuture.setResultMessage(new RuntimeException(String.format("%s is unreachable", address), e));}}LOGGER.error("client merge call failed: {}", e.getMessage(), e);}});isSending = false;}}

接受異步消息由TM初始化時添加的ClientOnResponseProcessor 進行處理,將會遍歷所有合并的消息,根據消息ID將其從futures中移除,并調用 future.setResultMessage 設置結果,此時TM發送消息時的阻塞狀態將會被喚醒。

io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process

  @Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//如果當前消息屬于合并發送的消息if (rpcMessage.getBody() instanceof MergeResultMessage) {//獲取消息體與消息ID,并將消息id對應的MessageFuture從futures中移除,不再進行超時檢測MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());//遍歷所有的消息for (int i = 0; i < mergeMessage.msgs.size(); i++) {int msgId = mergeMessage.msgIds.get(i);MessageFuture future = futures.remove(msgId);if (future == null) {LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]);} else {//在此時設置消息結果 結束阻塞等待future.setResultMessage(results.getMsgs()[i]);}}// 與合并消息的處理是一致的} else if (rpcMessage.getBody() instanceof BatchResultMessage) {try {BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {int msgId = batchResultMessage.getMsgIds().get(i);MessageFuture future = futures.remove(msgId);if (future == null) {LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));} else {future.setResultMessage(batchResultMessage.getResultMessages().get(i));}}} finally {// In order to be compatible with the old version, in the batch sending of version 1.5.0,// batch messages will also be placed in the local cache of mergeMsgMap,// but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMapmergeMsgMap.clear();}} else {MessageFuture messageFuture = futures.remove(rpcMessage.getId());if (messageFuture != null) {messageFuture.setResultMessage(rpcMessage.getBody());} else {if (rpcMessage.getBody() instanceof AbstractResultMessage) {if (transactionMessageHandler != null) {transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);}}}}}

TC 處理 GlobalBeginRequest 消息

NettyChannel消息處理

io.seata.core.rpc.netty.NettyRemotingServer#init

 @Overridepublic void init() {// registry processorregisterProcessor();if (initialized.compareAndSet(false, true)) {super.init();}}

io.seata.core.rpc.netty.NettyRemotingServer#registerProcessor

private void registerProcessor() {// 1. registry on request message processorServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());ShutdownHook.getInstance().addDisposable(onRequestProcessor);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);//處理GlobalBeginRequest消息  ServerOnRequestProcessorsuper.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);// 2. registry on response message processorServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);// 3. registry rm message processorRegRmProcessor regRmProcessor = new RegRmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);// 4. registry tm message processorRegTmProcessor regTmProcessor = new RegTmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);// 5. registry heartbeat message processorServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);}	

進一步跟進查看具體如何處理

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#process

    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {// channel是否已經注冊if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {try {if (LOGGER.isInfoEnabled()) {LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());}ctx.disconnect();ctx.close();} catch (Exception exx) {LOGGER.error(exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}

進入io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage

private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isDebugEnabled()) {LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());} else {try {BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"+ rpcContext.getTransactionServiceGroup());} catch (InterruptedException e) {LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);}}//GlobalBeginRequest 繼承 AbstractTransactionRequest 繼承 AbstractMessageif (!(message instanceof AbstractMessage)) {return;}// 合并消息處理// the batch send request messageif (message instanceof MergedWarpMessage) {//是否開啟了TC批量響應 默認false  rpcContext 的 version 不為空并且大于等于 1.5.0// 如果滿足 則使用 MergedWarpMessage 來處理請求消息if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;//遍歷處理for (int i = 0; i < msgs.size(); i++) {AbstractMessage msg = msgs.get(i);int msgId = msgIds.get(i);//是否開啟并發處理消息 默認關閉if (PARALLEL_REQUEST_HANDLE) {CompletableFuture.runAsync(() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));} else {//單個消息處理handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);}}} else {// results 響應結果集 如果開啟了并發處理消息 需要保證線程安全// completableFutures 并發處理消息List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();List<CompletableFuture<Void>> completableFutures = null;for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {// 默認關閉 沒有開啟并發處理消息 如果開啟了 則使用 completableFutures 來并發處理消息if (PARALLEL_REQUEST_HANDLE) {if (completableFutures == null) {completableFutures = new ArrayList<>();}int finalI = i;// 并發異步處理消息,并將結果添加到results中completableFutures.add(CompletableFuture.runAsync(() -> {results.add(finalI, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(finalI), rpcContext));}));} else {// 處理消息并按順序添加到results集合中results.add(i,handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));}}if (CollectionUtils.isNotEmpty(completableFutures)) {try {CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();} catch (InterruptedException | ExecutionException e) {LOGGER.error("handle request error: {}", e.getMessage(), e);}}MergeResultMessage resultMessage = new MergeResultMessage();resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);}} else {// the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#handleRequestsByMergedWarpMessage

private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {return transactionMessageHandler.onRequest(subMessage, rpcContext);
}

io.seata.server.coordinator.DefaultCoordinator#onRequest

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToTC)) {throw new IllegalArgumentException();}AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;transactionRequest.setTCInboundHandler(this);return transactionRequest.handle(context);}

io.seata.core.protocol.transaction.GlobalBeginRequest#handle

public AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalBeginRequest, io.seata.core.rpc.RpcContext)

  public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {GlobalBeginResponse response = new GlobalBeginResponse();exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {@Overridepublic void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {try {//真正處理beging消息doGlobalBegin(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore,String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),e);}}}, request, response);return response;}

獲取全局事務XID

io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {// 通過DefaultCore根據應用ID,事務分組名稱,超時時間 創建并開啟一個新的事務 返回全局事務XID 放入GlobalBeginResponse中response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}

全局事務會話持久化

io.seata.server.coordinator.DefaultCore#begin

    @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 創建全局事務對象 由XID.generateXID 生成全局事務XIDGlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,timeout);// 將xid放進ThreadLocal中MDC.put(RootContext.MDC_KEY_XID, session.getXid());
//        SessionHolder.getRootSessionManager() 根據SPI機制去查找
//        SessionHolder 在 io.seata.server.Server.start 啟動時初始化,獲取當前配置的 會話持久機制模式  ,調用  SessionHolder.init(sessionStoreMode)進行初始化
//        此時我們通過 SessionHolder.getRootSessionManager() 將使用seata的SPI機制去 META-INF/services/ 與  META-INF/seata/ 目錄下查找
//        文件名為 io.seata.server.session.SessionManager 的文件 在根據 @LoadLevel注解的name值加載需要的對象
//         例如 如果我們此時的session持久化模式為DB,那么  SessionHolder.getRootSessionManager() 將加載返回 DataBaseSessionManager 對象session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//開啟事務 標記GlobalSession 的事務狀態為 1 ,并記錄開啟時間與激活狀態  調用SessionLifecycleListener監聽器的onBegin方法session.begin();// transaction start event//發送事務開啟事件MetricsPublisher.postSessionDoingEvent(session, false);//返回XIDreturn session.getXid();}

Seata的SPI機制會根據 EnhancedServiceLoader.load(類, 名稱) 方法參數一的類的全限定類名,從META-INF/services/ 與 META-INF/seata/ 路徑下去匹配,然后根據匹配到的類的全限定類名,定位到具體的類,再根據參數二名稱與@LoadLevel注解的name值進行匹配 ,確定要加載的對象,如下所示

image-20231121152441877

加載 DataBaseSessionManager 對象后,添加其到session的生命監聽器列表中,在執行session.begin方法時,調用監聽器的onBegin方法,進而由父類AbstractSessionManager 執行 onBegin方法,并調用DataBaseSessionManager重寫的addGlobalSession方法

io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

    @Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {// 將全局事務會話信息寫入數據庫boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}} else {boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException("addGlobalSession failed.");}}}

io.seata.server.storage.db.store.DataBaseTransactionStoreManager#writeSession

    public boolean writeSession(LogOperation logOperation, SessionStorable session) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {// logStore 封裝了 DataSource   根據不同類型的數據源生成相應的insert語句 持久化到數據庫中return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}}

io.seata.server.storage.db.store.LogStoreDataBaseDAO#insertGlobalTransactionDO

public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {// 生成SQL插入 global_table (默認) 表中,持久化全局事務會話String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {int index = 1;conn = logStoreDataSource.getConnection();//自動提交conn.setAutoCommit(true);ps = conn.prepareStatement(sql);ps.setString(index++, globalTransactionDO.getXid());ps.setLong(index++, globalTransactionDO.getTransactionId());ps.setInt(index++, globalTransactionDO.getStatus());ps.setString(index++, globalTransactionDO.getApplicationId());ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ?transactionName.substring(0, transactionNameColumnSize) :transactionName;ps.setString(index++, transactionName);ps.setInt(index++, globalTransactionDO.getTimeout());ps.setLong(index++, globalTransactionDO.getBeginTime());ps.setString(index++, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}}

最終生成的SQL如下所示

insert into global_table(xid, transaction_id, status, application_id, transaction_service_group, transaction_name,timeout, begin_time, application_data, gmt_create, gmt_modified)
values (全局事務XID, 事務id, 事務狀態,應用id, 事務分組, 事務名稱, 超時時間, 開始時間, 應用數據, now(), now())

數據庫中的數據

image-20231121214708246

返回GlobalBeginResponse消息

在創建全局事務并開啟后,拿到XID后封裝到GlobalBeginResponse中,最終由remotingServer.sendAsyncResponse將GlobalBeginResponse消息返回給TM

TM處理 GlobalBeginResponse 消息

在seata服務端返回 GlobalBeginResponse 消息 后, TM還是由 io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process 處理接收到的消息,通過調用future.setResultMessage 設置消息結果,并恢復阻塞的TM發送GlobalBeginRequest消息的線程,將結果返回

io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

public void begin(int timeout, String name) throws TransactionException {if (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}assertXIDNull();String currentXid = RootContext.getXID();if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}// 獲取到seata服務端返回到XIDxid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;//將xid綁定到RootContext中RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}}

獲取到XID后,將XID綁定到RootContext中,至此,全局事務的開啟過程也就結束了。

總結流程圖

TM開啟全局事務過程

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/165165.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/165165.shtml
英文地址,請注明出處:http://en.pswp.cn/news/165165.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

操作系統發展過程--單道批處理系統、多道批處理系統、分時系統、實時系統

一、單道批處理系統 計算機早期&#xff0c;為了能提高利用率&#xff0c;需要盡量保持系統的連續運行&#xff0c;即在處理完一個作業之后&#xff0c;緊接著處理下一個作業&#xff0c;以減少機器的空閑等待時間 1.單道批處理系統的處理過程 為了實現對作業的連續處理&…

51單片機應用從零開始(七)·循環語句(if語句,swtich語句)

51單片機應用從零開始&#xff08;一&#xff09;-CSDN博客 51單片機應用從零開始&#xff08;二&#xff09;-CSDN博客 51單片機應用從零開始&#xff08;三&#xff09;-CSDN博客 51單片機應用從零開始&#xff08;四&#xff09;-CSDN博客 51單片機應用從零開始&#xff08;…

數倉成本下降近一半,StarRocks 存算分離助力云覽科技業務出海

成都云覽科技有限公司傾力打造了鳳凰瀏覽器&#xff0c;專注于為海外用戶提供服務&#xff0c;公司致力于構建一個全球性的數字內容連接入口&#xff0c;為用戶帶來更為優質、高效、個性化的瀏覽體驗。 作為數據驅動的高科技公司&#xff0c;從數據中挖掘價值一直是公司核心任務…

【Spring進階系列丨第四篇】學習Spring中的Bean管理(基于xml配置)

前言 在之前的學習中我們知道&#xff0c;容器是一個空間的概念&#xff0c;一般理解為可盛放物體的地方。在Spring容器通常理解為BeanFactory或者ApplicationContext。我們知道spring的IOC容器能夠幫我們創建對象&#xff0c;對象交給spring管理之后我們就不用手動去new對象。…

基于單片機的智能藍牙避障循跡小車

智能小車循跡與避障運動控制系統的設計 摘 要:本設計主要由STC89C52單片機來進行控制&#xff0c;通過輸入輸出兩個端口控制驅動模塊來調節電機的工作狀態。本設計預利用機器視覺&#xff0c;通過識別條帶狀路標實現自主導航且利用超聲波模塊實時檢測距離以實現避障功能&…

vue3 webSocket 封裝及使用

vue3 webSocket 封裝及使用 封裝 import { ref, onUnmounted } from vue; interface SocketOptions {heartbeatInterval?: number;reconnectInterval?: number;maxReconnectAttempts?: number; }class Socket {url: string;ws: WebSocket | null null;opts: SocketOption…

【Docker】從零開始:9.Docker命令:Push推送倉庫(Docker Hub,阿里云)

【Docker】從零開始&#xff1a;9.Docker命令:Push推送倉庫 知識點1.Docker Push有什么作用&#xff1f;2.Docker倉庫有哪幾種2.1 公有倉庫2.2 第三方倉庫2.3 私有倉庫2.4 搭建私有倉庫的方法有哪幾種 3.Docker公有倉庫與私有倉庫的優缺點對比 Docker Push 命令標準語法操作參數…

openEuler 22.03 LTS x86_64 cephadm 部署ceph18.2.0 未完成 筆記

環境 準備三臺虛擬機 10.47.76.94 node-1 10.47.76.95 node-2 10.47.76.96 node-3 下載cephadm [rootnode-1 ~]# yum install cephadm Last metadata expiration check: 0:11:31 ago on Tue 21 Nov 2023 10:00:20 AM CST. Dependencies resolved. Package …

酷開系統 | 酷開科技聚焦價值人群 助力營銷增長

2023年&#xff0c;是消費復蘇回暖的一年&#xff0c;市場中充溢著大量品牌重啟增長的機遇與實例。品牌商期望能夠把握住市場趨勢&#xff0c;通過營銷獲得確定性的業績提升&#xff0c;并在未來收獲長期穩定的增長。作為數字媒介的代表之一&#xff0c;OTT大屏營銷的屬性和價值…

Vue學習之路------指令

Vue指令 vue會根據不同的指令&#xff0c;針對標簽實現不同的功能 指令:帶有v-前綴的特殊標簽屬性 1&#xff1a;v-html&#xff1a;指令 <div v-html"msg"></div> 2&#xff1a;v-show 作用&#xff1a;控制元素顯示隱藏 語法&#xff1a;v-show&quo…

【SpringMVC】 對請求的不同響應

前言 本文學習如何運用不同的注解來返回不同的響應. 1.返回靜態頁面Controller 返回index.html頁面 Controller 和 RestController的區別 controller 只有加上這個注解,Spring才會幫我們管理這個代碼.后續我們訪問時才能訪問到. RestController 等同于 Controller ResponseBo…

UML建模圖文詳解教程01——Enterprise Architect的安裝與使用

版權聲明 本文原創作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl Enterprise Architect概述 官方網站&#xff1a;https://www.sparxsystems.cn/products/ea/&#xff1b;圖示如下&#xff1a; Enterprise Architect是一個全功能的、基于…

B033-Servlet交互 JSP

目錄 ServletServlet的三大職責跳轉&#xff1a;請求轉發和重定向請求轉發重定向匯總請求轉發與重定向的區別用請求轉發和重定向完善登錄 JSP第一個JSP概述注釋設置創建JSP文件默認字符編碼集 JSP的java代碼書寫JSP的原理三大指令九大內置對象改造動態web工程進行示例內置對象名…

2.HTML入門

目錄 一.HTML介紹 二.HTML常用標簽 2.1 標題標簽 2.2 段落標簽 2.3 超鏈接標簽 2.4 圖片標簽 2.5 換行與空格 2.6 布局標簽 2.7 列表標簽 2.8 表單標簽 一.HTML介紹 定義&#xff1a;將內容顯示在網頁&#xff0c;用來描述網頁的一種語言&#xff0c;負責網頁的架構…

Adiponectin 脂聯素 ; T-cadherin +exosome

T-cadherin Adiponectin exosome T-cadherin Adiponectin exosome 代謝綜合征中 外泌體、脂肪組織 和 脂聯素 的器官間通訊-2019.pdf

【華為OD】C卷真題 100%通過:數組去重和排序 C/C++實現

華為OD 數組去重和排序 C源碼實現&#xff0c;100%通過 目錄 題目描述&#xff1a; 示例1 代碼實現&#xff1a; 題目描述&#xff1a; 給定一個亂序的數組&#xff0c;刪除所有的重復元素&#xff0c;使得每個元素只出現一次&#xff0c;并且按照出現的次數從高到低進行排…

C語言之字符串函數

C語言之字符串函數 文章目錄 C語言之字符串函數1. strlen的使用和模擬實現1.1 strlen的使用1.2 strlen的模擬實現 2. strcpy的使用和模擬實現2.1 strcpy的使用2.2 strncpy的使用2.3 strcpy的模擬實現 3. strcat的使用和模擬實現3.1 strcat的使用3.2 strncat3.3 strcat的模擬實現…

C語言--每日五道選擇題--Day23

第一題 1. 已知int i1, j2;&#xff0c;則表達式ij的值為&#xff08; &#xff09; A&#xff1a;1 B&#xff1a;2 C&#xff1a;3 D&#xff1a;4 答案及解析 C 本題考查的是前置和后置的優先級&#xff0c;后置的優先級是高于前置的&#xff0c;所以這個表達式就可以轉變為…

【Spark源碼分析】事件總線機制分析

Spark事件總線機制 采用Spark2.11源碼&#xff0c;以下類或方法被DeveloperApi注解額部分&#xff0c;可能出現不同版本不同實現的情況。 Spark中的事件總線用于接受事件并提交到對應的監聽器中。事件總線在Spark應用啟動時&#xff0c;會在SparkContext中激活spark運行的事件總…

什么是持續集成的自動化測試?

持續集成的自動化測試 如今互聯網軟件的開發、測試和發布&#xff0c;已經形成了一套非常標準的流程&#xff0c;最重要的組成部分就是持續集成&#xff08;Continuous integration&#xff0c;簡稱CI&#xff0c;目前主要的持續集成系統是Jenkins&#xff09;。 那么什么是持…