Kafka之Producer原理

1. 生產者發送消息源碼分析

public class  SimpleProducer {public static void main(String[] args) {Properties pros=new Properties();pros.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
//        pros.put("bootstrap.servers","192.168.8.147:9092");pros.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");pros.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 發出去就確認 | 1 leader 落盤就確認| all(-1) 所有Follower同步完才確認pros.put("acks","1");// 異常自動重試次數pros.put("retries",3);// 多少條數據發送一次,默認16Kpros.put("batch.size",16384);// 批量發送的等待時間pros.put("linger.ms",5);// 客戶端緩沖區大小,默認32M,滿了也會觸發消息發送pros.put("buffer.memory",33554432);// 獲取元數據時生產者的阻塞時間,超時后拋出異常pros.put("max.block.ms",3000);// 創建Sender線程Producer<String,String> producer = new KafkaProducer<String,String>(pros);for (int i =0 ;i<1000000;i++) {producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));// System.out.println("發送:"+i);}//producer.send(new ProducerRecord<String,String>("mytopic","1","1"));//producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close();}

a. 首先我們是創建一些kafka的連接配置以及參數配置,然后先new出來一個生產者,創建一個sender線程,由下圖源碼可以看出,我們在new生產者的時候,kafak會幫我們船艦一個sender線程,并進行了命名和啟動

?b.隨后我們的main線程中,進行批量send發送,那么接下來我們看下send方法

可以看到,在send方法中,還有一個interceptors做了一個攔截器的處理,?那么攔截器應該怎么使用的呢?

我們只需要實現ProducerInterceptor中的onsend方法,并且在kafka send消息前進行配置就可以了

public class ChargingInterceptor implements ProducerInterceptor<String, String> {// 發送消息的時候觸發@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {System.out.println("1分錢1條消息,不管那么多反正先扣錢");return record;
}

帶有攔截器的kafka demo

