Kafka[1]是linkedin用于日志處理的分布式消息隊列,linkedin的日志數據容量大,但對可靠性要求不高,其日志數據主要包括用戶行為(登錄、瀏覽、點擊、分享、喜歡)以及系統運行日志(CPU、內存、磁盤、網絡、系統及進程狀態)。
?
當前很多的消息隊列服務提供可靠交付保證,并默認是即時消費(不適合離線)。高可靠交付對linkedin的日志不是必須的,故可通過降低可靠性來提高性能,同時通過構建分布式的集群,允許消息在系統中累積,使得kafka同時支持離線和在線日志處理。
?
注:本文中發布者(publisher)與生產者(producer)可以互換,訂閱者(subscriber)與消費者(consumer)可以互換。
?
Kafka的架構如下圖所示:
Kafka存儲策略
1.??kafka以topic來進行消息管理,每個topic包含多個part(ition),每個part對應一個邏輯log,有多個segment組成。
2.??每個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
3.??每個part在內存中對應一個index,記錄每個segment中的第一條消息偏移。
4.??發布者發到某個topic的消息會被均勻的分布到多個part上(隨機或根據用戶指定的回調函數進行分布),broker收到發布消息往對應part的最后一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發布時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,segment達到一定的大小后將不會再往該segment寫數據,broker會創建新的segment。
發布與訂閱接口

發布消息時,kafka client先構造一條消息,將消息加入到消息集set中(kafka支持批量發布,可以往消息集合中添加多條消息,一次行發布),send消息時,client需指定消息所屬的topic。
訂閱消息時,kafka client需指定topic以及partition num(每個partition對應一個邏輯日志流,如topic代表某個產品線,partition代表產品線的日志按天切分的結果),client訂閱后,就可迭代讀取消息,如果沒有消息,client會阻塞直到有新的消息發布。consumer可以累積確認接收到的消息,當其確認了某個offset的消息,意味著之前的消息也都已成功接收到,此時broker會更新zookeeper上地offset registry(后面會講到)。
?
高效的數據傳輸
1.??發布者每次可發布多條消息(將消息加到一個消息集合中發布), sub每次迭代一條消息。
2.??不創建單獨的cache,使用系統的page cache。發布者順序發布,訂閱者通常比發布者滯后一點點,直接使用linux的page cache效果也比較后,同時減少了cache管理及垃圾收集的開銷。
3.??使用sendfile優化網絡傳輸,減少一次內存拷貝。
?
無狀態broker
1.??Broker沒有副本機制,一旦broker宕機,該broker的消息將都不可用。
2.??Broker不保存訂閱者的狀態,由訂閱者自己保存。
3.??無狀態導致消息的刪除成為難題(可能刪除的消息正在被訂閱),kafka采用基于時間的SLA(服務水平保證),消息保存一定時間(通常為7天)后會被刪除。
4.??消息訂閱者可以rewind back到任意位置重新進行消費,當訂閱者故障時,可以選擇最小的offset進行重新讀取消費消息。
?
Consumer group
1.?允許consumer group(包含多個consumer,如一個集群同時消費)對一個topic進行消費,不同的consumer group之間獨立訂閱。
2.?為了對減小一個consumer group中不同consumer之間的分布式協調開銷,指定partition為最小的并行消費單位,即一個group內的consumer只能消費不同的partition。
?
Zookeeper 協調控制
1.?管理broker與consumer的動態加入與離開。
2.?觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一
? ?個consumer group內的多個consumer的訂閱負載平衡。
3.? 維護消費關系及每個partion的消費信息。
Zookeeper上的細節:
1.?每個broker啟動后會在zookeeper上注冊一個臨時的broker registry,包含broker的ip地址和端口號,所存儲的topics和partitions信息。
2.?每個consumer啟動后會在zookeeper上注冊一個臨時的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。
3.?每個consumer group關聯一個臨時的owner registry和一個持久的offset registry。對于被訂閱的每個partition包含一個owner registry,內容為訂閱這個partition的consumer id;同時包含一個offset registry,內容為上一次訂閱的offset。
?
消息交付保證
1.?kafka對消息的重復、丟失、錯誤以及順序型沒有嚴格的要求。
2.?kafka提供at-least-once delivery,即當consumer宕機后,有些消息可能會被重復delivery。
3.?因每個partition只會被consumer group內的一個consumer消費,故kafka保證每個partition內的消息會被順序的訂閱。
4.?Kafka為每條消息為每條消息計算CRC校驗,用于錯誤檢測,crc校驗不通過的消息會直接被丟棄掉。
?
Linkedin的應用環境
如下圖,左邊的應用于日志數據的在線實時處理,右邊的應用于日志數據的離線分析(現將日志pull至hadoop或DWH中)。
?
?
Kafka的性能
?
測試環境: 2 Linux machines, each with 8 2GHz cores,? 16GB? of? memory,? 6? disks? with? RAID? 10.? The? two machines? are? connected? with? a? 1Gb? network? link.? One? of? the machines was used as the broker and the other machine was used as the producer or the consumer.
?
測試評價(by me):(1)環境過于簡單,不足以說明問題。(2)對于producer持續的波動沒有進行分析。(3)只有兩臺機器zookeeper都省了??
?
測試結果:如下圖,完勝其他的message queue,單條消息發送(每條200bytes),能到50000messages/sec,50條batch方式發送,平均為400000messages/sec.
Kafka未來研究方向
1. 數據壓縮(節省網絡帶寬及存儲空間)
2. Broker多副本
3. 流式處理應用
參考資料
【1】??http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
【2】??https://cwiki.apache.org/KAFKA/kafka-papers-and-presentations.data/Kafka-netdb-06-2011.pdf