kafka理論學習匯總

基礎知識


基本簡介

Kafka 是一個分布式流式處理平臺,是一種分布式的,基于發布/訂閱的消息系統。

Kafka特點:

  1. 1. 同時為發布和訂閱提供高吞吐量
    Kafka 的設計目標是以時間復雜度為 O(1) 的方式提供消息持久化能力,即使對 TB 級以上數據也能保證常數時間的訪問性能。即使在非常廉價的商用機器上也能做到單機支持每秒 100K 條消息的傳輸。

  2. 2. 消息持久化
    將消息持久化到磁盤,因此可用于批量消費,例如 ETL 以及實時應用程序。通過將數據持久化到硬盤以及 replication 防止數據丟失。

  3. 3. 分布式
    支持 Server 間的消息分區及分布式消費,同時保證每個 partition 內的消息順序傳輸。這樣易于向外擴展,所有的 producer、broker 和 consumer 都會有多個,均為分布式的。無需停機即可擴展機器。

  4. 4. 消費消息采用 pull 模式
    消息被處理的狀態是在 consumer 端維護,而不是由 server 端維護,broker 無狀態,consumer 自己保存 offset。

  5. 5. 支持 online 和 offline 的場景
    同時支持離線數據處理和實時數據處理。

基礎概念


在這里插入圖片描述

1. Broker

Kafka 集群中的一臺或多臺服務器統稱為 Broker,多個 Kafka Broker 組成一個 Kafka Cluster。

2. Producer

消息和數據的生產者,可以理解為往 Kafka 發消息的客戶端

3. Consumer

消息和數據的消費者,可以理解為從 Kafka 取消息的客戶端

4. Topic

每條發布到 Kafka 的消息都有一個類別,即為 Topic 。

Producer 將消息發送到指定的Topic,Consumer 通過訂閱特定的Topic來消費消息。

物理上不同 Topic 的消息分開存儲。一個 Topic 的消息可以保存于一個或多個broker上,但用戶只需指定消息的 Topic 即可生產或消費數據。

5. Partition

是Topic 物理上的分組,一個 Topic 可以分為多個 Partition ,同一 Topic 下的 Partition 可以分布在不同的 Broker 上,這也說明一個 Topic 可以橫跨多個 Broker 。

每個 Partition 是一個有序的隊列,Partition 中的每條消息都會被分配一個有序的 id(offset),所有的partition當中的數據全部合并起來,就是一個topic當中的所有的數據。

每一個分區內的數據是有序的,但全局的數據不能保證是有序的。(有序是指生產什么樣順序,消費時也是什么樣的順序)

生產者生產的每條消息只會被發送到一個分區中,也就是說如果向一個雙分區的主題發送一條消息,這條消息要么在分區 0 中,要么在分區 1 中。

6. Consumer Group

每個 Consumer 屬于一個特定的 Consumer Group,若不指定則屬于默認的 Group。 這是 Kafka 用來實現一個 Topic 消息的廣播(發給所有的 Consumer )和單播(發給任意一個 Consumer )的手段。

一個 Topic 可以有多個 Consumer Group。Topic 的消息會復制(不是真的復制,是概念上的)到所有的 Consumer Group,但每個 Consumer Group 只會把消息發給該 Consumer Group 中的一個 Consumer。

如果要實現廣播,只要讓每個 Consumer 有一個獨立的 Consumer Group 即可。如果要實現單播只要所有的 Consumer 在同一個 Consumer Group 。

用 Consumer Group 還可以將 Consumer 進行自由的分組而不需要多次發送消息到不同的 Topic 。

從分區的角度看,每個分區只能由同一個消費組內的一個消費者來消費,可以由不同的消費組來消費,partition數量決定了每個consumer group中并發消費者的最大數量。

消費某一主題的一個消費組下的消費者數量,應該小于等于該主題下的分區數。

如:某一個主題有4個分區,那么消費組中的消費者應該小于等于4,而且最好與分區數成整數倍 1 2 4 這樣。同一個分區下的數據,在同一時刻,不能由同一個消費組的不同消費者消費。

7. 備份機制

就是把相同的數據拷貝到多臺機器上,而這些相同的數據拷貝被稱為副本。

Kafka定義了兩類副本:領導者副本和追隨者副本,前者對外提供服務,即與客戶端程序進行交互;而后者從 leader 副本中拉取消息進行同步,不與外界進行交互。

8. segment文件

一個partition由多個segment文件組成,每個segment文件,包含多類文件,比較主要的有兩部分,分別是 .log 文件和 .index 文件。

其中 .log 文件包含了我們發送的數據存儲,而.index 文件,記錄的是我們.log文件的數據索引值,以便于我們加快數據的查詢速度,.index文件中元數據指向對應 .log 文件中message的物理偏移地址。

index文件采用了稀疏存儲的方式,對每隔一定字節的數據建立一條索引。 這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內存中。 但缺點是沒有建立索引的Message也不能一次定位到其在數據文件的位置,從而需要做一次順序掃描。

生產者分區以及策略


Kafka的消息組織方式實際上是三級結構:主題-分區-消息。

主題下的每條消息只會保存在某一個分區中,不同的分區能夠被放置到不同的機器上。而數據的讀寫操作也都是針對分區這個粒度而進行的,這樣每個節點的機器都能獨立地執行各自分區的讀寫請求處理,并且,我們還可以通過添加新的節點機器來增加整體系統的吞吐量。

其實分區的作用就是提供負載均衡的能力,或者說對數據進行分區的主要原因,就是為了實現系統的高伸縮性(Scalability)。

分區策略

自定義分區策略

在編寫生產者程序時,可以編寫一個具體的類實現org.apache.kafka.clients.producer.Partitioner接口。

這個接口只定義了兩個方法:partition()close(),通常只需要實現partition()方法。

同時設置partitioner.class參數為實現類的Full Qualified Name,那么生產者程序就會按照自定義的代碼邏輯對消息進行分區

輪詢策略

輪詢策略有非常優秀的負載均衡表現,它總是能保證消息最大限度地被平均分配到所有分區上,是最常用的分區策略之一。

如果一個主題下有3個分區,那么第一條消息被發送到分區0,第二條被發送到分區1,第三條被發送到分區2,以此類推。

隨機策略

所謂隨機就是將消息放置到任意一個分區上。

如果要實現隨機策略版的partition方法,需要兩行代碼:

List partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分區數,然后隨機地返回一個小于它的正整數。

按消息鍵保序策略

Kafka允許為每條消息定義消息鍵,簡稱為Key。

Key可以是一個有著明確業務含義的字符串,比如客戶代碼、部門編號或是業務ID等;也可以用來表征消息元數據。

一旦消息被定義了Key,就可以保證同一個Key的所有消息都進入到相同的分區里面,由于每個分區下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略。

實現這個策略的partition方法只需要下面兩行代碼:

List partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

Kafka默認分區策略實際上同時實現了兩種策略:如果指定了Key,那么默認實現按消息鍵保序策略;如果沒有指定Key,則使用輪詢策略。

基于地理位置的分區策略

這種策略一般只針對那些大規模的Kafka集群,特別是跨城市、跨國家甚至是跨大洲的集群。

可以根據Broker所在的IP地址實現定制化的分區策略。比如下面這段代碼:

List partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

生產者壓縮算法


在Kafka中,壓縮可能發生在兩個地方:生產者端和Broker端。

生產者程序中配置compression.type參數即表示啟用指定類型的壓縮算法。

比如下面這段程序代碼展示了如何構建一個開啟GZIP的Producer對象:

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("acks", "all"); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 開啟GZIP壓縮 
props.put("compression.type", "gzip"); 
Producer producer = new KafkaProducer<>(props);

這樣Producer啟動后生產的每個消息集合都是經GZIP壓縮過的,故而能很好地節省網絡傳輸帶寬以及Kafka Broker端的磁盤占用。

