??將外部傳送給過來的數據發送到kafka集群。
1 發送原理
(1)創建main()線程,創建producer對象,調用send方法,經過攔截器(可選)、序列化器、分區器。
(2)分區器將數據發送到分區中,每個分區創建一個隊列(分區是在內存中完成的),內存總大小為32M,每個批次的大小為16K。
(3)sender線程將緩沖隊列中的數據讀取出來發往Kafka集群,根據batch.size和linger.ms拉取數據(即每批次的數據滿了之后或者設置的時間到了之后拉取數據)。
(4)sender線程拉取數據,以每個節點為一組,當第一個請求數據發送到broker1中,broker沒有及時應答,還是能發送第二個請求,最多有5個請求都沒有收到應答就不會再繼續發送請求。
(5)selector打通輸入流和輸出流。
(6)鏈路接通后發送數據。
(7)Kafka集群收到數據后根據副本機制進行副本同步。
(8)Kafka集群收到數據后根據應答機制進行應答。
(9)selector根據Kafka集群反饋的消息進行判斷。
(10)如果成功則刪掉該請求同時在緩沖隊列里清理掉對應的每一個分區的數據;如果失敗則進行重試,重新發送請求,知道成功為止。
2 生產者重要參數列表
3 異步發送API
3.1 普通異步發送
(1)需求:創建Kafka 生產者,采用異步的方式發送到 Kafka Broker
(2)分析:異步發送即將外部的數據發送到緩沖隊列里(不管緩沖隊列中的數據有沒有發送到Kafka集群)。
步驟:
(1)創建kafka工程,在pom.xml中導入依賴:
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>
(2)創建類:com.astudy.kafka.producer.CustomProducer
package com.study.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {//0.創建 kafka 生產者的配置對象Properties properties = new Properties();//給 kafka 配置對象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必須):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.創建 kafka 生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.調用 send 方法,發送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first","test"+i));}//3.關閉資源kafkaProducer.close();}
}
(3)測試:
在hadoop102上開啟Kafka消費者:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first
在 IDEA 中執行代碼,觀察 hadoop102 控制臺中是否接收到消息:
3.2 帶回調函數的異步發送
分析:
??回調函數會在 producer 收到 ack 時調用,為異步調用,該方法有兩個參數,分別是元數據信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說明消息發送成功,如果Exception 不為 null,說明消息發送失敗。
package com.study.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//0.創建 kafka 生產者的配置對象Properties properties = new Properties();//給 kafka 配置對象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必須):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.創建 kafka 生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.調用 send 方法,發送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first", "test" + i), new Callback() {// 該方法在 Producer 收到 ack 時調用,為異步調用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {// 沒有異常,輸出信息到控制臺System.out.println("topic:" + recordMetadata.topic() + " partition:" + recordMetadata.partition());}else {// 出現異常打印e.printStackTrace();}}});// 延遲一會會看到數據發往不同分區Thread.sleep(2);}//3.關閉資源kafkaProducer.close();}
}
測試:
在hadoop102上開啟Kafka消費者:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first
在 IDEA 中執行代碼,觀察 hadoop102 控制臺中是否接收到消息:
4 同步發送API
分析:只需在異步發送的基礎上,再調用一下 get()方法即可。
package com.study.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {//0.創建 kafka 生產者的配置對象Properties properties = new Properties();//給 kafka 配置對象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");// key,value 序列化(必須):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//1.創建 kafka 生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//2.調用 send 方法,發送消息for (int i = 0; i < 3; i++) {kafkaProducer.send(new ProducerRecord<>("first","test"+i)).get();}//3.關閉資源kafkaProducer.close();}
}
測試:
在hadoop102上開啟Kafka消費者:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first
在 IDEA 中執行代碼,觀察 hadoop102 控制臺中是否接收到消息: