設計一套Kafka到RocketMQ的雙寫+雙讀技術方案,實現無縫遷移!
- 1、背景
- 2、方案
- 3、具體邏輯
1、背景
假設你們公司本來線上的MQ用的主要是Kafka,現在要從Kafka遷移到RocketMQ去,那么這個遷移的過程應該怎么做呢?應該采用什么樣的技術方案來做遷移呢?
2、方案
介紹一個MQ集群遷移過程中的雙寫+雙讀技術方案。
3、具體邏輯
簡單來說,如果你要做MQ集群遷移,是不可能那么的簡單粗暴的,因為你不可能說在某一個時間點突然之間就說把所有的Producer系統都停機,然后更新他的代碼,接著全部重新上線,然后所有Producer系統都把消息寫入到RocketMQ去了
一般來說,首先你要做到雙寫,也就是說,在你所有的Producer系統中,要引入一個雙寫的代碼,讓他同時往Kafka和RocketMQ中去寫入消息,然后多寫幾天,起碼雙寫要持續個1周左右,因為MQ一般都是實時數據,里面數據也就最多保留一周。
當你的雙寫持續一周過后,你會發現你的Kafka和RocketMQ里的數據看起來是幾乎一模一樣了,因為MQ反正也就保留最近幾天的數據,當你雙寫持續超過一周過后,你會發現Kafka和RocketMQ里的數據幾乎一模一樣了。
但是光是雙寫還是不夠的,還需要同時進行雙讀,也就是說在你雙寫的同時,你所有的Consumer系統都需要同時從Kafka和RocketMQ里獲取消息,分別都用一模一樣的邏輯處理一遍。
只不過從Kafka里獲取到的消息還是走核心邏輯去處理,然后可以落入數據庫或者是別的存儲什么的,但是對于RocketMQ里獲取到的消息,你可以用一樣的邏輯處理,但是不能把處理結果具體的落入數據庫之類的地方。
你的Consumer系統在同時從Kafka和RocketMQ進行消息讀取的時候,你需要統計每個MQ當日讀取和處理的消息的數量,這點非常的重要,同時對于RocketMQ讀取到的消息處理之后的結果,可以寫入一個臨時的存儲中。
同時你要觀察一段時間,當你發現持續雙寫和雙讀一段時間之后,如果所有的Consumer系統通過對比發現,從Kafka和RocketMQ讀取和處理的消息數量一致,同時處理之后得到的結果也都是一致的,此時就可以判斷說當前Kafka和RocketMQ里的消息是一致的,而且計算出來的結果也都是一致的。
這個時候就可以實施正式的切換了,你可以停機Producer系統,再重新修改后上線,全部修改為僅僅寫RocketMQ,這個時候他數據不會丟,因為之前已經雙寫了一段時間了,然后所有的Consumer系統可以全部下線后修改代碼再上線,全部基于RocketMQ來獲取消息,計算和處理,結果寫入存儲中。