有兩種例外情況就可能讓Broker重新壓縮消息:

「情況一:Broker端指定了和Producer端不同的壓縮算法。」

在Broker端設置了不同的compression.type值,可能會發生預料之外的壓縮/解壓縮操作,通常表現為Broker端CPU使用率飆升。

「情況二:Broker端發生了消息格式轉換。」

所謂的消息格式轉換主要是為了兼容老版本的消費者程序。

在一個生產環境中,Kafka集群可能同時存在多種版本的消息格式。為了兼容老版本的格式,Broker端會對新版本消息執行向老版本格式的轉換。

這個過程中會涉及消息的解壓縮和重新壓縮。一般情況下這種消息格式轉換對性能是有很大影響的,除了這里的壓縮之外,它還讓Kafka喪失了Zero Copy特性。

「何時解壓縮?」

通常來說解壓縮發生在消費者程序中,也就是說Producer發送壓縮消息到Broker后,Broker照單全收并原樣保存起來。當Consumer程序請求這部分消息時,Broker依然原樣發送出去,當消息到達Consumer端后,由Consumer自行解壓縮還原成之前的消息。

「基本過程:Producer端壓縮、Broker端保持、Consumer端解壓縮。」

注意:除了在Consumer端解壓縮,Broker端也會進行解壓縮。

每個壓縮過的消息集合在Broker端寫入時都要發生解壓縮操作,目的就是為了對消息執行各種驗證。

我們必須承認這種解壓縮對Broker端性能是有一定影響的,特別是對CPU的使用率而言。

**啟用壓縮比較合適的時機:**啟用壓縮的一個條件就是Producer程序運行機器上的CPU資源要很充足。如果環境中帶寬資源有限,那么建議開啟壓縮。

消費者組

Consumer Group是Kafka提供的可擴展且具有容錯性的消費者機制。

組內可以有多個消費者或消費者實例,它們共享一個公共的ID,這個ID被稱為Group ID。組內的所有消費者協調在一起來消費訂閱主題的所有分區。

Consumer Group三個特性:

  • Consumer Group下可以有一個或多個Consumer實例,這里的實例可以是一個單獨的進程,也可以是同一進程下的線程。
  • Group ID是一個字符串,在一個Kafka集群中,它標識唯一的一個Consumer Group。
  • Consumer Group下所有實例訂閱的主題的單個分區,只能分配給組內的某個Consumer實例消費,這個分區當然也可以被其他的Group消費。

當Consumer Group訂閱了多個主題后,組內的每個實例不要求一定要訂閱主題的所有分區,它只會消費部分分區中的消息。

各個Consumer Group之間彼此獨立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。

Kafka使用Consumer Group這一種機制,同時實現了傳統消息引擎系統的兩大模型:

  • 如果所有實例都屬于同一個Group,那么它實現的就是消息隊列模型;
  • 如果所有實例分別屬于不同的Group,那么它實現的就是發布/訂閱模型。

理想情況下,Consumer實例的數量應該等于該Group訂閱主題的分區總數。假設一個Consumer Group訂閱了3個主題,分別是A、B、C,它們的分區數依次是1、2、3,那么通常情況下,為該Group設置6個Consumer實例是比較理想的情形,因為它能最大限度地實現高伸縮性。

針對Consumer Group,Kafka管理位移的方式:老版本的Consumer Group把位移保存在ZooKeeper中,在新版本中,采用了將位移保存在Kafka內部主題的方法,這個內部主題就是__consumer_offsets

ConsumerOffsets

Kafka將Consumer的位移數據作為一條條普通的Kafka消息,提交到__consumer_offsets中。__consumer_offsets的主要作用是保存Kafka消費者的位移信息。

__consumer_offsets主題就是普通的Kafka主題,但它的消息格式卻是Kafka自己定義的,用戶不能修改。其有3種消息格式:

  • 用于保存Consumer Group信息的消息。
  • 用于刪除Group過期位移以及刪除Group的消息
  • 保存了位移值。

第2種格式名為tombstone消息,即墓碑消息,也稱delete mark,它的主要特點是它的消息體是null,即空消息體。

一旦某個Consumer Group下的所有Consumer實例都停止了,而且它們的位移數據都已被刪除時,Kafka會向__consumer_offsets主題的對應分區寫入tombstone消息,表明要徹底刪除這個Group的信息。

當Kafka集群中的第一個Consumer程序啟動時,Kafka會自動創建位移主題。默認該主題的分區數是50,副本數是3。

目前Kafka Consumer提交位移的方式有兩種:自動提交位移和手動提交位移。

如果你選擇的是自動提交位移,那么就可能存在一個問題:只要Consumer一直啟動著,它就會無限期地向位移主題寫入消息。

假設Consumer當前消費到了某個主題的最新一條消息,位移是100,之后該主題沒有任何新消息產生,故Consumer無消息可消費了,所以位移永遠保持在100。由于是自動提交位移,位移主題中會不停地寫入位移=100的消息。

Kafka使用Compact策略來刪除__consumer_offsets主題中的過期消息,避免該主題無限期膨脹。Compact的過程就是掃描日志的所有消息,剔除那些過期的消息,然后把剩下的消息整理在一起。

Kafka提供了專門的后臺線程(Log Cleaner)定期地巡檢待Compact的主題,看看是否存在滿足條件的可刪除數據。

位移提交

因為Consumer能夠同時消費多個分區的數據,所以位移的提交實際上是在分區粒度上進行的,即Consumer需要為分配給它的每個分區提交各自的位移數據

位移提交分為自動提交和手動提交;從Consumer端的角度來說,位移提交分為同步提交和異步提交

開啟自動提交位移的方法:Java Consumer默認就是自動提交位移的,控制的參數是enable.auto.commit

如果啟用了自動提交,可以通過參數auto.commit.interval.ms控制提交頻率。它的默認值是5秒,表明Kafka每5秒會為你自動提交一次位移。

Kafka會保證在開始調用poll方法時,提交上次poll返回的所有消息。從順序上來說,poll方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現消費丟失的情況。

但自動提交位移的一個問題在于,它可能會出現重復消費。而手動提交的好處就在于更加靈活,你完全能夠把控位移提交的時機和頻率。

開啟手動提交位移的方法就是設置enable.auto.commit為false,還需要調用相應的API手動提交位移。

最簡單的API就是KafkaConsumer#commitSync()。該方法會提交KafkaConsumer#poll()返回的最新位移。它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。如果提交過程中出現異常,該方法會將異常信息拋出。實例如下:

while (true) {ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));process(records); // 處理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 處理提交失敗異常}
}

在調用commitSync()時,Consumer程序會處于阻塞狀態,直到遠端的Broker返回提交結果,這個狀態才會結束。

鑒于這個問題,可以使用異步提交KafkaConsumer#commitAsync(),Kafka提供了回調函數(callback),處理提交之后的邏輯,比如記錄日志或處理異常等。

while (true) {ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));process(records); // 處理消息consumer.commitAsync((offsets, exception) -> {if (exception != null)handle(exception);});
}

commitAsync的問題在于,出現問題時不會自動重試。如果是手動提交,需要將commitSync和commitAsync組合使用才能到達最理想的效果:

try {while(true) {ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));process(records); // 處理消息consumer.commitAysnc(); // 使用異步提交規避阻塞}
} catch(Exception e) {handle(e); // 處理異常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}
}

Kafka Consumer API為手動提交提供了這樣的方法: commitSync(Map)commitAsync(Map)。在poll返回大量消息時,可以控制處理完部分消息就提交位移。它們的參數是一個Map對象,鍵就是TopicPartition,即消費的分區,而值是一個OffsetAndMetadata對象,保存的主要是位移數據。

private Map offsets = new HashMap<>();
int count = 0;
……
while (true) {ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord record: records) {process(record);  // 處理消息offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)if(count % 100 == 0)consumer.commitAsync(offsets, null); // 回調處理邏輯是nullcount++;}
}

