一、Kafka簡介
Kafka是什么
Kafka是一種高吞吐量的分布式發布訂閱消息系統(消息引擎系統),它可以處理消費者在網站中的所有動作流數據。 這種動作(網頁瀏覽,
搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由于吞吐量的要求而通過處理日志和日志聚合來
解決。 對于像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop
的并行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
其實我們簡單點理解就是系統A發送消息給kafka(消息引擎系統),系統B從kafka中讀取A發送的消息。而kafka就是個中間商。
1.1 Kafka的特性:
-
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
-
可擴展性:kafka集群支持熱擴展
-
持久性、可靠性:消息被持久化到本地磁盤,并且支持數據備份防止數據丟失
-
容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
-
高并發:支持數千個客戶端同時讀寫
1.2 Kafka的使用場景:
Kafaka經常用于削峰、解耦、異步。
-
日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
-
消息系統:解耦生產者和消費者、緩存消息等。
-
用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
-
運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。
-
流式處理:比如spark streaming和storm
-
事件源
1.3 Kakfa的設計思想
Kakfa Broker Leader的選舉:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker節點一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper注冊Watch)。這個Controller會監聽其他的Kafka Broker的所有信息,如果這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時所有的kafka broker又會一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上所有的partition在zookeeper上的狀態,并選取ISR列表中的一個replica作為partition leader(如果ISR列表中的replica全掛,選一個幸存的replica作為leader; 如果該partition的所有的replica都宕機了,則將新的leader設置為-1,等待恢復,等待ISR中的任一個Replica“活”過來,并且選它作為Leader;或選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其他的kafka broker。
二、Kafka架構
三、Kafka中的術語解釋概述
Broker【服務器節點】
Kafka 集群包含一個或多個服務器,服務器節點稱為broker。broker存儲topic的數據。
-
如果某topic有N個partition,集群有N個broker,那么每個broker存儲該topic的一個partition。
-
如果某topic有N個partition,集群有(N+M)個broker,那么其中有N個broker存儲該topic的一個partition,剩下的M個broker不存儲該topic的partition數據。
-
如果某topic有N個partition,集群中broker數目少于N個,那么一個broker存儲該topic的一個或多個partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致Kafka集群數據不均衡。
Topic【主題】
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于
一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存于何處)。類似于數據庫的表名。在每個broker上都
可以創建多個topic。
Partition【分區】
-
topic中的數據分割為一個或多個partition。每個topic至少有一個partition。topic的數據數據會寫入到不同的partition。
-
每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。
-
partition中的數據是有序的,不同partition間的數據丟失了數據的順序。
-
如果topic有多個partition,消費數據時就不能保證數據的順序。
-
在需要嚴格保證消息的消費順序的場景下,需要將partition數目設為1。
上面說到數據會寫入到不同的分區,那kafka為什么要做分區呢?相信大家應該也能猜到,分區的主要目的是:
-
方便擴展。因為一個topic可以有多個partition,所以我們可以通過擴展機器去輕松的應對日益增長的數據量。
-
提高并發。以partition為讀寫單位,可以多個消費者同時消費數據,提高了消息的處理效率。
熟悉負載均衡的朋友應該知道,當我們向某個服務器發送請求的時候,服務端可能會對請求做一個負載,將流量分發到不同的服務器,那在kafka中,如果某個topic有多個partition,producer又怎么知道該將數據發往哪個partition呢?
kafka中有幾個原則:
-
partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
-
如果沒有指定partition,但是設置了數據的key,則會根據key的值hash出一個partition。
-
如果既沒指定partition,又沒有設置key,則會輪詢選出一個partition。
保證消息不丟失是一個消息隊列中間件的基本保證,那producer在向kafka寫入消息的時候,怎么保證消息不丟失呢?
那就是通過ACK應答機制!在生產者向隊列寫入數據的時候可以設置參數來確定是否確認kafka接收到數據,這個參數可設置的值為0、1、all。
-
0代表producer往集群發送數據不需要等到集群的返回,不確保消息發送成功。安全性最低但是效率最高。
-
1代表producer往集群發送數據只要leader應答就可以發送下一條,只確保leader發送成功。
-
all代表producer往集群發送數據需要所有的follower都完成從leader的同步才會發送下一條,確保leader發送成功和所有的副本都完成備份。安全性最高,但是效率最低。
最后要注意的是,如果往不存在的topic寫數據,能不能寫入成功呢?kafka會自動創建topic,分區和副本的數量根據默認配置都是1。
Producer【生產者】
生產者即數據的發布者,該角色將消息發布到Kafka的topic中。broker接收到生產者發送的消息后,broker將該消息追加到當前用于追加數據的segment文件中。生產者發送的消息,存儲到一個partition中,生產者也可以指定數據存儲的partition。
Consumer【消費者】
消費者可以從broker中讀取數據。消費者可以消費多個topic中的數據。
Consumer Group【消費者組】
每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group)。
同一個topic下的每個partition中message只能被組(Consumer group )中的一個consumer消費,如果想讓一個message可以被多個consumer消費的話,那么這些consumer必須在不同的Consumer group。所以如果想同時對一個topic做消費的話,啟動多個consumer group就可以了,但是要注意的是,這里的多個consumer的消費都必須是順序讀取partition里面的message,新啟動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣可以多個BET作為consumer去互斥的(for update悲觀鎖)并發處理message,這是因為多個BET去消費一個Queue中的數據的時候,由于要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許同一個consumer group下的一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。如果想多個不同的業務都需要這個topic的數據,起多個consumer group就好了,大家都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。
當啟動一個consumer group去消費一個topic的時候,無論topic里面有多少個partition,無論我們consumer group里面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer thread,那么這個consumer thread也會去消費所有的partition。因此,最優的設計就是,consumer group下的consumer thread的數量等于partition數量,這樣效率是最高的。
- 當consumer group里面的consumer數量小于這個topic下的partition數量的時候,就會出現一個conusmer thread消費多個partition的情況,總之是這個topic下的partition都會被消費。
- 如果consumer group里面的consumer數量等于這個topic下的partition數量的時候,此時效率是最高的,每個partition都有一個consumer thread去消費。
- 當consumer group里面的consumer數量大于這個topic下的partition數量的時候,就會有consumer thread空閑。
多個Consumer Group下的consumer可以消費同一條message,但是這種消費也是以o(1)的方式順序的讀取message去消費,,所以一定會重復消費這批message的,不能向AMQ那樣多個BET作為consumer消費(對message加鎖,消費的時候不能重復消費message)
Leader【領導者】
每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責數據的讀寫的partition。
Follower【跟隨者】
- Follower跟隨Leader,所有寫請求都通過Leader路由,數據變更會廣播給所有Follower,Follower與Leader保持數據同步。
- 如果Leader失效,則從Follower中選舉出一個新的Leader。
- 當Follower與Leader掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)列表中刪除,重新創建一個Follower。
Replica【副本】
每個partition可以在其他的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集群。
存replica副本的方式是按照kafka broker的順序存。
例如有5個kafka broker節點,某個topic有3個partition,每個partition存2個副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replica副本數目不能大于kafka broker節點的數目,否則報錯。這里的replica數其實就是partition的副本總數,其中包括一個leader,其他的就是copy副本)。這樣如果某個broker宕機,其實整個kafka內數據依然是完整的。但是,replica副本數越高,系統雖然越穩定,但是會帶來資源和性能上的下降;replica副本少的話,也會造成系統丟數據的風險。
-
傳送消息:producer先把message發送到partition leader,再由leader發送給其他partition follower(如果讓producer發送給每個replica那就太慢了)。 再向Producer發送ACK前需要保證有多少個Replica已經收到該消息:根據ack配的個數而定。
-
處理某個Replica不工作的情況:如果這個部工作的partition replica不在ack列表中,就是producer在發送消息到partition leader上,partition leader向partition follower發送message沒有響應而已,這個不會影響整個系統,也不會有什么問題。如果這個不工作的partition replica在ack列表中的話,producer發送的message的時候會等待這個不工作的partition replca寫message成功,但是會等到time out,然后返回失敗因為某個ack列表中的partition replica沒有響應,此時kafka會自動的把這個部工作的partition replica從ack列表中移除,以后的producer發送message的時候就不會有這個ack列表下的這個部工作的partition replica了。
-
處理Failed Replica恢復回來的情況:如果這個partition replica之前不在ack列表中,那么啟動后重新受Zookeeper管理即可,之后producer發送message的時候,partition leader會繼續發送message到這個partition follower上。如果這個partition replica之前在ack列表中,此時重啟后,需要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的,出現某個部工作的partition replica的時候自動從ack列表中移除的)。
四、Kafka可視化管理工具
【Kafka可視化工具】kafka-manager
kafka-manager安裝及基本使用
【Kafka可視化工具】Offset Explorer
Kafka-Offset Explorer安裝及基本使用