 public static void main(String[] args) {Properties props=new Properties();props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");
//        props.put("bootstrap.servers","192.168.8.147:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 發出去就確認 | 1 leader 落盤就確認| all 所有Follower同步完才確認props.put("acks","1");// 異常自動重試次數props.put("retries",3);// 多少條數據發送一次,默認16Kprops.put("batch.size",16384);// 批量發送的等待時間props.put("linger.ms",5);// 客戶端緩沖區大小,默認32M,滿了也會觸發消息發送props.put("buffer.memory",33554432);// 獲取元數據時生產者的阻塞時間,超時后拋出異常props.put("max.block.ms",3000);// 添加攔截器List<String> interceptors = new ArrayList<>();interceptors.add("com.zsc.mq.kafka.javaapi.interceptor.ChargingInterceptor");// 這個鍵就是攔截器的配置,因為攔截器是個list,因此可以實現多個攔截器props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);Producer<String,String> producer = new KafkaProducer<String,String>(props);producer.send(new ProducerRecord<String,String>("mytopic","1","1"));producer.send(new ProducerRecord<String,String>("mytopic","2","2"));producer.close();}}

?c.? send方法走完攔截器后,我們進入到dosend方法中,接著看

?

可以看到,kafka對我們的消息進行了一個序列化,那么序列化方式就是在我們初始配置參數的時候進行配置的,可以指定不同的序列化方式,并且也可以自定義序列化方式,實現序列化接口,增加到配置類中即可

d. 看完序列化,我們的消息發送接著往下面走, 進入到分區器流程

?

?

?由上面可知,我們的分區器如果指定了分區就會走我們指定的分區;消息沒有指定分區但是自定義了消息分區器,就會走到消息分區器中,自定義消息器代碼如下(實現partitioer接口即可):

public class SimplePartitioner implements Partitioner {public SimplePartitioner() {}@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String k = (String) key;System.out.println(k);if (Integer.parseInt(k) % 2 == 0){return 0;}else{return 1;}// return Integer.parseInt(k)%2;}@Overridepublic void close() {}

?還有第三個情況,既沒有指定也沒有自定義分區器。那么key不為空,那就是走hash取模算法;key也會空的話,就是采用粘連策略(根據topic來確定在哪里存儲)

e. 當我們消息的分區器走完之后,就進入到我們的累加器,在上篇博客MQ之初識kafka-CSDN博客我們介紹組件的時候就提到過,kafka為了提升高吞吐,查詢效率快,消息并不是堆積在一起的,而是一批一批去放的,因此經過一個累加器。

可以看到 按批次添加到累加器中,那么添加到累加器之后,是怎么觸發流程的呢?

f . 順著源碼再往下看,可以看到一個判斷,當累加器的批次滿了的話或者是剛創建的批次,就會去喚醒sender線程,向Broker中發送消息。

生產者發送消息的整個流程圖如下所示:

?

2. ACK應答機制與ISR機制

2.1 服務器端響應策略的必要性

如圖所示,我們正常的執行流程是生產者producer向leader中發送消息,然后leader同步到兩個follower副本中,那么當發送消息的過程中服務異常的話,我們的leader就接收不到消息了,因此需要一個應答機制來保證我們能夠接收到消息,如果leader沒有接收到消息,就觸發重發機制,讓producer重新發送消息給leader

?2.2 ACK應答機制

kafka中提供了三種可靠性級別,可以根據對可靠性和延遲性的要求進行選擇

1.acks = 0? producer 不等待 broker動作、直接返回ack
2.acks = 1(默認)?producer等待broker 動作、 leader 落盤成功、返回 ack
3.acks=-1(all)? producer 等待 broker 動作、 leader&follower 全部落盤成功、返回 ack

?props.put("ack","0");? 不等待ACK

這種情況是說我們的producer發送消息給leader,leader異步返回ack給peoducer告訴消息已經發送成功了,這種正常存儲的情況下肯定是沒有問題的。但是如果還沒有同步副本的情況下,我們的leader此時掛掉了,而producer已經收到了應答,因此不會再重發消息。當再次重啟leader所在的服務器時,數據就丟失了

props.put("ack","1");? Leader落盤、返回ACK(默認)

這種下,我們peoducer確定了leader已經落盤了,但是如果極端情況下,leader還沒有同步副本給follower,那么此時leader服務器掛了,數據是不是也就丟失了,因為也還沒有進行備份

?props.put("ack","-1");? Leader和全部Follower落盤、返回ACK

這種情況是我們的producer等待leader和follower全部落盤成功后,進行ack響應,這種策略的可靠性最高,但是吞吐量是最低的,因此要根據具體業務具體配置。那么這種策略是不是就沒有什么問題了呢?當然也有,比如當leader和follower都落盤后,再返回應答信號時,leader掛了,那么peoducer沒有收到消息,就會任務leader沒有接收到消息,還是會對消息進行重發,那這個問題怎么解決呢? 可以用消息冪等性(在第三章進行贅述說明)

?應答異常

如上圖,當一個flower掛了的情況下,是不是我們的leader就沒法同步了,沒法同步,就會造成整個鏈路的阻塞,peoducer沒收到應答信息還啥也不知道,又往leader發消息,如果這樣持續下去,服務是不是就該崩了,因此引入了一個ISR機制。

2.3 ISR機制

?ISR是一組動態維護的同步副本集合,它的作用就是把leader和follower同時放到一個ISR隊列中,比如上面的P0_R0掛掉了,同步不積極,那么就把它移除ISR隊列,默認為30s,可以經過replica.lag.time.max.ms進行配置,當ISR中的隊列都同步完了的話,就返回ACK應答信號

AR = ISR+OSR

3. 消息冪等性

發送消息情形-1: 正常發送

發送消息情形-2:消息發送失敗,觸發消息重發,造成消息重發寫入

?

?發送消息情形-3:消息發送失敗,觸發消息重發,消息不重復寫入

如上圖所示,是怎么保證消息不被重復寫入的呢?利用冪等性,在發送消息的時候新增兩個參數PID與Sequence Number分別代表生產者ID和消息的編碼,那么Broker存儲的時候也會多加一點空間存儲這兩個值,當ack應答異常時,再次重發消息到隊列中時,就會進行一次判斷a.如果PID和sequence Number都相等,則消息寫入隊列失敗,b.如果Sequence Number為1 則順序寫入?c.如果Sequence Number為2,則拋出異常,表示數據有丟失

冪等性生產者發送消息流程總結:

1 Producer 端發送消息(消息本身、 PID Sequence Number
2 Broker 端接收到消息(將消息和 PID Sequence Number 一起保存)
3 、若 ACK 響應失敗,生產者重試,再次發送消息

?kafka是在Broker端完成的去重處理

4. Kafka生產者事務

生產者的冪等性只能保證在單分區單會話的場景下有效,因此對于多分區來說,kafka事務就提供了對多個分區寫入操作的原子性。但是kafka事務的前提是開啟冪等性。

kafka事務API的相關方法

initTransactions() 初始化事務
beginTransaction() 開啟事務
commitTransaction() 提交事務
abortTransaction() 中止事務
sendOffsetsToTransaction()

事務的一個demo

    public static void main(String[] args) {Properties props=new Properties();//props.put("bootstrap.servers","192.168.8.144:9092,192.168.8.145:9092,192.168.8.146:9092");props.put("bootstrap.servers","192.168.8.147:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 0 發出去就確認 | 1 leader 落盤就確認| all或-1 所有Follower同步完才確認props.put("acks","all");// 異常自動重試次數props.put("retries",3);// 多少條數據發送一次,默認16Kprops.put("batch.size",16384);// 批量發送的等待時間props.put("linger.ms",5);// 客戶端緩沖區大小,默認32M,滿了也會觸發消息發送props.put("buffer.memory",33554432);// 獲取元數據時生產者的阻塞時間,超時后拋出異常props.put("max.block.ms",3000);props.put("enable.idempotence",true);// 事務ID,唯一props.put("transactional.id", UUID.randomUUID().toString());Producer<String,String> producer = new KafkaProducer<String,String>(props);// 初始化事務producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
//            Integer i = 1/0;producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));// 提交事務producer.commitTransaction();} catch (KafkaException e) {// 中止事務producer.abortTransaction();}producer.close();}
}

kafka事務操作的基本流程

最后標記消費狀態后,就可以進行消費了

?kafka的事務細節流程:

5. 總結

? ? ? ? 本文主要是介紹了kafka生產者端的一些原理,先是從源碼出發,介紹了生產者發送消息到Broker經歷的一系列過程:先是創建了一個sender線程,然后在發送消息的過程中一次經過攔截器、累加器、分區器最后根據分區的批量消息是否新建或者滿了來觸發sender線程發送到Broker服務器中。隨后我們介紹了,peoducer跟broker服務器之間的交互采用的是應答機制,在這里有3種配置,可根據業務需要來具體配置,當配置-1的時候,我們分析了為什么會出現重發消息的問題,通過冪等性來保證,follower從節點掛了的情況下,應答異常,采用ISR隊列機制進行避免。但是冪等性只能保證單分區單會話的場景,而針對多分區的情況下,kafka主要是采用分布式事務來解決,利用分布式ID,事務coordinattor和事務日志分二PC提交,并且對事務的狀態進行存儲標記,當事務的狀態更改為可消費的時候,才會進行消費。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/23893.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/23893.shtml
英文地址,請注明出處:http://en.pswp.cn/web/23893.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

OceanBase v4.2 特性解析:Lateral Derived Table 優化查詢

前言 從傳統規則來看&#xff0c;內聯視圖通常不允許引用在同一FROM子句中前面定義的表的列。但從OceanBase 4.2.2版本開始&#xff0c;這一限制得到了突破&#xff0c;允許內聯視圖作為Lateral Derived Table來定義&#xff0c;從而允許此類引用。Lateral Derived Table的語法…

26-LINUX--I/O復用-select

一.I/O復用概述 /O復用使得多個程序能夠同時監聽多個文件描述符&#xff0c;對提高程序的性能有很大幫助。以下情況適用于I/O復用技術&#xff1a; ? TCP 服務器同時要處理監聽套接字和連接套接字。 ? 服務器要同時處理 TCP 請求和 UDP 請求。 ? 程序要同時處理多個套接…

Facebook廣告素材如何測試?手把手教你!

廣告素材對Facebook廣告效果的影響是很大的&#xff0c;用對了素材&#xff0c;Facebook廣告的價值就越高。廣告主們通常會先通過廣告測試&#xff0c;根據數據反饋來挑選素材。今天就手把手教你做Facebook素材測試的技巧&#xff0c;讓你更有靈感和思路&#xff01; 創意測試 …

Hudi CLI 安裝配置總結

前言 上篇文章 總結了Spark SQL Rollback, Hudi CLI 也能實現 Rollback,本文總結下 Hudi CLI 安裝配置以及遇到的問題。 官方文檔 https://hudi.apache.org/cn/docs/cli/ 版本 Hudi 0.13.0(發現有bug)、(然后升級)0.14.1Spark 3.2.3打包 mvn clean package -DskipTes…

使用 Django 構建動態網頁

文章目錄 創建 Django 項目和應用程序創建 HTML 模板創建視圖函數配置 URL 路由運行 Django 服務器使用 Django 模板語言 Django 是一個流行的 Python Web 框架&#xff0c;它能夠幫助開發人員快速構建強大的 Web 應用程序。在 Django 中&#xff0c;HTML 是用于呈現網頁內容的…

Spring Boot 復習

2 3 5&#xff08;不考&#xff09; 9 (1)RestController 注解是一個組合注解&#xff0c;等同于Controller 和ResponseBody 兩個注解結合使用的效果。主要作用是將當前類作為控制層的組件添加到 Spring 容器中&#xff0c;同時該類的方法無法返回 JSP 頁面&#xff0c;而且…

Flutter 中的 RenderObjectToWidgetAdapter 小部件:全面指南

Flutter 中的 RenderObjectToWidgetAdapter 小部件&#xff1a;全面指南 Flutter 是一個功能強大的 UI 框架&#xff0c;由 Google 開發&#xff0c;允許開發者使用 Dart 語言構建跨平臺的移動、Web 和桌面應用。在 Flutter 的渲染體系中&#xff0c;RenderObjectToWidgetAdap…

MyBatis面試題系列三

1、#{}和${}的區別是什么&#xff1f; #{}是預編譯處理&#xff0c;${}是字符串替換。 Mybatis 在處理#{}時&#xff0c;會將 sql 中的#{}替換為?號&#xff0c;調用 PreparedStatement 的 set 方法來賦值&#xff1b; Mybatis 在處理${}時&#xff0c;就是把${}替換成變量的值…

SpringBoot項目啟動時“jar中沒有主清單屬性”異常

資料參考 Spring Boot 啟動時 “jar中沒有主清單屬性” 異常 - spring 中文網 (springdoc.cn) 實際解決 更詳細的參考以上&#xff0c;我這邊的話只需要在 pom文件 中加上 spring-boot-maven-plugin 插件就能解決該異常&#xff0c;具體如下&#xff1a; <build><p…

1. 計算機系統概述

1. 計算機系統概述 文章目錄 1. 計算機系統概述1.1 計算機的發展硬件的發展軟件的發展 1.2.1 計算機硬件的基本組成早期馮諾依曼的結構現代計算機的結構 1.2.2 各個硬件的工作原理主存儲器運算器控制器計算機工作過程 1.2.3 計算機系統的多級層次結構1.3 計算機的性能指標存儲器…

GD32如何配置中斷優先級分組以及中斷優先級

使用GD32 MCU的過程中&#xff0c;大家可能會有以下疑問&#xff1a;中斷優先級如何配置和使用&#xff1f; 本文將會為大家解析中斷優先級分組以及中斷優先級的配置使用&#xff1a; 中斷優先級分組配置 一個GD32 MCU系統需要大家明確系統中使用的中斷優先級分組&#xff0…

代駕公司在市場競爭中如何保持優勢?

在競爭激烈的市場中&#xff0c;代駕公司可以通過多種策略保持其競爭優勢&#xff0c;包括利用市場潛力、創新服務模式、提高服務效率以及加強品牌建設等。以下是具體的策略&#xff1a; 利用市場潛力 汽車產業空間巨大&#xff1a;隨著汽車保有量的增加&#xff0c;代駕行業…

掃地機器人:卷價格,不如卷技術

掃地機器人內卷的終點是技術和價值&#xff0c;價格只是附屬品。 一路上漲的價格&#xff0c;一路下跌的銷量 從價格飆升&#xff0c;到重新卷回價格&#xff0c;尷尬的背后是掃地機器人在骨感現實下的無奈抉擇。 根據數據顯示&#xff0c;2020中國掃地機器人線上市場零售均價…

通過可識別性和深度學習重建大腦功能網絡

摘要 本研究提出了一種新的方法來重建代表大腦動力學的功能網絡&#xff0c;該方法基于兩個腦區在同一認知任務中的共同參與會導致其可識別性或其動力學特性降低的觀點。這種可識別性是通過深度學習模型在監督分類任務中獲得的分數來估計的&#xff0c;因此不需要對這種協同參…

零、測試開發前置知識

文章目錄 1、什么是冒煙測試、回歸測試&#xff1f;2、設計測試用例的方法有哪些&#xff1f;3、對于404或500&#xff0c;你會如何分析定位&#xff1f;4、什么是敏捷開發&#xff1f;敏捷開發流程是怎么樣的&#xff1f;5、做接口測試過程中&#xff0c;下游接口需要上游數據…

Flink端到端的精確一次(Exactly-Once)

目錄 狀態一致性 端到端的狀態一致性 端到端精確一次&#xff08;End-To-End Exactly-Once&#xff09; Flink內部的Exactly-Once 輸入端保證 輸出端保證 冪等寫入 事務寫入 Flink和Kafka連接時的精確一次保證 整體介紹 需要的配置 案例 狀態一致性 流式計算本身就…

Java工作學習筆記

1、ConfigurationProperties注解是什么意思&#xff1f; ConfigurationProperties 可以將屬性文件與一個Java類綁定&#xff0c;將屬性文件中的變量值注入到該Java類的成員變量中 示例代碼&#xff1a; /*** SSP配置** author mua*/ Component Data ConfigurationProperties…

如何提高接口響應速度

在非大數據&#xff08;幾萬以上記錄&#xff09;的情況下&#xff0c;影響接口響應速度的因素中最大的是查詢數據庫的次數&#xff0c;其次才是數組遍歷和簡單數據處理&#xff08;如根據已有字段增加新的屬性&#xff0c;或計算值&#xff09;。 一般一次數據庫查詢需要50毫秒…

Java Web應用,IPv6問題解決

在Java Web程序中&#xff0c;如果使用Tomcat并遇到了IPv6相關的問題&#xff0c;可以通過以下幾種方式來解決&#xff1a; 1. 配置Tomcat以使用IPv4 默認情況下&#xff0c;Java可能會優先使用IPv6。如果你希望Tomcat使用IPv4&#xff0c;最簡單的方法是通過設置系統屬性來強…

無線麥克風哪個牌子性價比高?一文告訴你無線領夾麥克風怎么挑選

?當我們談論到演講、表演或者錄制視頻時&#xff0c;一個高質量的無線麥克風能夠使得整個體驗提升至一個全新的水平。它不僅能夠保證聲音的清晰度和真實度&#xff0c;還能夠讓使用者在演講或者表演時更加自信和舒適。基于對市場的深入研究和用戶體驗的考量&#xff0c;我挑選…