什么是rocketmq事務消息
事務消息是 Apache RocketMQ 提供的一種高級消息類型,支持在分布式場景下保障消息生產和本地事務的最終一致性。
RocketMQ的分布式事務又稱為“半消息事務”。
事務消息處理流程
RocketMQ是靠半消息機制實現分布式事務
事務消息:MQ 提供類似 X/Open XA 的分布事務功能,通過 MQ 事務消息能達到分布式事務的最終一致。
半消息:暫不能投遞的消息,發送方已經將消息成功發送到了 MQ 服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處于該種狀態下的消息即半消息。
半消息回查:由于網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,MQ 服務端通過掃描發現某條消息長期處于“半消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該過程即消息回查。
事務消息交互流程如下圖所示。
1. 生產者將消息發送至Apache RocketMQ服務端。
2. Apache RocketMQ服務端將消息持久化成功之后,向生產者返回Ack確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息。
3. 生產者開始執行本地事務邏輯。
4. 生產者根據本地事務執行結果向服務端提交二次確認結果(Commit或是Rollback),服務端收到確認結果后處理邏輯如下:
??二次確認結果為Commit:服務端將半事務消息標記為可投遞,并投遞給消費者。
??二次確認結果為Rollback:服務端將回滾事務,不會將半事務消息投遞給消費者。
5. 在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到發送者提交的二次確認結果,或服務端收到的二次確認結果為Unknown未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。?說明?服務端回查的間隔時間和最大回查次數,請參見參數限制。
6. 生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
7. 生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。
事務消息生命周期
事務消息
??初始化:半事務消息被生產者構建并完成初始化,待發送到服務端的狀態。
??事務待提交:半事務消息被發送到服務端,和普通消息不同,并不會直接被服務端持久化,而是會被單獨存儲到事務存儲系統中,等待第二階段本地事務返回執行結果后再提交。此時消息對下游消費者不可見。
??消息回滾:第二階段如果事務執行結果明確為回滾,服務端會將半事務消息回滾,該事務消息流程終止。
??提交待消費:第二階段如果事務執行結果明確為提交,服務端會將半事務消息重新存儲到普通存儲系統中,此時消息對下游消費者可見,等待被消費者獲取并消費。
??消費中:消息被消費者獲取,并按照消費者本地的業務邏輯進行處理的過程。此時服務端會等待消費者完成消費并提交消費結果,如果一定時間后沒有收到消費者的響應,Apache RocketMQ會對消息進行重試處理。
??消費提交:消費者完成消費處理,并向服務端提交消費結果,服務端標記當前消息已經被處理(包括消費成功和失敗)。Apache RocketMQ默認支持保留所有消息,此時消息數據并不會立即被刪除,只是邏輯標記已消費。消息在保存時間到期或存儲空間不足被刪除前,消費者仍然可以回溯消息重新消費。
??消息刪除:Apache RocketMQ按照消息保存機制滾動清理最早的消息數據,將消息從物理文件中刪除。
示例
下面是使用 RocketMQ 實現事務的一個例子:
生產者實現事務監聽器:
首先,需要實現一個 RocketMQ 的事務監聽器接口RocketMQLocalTransactionListener
,這個接口定義了在發送和確認事務消息時的回調方法。您需要根據業務邏輯來實現這些方法。
executeLocalTransaction 方法:
這個方法在發送事務消息時被調用,用于執行本地事務。具體步驟如下:
1. 獲取消息中的事務 ID。
2. 根據事務索引來模擬本地事務執行的狀態。
3.?將事務狀態放入?localTrans
?映射中,以備后續?checkLocalTransaction
?方法使用。
根據您的代碼,executeLocalTransaction
?方法中模擬了三種狀態:
??如果狀態為 0,表示本地事務成功,返回?RocketMQLocalTransactionState.COMMIT
,消息將被提交。
??如果狀態為 1,表示本地事務失敗,返回?RocketMQLocalTransactionState.ROLLBACK
,消息將被回滾。
??如果狀態為 2,表示本地事務狀態未知,返回?RocketMQLocalTransactionState.UNKNOWN
。
checkLocalTransaction 方法:
這個方法在消息的確認狀態時被調用,用于檢查本地事務的狀態。具體步驟如下:
-
獲取消息中的事務 ID。
-
根據之前保存在?
localTrans
?映射中的事務狀態,決定消息的提交、回滾或未知。
checkLocalTransaction
?方法會根據之前在?executeLocalTransaction
?方法中保存的狀態來返回相應的事務狀態。
@RocketMQTransactionListener??
public?class?TransactionListenerImpl?implements?RocketMQLocalTransactionListener?{??private?AtomicInteger?transactionIndex?=?new?AtomicInteger(0);??private?ConcurrentHashMap<String,?Integer>?localTrans?=?new?ConcurrentHashMap<String,?Integer>();??@Override??public?RocketMQLocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{??String?transId?=?(String)?msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);??System.out.printf("####?executeLocalTransaction?is?executed,?msgTransactionId=%s?%n",??transId);??int?value?=?transactionIndex.getAndIncrement();??int?status?=?value?%?3;??localTrans.put(transId,?status);??if?(status?==?0)?{??//?Return?local?transaction?with?success(commit),?in?this?case,??//?this?message?will?not?be?checked?in?checkLocalTransaction()??System.out.printf("?#?COMMIT?#?Simulating?msg?%s?related?local?transaction?exec?succeeded!?###?%n",?msg.getPayload());??return?RocketMQLocalTransactionState.COMMIT;??}??if?(status?==?1)?{??//?Return?local?transaction?with?failure(rollback)?,?in?this?case,??//?this?message?will?not?be?checked?in?checkLocalTransaction()??System.out.printf("?#?ROLLBACK?#?Simulating?%s?related?local?transaction?exec?failed!?%n",?msg.getPayload());??return?RocketMQLocalTransactionState.ROLLBACK;??}??System.out.printf("?#?UNKNOW?#?Simulating?%s?related?local?transaction?exec?UNKNOWN!?\n");??return?RocketMQLocalTransactionState.UNKNOWN;??}??@Override??public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{??String?transId?=?(String)?msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);??RocketMQLocalTransactionState?retState?=?RocketMQLocalTransactionState.COMMIT;??Integer?status?=?localTrans.get(transId);??if?(null?!=?status)?{??switch?(status)?{??case?0:??retState?=?RocketMQLocalTransactionState.COMMIT;??break;??case?1:??retState?=?RocketMQLocalTransactionState.ROLLBACK;??break;??case?2:??retState?=?RocketMQLocalTransactionState.UNKNOWN;??break;??}??}??System.out.printf("------?!!!?checkLocalTransaction?is?executed?once,"?+??"?msgTransactionId=%s,?TransactionState=%s?status=%s?%n",??transId,?retState,?status);??return?retState;??}??
}
消費者
@Service??
@RocketMQMessageListener(topic?=?"${demo.rocketmq.transTopic}",?consumerGroup?=?"string_trans_consumer")??
public?class?StringTransactionalConsumer?implements?RocketMQListener<String>?{??@Override??public?void?onMessage(String?message)?{??System.out.printf("-------?StringTransactionalConsumer?received:?%s?\n",?message);??}??
}
這些步驟基本上涵蓋了使用 RocketMQ 實現事務的主要過程。可以根據具體的業務需求和環境進行調整和配置。
總結
使用半消息實現分布式事務在提供分布式事務支持和保證消息傳遞的原子性方面具有優勢,但需要引入MQ并提供查詢事務接口。在選擇是否使用半消息實現分布式事務時,需要根據具體的業務需求和系統性能要求來進行權衡和選擇。