Broker提交或回滾事務消息
當生產者本地事務處理完成并且Broker回查事務消息后,不管執行Commit還是Rollback,都會根據用戶本地事務的執行結果發送一個End_transaction的RPC請求給Broker,Broker端處理該請求的類是EndTransactionProcessor
第一步,End_Transaction請求校驗,主要檢查項如下
1.Broker角色檢查。Slave Broker不處理事務消息
2.事務消息類型檢查。EndTransactionProcessor只處理
Commit或Rollback類型的事務消息,其余消息不處理,這里區分了事務回查
第二步,進行Commit或Rollback。
根據生產者請求頭中的參數判斷,是Commit請求還是Rollback請求,然后分別進行處理
commitMessage()
提交Half消息/這是事務消息服務接口中的一個方法,根據消息位點查詢了Half消息,并將Half消息返回
checkPrepareMessage()
Half消息數據校驗。校驗內容包括發送消息的生產者組與當前執行Commit/Rollback的生產者是否一致,當前Half消息是否與請求Commit/Rollback的消息是否是同一條消息
endMessageTransaction()
消息對象類型轉化,將MessageExt對象轉化為MessageExtBrokerInner對象,并且還原消息之前的Topic和ConsumeQueue等信息
sendFinalMessage()
將還原后的事務消息最終發送到CommitLog中,一旦發送成功,消費者就可以正常拉取消息并消費
deletePrepareMessage()
在sendFinalMessage()執行成功后,刪除Half消息。其實RocketMQ是不能真正刪除消息的,其實質是順序寫磁盤,相當于做了一個"假刪除"。"假刪除"通過putOpMessage()方法將消息保存到TransactionMessageUtil.
buildOpTopic()的Topic中,并且做上標記TransactionalMessageUtil.REMOVETAG,表示消息已刪除
- 如果消息被標記為已刪除,則調用addRemoveTagInTransactionOp()方法,利用標記為已刪除的OP消息構造Message消息對象,并且調用存儲方法保存消息
- TransactionalMessageUtil.buildOpTopic()方法跟保存Half消息時的邏輯類似
- Half消息保存在名為MixAll.RMQ_SYS_TRANS_HALF_TOPIC的Topic中,執行Commit和Rollback后的消息都保存在MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC
對象中,以便Broker判斷是否需要回查生產者事務的執行狀態。
- 調用存儲層方法,真正地將OP消息保存到了CommitLog中
Rollback實現邏輯
Rollback并沒有真正刪除消息,而是標記Half消息為刪除,在Broker回查時機會跳過不檢查
rollbackMessage()
該方法與CommitMessage()方法一樣,都是查詢Half消息并返回消息對象。
checkPrepareMessage()
消息校驗,與Commit調用的是同一個方法
deletePrepareMessage()
刪除Half消息,與Commit調用的是同一個方法