1. kafka內核原理
1.1 ISR機制
光是依靠多副本機制能保證Kafka的高可用性,但是能保證數據不丟失嗎?不行,因為如果leader宕機,但是leader的數據還沒同步到follower上去,此時即使選舉了follower作為新的leader,當時剛才的數據已經丟失了。ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的數量,只有處于ISR列表中的follower才可以在leader宕機之后被選舉為新的leader,因為在這個ISR列表里代表他的數據跟leader是同步的。如果要保證寫入kafka的數據不丟失,首先需要保證ISR中至少有一個follower,其次就是在一條數據寫入了leader partition之后,要求必須復制給ISR中所有的follower partition,才能說代表這條數據已提交,絕對不會丟失,這是Kafka給出的承諾
1.2 HW&LEO原理
-
LEO
last end offset,日志末端偏移量,標識當前日志文件中下一條待寫入的消息的offset。舉一個例子,若LEO=10,那么表示在該副本日志上已經保存了10條消息,位移范圍是[0,9]。
-
HW
Highwatermark,俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個offset之前的消息。任何一個副本對象的HW值一定不大于其LEO值。小于或等于HW值的所有消息被認為是“已提交的”或“已備份的”。HW它的作用主要是用來判斷副本的備份進度.下圖表示一個日志文件,這個日志文件中只有9條消息,第一條消息的offset(LogStartOffset)為0,最有一條消息的offset為8,offset為9的消息使用虛線表示的,代表下一條待寫入的消息。日志文件的 HW 為6,表示消費者只能拉取offset在 0 到 5 之間的消息,offset為6的消息對消費者而言是不可見的。
leader持有的HW即為分區的HW,同時leader所在broker還保存了所有follower副本的leo(1)關系:leader的leo >= follower的leo >= leader保存的follower的leo >= leader的hw >= follower的hw (2)原理:上面關系反應出各個值的更新邏輯的先后
-
更新LEO的機制
- 注意
- follower副本的LEO保存在2個地方
(1)follower副本所在的broker緩存里。 (2)leader所在broker的緩存里,也就是leader所在broker的緩存上保存了該分區所有副本的LEO。
-
更新LEO的時機
- follower更新LEO
(1)follower的leo更新時間每當follower副本寫入一條消息時,leo值會被更新(2)leader端的follower副本的leo更新時間當follower從leader處fetch消息時,leader獲取follower的fetch請求中offset參數,更新保存在leader端follower的leo。
- leader更新LEO
(1)leader本身的leo的更新時間:leader向log寫消息時
- 注意
-
更新HW的機制
-
follower更新HW
follower更新HW發生在其更新完LEO后,即follower向log寫完數據,它就會嘗試更新HW值。具體算法就是比較當前LEO(已更新)與fetch響應中leader的HW值,取兩者的小者作為新的HW值。
-
leader更新HW
- leader更新HW的時機
(1)producer 向 leader 寫消息時 (2)leader 處理 follower 的 fetch 請求時 (3)某副本成為leader時 (4)broker 崩潰導致副本被踢出ISR時
- leader更新HW的方式
當嘗試確定分區HW時,它會選出所有滿足條件的副本,比較它們的LEO(當然也包括leader自己的LEO),并選擇最小的LEO值作為HW值。這里的滿足條件主要是指副本要滿足以下兩個條件之一:(1)處于ISR中(2)副本LEO落后于leader LEO的時長不大于replica.lag.time.max.ms參數值(默認值是10秒)
-
-
2. producer消息發送原理
2.1 producer核心流程概覽
-
1、ProducerInterceptors是一個攔截器,對發送的數據進行攔截
ps:說實話這個功能其實沒啥用,我們即使真的要過濾,攔截一些消息,也不考慮使用它,我們直接發送數據之前自己用代碼過濾即可
-
2、Serializer 對消息的key和value進行序列化
-
3、通過使用分區器作用在每一條消息上,實現數據分發進行入到topic不同的分區中
-
4、RecordAccumulator收集消息,實現批量發送
按照分區構建不同的隊列,將消息封裝成一個個batch(16kb)
它是一個緩沖區,可以緩存一批數據,把topic的每一個分區數據存在一個隊列中,然后封裝消息成一個一個的batch批次,最后實現數據分批次批量發送。
-
5、Sender線程從RecordAccumulator獲取消息
-
6、構建ClientRequest對象
-
7、將ClientRequest交給 NetWorkClient準備發送
-
8、NetWorkClient 將請求放入到KafkaChannel的緩存
-
9、發送請求到kafka集群
-
10、調用回調函數,接受到響應
3. producer核心參數
- 回顧之前的producer生產者代碼
package com.kaikeba.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 需求:開發kafka生產者代碼*/
public class KafkaProducerStudyDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//準備配置屬性Properties props = new Properties();//kafka集群地址props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");//acks它代表消息確認機制 // 1 0 -1 allprops.put("acks", "all");//重試的次數props.put("retries", 0);//批處理數據的大小,每次寫入多少數據到topicprops.put("batch.size", 16384);//可以延長多久發送數據props.put("linger.ms", 1);//緩沖區的大小props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//添加自定義分區函數props.put("partitioner.class","com.kaikeba.partitioner.MyPartitioner");Producer<String, String> producer = new KafkaProducer<String, String>(props);for (int i = 0; i < 100; i++) {// 這是異步發送的模式producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hello-kafka-"+i), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null) {// 消息發送成功System.out.println("消息發送成功");} else {// 消息發送失敗,需要重新發送}}});// 這是同步發送的模式//producer.send(record).get();// 你要一直等待人家后續一系列的步驟都做完,發送消息之后// 有了消息的回應返回給你,你這個方法才會退出來}producer.close();}}
3.1 常見異常處理
-
不管是異步還是同步,都可能讓你處理異常,常見的異常如下:
1)LeaderNotAvailableException:這個就是如果某臺機器掛了,此時leader副本不可用,會導致你寫入失敗,要等待其他follower副本切換為leader副本之后,才能繼續寫入,此時可以重試發送即可。如果說你平時重啟kafka的broker進程,肯定會導致leader切換,一定會導致你寫入報錯,是LeaderNotAvailableException2)NotControllerException:這個也是同理,如果說Controller所在Broker掛了,那么此時會有問題,需要等待Controller重新選舉,此時也是一樣就是重試即可3)NetworkException:網絡異常,重試即可 我們之前配置了一個參數,retries,他會自動重試的,但是如果重試幾次之后還是不行,就會提供Exception給我們來處理了。
-
retries
- 重新發送數據的次數
-
retry.backoff.ms
- 兩次重試之間的時間間隔
3.2 提升消息吞吐量
-
buffer.memory
- 設置發送消息的緩沖區,默認值是33554432,就是32MB
如果發送消息出去的速度小于寫入消息進去的速度,就會導致緩沖區寫滿,此時生產消息就會阻塞住,所以說這里就應該多做一些壓測,盡可能保證說這塊緩沖區不會被寫滿導致生產行為被阻塞住
-
compression.type
- producer用于壓縮數據的壓縮類型。默認是none表示無壓縮。可以指定gzip、snappy
- 壓縮最好用于批量處理,批量處理消息越多,壓縮性能越好。
-
batch.size
- producer將試圖批處理消息記錄,以減少請求次數。這將改善client與server之間的性能。
- 默認是16384Bytes,即16kB,也就是一個batch滿了16kB就發送出去
如果batch太小,會導致頻繁網絡請求,吞吐量下降;如果batch太大,會導致一條消息需要等待很久才能被發送出去,而且會讓內存緩沖區有很大壓力,過多數據緩沖在內存里。
-
linger.ms
- 這個值默認是0,就是消息必須立即被發送
一般設置一個100毫秒之類的,這樣的話就是說,這個消息被發送出去后進入一個batch,如果100毫秒內,這個batch滿了16kB,自然就會發送出去。但是如果100毫秒內,batch沒滿,那么也必須把消息發送出去了,不能讓消息的發送延遲時間太長,也避免給內存造成過大的一個壓力。
3.3 請求超時
- max.request.size
- 這個參數用來控制發送出去的消息的大小,默認是1048576字節,也就1mb
- 這個一般太小了,很多消息可能都會超過1mb的大小,所以需要自己優化調整,把他設置更大一些(企業一般設置成10M)
- request.timeout.ms
- 這個就是說發送一個請求出去之后,他有一個超時的時間限制,默認是30秒
- 如果30秒都收不到響應,那么就會認為異常,會拋出一個TimeoutException來讓我們進行處理
3.4 ACK參數
acks參數,其實是控制發送出去的消息的持久化機制的。
-
acks=0
- 生產者只管發數據,不管消息是否寫入成功到broker中,數據丟失的風險最高
producer根本不管寫入broker的消息到底成功沒有,發送一條消息出去,立馬就可以發送下一條消息,這是吞吐量最高的方式,但是可能消息都丟失了。 你也不知道的,但是說實話,你如果真是那種實時數據流分析的業務和場景,就是僅僅分析一些數據報表,丟幾條數據影響不大的。會讓你的發送吞吐量會提升很多,你發送弄一個batch出去,不需要等待人家leader寫成功,直接就可以發送下一個batch了,吞吐量很大的,哪怕是偶爾丟一點點數據,實時報表,折線圖,餅圖。
-
acks=1
- 只要leader寫入成功,就認為消息成功了.
默認給這個其實就比較合適的,還是可能會導致數據丟失的,如果剛寫入leader,leader就掛了,此時數據必然丟了,其他的follower沒收到數據副本,變成leader.
-
acks=all,或者 acks=-1
- 這個leader寫入成功以后,必須等待其他ISR中的副本都寫入成功,才可以返回響應說這條消息寫入成功了,此時你會收到一個回調通知.
這種方式數據最安全,但是性能最差。
-
如果要想保證數據不丟失,得如下設置
(1)min.insync.replicas = 2ISR里必須有2個副本,一個leader和一個follower,最最起碼的一個,不能只有一個leader存活,連一個follower都沒有了。(2)acks = -1每次寫成功一定是leader和follower都成功才可以算做成功,這樣leader掛了,follower上是一定有這條數據,不會丟失。(3)retries = Integer.MAX_VALUE無限重試,如果上述兩個條件不滿足,寫入一直失敗,就會無限次重試,保證說數據必須成功的發送給兩個副本,如果做不到,就不停的重試。除非是面向金融級的場景,面向企業大客戶,或者是廣告計費,跟錢的計算相關的場景下,才會通過嚴格配置保證數據絕對不丟失
3.5 重試亂序
- max.in.flight.requests.per.connection
- 每個網絡連接可以忍受 producer端發送給broker 消息然后消息沒有響應的個數
- 消息重試是可能導致消息的亂序的,因為可能排在你后面的消息都發送出去了,你現在收到回調失敗了才在重試,此時消息就會亂序,所以可以使用“max.in.flight.requests.per.connection”參數設置為1,這樣可以保證producer同一時間只能發送一條消息
消息重試是可能導致消息的亂序的,因為可能排在你后面的消息都發送出去了,你現在收到回調失敗了才在重試,此時消息就會亂序,所以可以使用“max.in.flight.requests.per.connection”參數設置為1,這樣可以保證producer同一時間只能發送一條消息
4. broker核心參數
-
server.properties配置文件核心參數
【broker.id】 每個broker都必須自己設置的一個唯一id【log.dirs】 這個極為重要,kafka的所有數據就是寫入這個目錄下的磁盤文件中的,如果說機器上有多塊物理硬盤,那么可以把多個目錄掛載到不同的物理硬盤上,然后這里可以設置多個目錄,這樣kafka可以數據分散到多塊物理硬盤,多個硬盤的磁頭可以并行寫,這樣可以提升吞吐量。【zookeeper.connect】 連接kafka底層的zookeeper集群的【Listeners】 broker監聽客戶端發起請求的端口號,默認是9092【unclean.leader.election.enable】 默認是false,意思就是只能選舉ISR列表里的follower成為新的leader,1.0版本后才設為false,之前都是true,允許非ISR列表的follower選舉為新的leader【delete.topic.enable】 默認true,允許刪除topic【log.retention.hours】 可以設置一下,要保留數據多少個小時(默認168小時),這個就是底層的磁盤文件,默認保留7天的數據,根據自己的需求來就行了
【log.retention.hours】
可以設置一下,要保留數據多少個小時(默認168小時),這個就是底層的磁盤文件,默認保留7天的數據,根據自己的需求來就行了【unclean.leader.election.enable】
默認是false,意思就是只能選舉ISR列表里的follower成為新的leader,1.0版本后才設為false,之前都是true,允許非ISR列表的follower選舉為新的leader
5. consumer消費原理
5.1 Offset管理
? 每個consumer內存里數據結構保存對每個topic的每個分區的消費offset,定期會提交offset,老版本是寫入zk,但是那樣高并發請求zk是不合理的架構設計,zk是做分布式系統的協調的,輕量級的元數據存儲,不能負責高并發讀寫,作為數據存儲。所以后來就是提交offset發送給內部topic:__consumer_offsets,提交過去的時候,key是group.id+topic+分區號,value就是當前offset的值,每隔一段時間,kafka內部會對這個topic進行compact。也就是每個group.id+topic+分區號就保留最新的那條數據即可。而且因為這個 __consumer_offsets可能會接收高并發的請求,所以默認分區50個,這樣如果你的kafka部署了一個大的集群,比如有50臺機器,就可以用50臺機器來抗offset提交的請求壓力,就好很多。
5.2 Coordinator 協調器
-
Coordinator的作用
每個consumer group都會選擇一個broker作為自己的coordinator,他是負責監控這個消費組里的各個消費者的心跳,以及判斷是否宕機,然后開啟rebalance.根據內部的一個選擇機制,會挑選一個對應的Broker,Kafka總會把你的各個消費組均勻分配給各個Broker作為coordinator來進行管理的.consumer group中的每個consumer剛剛啟動就會跟選舉出來的這個consumer group對應的coordinator所在的broker進行通信,然后由coordinator分配分區給你的這個consumer來進行消費。coordinator會盡可能均勻的分配分區給各個consumer來消費。
-
如何選擇哪臺是coordinator
-
consumer-roup-id.hashcode % 分區數 =》分區號,該分區的leader所在的broker就是這個consumer group的coordinator。
首先對消費組的groupId進行hash,接著對consumer_offsets的分區數量取模,默認是50,可以通過offsets.topic.num.partitions來設置,找到你的這個consumer group的offset要提交到consumer_offsets的哪個分區。比如說:groupId,"membership-consumer-group" -> hash值(數字)-> 對50取模 -> 就知道這個consumer group下的所有的消費者提交offset的時候是往哪個分區去提交offset,找到consumer_offsets的一個分區,consumer_offset的分區的副本數量默認來說1,只有一個leader,然后對這個分區找到對應的leader所在的broker,這個broker就是這個consumer group的coordinator了,consumer接著就會維護一個Socket連接跟這個Broker進行通信。
6. consumer消費者Rebalance策略
比如我們消費的一個topic主題有12個分區:p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
假設我們的消費者組里面有三個消費者。
6.1 range范圍策略
range策略就是按照partiton的序號范圍p0~3 consumer1p4~7 consumer2p8~11 consumer3
默認就是這個策略
6.2 round-robin輪循策略
consumer1: 0,3,6,9
consumer2: 1,4,7,10
consumer3: 2,5,8,11但是前面的這兩個方案有個問題:假設consuemr1掛了:p0-5分配給consumer2,p6-11分配給consumer3這樣的話,原本在consumer2上的的p6,p7分區就被分配到了 consumer3上
6.3 sticky黏性策略
最新的一個sticky策略,就是說盡可能保證在rebalance的時候,讓原本屬于這個consumer
的分區還是屬于他們,然后把多余的分區再均勻分配過去,這樣盡可能維持原來的分區分配的策略consumer1: 0-3
consumer2: 4-7
consumer3: 8-11 假設consumer3掛了
consumer1:0-3,+8,9
consumer2: 4-7,+10,11
7. consumer核心參數
【heartbeat.interval.ms】
默認值:3000
consumer心跳時間,必須得保持心跳才能知道consumer是否故障了,然后如果故障之后,就會通過心跳下發rebalance的指令給其他的consumer通知他們進行rebalance的操作【session.timeout.ms】
默認值:10000
kafka多長時間感知不到一個consumer就認為他故障了,默認是10秒【max.poll.interval.ms】
默認值:300000
如果在兩次poll操作之間,超過了這個時間,那么就會認為這個consume處理能力太弱了,會被踢出消費組,分區分配給別人去消費,一遍來說結合你自己的業務處理的性能來設置就可以了【fetch.max.bytes】
默認值:1048576
獲取一條消息最大的字節數,一般建議設置大一些【max.poll.records】
默認值:500條
一次poll返回消息的最大條數,【connections.max.idle.ms】
默認值:540000
consumer跟broker的socket連接如果空閑超過了一定的時間,此時就會自動回收連接,但是下次消費就要重新建立socket連接,這個建議設置為-1,不要去回收【auto.offset.reset】earliest當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 latest當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從當前位置開始消費nonetopic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
注:我們生產里面一般設置的是latest【enable.auto.commit】
默認值:true
設置為自動提交offset【auto.commit.interval.ms】
默認值:60 * 1000
每隔多久更新一下偏移量
官網查看kafka參數http://kafka.apache.org/10/documentation.html
8. 指定位置開始消費
1.從0開始消費
TopicPartition partition = new TopicPartition("order", 0);consumer.assign(Arrays.asList(partition));consumer.seekToBeginning(Arrays.asList(partition));while (true) {ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<Integer, String> record : poll) {System.out.println(record.key() + "-------" + record.value());}}
2.從指定位置開始消費
TopicPartition partition = new TopicPartition("order", 0);consumer.assign(Arrays.asList(partition));//從指定位置開始消費consumer.seek(partition, 5310);while (true) {ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<Integer, String> record : poll) {System.out.println(record.key() + "-------" + record.value());}}
9. 總結!!!
9.1、內核原理
ISR機制 :就是跟leader同步的follower的數量,默認再ISR列表中的follower才可以參與leader的選舉,【unclean.leader.election.enable】參數默認是false,修改為true,就可以讓不在ISR列表中的follower參與leader選舉
HW:Highwatermark,標定了一個偏移量值,該偏移量之前的消息對消費者可見,可以消費。
LEO:last end offset,日志末端偏移量,標記當前log文件中下一條待寫入的消息的offset。
HW<=LEO
更新機制
1、leader寫log后更新自己的LEO
2、follower請求從leader拉去數據時帶著自己LEO,leader根據這個跟更新這邊follower副本的LEO
3、比較leader和副本取最小LEO更新作為leader的HW
4、follower得到同步數據包含leader的HW,寫數據后更新自己的LEO,并得到的HW作為自己的HW更新
2、3、4會重復因為有多個follower會請求同步數據
9.2、生產者核心流程
生產者主線程經過消息序列化和分區,發送到一個緩沖區Record Accumulator(32M),這個緩沖區中會依照partition數量來構建多個隊列,并將消息封裝成一個個batch(16kb)【涉及一個參數linger.ms】,發送給sender線程,sender線程將消息封裝成一個request發送給kafka的channel,kafka做出響應。
9.3、生產者核心參數
吞吐量
buffer.memory 默認 32MB 緩沖區Record Accumulator
batch.size 默認 16kb 緩沖區中的隊列中的batch大小
linger.ms 默認是 0
假如設置為100ms,這樣的話就是說,這個消息被發送出去后進入一個batch,如果100毫秒內,這個batch大小達到【batch.size的值】16kB,自然就會發送出去。
但是如果100毫秒內,batch沒滿,那么也必須把消息發送出去了,不能讓消息的發送延遲時間太長,也避免給內存造成過大的一個壓力。
compression.type 默認是none ,可以指定gzip、snappy,用于批量處理,批量處理消息越多,壓縮性能越好。
ACK參數
ack=0 生產者發送消息不做確認,直接視為發送成功
ack=1 只確認leader寫入成功,視為發送成功
ack=all(或-1) 確認leader寫入成功以及ISR中的副本都寫入成功,才視為發送成功
retries參數只能在 ack=1和all的時候起作用
消息重試亂序:
max.in.flight.requests.per.connection 就是生產者給broker發送消息可以接受有多少條消息沒有響應,默認是5
消息重試是可能導致消息的亂序的,因為可能排在你后面的消息都發送出去了,你現在收到回調失敗了才在重試,此時消息就會亂序,所以可以使用“max.in.flight.requests.per.connection”參數設置為1,這樣可以保證producer同一時間只能發送一條消息
9.4、broker節點核心參數
【log.retention.hours】
可以設置一下,要保留數據多少個小時(默認168小時),這個就是底層的磁盤文件,默認保留7天的數據,根據自己的需求來就行了
【unclean.leader.election.enable】
默認是false,意思就是只能選舉ISR列表里的follower成為新的leader,1.0版本后才設為false,之前都是true,允許非ISR列表的follower選舉為新的leader
9.5、消費者offset偏移量管理
【auto.offset.reset】
【enable.auto.commit】
【auto.commit.interval.ms】
0.8前存在zk上,后面存在kafka自身的一個topic【consumer_offsets】里
一般不自動提交offset,
9.6、消費者Coordinator協調器
負責監控這個消費組里的各個消費者的心跳,以及判斷是否宕機,然后開啟rebalance.
consumer-roup-id.hashcode % 分區數 =》分區號,該分區的leader所在的broker就是這個consumer group的coordinator。
9.7、消費者負載均衡rebalance策略
范圍策略,連續平分
輪循策略
黏性策略,在負載均衡的基礎上如果有消費者宕機,保證其他消費者消費分區不變,將宕機的消費者的分區平分