RabbitMQ
stock.#.nyse ,#匹配多個字符,*匹配一個字符。
Confirm Callback 到達exchange的回調。
Return Callback 到達queue失敗的回調。
Kafka
Kafka生產端
分區器:
1.直接指定partition 指定0,1。
2.設置hashkey,計算key的hash值進行取模分區。
3.不設置分區鍵,采用粘性發送,即往某個分區發送至batchSize16K大小后切換分區。
RecordAccumulator(阿q米leite)生產內存池
DQuene每個partition對應一個。
batch.size,默認16k。配合linger.ms 10ms ,就是16k一批次發送,最多延遲10ms。
配合linger.ms默認0ms,不進行延遲,即來一條發一條。
sender 線程默認往每個broker發送5條緩存消息,即沒有接受到broker的應答,也能發5條。
broker 采用0 1 -1 的應答策略,
默認-1 等待所有的主從節點全部落盤,才去應答。
0 不需要任何落盤應答,直接確認。
1 主節點落盤就直接確認應答。
若應答失敗,則會無限重試。
副本分區策略。
創建一個T topic 三分區。
三節點 A B C
0 0’ 0’
1’ 1 1’
2’ 2’ 2
0為主分區lead 0’為副本
消費端
enable.auto.commit 默認true ,設置為false ,開啟手動ack模式。
fetch.min.bytes 默認1B 設置為 128KB
fetch.max.wait.ms 默認500ms
Max.poll.records 最大拉取批次條數,默認500條,設置為1000條。
每個消費者都會和coordinator保持心跳(默認3s),一旦超時
(session.timeout.ms=45s),該消費者會被移除,并觸發再平衡;
或者消費者處理消息的時間過長(max.poll.interval.ms 5分鐘),也
會觸發再平衡
默認采用Range消費者分區策略
10 P 3 C
先平均分配,多的給第一個
123 10 456 789
earliest與latest的區別
earliest:
新消費者會讀取分區中從起始位置到當前最開始偏倚位置的所有消息。
latest:
新消費者會讀取分區中從起始位置到當前所有消費者最后位置。
消息冪等
1.首先是生產端發生至kafka broker
PID 每次重啟kafka則更換,Partition分區號,SeqNumber 區內順序號。
<PID, Partition, SeqNumber>
冪等性只能保證的是在單分區單會話內不重復
enable.idempotence 默認為 true
kafka Brocker不接受已經接受的順序號
2.kafka Brocker發送至 消費者端
kafka在同一消費者組,只會把同一分區的消息發送至一個消費者實例,但是由于消費者組一個實例宕機,觸發再平衡機制,可能會重新發送消息至消費者端,故消費者端也需要根據主鍵冪等判斷。
消息順序
只能保證單分區順序性。
broker接受到亂序的消息之后,會等待順序的消息,并且進行重新排序。比如Brocker 接受到1 2 4,會等到接受到3之后排好序后落盤。
max.in.flight.requests.per.connection 需要設置小于等于5 默認配置就是5
enable.idempotence 默認為 true
文件存儲機制
每個Segment 1 G 大小,每4KB存儲一條索引,指定offset查詢時,先查找索引的兩端范圍,再去查詢log文件位置。
默認7天過期、以每個Segment 最大的消息時間判斷是否過期,5分鐘檢查一次。
改為30天,30分鐘檢查一次,甚至對于重要消息,可以設置永不過期,在根據消費到的offset,進行手動消費。
Kafka為什么快
1.
零拷貝
減少了兩次內核態和用戶態的文件傳輸。
2.順序磁盤寫,減少磁頭尋址時間。
3.采用LZ4,Gzip等壓縮算法,減少包大小。
4.分區并行。
本地消息表
msgid(全消息唯一id)
topic(stock.relase)
status(狀態 1.未消費2.已消費3.異常重試4.錯誤5.已存在)
erromsg錯誤消息內容
e_count(異常重試次數)
e_threshold(錯誤消息閾值)
group_flag(是否是順序消息)
groupid(順序消息組號)
grouporder(順序消息組內順序號)
發送時間
消費時間
無論如何都進行,確認的ack,不然像kafka這種,uack跳過某一條消息,之后有確認了后續的消息,offset直接就移動到了ack的位置,相當于錯誤的那條消息丟了,很嚴重的。
接受消息時都去判斷,msgid是否存在,若存在,則不消費邏輯,直接已存在。若不存在,正常消費,處理完成之后標記為已消費。
若出現了異常(db超市,遠程調用異常等),標記為異常重試。
后臺定時任務,每隔5分鐘,走索引掃描異常重試的消息,二次發送至mq。
當異常重試次數達到設定的錯誤閾值2次時,直接標記改消息錯誤,記錄異常。(此時大概率是代碼邏輯的健壯性不夠導致的,比如非法的參數長度,sql進不去,或者是數組越界,或者入參沒校驗空值,出了空指針)重試再多次都不行。有個錯誤消息的前端頁面,可以直接觀測到。
順序消息,通過是否是順序消息、消息組號和組內順序號去確認,當前的消息是否應該被消費,若是第二條,則查詢庫中第一條的消費狀態,若不存在或者異常為消費,則直接入庫,等待第一條消息消費完,查詢后一條的是否存在進行消費。
當然,本地消費表只針對業務型mq。像rocketmq或者kafka有自己的策略。而且數據的并發肯定更不上mq的上限。