事務消息
事務消息機制。
事務消息的發送和處理總結為四個過程:
1.生產者發送事務消息和執行本地事務
2.Broker存儲事務消息
3.Broker回查事務消息
4.Broker提交或回滾事務消息
生產者發送事務消息和執行本地事務。
發送過程分為兩個階段:
第一階段,發送事務消息
第二階段,發送endTransaction消息
事務消息發送過程的實現類TransactionMQProducer,該類繼承魚DefaultMQProducer,不僅能發送事務消息,還能發送其他消息。雖然4.2.0版本有事務消息代碼,但實際是4.3.0版本才全面支持事務消息。
TransactionMQProducer的核心屬性和方法:
- transactionListener:事務監聽器,主要功能是執行本地事務和執行事務回查。事務監聽器包含executeLocalTransaction()和checkLocalTransaction()兩個方法。executeLocalTransaction()方法執行本地事務,checkLocalTransaction()方法是當生產者由于各種問題導致未發送Commit或Rollback消息給Broker時,Broker回調生產者查詢本地事務專改的處理方法
- executorService:Broker回查請求處理的線程池
- start():事務消息生產者啟動方法,與普通啟動方法不同,增加了this.defaultMQProducerImpl.initTransactionEnv()的調用,即增加了初始化事務消息的環境信息
事務消息的環境初始化主要用于初始化Broker回查請求處理的線程池,
在初始化事務消息生產者時我們可以指定初始化對象,如果不指定初始化對象,那么這里會初始化一個單線程的線程池
- shutdown():關閉生產者,回收生產者資源。該方法時啟動方法的逆過程,功能時關閉生產者、銷毀事務環境。銷毀事務環境是指銷毀事務回查線程池,清楚回查任務隊列
生產者發送事務消息主要分為如下兩個階段:
1.發送Half消息的過程
2.發送Commit或Rollback消息
發送Half消息的過程。
事務消息的發送是通過sendMessageInTransaction()方法來完成的
- 第一步,數據校驗,判斷TransactionListener的值是否為null、消息Topic為空檢查、消息體為空檢查等
- 第二步:消息預處理。預處理的主要功能是在消息擴展字段中設置消息類型。MessageConst.PROPERTY_TRANSACTION_PREPARED表示當前消息是事務Half消息。MessageConst.PROPERTY_PRODUCER_GROUP用于設置發送消息的生產者組名,以及設置事務消息的擴展字段
- 第三步:發送事務消息,調用同步發送消息的方法將事務消息發送出去
發送Commit或Rollback消息
在本地事務處理完成后,根據本地事務的執行結果調用DefaultMQProducerImpl.endTransaction()方法
通知Broker進行Commit或Rollback
當前Half消息發送完成后,會返回生產者消息發送到哪個Broker、消息位點是多少、再根據本地事務的執行
結果封裝EndTransactionRequestHeader對象,最后調用MQClientAPIimpl.endTransactionOneway()方法
通知Broker進行Commit或Rollback
- brokerAddr:存儲當前Half消息的Broker服務器的socket地址
- localTransactionState:本地事務執行結果
- transactionId:事務消息的事務id
- endTranactionOneway():以發送oneway消息的方式發送該RPC請求給Broker.