1. 生產者發送消息源碼分析
public class SimpleProducer {public static void main(String[] args) {Properties pros=new Properties();pros.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
// pros.put("bootstrap.servers","192.168.8.147:9092");pros.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");pros.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 發出去就確認 | 1 leader 落盤就確認| all(-1) 所有Follower同步完才確認pros.put("acks","1");// 異常自動重試次數pros.put("retries",3);// 多少條數據發送一次,默認16Kpros.put("batch.size",16384);// 批量發送的等待時間pros.put("linger.ms",5);// 客戶端緩沖區大小,默認32M,滿了也會觸發消息發送pros.put("buffer.memory",33554432);// 獲取元數據時生產者的阻塞時間,超時后拋出異常pros.put("max.block.ms",3000);// 創建Sender線程Producer<String,String> producer = new KafkaProducer<String,String>(pros);for (int i =0 ;i<1000000;i++) {producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));// System.out.println("發送:"+i);}//producer.send(new ProducerRecord<String,String>("mytopic","1","1"));//producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close();}
a. 首先我們是創建一些kafka的連接配置以及參數配置,然后先new出來一個生產者,創建一個sender線程,由下圖源碼可以看出,我們在new生產者的時候,kafak會幫我們船艦一個sender線程,并進行了命名和啟動
?b.隨后我們的main線程中,進行批量send發送,那么接下來我們看下send方法
可以看到,在send方法中,還有一個interceptors做了一個攔截器的處理,?那么攔截器應該怎么使用的呢?
我們只需要實現ProducerInterceptor中的onsend方法,并且在kafka send消息前進行配置就可以了
public class ChargingInterceptor implements ProducerInterceptor<String, String> {// 發送消息的時候觸發@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {System.out.println("1分錢1條消息,不管那么多反正先扣錢");return record;
}
帶有攔截器的kafka demo
public static void main(String[] args) {Properties props=new Properties();props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
// props.put("bootstrap.servers","192.168.8.147:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 發出去就確認 | 1 leader 落盤就確認| all 所有Follower同步完才確認props.put("acks","1");// 異常自動重試次數props.put("retries",3);// 多少條數據發送一次,默認16Kprops.put("batch.size",16384);// 批量發送的等待時間props.put("linger.ms",5);// 客戶端緩沖區大小,默認32M,滿了也會觸發消息發送props.put("buffer.memory",33554432);// 獲取元數據時生產者的阻塞時間,超時后拋出異常props.put("max.block.ms",3000);// 添加攔截器List<String> interceptors = new ArrayList<>();interceptors.add("com.zsc.mq.kafka.javaapi.interceptor.ChargingInterceptor");// 這個鍵就是攔截器的配置,因為攔截器是個list,因此可以實現多個攔截器props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);Producer<String,String> producer = new KafkaProducer<String,String>(props);producer.send(new ProducerRecord<String,String>("mytopic","1","1"));producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close();}}
?c.? send方法走完攔截器后,我們進入到dosend方法中,接著看
?
可以看到,kafka對我們的消息進行了一個序列化,那么序列化方式就是在我們初始配置參數的時候進行配置的,可以指定不同的序列化方式,并且也可以自定義序列化方式,實現序列化接口,增加到配置類中即可
d. 看完序列化,我們的消息發送接著往下面走, 進入到分區器流程
?
?
?由上面可知,我們的分區器如果指定了分區就會走我們指定的分區;消息沒有指定分區但是自定義了消息分區器,就會走到消息分區器中,自定義消息器代碼如下(實現partitioer接口即可):
public class SimplePartitioner implements Partitioner {public SimplePartitioner() {}@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String k = (String) key;System.out.println(k);if (Integer.parseInt(k) % 2 == 0){return 0;}else{return 1;}// return Integer.parseInt(k)%2;}@Overridepublic void close() {}
?還有第三個情況,既沒有指定也沒有自定義分區器。那么key不為空,那就是走hash取模算法;key也會空的話,就是采用粘連策略(根據topic來確定在哪里存儲)
e. 當我們消息的分區器走完之后,就進入到我們的累加器,在上篇博客MQ之初識kafka-CSDN博客我們介紹組件的時候就提到過,kafka為了提升高吞吐,查詢效率快,消息并不是堆積在一起的,而是一批一批去放的,因此經過一個累加器。
可以看到 按批次添加到累加器中,那么添加到累加器之后,是怎么觸發流程的呢?
f . 順著源碼再往下看,可以看到一個判斷,當累加器的批次滿了的話或者是剛創建的批次,就會去喚醒sender線程,向Broker中發送消息。
生產者發送消息的整個流程圖如下所示:
?
2. ACK應答機制與ISR機制
2.1 服務器端響應策略的必要性
如圖所示,我們正常的執行流程是生產者producer向leader中發送消息,然后leader同步到兩個follower副本中,那么當發送消息的過程中服務異常的話,我們的leader就接收不到消息了,因此需要一個應答機制來保證我們能夠接收到消息,如果leader沒有接收到消息,就觸發重發機制,讓producer重新發送消息給leader
?2.2 ACK應答機制
kafka中提供了三種可靠性級別,可以根據對可靠性和延遲性的要求進行選擇
1.acks = 0? producer 不等待 broker動作、直接返回ack2.acks = 1(默認)?producer等待broker 動作、 leader 落盤成功、返回 ack3.acks=-1(all)? producer 等待 broker 動作、 leader&follower 全部落盤成功、返回 ack
?props.put("ack","0");? 不等待ACK
這種情況是說我們的producer發送消息給leader,leader異步返回ack給peoducer告訴消息已經發送成功了,這種正常存儲的情況下肯定是沒有問題的。但是如果還沒有同步副本的情況下,我們的leader此時掛掉了,而producer已經收到了應答,因此不會再重發消息。當再次重啟leader所在的服務器時,數據就丟失了
props.put("ack","1");? Leader落盤、返回ACK(默認)
這種下,我們peoducer確定了leader已經落盤了,但是如果極端情況下,leader還沒有同步副本給follower,那么此時leader服務器掛了,數據是不是也就丟失了,因為也還沒有進行備份
?props.put("ack","-1");? Leader和全部Follower落盤、返回ACK
這種情況是我們的producer等待leader和follower全部落盤成功后,進行ack響應,這種策略的可靠性最高,但是吞吐量是最低的,因此要根據具體業務具體配置。那么這種策略是不是就沒有什么問題了呢?當然也有,比如當leader和follower都落盤后,再返回應答信號時,leader掛了,那么peoducer沒有收到消息,就會任務leader沒有接收到消息,還是會對消息進行重發,那這個問題怎么解決呢? 可以用消息冪等性(在第三章進行贅述說明)
?應答異常
如上圖,當一個flower掛了的情況下,是不是我們的leader就沒法同步了,沒法同步,就會造成整個鏈路的阻塞,peoducer沒收到應答信息還啥也不知道,又往leader發消息,如果這樣持續下去,服務是不是就該崩了,因此引入了一個ISR機制。
2.3 ISR機制
?ISR是一組動態維護的同步副本集合,它的作用就是把leader和follower同時放到一個ISR隊列中,比如上面的P0_R0掛掉了,同步不積極,那么就把它移除ISR隊列,默認為30s,可以經過replica.lag.time.max.ms進行配置,當ISR中的隊列都同步完了的話,就返回ACK應答信號
AR = ISR+OSR
3. 消息冪等性
發送消息情形-1: 正常發送
發送消息情形-2:消息發送失敗,觸發消息重發,造成消息重發寫入
?
?發送消息情形-3:消息發送失敗,觸發消息重發,消息不重復寫入
如上圖所示,是怎么保證消息不被重復寫入的呢?利用冪等性,在發送消息的時候新增兩個參數PID與Sequence Number分別代表生產者ID和消息的編碼,那么Broker存儲的時候也會多加一點空間存儲這兩個值,當ack應答異常時,再次重發消息到隊列中時,就會進行一次判斷a.如果PID和sequence Number都相等,則消息寫入隊列失敗,b.如果Sequence Number為1 則順序寫入?c.如果Sequence Number為2,則拋出異常,表示數據有丟失
冪等性生產者發送消息流程總結:
1 、 Producer 端發送消息(消息本身、 PID 、 Sequence Number )2 、 Broker 端接收到消息(將消息和 PID 、 Sequence Number 一起保存)3 、若 ACK 響應失敗,生產者重試,再次發送消息
?kafka是在Broker端完成的去重處理
4. Kafka生產者事務
生產者的冪等性只能保證在單分區單會話的場景下有效,因此對于多分區來說,kafka事務就提供了對多個分區寫入操作的原子性。但是kafka事務的前提是開啟冪等性。
kafka事務API的相關方法
initTransactions() 初始化事務beginTransaction() 開啟事務commitTransaction() 提交事務abortTransaction() 中止事務sendOffsetsToTransaction()
事務的一個demo
public static void main(String[] args) {Properties props=new Properties();//props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");props.put("bootstrap.servers","192.168.8.147:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 發出去就確認 | 1 leader 落盤就確認| all或-1 所有Follower同步完才確認props.put("acks","all");// 異常自動重試次數props.put("retries",3);// 多少條數據發送一次,默認16Kprops.put("batch.size",16384);// 批量發送的等待時間props.put("linger.ms",5);// 客戶端緩沖區大小,默認32M,滿了也會觸發消息發送props.put("buffer.memory",33554432);// 獲取元數據時生產者的阻塞時間,超時后拋出異常props.put("max.block.ms",3000);props.put("enable.idempotence",true);// 事務ID,唯一props.put("transactional.id", UUID.randomUUID().toString());Producer<String,String> producer = new KafkaProducer<String,String>(props);// 初始化事務producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
// Integer i = 1/0;producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));// 提交事務producer.commitTransaction();} catch (KafkaException e) {// 中止事務producer.abortTransaction();}producer.close();}
}
kafka事務操作的基本流程
最后標記消費狀態后,就可以進行消費了
?kafka的事務細節流程:
5. 總結
? ? ? ? 本文主要是介紹了kafka生產者端的一些原理,先是從源碼出發,介紹了生產者發送消息到Broker經歷的一系列過程:先是創建了一個sender線程,然后在發送消息的過程中一次經過攔截器、累加器、分區器最后根據分區的批量消息是否新建或者滿了來觸發sender線程發送到Broker服務器中。隨后我們介紹了,peoducer跟broker服務器之間的交互采用的是應答機制,在這里有3種配置,可根據業務需要來具體配置,當配置-1的時候,我們分析了為什么會出現重發消息的問題,通過冪等性來保證,follower從節點掛了的情況下,應答異常,采用ISR隊列機制進行避免。但是冪等性只能保證單分區單會話的場景,而針對多分區的情況下,kafka主要是采用分布式事務來解決,利用分布式ID,事務coordinattor和事務日志分二PC提交,并且對事務的狀態進行存儲標記,當事務的狀態更改為可消費的時候,才會進行消費。