但實際目前一般使用Spring Kafka,在這種Kafka消費者配置中,消息的拉取和提交是逐條進行的:

@KafkaListener(topics = DemoMessage.TOPIC,groupId = "demo-consumer-group-" + DemoMessage.TOPIC)public void onMessage(DemoMessage message, Acknowledgment acknowledgment) {log.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);// 提交消費進度acknowledgment.acknowledge();}

消費者策略


Round

默認,也叫輪詢,說的是對于同一組消費者來說,使用輪詢分配的方式,決定消費者消費的分區

Range

消費方式是以分區總數除以消費者總數來決定,一般如果不能整除,往往是從頭開始將剩余的分區分配好

Sticky

Round和Range方式當同組內有新的消費者加入或者舊的消費者退出的時候,會從新開始決定消費者消費方式。

但是Sticky,在同組中有新的新的消費者加入或者舊的消費者退出時,不會直接開始新的分配,而是保留現有消費者原來的消費策略,將退出的消費者所消費的分區平均分配給現有消費者,新增消費者同理,同其他現存消費者的消費策略中分離。

重平衡

Rebalance本質上是一種協議,規定了一個Consumer Group下的所有Consumer如何達成一致,來分配訂閱Topic的每個分區。

比如某個Group下有20個Consumer實例,它訂閱了一個具有100個分區的Topic。正常情況下,Kafka平均會為每個Consumer分配5個分區。這個分配的過程就叫Rebalance。

Rebalance的觸發條件:

  • 組成員數發生變更。比如有新的Consumer實例加入組或者離開組,或是有Consumer實例崩潰被踢出組。
  • 訂閱主題數發生變更。Consumer Group可以使用正則表達式的方式訂閱主題,比如consumer.subscribe(Pattern.compile(“t.*c”))就表明該Group訂閱所有以字母t開頭、字母c結尾的主題,在Consumer Group的運行過程中,你新創建了一個滿足這樣條件的主題,那么該Group就會發生Rebalance。
  • 訂閱主題的分區數發生變更,會觸發訂閱該主題的所有Group開啟Rebalance。

Rebalance發生時,Group下所有的Consumer實例都會協調在一起共同參與。

多副本機制


Kafka 為分區(Partition)引入了多副本(Replica)機制,上節的備份機制中也對此有一些介紹。

副本角色

一個Partition可以有一個 leader 副本和多個 follower副本,這些副本分散保存在不同的Broker上,而一個topic的不同Partition的leader副本也會盡量被分配在不同的broker,針對同一個Partition,在同一個broker節點上不可能出現它的多個副本。

生產者發送的消息會被發送到 leader 副本,隨后 follower 副本從 leader 副本中拉取消息進行同步,也即生產者和消費者只與 leader 副本交互,而同一個Partition下的所有副本保存有相同的消息序列。

也可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲的安全性。

當 leader 副本發生故障時會從 follower 中選舉出一個 leader,但是 follower 中如果有和 leader 同步程度達不到要求的參加不了 leader 的競選。舊的Leader副本恢復后,只能作為follower副本加入到集群中。

ISR副本集合

ISR中的副本都是與Leader同步的副本,不在ISR中的follower副本就被認為是與Leader不同步的,Leader副本默認在ISR中。

只要一個Follower副本落后Leader副本的時間不連續超過replica.lag.time.max.ms秒,那么Kafka就認為該Follower副本與Leader是同步的,即使此時Follower副本中的消息少于Leader副本。

其中,參數replica.lag.time.max.ms是在Broker端設置的,默認為10s。

Unclean領導者選舉

所有不在ISR中的存活副本都稱為非同步副本,在Kafka中,選舉這種副本的過程稱為Unclean領導者選舉。

開啟Unclean領導者選舉可能會造成數據丟失,優勢在于,它使得分區Leader副本一直存在,不至于停止對外提供服務,因此提升了高可用性。反之,禁止Unclean領導者選舉的好處在于維護了數據的一致性,避免了消息丟失,但犧牲了高可用性。

其中,可以通過Broker端參數unclean.leader.election.enable控制是否允許Unclean領導者選舉。

副本選舉

kafka引入了優先副本的概念,即在副本集合列表中的第一個副本,在理想狀態下就是該分區的leader副本。

例如kafka集群由3臺broker組成,創建了一個topic,設置partition為3,副本數為3,partition0中副本集合列表為 [1,2,0],那么分區0的優先副本為1。

當 leader 副本發生故障時會從 follower 中選舉出一個 leader,當原來leader的節點恢復之后,它只能成為一個follower節點,此時就導致了集群負載不均衡。

為了解決上問題,kafka支持了優先副本選舉,此時,只要再觸發一次優先副本選舉就能保證分區負載均衡。

kafka支持自動優先副本選舉功能,默認每5分鐘觸發一次優先副本選舉操作。

優勢

Kafka 通過給特定 Topic 指定多個 Partition, 而各個 Partition 可以分布在不同的 Broker 上, 這樣便能提供比較好的并發能力(負載均衡)。

Partition 可以指定對應的 Replica 數,提高了消息存儲的安全性和容災能力,不過也相應的增加了所需要的存儲空間。

冪等性producer

在Kafka中,Producer默認不是冪等性的,但0.11.0.0版本引入了冪等性Producer,僅需要設置一個參數即可:

props.put(“enable.idempotence”, ture),
或
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

底層具體的原理很簡單,就是經典的用空間去換時間的優化思路,即在Broker端多保存一些字段。當Producer發送了具有相同字段值的消息后,Broker能夠自動知曉這些消息已經重復了,于是可以在后臺默默地把它們丟棄掉。

冪等性Producer的作用范圍:

  • 它只能保證單分區上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分區上不出現重復消息,它無法實現多個分區的冪等性。
  • 它只能實現單會話上的冪等性,不能實現跨會話的冪等性。這里的會話,可以理解為Producer進程的一次運行,當你重啟了Producer進程之后,這種冪等性保證就喪失了。

事務

目前隔離級別主要是read committed。事務型Producer能保證多條消息原子性地寫入到目標分區。

這批消息要么全部寫入成功,要么全部失敗,即使Producer發生了重啟,Kafka依然保證它們發送消息的精確一次處理。

設置事務型Producer的方法也很簡單,滿足兩個要求即可:

  • 和冪等性Producer一樣,開啟enable.idempotence = true
  • 設置Producer端參數transactional.id,最好為其設置一個有意義的名字。

此外,需要在Producer代碼中做一些調整,如這段代碼所示:

producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}

事務型Producer的調用了一些事務API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分別對應事務的初始化、事務開始、事務提交以及事務終止。

這段代碼能夠保證Record1和Record2被當作一個事務統一提交到Kafka,要么它們全部提交成功,要么全部寫入失敗。

實際上即使寫入失敗,Kafka也會把它們寫入到底層的日志中,也就是說Consumer還是會看到這些消息。

有一個isolation.level參數,這個參數有兩個取值:

  • read_uncommitted:這是默認值,表明Consumer能夠讀取到Kafka寫入的任何消息,不論事務型Producer提交事務還是終止事務,其寫入的消息都可以讀取,如果你用了事務型Producer,那么對應的Consumer就不要使用這個值。
  • read_committed:表明Consumer只會讀取事務型Producer成功提交事務寫入的消息,它也能看到非事務型Producer寫入的所有消息。

攔截器

Kafka攔截器分為生產者攔截器和消費者攔截器。

  • 生產者攔截器支持在發送消息前以及消息提交成功后編寫特定邏輯;
  • 消費者攔截器支持在消費消息前以及提交位移后編寫特定邏輯。

可以將一組攔截器串連成一個大的攔截器,Kafka會按照添加順序依次執行攔截器邏輯。

