如何實現一個消息隊列
拆解分析主流的幾種消息隊列
1、基本架構
生產者Producer、消費者Consumer、Broker:生產者發送消息,消費者接受消息,Broker是服務端,處理消息的存儲、備份、刪除和消費關系的維護。
主題和分區:主題(topic)消息分類的標識,分區是主題的物理分割,有助于提高消息隊列的吞吐量。
1.1 kafka:生產者將消息發布到kafka集群(broker)的一個或多個主題(topic),每個topic包含一個或多個分區(partition),消費者從kafka集群中的一個或多個主題消費消息,并將消費的偏移量(offset:分區中每條消息的位置信息,是一個單調遞增且不變的值)提交回kafka以保證消息的順序性和一致性。
kafka集群中,每個分區可以有多個副本,這些副本中包含一個Leader和多個Follower,只有Leader可以處理生產者和消費者的請求,Follower用于數據備份和容錯,當Leader發生故障時,Follower提升為Leader。另外還有一個Zookeeper作為注冊中心,協調服務,維護集群的狀態和元數據信息。
1.2 RocketMQ:除了生產者Producer、消費者Consumer、Broker集群外,有NameServer(名稱服務),負責維護Broker的元數據信息,Producerhr和Comsumer啟動時需要連接到NameServer獲取Broker的地址信息。每個Topic中可以有多個Queue(消息隊列),Producer將消息發送到指定的Queue,Consumer從指定的Queue中l拉取消息。
1.3 RabbitMQ:生產者將消息發布到RabbitMQ的交換器(Exchange),交換器將消息路由到和它綁定(Binding)的隊列(Queue),消費者從隊列中獲取消息。RabbitMQ的Broker就是一個個VHost(可以理解為操作系統的命名空間,里面對各資源進行隔離分組),每個VHost擁有自己的交換器、隊列、綁定和權限設置,相互獨立。
2、基本功能
2.1 消息存儲:一般采用內存或者磁盤,內存讀寫快但可能丟數據;磁盤可以持久化消息但是讀寫速度相對慢一些。
2.2 消息傳遞協議:使用成熟的RPC框架(Dubbo或者Thrift)實現生產者和消費者與Broker之間的通信。
2.3 消息持久化和確認機制:一般做法是將消息存儲在磁盤中,并且在消費者確認消費完成后再刪除消息。
2。4 消息的分發方式:點對點或廣播,點對對是每個消費者只會接收自己訂閱的消息,廣播是每個消費者都會接收到所有消息。
2.5 消息的傳遞方式:輪詢、長連接、長輪詢。一般都是支持推拉結合,或者基于拉實現推。
推消息就是消費者和中間件之間建立TCP長連接或者注冊一個回調,當服務端數據發生變化,立即通過這個長連接或者回調將數據推送給消費者。這樣的話好處就是能保證消息的實時性,但是一旦生產消息過快消息就會堆積在消費者端。
拉消息就是消費者輪詢檢查數據是否有變化,有變化的話就把數據拉過來。好處是消費者可以控制消息的數量和速度,缺陷就是消費者需要不斷輪詢,消息中間件也會因此有一定的壓力。
另外有些生產環境下,不同環境的通信可能是單向的,此時就只能消費者采取拉的方式,因為長連接是雙向通信。
實際使用時,很多中間件是結合使用長連接和輪詢,又稱長輪詢,就是消費者向消息中間件發送一個長輪詢請求,消息中間件如果有消息就直接返回,如果沒有消息不會立即斷開,等待一段時間,在超時時間到達之前有新小心就返回,否則就斷開連接等待下一次長輪詢。比如Kafka和RocketMQ。
3、消息的可靠性
其實主要就是保證消息不丟失,一般做法就是主從復制、集群模式或者分布式架構。
Kafka如何保證消息不丟失
發送端:發送消息時建議使用producer.send(msg,callback)方法。
Producer設置中acks=-1,表示Leader會等待消息被成功寫進所有的ISR副本才認為producer請求成功。retries設置大于0;
Broker端:設置unclean.leader.election.enable = false,表示是否可以把非ISR集合中的副本選舉為Leader副本,如果一個Broker落后原先的Leader太多,那么一旦它成為新的Leader則必然會丟失消息,所以這個參數設置為false。設置 min.insync.replicas > 1,控制的是消息至少要被寫入到多少個副本才算是“已提交”。另外推薦設置成 replication.factor = min.insync.replicas + 1。
消費端:enable.auto.commit=false,采用手動提交位移的方式。
如上操作之后其實我們還是沒法保證消息100%不丟失,首先生產者發送消息后如果kafka掛了,消息還沒寫進日志(同步到磁盤),那么消息會丟失。后續重試時如果生產者也掛了,那就沒人知道這條消息失敗了,也就沒有重試了。其次,Kafka雖然引入了副本的機制,但是如果發生同步延遲,還沒同步主副本就都掛了,那么消息也可能就丟失了。
RocketMQ如何保證消息不丟失
發送端:
同步發送消息的話將保存機制改成同步刷盤,因為Broker默認是先將消息保存在內存中,內存存儲成功就返回結果給生產者,然后通過異步刷盤將消息存儲到磁盤上,這時候如果機器掛了那么消息就可能丟失。
flushDiskType = SYNC_FLUSH
異步發送消息的話就需要生產者重寫SendCallback的onSuccess和onException方法,用于Broker回調,方法中實現消息的確認和重發。
除此之外,RocketMQ集群部署通常采用的一主多從,并且采用主從同步方式做數據復制。Master在將數據同步到Slave節點后,再返回給生產者確認結果。
brokerRole = SYNC_MASTER
消費端:在業務邏輯的最后加上 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS。
RabbitMQ如何保證消息不丟失
首先,在消息從生產者到交換機Exchange和Exchange到Queue的過程中,為了保證消息發送成功,有兩種方案:
一種是confirm機制,一種是事務機制。
confirm機制即注冊回調來監聽,開啟Publisher Confirm(確保消息被Exchange成功接收和處理)和Publisher Return(處理消息在無法路由到隊列時的異常)。
事務機制主要了解三個方法就行:
toSelect:將當前channel設置成transaction模式。
toCommit:提交事務。
toRollback:回滾事務。
其次,RabbitMQ接收到消息之后,消息也是先暫存內存,所以為了避免消息丟失,需要考慮的就是一個可靠持久化機制。
隊列和交換機的持久化:設置durable參數為true來創建持久化交換機、持久化隊列以及持久化綁定關系。
持久化消息:設置消息的deliveryMode為2來創建持久化消息,RabbitMQ才會將消息寫入磁盤。
消費端同樣的有相應的確認機制,消費者處理消息成功之后可以向MQ發送ack回執,MQ收到ack后才會刪除消息。處理消息有異常則返回nack回執,MQ收到后可以重發消息,如果一直收不到返回則也會重試。
消息丟失解決方案
kafka、RocketMQ和RabbitMQ單靠自己其實都無法100%保證消息不丟失,針對消息可能的丟失我們可以引入一些其他機制,比如分布式事務、本地消息表等。
分布式事務:
就是保證數據的一致性(所有參與者在一次寫操作過程中要么都成功要么都失敗),可分強一致性和最終一致性。
強一致性引入一個協調者,方案包括基于XA的二階段(2PC)及三階段提交(3PC)。2PC可以理解為第一階段是協調者先詢問參與者是可以發起事務提交操作,若參與者可以執行事務提交,那么就是進行事務操作,只是執行完沒有還沒commit或者rollback,如果參與者成功執行事務操作就返回YES,沒成功就返回NO。第二階段就是協調者接收到所有參與者的YES的反饋后,就給參與者發送commit請求,如果有反饋為NO,就發送rollback請求。然后參與者將ACK結果返回給協調者。
2PC最關鍵的一個問題就是在第二階段,如果參與者和協調者都掛了,那么就可能出現數據不一致的問題。因此引入3PC(CanCommit,PreCommit,doCommit),就是將2PC的第一階段中的事務操作也分離出來。3PC的問題就是如果由于網絡原因,參與者在等待超時后就會commit,這樣可能就與其他接收到abort命令執行回滾的參與者不一致了。
最終一致性:方案是基于可靠消息的最終一致性(本地消息表、事務消息)、最大努力通知以及TCC。
基于本地消息表實現分布式事務,這個方案主要思路其實就是將分布式事務拆為本地事務和消息事務。參與者A發送消息前先創建一個本地消息,在參與者A的DB中寫入本地業務數據和本地消息數據,兩者在一個事務中,這樣業務成功則本地消息也一定寫入了。然后參與者A基于本地消息調用MQ發送遠程消息,參與者B接收后做業務處理且成功之后再聯動修改本地消息的狀態。這個流程中如果參與者A消息發送MQ失敗,那么就可以通過定時任務掃描本地消息數據,對未成功的消息進行重新投遞。如果是MQ發送消息失敗,那么MQ的重試機制也就派上用場了。如果是最終修改本地消息狀態失敗,那么起碼現在分布式系統中的業務數據是一致了,只是本地消息的狀態不對,這種情況可以借助定時任務重新投消息,下游冪等消費再重新更改消息狀態,或者本系統通過定時任務主動去查詢下游系統的狀態,如果已經成功則直接修改消息狀態。
基于事務消息實現分布式事務,參考RocketMQ的事務消息實現,參與者A先向RocketMQ Broker發送一條half消息(半消息),半消息存儲在Broker的事務消息日志中,半消息發送成功后參與者A執行本地事務,如果A執行本地事務成功則通知RocketMQ Broker提交事務消息,消息狀態從prepared改為committed,消費者可以接收消息。如果本地事務失敗則A通知RocketMQ Broker回滾事務消息,消息從事務日志刪除。這個過程中如果RocketMQ Broker沒有接收到A執行本地事務的結果那么就會進行回查,A自查后返回自查結果,如果在規定時間沒有結果那么消息就變味unknow狀態,此時A如果有了結果還可以向MQ發送commit或者rollback,但是如果一直沒有結果,過期時間一到MQ就自動回滾事務消息,將其從事務消息日志中刪除。
TCC就是Try-Confirm-Cancel,將分布式事務分解為若干小事務,每個事務都有Try、Confirm和Cancel三個操作。try階段參與者執行本地事務,并對全局事務預留資源,返回執行標識。所有參與者都返回成功則協調者通知所有參與者提交事務,即confirm階段,參與者在本地提交事務,并釋放全局事務資源。如果任一參與者try階段返回失敗則協調者通知所有參與者回滾。這里面就會有兩個問題:空回滾和事務懸掛。空回滾就是try沒成功也要執行回滾,注意處理邏輯。事務懸掛就是由于網絡原因可能某個節點的try還沒收到,而其他節點觸發了cancel,然后這個節點先收到cancel進行了空回滾之后又收到了try并執行了,那么這個節點的try占用的資源就沒法釋放。解決方案就是引入一張分布式事務記錄表,每個參與者都可以在本地事務的執行過程中同時記錄一次分布式事務的操作記錄。
除上述方案還有分布式事務的組件,如Seata。Seata包含三部分:Transaction Coordinator(TC),Transaction Manager?,Resource Mabager(RM),TC維護整個事務的全局狀態,負責通知RM進行提交或回滾;TM可視為微服務中的聚合服務,開啟一個全局事務或者提交或回滾一個全局事務;RM可對應微服務架構中的某個微服務,對應一個事務分支,負責執行事務分支的操作。TM接收到用戶請求后調TC開啟全局事務并從TC獲得一個XID;TM通過RPC/Restful調用各RM并把XID傳遞過去;各RM接收到XID,在TC注冊事務分支;TM根據所有調用全部完成后的狀態確定是Commit還是Rollbask,將結果通知TC。TC協調各RM進行Commit或者Rollbask。
4、消息的高性能
性能這塊可以參考kafka的設計,引入一些批量操作、順序寫入和零拷貝之類的技術。
消息發送
批量發送、異步發送、消息壓縮、并行發送(數據分布在不同的分區,生產者并行發送消息)。
消息存儲
1、零拷貝:一次IO流程可以簡單概括有磁盤數據copy到內核緩沖區(頁緩存),內核態中的數據copy到用戶態中,用戶態數據copy到內核態中(socket緩存),內核態緩沖區數據copy到網卡中。零拷貝就是通過各種技術來減少數據copy的次數或者說減少CPU參與數據拷貝的次數。
實現方式有mmap、sendfile、dma、directI/O等/
2、磁盤順序寫入
3、頁緩存
4、系數索引:kafka存儲消息是通過分段的日志文件,每個分段有自己的索引文件。
5、分區和副本:kafka采用分區和副本的機制,可以將數據分散到多個節點上進行處理。具體可以了解下ISR機制,即同步副本。kafka中每個主題可以有多個副本,ISR是與主副本保持同步的副本集合。當消息寫入Kafka的分區時,首先會寫入Leader,然后Leader將消息復制給ISR中的所有副本,只有當ISR中的所有副本都成功接收到并確認了消息之后,主副本才會認為消息已成功提交。
消息消費
消費者群組、并行消費、批量拉取
5、擴展功能
順序消息
kafka順序消息
kafka的一個topic下有多個partition,當生產者向某個partiton發送消息時,消息被追加到該partiton的日志文件中,并且分配一個唯一的offset,文件讀寫是有序的。當消費者從該分區消費消息時,會從該分區最早的offset開始讀取消息。所以同一個partiton下的消息是有序的。
所以想要實現消息順序消費,那么一個topic下只創建一個partition,或者消息被發送到同一個partiton。
要想實現消息發送到同一個partition,可以了解下DefaultPartitoner這個類,實現方式有三種:
一是在key為null的話直接指定partiton。
二是指定key,這樣同樣的ykey經過hash之后還是會指向同一個partiton編號。
三是自己寫一個分區器類,實現Partitoner接口,重寫partition方法,在生產者的配置中指定使用自己寫的分區器類。
RocketMQ的順序消息
RocketMQ是基于隊列的順序消費,同一個隊列的消息可以做到有序。
生產者需要同步發送消息,并且在send方法中傳入一個MessageQueueSelector,這個MessageQueueSelector中需要實現一個select方法,用來定義要把消息發送到哪個MessageQueue。
消息有序進入同一個隊列之后,要保證順序消費,需要加三把鎖,先鎖定Broker上的MessageQueue,確保消息只會投遞給一個消費者,對本地的MessageQueue加鎖,確保只有一個線程能處理這個消息隊列,對存儲消息的ProcessQueue加鎖,確保在重平衡的過程中不會出現重復消費。值得注意的是,多次加鎖雖然能做到順序消費,但這無疑會降低系統的吞吐量,可能會導致消息阻塞。
延遲消息
RabbitMQ延遲消息
死信隊列:給消息設置一個TTL,到期后消息進入死信隊列,監聽死信隊列消費消息。存在問題是可能造成對頭阻塞。因為RabbitMQ只定期掃描隊頭消息是否過期,如果隊頭消息沒過期,隊列中的消息即使過期了也不會進入死信隊列,一直被阻塞。另外這個方案實現也比較麻煩。
RabbitMQ插件: 對版本有要求,3.6.12版本開始支持的。基于rebbitmq_delayded_message_exchange插件,消息不是在隊列中,而是一個基于Erlang開發的Mnesia數據庫中,通過一個定時器去查詢需要被投遞的消息,投遞到x-delayed-message交換機中。這個插件支持的最大延遲時間有限。
RocketMQ延遲消息
基于Timer定時器:先將消息存處在內存上,叨叨指定時間后再寫入磁盤。
基于時間輪(5.0版本):將消息按照過期時間放置在不同的槽位,到達過期時間就將該槽位的所有消息投遞給消費者。
事務消息
參考上文解決消息丟失中的RocketMQ的事務消息。
重復消費
kafka如何防止重復消費
1、kafka中消費者必須至少加入一個消費者組,同組消費者共享消費者的負載,因此只要有一個消費者在消費某條消息,其他消費者就不會接收這個消息。
2、手動提交位移控制+處理結果去重。
3、客戶端做冪等控制:一鎖二判三更新之類。
4、Kafka的Exactly-Once消費語義(生產者開啟冪等+事務 或者 消費者端精確控制)
RabbitMQ如何防止重復消費
根據發送消息時設置的唯一標識在消費者端做冪等控制。
消息堆積
一般是因為客戶端本地消費過程中消費時間過長或者消費并發小。
如上解決方案有:
增加消費者數量,提升消費速度(引入線程池,本地存儲消息即返回成功后續慢慢消費等)、清理過期消息(評估過期消息和一些一直無法成功的消息是否可清理),調整一些關于參數比如隊列數、消息拉取間隔時間等(具體根據MQ類型修改調試)。