Kafka攔截器的設置方法是通過參數配置完成的,生產者和消費者兩端有一個相同的參數interceptor.classes,它指定的是一組類的列表,每個類就是特定邏輯的攔截器實現類。

Properties props = new Properties(); 
List interceptors = new ArrayList<>(); 
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1 
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2 
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); 

這兩個類以及編寫的所有Producer端攔截器實現類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor接口,里面有兩個核心的方法。

  • onSend:該方法會在消息發送之前被調用。
  • onAcknowledgement:該方法會在消息成功提交或發送失敗之后被調用。onAcknowledgement的調用要早于callback的調用。值得注意的是,這個方法和onSend不是在同一個線程中被調用的,因此如果在這兩個方法中調用了某個共享可變對象,要保證線程安全。

同理,指定消費者攔截器也是同樣的方法,只是具體的實現類要實現org.apache.kafka.clients.consumer.ConsumerInterceptor接口,這里面也有兩個核心方法。

  • onConsume:該方法在消息返回給Consumer程序之前調用。
  • onCommit:Consumer在提交位移之后調用該方法。

一定要注意的是,指定攔截器類時要指定它們的全限定名。

控制器

控制器組件(Controller),它的主要作用是在Apache ZooKeeper的幫助下管理和協調整個Kafka集群。Kafka控制器大量使用ZooKeeper的Watch功能實現對集群的協調管理。

集群中任意一臺Broker都能充當控制器的角色,但是,在運行過程中,只能有一個Broker成為控制器,行使其管理和協調的職責。

Broker在啟動時,會嘗試去ZooKeeper中創建/controller節點。Kafka當前選舉控制器的規則是:第一個成功創建/controller節點的Broker會被指定為控制器

控制器的職責大致可以分為5種:

  • 主題管理(創建、刪除、增加分區),控制器幫助我們完成對Kafka主題的創建、刪除以及分區增加的操作。
  • 分區重分配
  • Preferred領導者選舉,Preferred領導者選舉主要是Kafka為了避免部分Broker負載過重而提供的一種換Leader的方案。
  • 集群成員管理
    包括自動檢測新增Broker、Broker主動關閉及被動宕機。這種自動檢測是依賴于Watch功能和ZooKeeper臨時節點組合實現的。
    比如,控制器組件會利用Watch機制檢查ZooKeeper的/brokers/ids節點下的子節點數量變更。
    目前,當有新Broker啟動后,它會在/brokers下創建專屬的znode節點。一旦創建完畢,ZooKeeper會通過Watch機制將消息通知推送給控制器,這樣,控制器就能自動地感知到這個變化,進而開啟后續的新增Broker作業。
    偵測Broker存活性則是依賴于剛剛提到的另一個機制:臨時節點
    每個Broker啟動后,會在/brokers/ids下創建一個臨時znode。當Broker宕機或主動關閉后,該Broker與ZooKeeper的會話結束,這個znode會被自動刪除。
    同理,ZooKeeper的Watch機制將這一變更推送給控制器,這樣控制器就能知道有Broker關閉或宕機了,從而進行善后。
  • 數據服務,控制器上保存了最全的集群元數據信息,其他所有Broker會定期接收控制器發來的元數據更新請求,從而更新其內存中的緩存數據。

日志存儲

Kafka中的消息是以主題為基本單位進行歸類的,每個主題在邏輯上相互獨立。每個主題又可以分為一個或多個分區,在不考慮副本的情況下,一個分區會對應一個日志。

但隨著時間推移,日志文件會不斷擴大,因此引入了日志分段(LogSegment)的概念,將Log切分為多個LogSegment,便于后續的消息維護和清理工作。

下圖描繪了主題、分區、副本、Log、LogSegment五者之間的關系。
在這里插入圖片描述
在Kafka中,每個Log對象可以劃分為多個LogSegment文件,每個LogSegment文件包括數據文件、索引文件、時間戳文件等。

其中,每個LogSegment中的日志數據文件大小均相等,該日志數據文件的大小可以通過在Kafka Broker的config/server.properties配置文件的中的log.segment.bytes進行設置,默認為1G大小(1073741824字節),在順序寫入消息時如果超出該設定的閾值,將會創建一組新的日志數據和索引文件。

Kafka 如何保證消息的消費順序?


kafka的topic是無序的,但是一個topic包含多個partition,每個partition內部是有序的。

生產者控制

消息在被追加到 Partition(分區)的時候都會分配一個特定的偏移量(offset),Kafka 通過偏移量(offset)來保證消息在分區內的順序性。

所以,1 個 Topic 只對應一個 Partition就可以保證消息消費順序,但是破壞了 Kafka 的設計初衷。

Kafka 中發送 1 條消息的時候,可以指定 topic, partition, key,data(數據) 4 個參數。如果發送消息的時候指定了 Partition 的話,所有消息都會被發送到指定的 Partition。同一個 key 的消息可以保證只發送到同一個 partition,可以采用表/對象的 id 來作為 key 。

因此,對于如何保證 Kafka 中消息消費的順序,有下面兩種方法:

  • (不推薦)1 個 Topic 只對應一個 Partition。
  • 發送消息的時候指定 key/Partition。

消費者控制

對于同一業務進入了同一個消費者組之后,用了多線程來處理消息,會導致消息的亂序。

消費者內部根據線程數量創建等量的內存隊列,對于需要順序的一系列業務數據,根據key或者業務數據,放到同一個內存隊列中,然后線程從對應的內存隊列中取出并操作

通過設置相同key來保證消息有序性,會有一點缺陷:

例如消息發送設置了重試機制,并且異步發送,消息A和B設置相同的key,業務上A先發,B后發,由于網絡或者其他原因A發送失敗,B發送成功;A由于發送失敗就會重試且重試成功,這時候消息順序B在前A在后,與業務發送順序不一致。

如果需要解決這個問題,需要設置參數max.in.flight.requests.per.connection=1,其含義是限制客戶端在單個連接上能夠發送的未響應請求的個數,設置此值是1表示在 broker響應請求之前producer不能再向同一個broker發送請求,這個參數默認值是5。

Kafka 如何保證消息不丟失?


生產者丟失消息的情況

生產者調用send方法發送消息之后,消息可能因為網絡問題并沒有發送過去。所以,不能默認在調用send方法發送消息之后消息發送成功了。

為了確定消息是發送成功,要判斷消息發送的結果。但是要注意的是 生產者使用 send 方法發送消息實際上是異步的操作,可以通過 get()方法獲取調用結果,但是這樣也讓它變為了同步操作,示例代碼如下

SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {logger.info("生產者成功發送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendResult.getProducerRecord().value().toString());
}

所以一般不推薦這么做!可以采用為其添加回調函數的形式,示例代碼如下:

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生產者成功發送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生產者發送消失敗,原因:{}", ex.getMessage()));

另外,推薦為 Producer 的retries(重試次數)設置一個比較合理的值,一般是 3 ,但是為了保證消息不丟失的話一般會設置比較大一點。設置完成之后,當出現網絡問題之后能夠自動重試消息發送,避免消息丟失。

另外,建議還要設置重試間隔,因為間隔太小的話重試的效果就不明顯了。

消費者丟失消息的情況

我們知道消息在被追加到 Partition(分區)的時候都會分配一個特定的偏移量(offset)。偏移量(offset)表示 Consumer 當前消費到的 Partition(分區)的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區內的順序性。

當消費者拉取到了分區的某個消息之后,消費者會自動提交了 offset。自動提交的話會有一個問題,當消費者剛拿到這個消息準備進行真正消費的時候,突然掛掉了,消息實際上并沒有被消費,但是 offset 卻被自動提交了。

可以手動關閉自動提交 offset,每次在真正消費完消息之后再自己手動提交 offset 。 但是,這樣會帶來消息被重新消費的問題。比如剛消費完消息之后,提交 offset前,服務掛掉了,那么這個消息理論上就會被消費兩次,所以還要考慮消費冪等性。

Kafka 弄丟了消息

Kafka 為分區引入了多副本機制。分區中的多個副本之間會有一個 leader 和多個follower。生產者發送的消息會被發送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進行同步。

假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個 leader ,但是 leader 的數據還有一些沒有被 follower 副本的同步的話,就會造成消息丟失。以下是解決方式:

  • 設置 acks = all
    解決辦法就是我們設置 acks = all。acks 的默認值即為 1,代表消息被 leader 副本接收之后就算被成功發送。
    配置 acks = all 表示只有所有 ISR 列表的副本全部收到消息時,生產者才會接收到來自服務器的響應。這種模式是最高級別的,也是最安全的,可以確保不止一個 Broker 接收到了消息。該模式的延遲會很高。
  • 設置 replication.factor >= 3
    為了保證 leader 副本能有 follower 副本能同步消息,一般為 topic 設置 replication.factor >= 3。這樣就可以保證每個分區至少有 3 個副本。
  • 設置 min.insync.replicas > 1
    一般情況下還需要設置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個副本才算是被成功發送。min.insync.replicas 的默認值為 1 ,在實際生產中應盡量避免默認值 1。
  • 設置replication.factor > min.insync.replicas
    假如兩者相等的話,只要是有一個副本掛掉,整個分區就無法正常工作了。一般推薦設置成 replication.factor = min.insync.replicas + 1
  • 設置 unclean.leader.election.enable = false
    它控制的是哪些Broker有資格競選分區的Leader,當 leader 副本發生故障時就不會從 follower 副本中和 leader 同步程度達不到要求的副本中選擇出 leader ,這樣降低了消息丟失的可能性。

Kafka 如何保證消息不重復消費?

kafka 出現消息重復消費的原因:

  • 服務端側已經消費的數據沒有成功提交 offset(根本原因),例如consumer 在消費過程中,應用進程被強制kill掉或發生異常退出。
  • Kafka 側由于服務端處理業務時間長或者網絡鏈接等等原因讓 Kafka 認為服務假死,觸發了分區 rebalance。
    舉例:單次拉取11條消息,每條消息耗時30s,11條消息耗時5分鐘30秒,由于max.poll.interval.ms 默認值5分鐘,所以消費者無法在5分鐘內消費完,consumer會離開組,導致rebalance。
    在消費完11條消息后,consumer會重新連接broker,再次rebalance,因為上次消費的offset未提交,再次拉取的消息是之前消費過的消息,造成重復消費。

解決方案:

  • 消費消息服務做冪等校驗,比如 Redis 的 set、MySQL 的主鍵等天然的冪等功能。這種方法最有效。
  • 提高消費能力,提高單條消息的處理速度;根據實際場景可講max.poll.interval.ms值設置大一點,避免不必要的rebalance;可適當減小max.poll.records的值,默認值是500,可根據實際消息速率適當調小。
  • 將 enable.auto.commit 參數設置為 false,關閉自動提交,開發者在代碼中手動提交 offset。那么這里會有個問題:什么時候提交 offset 合適?
    • 處理完消息再提交:依舊有消息重復消費的風險,和自動提交一樣
    • 拉取到消息即提交:會有消息丟失的風險。允許消息延時的場景會采用這種方式,通過定時任務在業務不繁忙的時候做數據兜底。

Kafka 重試機制


消費失敗會怎么樣?

生產者代碼:

 for (int i = 0; i < 10; i++) {kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i))}

消費者消代碼:

   @KafkaListener(topics = {KafkaConst.TEST_TOPIC},groupId = "apple")private void customer(String message) throws InterruptedException {log.info("kafka customer:{}",message);Integer n = Integer.parseInt(message);if (n % 5==0){throw new  RuntimeException();}}

在默認配置下,當消費異常會進行重試,重試多次后會跳過當前消息,繼續進行后續消息的消費,不會一直卡在當前消息。

默認會重試多少次?

默認配置下,消費異常會進行重試。看源碼 FailedRecordTracker 類有個 recovered 函數,返回 Boolean 值判斷是否要進行重試,下面是這個函數中判斷是否重試的邏輯:

	@Overridepublic boolean recovered(ConsumerRecord << ? , ? > record, Exception exception,@Nullable MessageListenerContainer container,@Nullable Consumer << ? , ? > consumer) throws InterruptedException {if (this.noRetries) {// 不支持重試attemptRecovery(record, exception, null, consumer);return true;}// 取已經失敗的消費記錄集合Map < TopicPartition, FailedRecord > map = this.failures.get();if (map == null) {this.failures.set(new HashMap < > ());map = this.failures.get();}//  獲取消費記錄所在的Topic和PartitionTopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);// 通知注冊的重試監聽器,消息投遞失敗this.retryListeners.forEach(rl - >rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get()));// 獲取下一次重試的時間間隔long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();if (nextBackOff != BackOffExecution.STOP) {this.backOffHandler.onNextBackOff(container, exception, nextBackOff);return false;} else {attemptRecovery(record, exception, topicPartition, consumer);map.remove(topicPartition);if (map.isEmpty()) {this.failures.remove();}return true;}}

其中, BackOffExecution.STOP 的值為 -1。

@FunctionalInterface
public interface BackOffExecution {long STOP = -1;long nextBackOff();}

nextBackOff 的值調用 BackOff 類的 nextBackOff() 函數。如果當前執行次數大于最大執行次數則返回 STOP,既超過這個最大執行次數后才會停止重試。

public long nextBackOff() {this.currentAttempts++;if (this.currentAttempts <= getMaxAttempts()) {return getInterval();}else {return STOP;}
}

那么這個 getMaxAttempts 的值又是多少呢?回到最開始,當執行出錯會進入 DefaultErrorHandler 。DefaultErrorHandler 默認的構造函數是:

public DefaultErrorHandler() {this(null, SeekUtils.DEFAULT_BACK_OFF);
}

SeekUtils.DEFAULT_BACK_OFF 定義的是:

public static final int DEFAULT_MAX_FAILURES = 10;public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1);

DEFAULT_MAX_FAILURES 的值是 10,currentAttempts 從 0 到 9,所以總共會執行 10 次,每次重試的時間間隔為 0。
最后,簡單總結一下:Kafka 消費者在默認配置下會進行最多 10 次 的重試,每次重試的時間間隔為 0,即立即進行重試。如果在 10 次重試后仍然無法成功消費消息,則不再進行重試,消息將被視為消費失敗

自定義重試次數以及時間間隔

自定義重試次數以及時間間隔,只需要在 DefaultErrorHandler 初始化的時候傳入自定義的 FixedBackOff 即可。重新實現一個 KafkaListenerContainerFactory ,調用 setCommonErrorHandler 設置新的自定義的錯誤處理器就可以實現。

@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();// 自定義重試時間間隔以及次數FixedBackOff fixedBackOff = new FixedBackOff(1000, 5);factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff));factory.setConsumerFactory(consumerFactory);return factory;
}

在重試失敗后進行告警

自定義重試失敗后邏輯,需要手動實現,以下是一個簡單的例子,重寫 DefaultErrorHandler 的 handleRemaining 函數,加上自定義的告警等操作。

@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {public DelErrorHandler(FixedBackOff backOff) {super(null,backOff);}@Overridepublic void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {super.handleRemaining(thrownException, records, consumer, container);log.info("重試多次失敗");// 自定義操作}
}

DefaultErrorHandler 只是默認的一個錯誤處理器,Spring Kafka 還提供了 CommonErrorHandler 接口。手動實現 CommonErrorHandler 就可以實現更多的自定義操作,有很高的靈活性。例如根據不同的錯誤類型,實現不同的重試邏輯以及業務邏輯等。

重試失敗后的數據如何再次處理?

死信隊列(Dead Letter Queue,簡稱 DLQ) 是消息中間件中的一種特殊隊列。它主要用于處理無法被消費者正確處理的消息,通常是因為消息格式錯誤、處理失敗、消費超時等情況導致的消息被"丟棄"或"死亡"的情況。

當消息進入隊列后,如果超過一定的重試次數仍無法被成功處理,消息可以發送到死信隊列中,而不是被永久性地丟棄。

@RetryableTopic 是 Spring Kafka 中的一個注解,它用于配置某個 Topic 支持消息重試,更推薦使用這個注解來完成重試。

// 重試 5 次,重試間隔 100 毫秒,最大間隔 1 秒
@RetryableTopic(attempts = "5",backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {log.info("kafka customer:{}", message);Integer n = Integer.parseInt(message);if (n % 5 == 0) {throw new RuntimeException();}System.out.println(n);
}

當達到最大重試次數后,如果仍然無法成功處理消息,消息會被發送到對應的死信隊列中。對于死信隊列的處理,既可以用 @DltHandler 處理,也可以使用 @KafkaListener 重新消費。

高性能原因


順序讀寫

kafka的消息是不斷追加到文件中的,這個特性使kafka可以充分利用磁盤的順序讀寫性能。

順序讀寫不需要硬盤磁頭的尋道時間,只需很少的扇區旋轉時間,所以速度遠快于隨機讀寫。

Kafka 可以配置異步刷盤,不開啟同步刷盤,異步刷盤不需要等寫入磁盤后返回消息投遞的 ACK,所以它提高了消息發送的吞吐量,降低了請求的延時。

零拷貝

傳統的 IO 流程,需要先把數據拷貝到內核緩沖區,再從內核緩沖拷貝到用戶空間,應用程序處理完成以后,再拷貝回內核緩沖區。

這個過程中發生了多次數據拷貝,為了減少不必要的拷貝,Kafka 依賴 Linux 內核提供的 Sendfile 系統調用。

在 Sendfile 方法中,數據在內核緩沖區完成輸入和輸出,不需要拷貝到用戶空間處理,這也就避免了重復的數據拷貝。

在具體的操作中,Kafka 把所有的消息都存放在單獨的文件里,在消息投遞時直接通過 Sendfile 方法發送文件,減少了上下文切換,因此大大提高了性能。

MMAP技術

除了 Sendfile 之外,還有一種零拷貝的實現技術,即 Memory Mapped Files。

Kafka 使用 Memory Mapped Files 完成內存映射,Memory Mapped Files 對文件的操作不是 write/read,而是直接對內存地址的操作,如果是調用文件的 read 操作,則把數據先讀取到內核空間中,然后再復制到用戶空間,但 MMAP可以將文件直接映射到用戶態的內存空間,省去了用戶空間到內核空間復制的開銷。

Producer生產的數據持久化到broker,采用mmap文件映射,實現順序的快速寫入。

Consumer從broker讀取數據,采用sendfile,將磁盤文件讀到OS內核緩沖區后,直接轉到socket buffer進行網絡發送。

批量發送讀取

Kafka 的批量包括批量寫入、批量發布等,它在消息投遞時會將消息緩存起來,然后批量發送。

同樣,消費端在消費消息時,也不是一條一條處理的,而是批量進行拉取,提高了消息的處理速度。

數據壓縮

Kafka還支持對消息集合進行壓縮,Producer可以通過GZIP或Snappy格式對消息集合進行壓縮,來減少傳輸的數據量,減輕對網絡傳輸的壓力。

Producer壓縮之后,在Consumer需進行解壓,雖然增加了CPU的工作,但在對大數據處理上,瓶頸在網絡上而不是CPU,所以這個成本很值得。

分區機制

kafka中的topic中的內容可以被分為多partition存在,每個partition又分為多個段segment,所以每次操作都是針對一小部分做操作,增加并行操作的能力。

常用參數


broker端配置

broker.id

每個 kafka broker 都有一個唯一的標識來表示,這個唯一的標識符即是 broker.id,它的默認值是 0。

這個值在 kafka 集群中必須是唯一的,這個值可以任意設定,

port

如果使用配置樣本來啟動 kafka,它會監聽 9092 端口,修改 port 配置參數可以把它設置成任意的端口。

要注意,如果使用 1024 以下的端口,需要使用 root 權限啟動 kakfa。

zookeeper.connect

用于保存 broker 元數據的 Zookeeper 地址是通過 zookeeper.connect 來指定的。

比如可以這么指定 localhost:2181 表示這個 Zookeeper 是運行在本地 2181 端口上的。

也可以通過 zk1:2181,zk2:2181,zk3:2181 來指定 zookeeper.connect 的多個參數值。

該配置參數是用冒號分割的一組 hostname:port/path 列表,其含義如下

  • hostname 是 Zookeeper 服務器的機器名或者 ip 地址。
  • port 是 Zookeeper 客戶端的端口號
  • /path 是可選擇的 Zookeeper 路徑,Kafka 路徑是使用了 chroot 環境,如果不指定默認使用跟路徑。

如果你有兩套 Kafka 集群,假設分別叫它們 kafka1 和 kafka2,那么兩套集群的zookeeper.connect參數可以這樣指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2

log.dirs

Kafka 把所有的消息都保存到磁盤上,存放這些日志片段的目錄是通過 log.dirs 來制定的,它是用一組逗號來分割的本地系統路徑,log.dirs 是沒有默認值的,必須手動指定他的默認值

還有一個參數是 log.dir,這個配置是沒有 s 的,默認情況下只用配置 log.dirs 就好了,比如可以通過 /home/kafka1,/home/kafka2,/home/kafka3 這樣來配置這個參數的值。

auto.create.topics.enable

默認情況下,kafka 會自動創建主題,auto.create.topics.enable參數建議最好設置成 false,即不允許自動創建 Topic。

主題相關配置

num.partitions

num.partitions 參數指定了新創建的主題需要包含多少個分區,該參數的默認值是 1。

default.replication.factor

表示 kafka保存消息的副本數。

log.retention.ms

Kafka 通常根據時間來決定數據可以保留多久。默認使用log.retention.hours參數來配置時間,默認是 168 個小時,即一周。

除此之外,還有兩個參數log.retention.minuteslog.retentiion.ms

這三個參數作用是一樣的,都是決定消息多久以后被刪除,推薦使用log.retention.ms

message.max.bytes

broker 通過設置message.max.bytes參數來限制單個消息的大小,默認是 1000 000, 也就是 1MB,如果生產者嘗試發送的消息超過這個大小,不僅消息不會被接收,還會收到 broker 返回的錯誤消息。

retention.ms

規定了該主題消息被保存的時常,默認是7天,即該主題只能保存7天的消息,一旦設置了這個值,它會覆蓋掉 Broker 端的全局參數值。

常見面試題


Kafka是Push還是Pull模式?

Kafka遵循了一種大部分消息系統共同的傳統的設計:producer將消息推送到broker,consumer從broker拉取消息。

push模式由broker決定消息推送的速率,對于不同消費速率的consumer就不太好處理了。

消息系統都致力于讓consumer以最大的速率最快速的消費消息,push模式下,當broker推送的速率遠大于consumer消費的速率時,consumer處理會有問題。

Kafka中的Producer和Consumer采用的是Push-and-Pull模式,即Producer向Broker Push消息,Consumer從Broker Pull消息。

Pull有個缺點是,如果broker沒有可供消費的消息,將導致consumer不斷在循環中輪詢,直到新消息到達。

Kafka如何保證高可用?

面試題:Kafka如何保證高可用?有圖有真相

Kafka的使用場景

異步通信

消息中間件在異步通信中用的最多,很多業務流程中,如果所有步驟都同步進行可能會導致核心流程耗時非常長,更重要的是所有步驟都同步進行一旦非核心步驟失敗會導致核心流程整體失敗,因此在很多業務流程中Kafka就充當了異步通信角色。

日志同步

大規模分布式系統中的機器非常多而且分散在不同機房中,分布式系統帶來的一個明顯問題就是業務日志的查看、追蹤和分析等行為變得十分困難,對于集群規模在百臺以上的系統,查詢線上日志很恐怖。

為了應對這種場景統一日志系統應運而生,日志數據都是海量數據,通常為了不給系統帶來額外負擔一般會采用異步上報,這里Kafka以其高吞吐量在日志處理中得到了很好的應用。

實時計算

隨著據量的增加,離線的計算會越來越慢,難以滿足用戶在某些場景下的實時性要求,因此很多解決方案中引入了實時計算。

很多時候,即使是海量數據,我們也希望即時去查看一些數據指標,實時流計算應運而生。

實時流計算有兩個特點,一個是實時,隨時可以看數據;另一個是流。

kafka的作用

緩沖和削峰:上游數據時有突發流量,下游可能扛不住,或者下游沒有足夠多的機器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務就可以按照自己的節奏進行慢慢處理。

解耦和擴展性:消息隊列可以作為一個接口層,解耦重要的業務流程。只需要遵守約定,針對數據編程即可獲取擴展能力。

冗余:可以采用一對多的方式,一個生產者發布消息,可以被多個訂閱topic的服務消費到,供多個毫無關聯的業務使用。

健壯性:消息隊列可以堆積請求,所以消費端業務即使短時間死掉,也不會影響主要業務的正常進行。

異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

Kafka消費過的消息如何再消費

kafka消費消息的offset是定義在zookeeper中的, 如果想重復消費kafka的消息,可以在redis中自己記錄offset的checkpoint點(n個),當想重復消費消息時,通過讀取redis中的checkpoint點進行zookeeper的offset重設,這樣就可以達到重復消費消息的目的了

kafka的數據是放在磁盤上還是內存上,為什么速度會快?

kafka使用的是磁盤存儲。

速度快是因為:

  • 順序寫入:因為硬盤是機械結構,每次讀寫都會尋址->寫入,其中尋址是一個“機械動作”,它是耗時的。所以硬盤 “討厭”隨機I/O, 喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。
  • Memory Mapped Files(內存映射文件):64位操作系統中一般可以表示20G的數據文件,它的工作原理是直接利用操作系統的Page來實現文件到物理內存的直接映射。完成映射之后你對物理內存的操作會被同步到硬盤上。
  • Kafka高效文件存儲設計: Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。
    通過索引信息可以快速定位 message和確定response的大小。通過index元數據全部映射到memory(內存映射文件), 可以避免segment file的IO磁盤操作。通過索引文件稀疏存儲,可以大幅降低index文件元數據占用空間大小。

注:

  • Kafka解決查詢效率的手段之一是將數據文件分段,比如有100條Message,它們的offset是從0到99。假設將數據文件分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨的數據文件里面,數據文件以該段中小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個段中。
  • 為數據文件建索引,雖然數據文件分段使得可以在一個較小的數據文件中查找對應offset的Message,但是這依然需要順序掃描才能找到對應offset的Message。 為了進一步提高查找的效率,Kafka為每個分段后的數據文件建立了索引文件,文件名與數據文件的名字是一樣的,只是文件擴展名為.index。

Kafka數據怎么保障不丟失

參考上面章節

kafka 重啟是否會導致數據丟失

kafka是將數據寫到磁盤的,一般數據不會丟失。
但是在重啟kafka過程中,如果有消費者消費消息,那么kafka如果來不及提交offset,可能會造成數據的不準確(丟失或者重復消費)。

kafka 宕機了如何解決

kafka 宕機了,首先考慮的問題是所提供的服務是否因為宕機的機器而受到影響,如果服務提供沒問題,事前做好了集群的容災機制,那么就不用擔心了。

想要恢復集群的節點,主要的步驟就是通過日志分析來查看節點宕機的原因,從而解決并重新恢復節點。

為什么Kafka不支持讀寫分離

在 Kafka 中,生產者寫入消息、消費者讀取消息的操作都是與 leader 副本進行交互的,從而實現的是一種主寫主讀的生產消費模型。 Kafka 并不支持主寫從讀,因為主寫從讀有 2 個很明顯的缺點:

  1. 數據一致性問題:數據從主節點轉到從節點必然會有一個延時的時間窗口,這個時間窗口會導致主從節點之間的數據不一致。
  2. 延時問題:類似 Redis 這種組件,數據從寫入主節點到同步至從節點中的過程需要經歷 網絡→主節點內存→網絡→從節點內存 這幾個階段,整個過程會耗費一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經歷 網絡→主節點內存→主節點磁盤→網絡→從節 點內存→從節點磁盤 這幾個階段。對延時敏感的應用而言,主寫從讀的功能并不太適用。

而kafka的主寫主讀的優點就很多了:

  • 可以簡化代碼的實現邏輯,減少出錯的可能;
  • 將負載粒度細化均攤,與主寫從讀相比,不僅負載效能更好,而且對用戶可控;
  • 沒有延時的影響;
  • 在副本穩定的情況下,不會出現數據不一致的情況。

kafka數據分區和消費者的關系

每個分區只能由同一個消費組內的一個消費者(consumer)來消費,可以由不同的消費組的消費者來消費,同組的消費者則起到并發的效果。

kafka的數據offset讀取流程

  1. 連接ZK集群,從ZK中拿到對應topic的partition信息和partition的Leader的相關信息;
  2. 連接到對應Leader對應的broker;
  3. consumer將?自?己保存的offset發送給Leader;
  4. Leader根據offset等信息定位到segment(索引?文件和?日志?文件);
  5. 根據索引?文件中的內容,定位到?日志?文件中該偏移量量對應的開始位置讀取相應?長度的數據并返回給consumer。

kafka內部如何保證順序,結合外部組件如何保證消費者的順序

參考上面章節

Kafka消息數據積壓,Kafka消費能力不足怎么處理

  1. 如果是Kafka消費能力不足,則可以考慮增加Topic的分區數,并且同時提升消費組的消費者數量,消費者數=分區數。(兩者缺一不可)
  2. 如果是下游的數據處理不及時,提高每批次拉取的數量。批次拉取數據過少(拉取數據/處理時間<生產速度),使處理的數據小于生產的數據,也會造成數據積壓。

Kafka單條日志傳輸大小

kafka對于消息體的大小默認為單條最大值是1M但是在我們應用場景中, 常常會出現一條消息大于1M,如果不對kafka進行配置。則會出現生產者無法將消息推送到kafka或消費者無法去消費kafka里面的數據, 這時我們就要以下參數進行配置:

replica.fetch.max.bytes: 1048576  broker可復制的消息的最大字節數, 默認為1M
message.max.bytes: 1000012   kafka 會接收單個消息size的最大限制, 默認為1M左右

注意:message.max.bytes必須小于等于replica.fetch.max.bytes,否則就會導致replica之間數據同步失敗。

參考

四萬字32圖,Kafka知識體系保姆級教程寶典
Kafka常見問題總結
Kafka核心知識點大梳理

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/903398.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/903398.shtml
英文地址,請注明出處:http://en.pswp.cn/news/903398.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【亞馬遜云】AWS Wavelength 從理論講解到實驗演練

&#x1faaa; 本文作者&#xff1a;許業寶 ?? 作者信息&#xff1a; &#x1f31e; VSTECS 云解決方案架構師 &#xff5c; AWS Ambassador &#xff5c; &#x1faaa; AWS Community Builder | 亞馬遜云科技技能云博主 ? 已獲六項 AWS 認證 | CKA、CKS認證 &#xff5c; …

ORACLE DATAGUARD遇到GAP增量恢復方式修復RAC環境備機的實踐

ORACLE DATAGUARD技術是一個常用的數據保護機制&#xff0c;在DATAGUARD運行過程中&#xff0c;遇到異常導致備機不同步&#xff0c;而主庫的歸檔日志也被清理&#xff0c;此時出現GAP&#xff0c;無法同步&#xff1b;就需要人工處理&#xff1b;對于小型數據庫重新全量同步數…

Java24 抗量子加密:后量子時代的安全基石

一、量子計算威脅與 Java 的應對 隨著量子計算機的快速發展&#xff0c;傳統加密算法面臨前所未有的挑戰。Shor 算法可在多項式時間內破解 RSA、ECC 等公鑰加密體系&#xff0c;而 Grover 算法能將對稱加密的暴力破解效率提升至平方根級別。據 NIST 預測&#xff0c;具備實用價…

day005

文章目錄 1. Linux系統核心文件1.1 查看系統版本信息1.1.1 /etc/os-release1.1.2 hostnamectl 1.2 查看主機名并修改1.2.1 hostname1.2.2 cat /etc/hostname1.2.3 hostnamectl 1.3 查看Linux內核版本1.3.1 uname -r1.3.2 hostnamectl 1.4 查看網卡信息并修改1.4.1 nmtui 網絡管…

常用財務分析指標列表

財務分析指標是企業財務管理和決策的重要工具&#xff0c;不同需求人群在各自的場景中運用這些指標來做出決策。企業管理者需要通過財務分析指標來評估企業經營狀況、制定戰略和決策&#xff1b;投資者利用這些指標來評估投資價值和風險&#xff1b;債權人通過財務分析指標來評…

刪除非今天日期文件夾--批處理腳本

echo off setlocal enabledelayedexpansion REM ----- 配置部分 ----- set “target_dirK:\360downloads\Software” set “log_file%temp%\delete_folders.log” REM ----- 管理員權限檢查 ----- NET FILE >NUL 2>&1 || ( echo 需要以管理員權限運行&#xff01; …

QT創建軟件登錄界面(14)

文章目錄 一、本章說明二、登錄界面設計2.1 添加登錄窗口2.2 設置登錄窗口布局2.3 主函數中創建登錄窗口對象2.4 登錄窗口頭文件與c文件2.5 源文件添加三、注意四、源碼項目文件一、本章說明 注:本節為【基于STM的環境監測系統(節點+云服務器存儲+QT界面設計)】項目第14篇文…

小天互連即時通訊音視頻功能

小天互連即時通訊的音視頻功能是核心功能及優勢之一&#xff0c;小天互連即時通訊采用先進的音視頻編解碼技術&#xff0c;即使在網絡環境不好的情況下&#xff0c;也能智能優化保證會議穩定進行。因此可以讓遠程開會也變得和面對面交流一樣的便捷&#xff0c;極大地提升了溝通…

【LInux網絡】數據鏈路層 - 深度理解以太網和APR協議

&#x1f4e2;博客主頁&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;博客倉庫&#xff1a;https://gitee.com/JohnKingW/linux_test/tree/master/lesson &#x1f4e2;歡迎點贊 &#x1f44d; 收藏 ?留言 &#x1f4dd; 如有錯誤敬請指正&#xff01; &…

【零基礎入門】ASP.NET Core快速搭建第一個Web應用

一、為什么選擇ASP.NET Core&#xff1f; 跨平臺支持&#xff1a;可在Windows/macOS/Linux系統運行 高性能&#xff1a;比傳統ASP.NET框架快10倍以上 開源生態&#xff1a;活躍的開發者社區和豐富的NuGet包 云原生支持&#xff1a;完美適配Docker和Kubernetes部署 二、開發…

AT2401C與RFX2401C問題處理資料

1、AT2401C 可以 PIN 對 PIN 替代 RFX2401C 嗎&#xff1f; 答&#xff1a;AT2401C 可以 PIN 對 PIN 替換 RFX2401C&#xff1b;同時 CB2401 也可以 PIN 對 PIN 替換 RFX2401C&#xff1b;我們主要推 AT2401C 這款芯片&#xff0c;如果客戶產 品需要過認證或者應用于音頻產品建…

Redis-緩存應用 本地緩存與分布式緩存的深度解析

Redis緩存場景與策略&#xff1a;本地緩存與分布式緩存的深度解析 在當今高并發、低延遲的互聯網架構中&#xff0c;緩存技術是優化系統性能的核心手段之一。Redis作為分布式緩存的標桿&#xff0c;與本地緩存共同構成了緩存體系的兩大支柱。然而&#xff0c;兩者的適用場景與…

LinuxAgent開源程序是一款智能運維助手,通過接入 DeepSeek API 實現對 Linux 終端的自然語言控制,幫助用戶更高效地進行系統運維工作

一、軟件介紹 文末提供程序和源碼下載 LinuxAgent 開源程序是基于LLM大模型的Linux智能運維助手。通過接入DeepSeek API實現對Linux終端的自然語言控制&#xff0c;幫助用戶更高效地進行系統運維工作。 二、版本特性對比 特性v1.4.1 1.4.1 版v2.0.3 2.0.3 版v2.0.4 2.0.4…

Shadertoy著色器移植到Three.js經驗總結

Shadertoy是一個流行的在線平臺&#xff0c;用于創建和分享WebGL片段著色器。里面有很多令人驚嘆的畫面&#xff0c;甚至3D場景。本人也移植了幾個ShaderToy上的著色器。本文將詳細介紹移植過程中需要注意的關鍵點。 1. 基本結構差異 想要移植ShaderToy的shader到three.js&am…

StarRocks SRCA 考試心得總結

文章目錄 前言0 什么是StarRcoks&#xff1f;1. 關于 SRCA 考試2. 備考資料與學習方式2.1 官方文檔與教程2.2 在線培訓課程2.3 實戰演練 3. 重點考試內容3.1 StarRocks 架構與原理3.2 數據導入與導出3.3 SQL 查詢優化3.4 性能調優 4. 備考建議4.1 多做實操4.2 注重考試中的細節…

什么是 Spring Profiles 以及如何在 Spring Boot 中使用:配置與實踐指南

在現代應用開發中&#xff0c;應用程序通常需要在不同環境&#xff08;如開發、測試、生產&#xff09;中運行&#xff0c;每個環境可能有不同的配置&#xff08;如數據庫、日志級別、消息隊列&#xff09;。Spring Profiles 是 Spring 框架提供的一項功能&#xff0c;用于根據…

Spring Cloud Gateway限流:基于Redis的請求限流實現

文章目錄 引言一、Spring Cloud Gateway限流基礎1.1 限流機制概述1.2 Redis分布式限流原理 二、實現基于Redis的限流方案2.1 環境準備與依賴配置2.2 配置限流策略2.3 自定義限流響應 三、高級應用與最佳實踐3.1 動態限流規則調整3.2 優先級與降級策略3.3 監控與告警 總結 引言 …

keil修改字體無效,修改字體為“微軟雅黑”方法

在網上下載了微軟雅黑字體&#xff0c;微軟雅黑參考下載鏈接 結果在Edit->Configuration中找不到這個字體 這個時候可以在keil的安裝目錄中找到UV4/global.prop文件 用記事本打開它進行編輯&#xff0c;把字體名字改成微軟雅黑 重新打開keil就發現字體成功修改了。 這個…

CSS文字特效實例:猜猜我是誰

CSS文字特效實例&#xff1a;猜猜我是誰 引言 在之前的文章中&#xff0c;我們分別實現了空心文字、文字填充、文字模糊、文字裂開等效果。本文將使用一個小實例&#xff0c;組合使用相關特效&#xff1a;當鼠標懸停在圖片上時&#xff0c;其余圖片模糊&#xff0c;且文字會上…

美團社招一面

美團社招一面 做題 1、面試題 <style> .outer{width: 100px;background: red;height: 100px; }.inner {width: 50px;height: 50px;background: green; }</style> <div class"outer"><div class"inner"></div> </div>…