文章目錄
- 1. Kafka 生產者
- 2. kafaka 命令行操作
- 3. kafka 生產者發送消息流程
- 4. Kafka 生產者的創建
- 5. Kafka 生產者發送消息
- 1. 發送即忘記
- 2. 同步發送
- 3. 異步發送
- 6. Kafka 消息對象 ProducerRecord
1. Kafka 生產者
不管是把Kafka作為消息隊列、消息總線還是數據存儲平臺,總是需要一個可以往Kafka寫入數據的生產者、一個可以從Kafka讀取數據的消費者,或者一個兼具兩種角色的應用程序。
Kafka 生產者是指使用 Apache Kafka 消息系統的應用程序,它們負責將消息發送到 Kafka 集群中的一個或多個主題(topic)。生產者可以將消息發送到指定的主題,也可以根據分區策略將消息發送到多個分區中。生產者可以以異步或同步方式發送消息,并且可以配置消息的可靠性和持久性等屬性。在 Kafka 中,生產者是消息的源頭,它們將消息發送到 Kafka 集群中,供消費者消費。
2. kafaka 命令行操作
① 啟動 Zookeeper 集群:
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-01/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-02/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-03/bin
[root@master01 bin]# ./zkServer.sh start
② 啟動 kafka 集群:
[root@master01 kafka01]# pwd
/root/ch/soft/kafka/kafka01
[root@master01 kafka01]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka02]# pwd
/root/ch/soft/kafka/kafka02
[root@master01 kafka02]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka03]# pwd
/root/ch/soft/kafka/kafka03
[root@master01 kafka03]# bin/kafka-server-start.sh config/server.properties
③ 創建主題 test:
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2 --topic test
Created topic test.
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0
④ 生產者發送消息到主題test:
[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic test
>hello
>你好,kafka!
>
⑤ 消費者消費主題test的消息:
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
hello
你好,kafka!
3. kafka 生產者發送消息流程
先從創建一個ProducerRecord對象開始,其中需要包含目標主題和要發送的內容。另外,還可以指定鍵、分區、時間戳或標頭。在發送ProducerRecord對象時,生產者需要先把鍵和值對象序列化成字節數組,這樣才能在網絡上傳輸。
接下來,如果沒有顯式地指定分區,那么數據將被傳給分區器。分區器通常會基于ProducerRecord對象的鍵選擇一個分區。選好分區以后,生產者就知道該往哪個主題和分區發送這條消息了。緊接著,該消息會被添加到一個消息批次里,這個批次里的所有消息都將被發送給同一個主題和分區。有一個獨立的線程負責把這些消息批次發送給目標broker。
broker在收到這些消息時會返回一個響應。如果消息寫入成功,就返回一個RecordMetaData對象,其中包含了主題和分區信息,以及消息在分區中的偏移量。如果消息寫入失敗,則會返回一個錯誤。生產者在收到錯誤之后會嘗試重新發送消息,重試幾次之后如果還是失敗,則會放棄重試,并返回錯誤信息。
4. Kafka 生產者的創建
要向Kafka寫入消息,首先需要創建一個生產者對象,并設置一些屬性。Kafka生產者有3個必須設置的屬性。
① bootstrap.servers
broker的地址。可以由多個host:port組成,生產者用它們來建立初始的Kafka集群連接。它不需要包含所有的broker地址,因為生產者在建立初始連接之后可以從給定的broker那里找到其他broker的信息。不過還是建議至少提供兩個broker地址,因為一旦其中一個停機,則生產者仍然可以連接到集群。
② key.serializer
一個類名,用來序列化消息的鍵。broker 希望接收到的消息的鍵和值都是字節數組。生產者可以把任意Java對象作為鍵和值發送給broker,但它需要知道如何把這些Java對象轉換成字節數組。key.serializer 必須被設置為一個實現了 org.apache.kafka.common.serialization.Serializer 接口的類,生產者會用這個類把鍵序列化成字節數組。Kafka客戶端默認提供了ByteArraySerializer、StringSerializer和IntegerSerializer等,如果你只使用常見的幾種Java對象類型,就沒有必要實現自己的序列化器。
需要注意的是,必須設置key.serializer這個屬性,盡管你可能只需要將值發送給Kafka。如果只需要發送值,則可以將Void作為鍵的類型,然后將這個屬性設置為VoidSerializer。
③ value.serializer
一個類名,用來序列化消息的值。與設置key.serializer屬性一樣,需要將value.serializer設置成可以序列化消息值對象的類。
public class CustomProducer01 {public static void main(String[] args) {// kafka生產者屬性配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 創建kafka生產者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);}
}
5. Kafka 生產者發送消息
實例化好生產者對象后,接下來就可以開始發送消息了。KafkaProducer 的 send() 方法用于向 Kafka 集群發送消息。該方法的語法如下:
public interface Producer<K, V> extends Closeable {Future<RecordMetadata> send(ProducerRecord<K, V> record);Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}
其中,ProducerRecord<K, V> 表示要發送的消息記錄,K 和 V 分別表示鍵和值的類型。send() 方法返回一個 Future 對象,表示異步發送消息的結果。
發送消息主要有以下3種方式:
① 發送并忘記
把消息發送給服務器,但并不關心它是否成功送達。大多數情況下,消息可以成功送達,因為Kafka是高可用的,而且生產者有自動嘗試重發的機制。但是,如果發生了不可重試的錯誤或超時,那么消息將會丟失,應用程序將不會收到任何信息或異常。
② 同步發送
一般來說,生產者是異步的——我們調用send()方法發送消息,它會返回一個Future對象。可以調用get()方法等待Future完成,這樣就可以在發送下一條消息之前知道當前消息是否發送成功。
③ 異步發送
調用send()方法,并指定一個回調函數,當服務器返回響應時,這個函數會被觸發。
1. 發送即忘記
發送即忘記,生產者發送消息后不會等待服務器的響應,直接發送下一條消息。它只管往Kafka中發送消息而并不關心消息是否正確到達。在大多數情況下,這種發送方式沒有什么問題,不過在某些時候(比如發生不可重試異常時)會造成消息的丟失。這種發送方式的性能最高,可靠性也最差。
public class CustomProducer01 {private static final String brokerList "10.65.132.2:9093";private static final String topic = "test";public static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生產者屬性配置Properties properties = initConfig();// kafka生產者發送消息,默認是異步發送方式KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka!");try{// 發送消息kafkaProducer.send(producerRecord);}catch (Exception e){e.printStackTrace();}// 關閉資源kafkaProducer.close();}
}
cmd命令行窗口開啟 kafka 消息者,觀察消費者是否接收到消息:
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
2. 同步發送
同步發送消息很簡單,當Kafka返回錯誤或重試次數達到上限時,生產者可以捕獲到異常。這里需要考慮性能問題。根據Kafka集群繁忙程度的不同,broker可能需要2毫秒或更長的時間來響應請求。如果采用同步發送方式,那么發送線程在這段時間內就只能等待,什么也不做,甚至都不發送其他消息,這將導致糟糕的性能。因此,同步發送方式通常不會被用在生產環境中(但會經常被用在示例代碼中)。
send() 方法本身就是異步的,send() 方法返回的Future對象可以使調用方稍后獲得發送的結果。在執行send() 方法之后可以調用 get() 方法來阻塞等待Kafka的響應,直到消息發送成功,或者發生異常。如果發生異常,那么就需要捕獲異常并交由外層邏輯處理。
Future 接口源碼:
public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Future接口是Java中用于表示異步計算結果的接口。它定義了一些方法,用于查詢異步計算是否完成、獲取計算結果等操作。
- cancel方法用于取消異步計算;
- isCancelled方法用于判斷異步計算是否已經被取消;
- isDone方法用于判斷異步計算是否已經完成。
- get方法用于獲取異步計算的結果,如果計算還沒有完成,則該方法會阻塞直到計算完成。如果計算被取消,則該方法會拋出CancellationException異常。如果計算拋出異常,則該方法會拋出ExecutionException異常。
- get(long timeout, TimeUnit unit)方法與get方法類似,但是它會在指定的時間內等待計算完成,如果超時則會拋出TimeoutException異常。
Future 表示一個任務的生命周期,并提供了相應的方法來判斷任務是否已經完成或取消,以及獲取任務的結果和取消任務等。既然KafkaProducer.send() 方法的返回值是一個Future類型的對象,那么完全可以用Java語言層面的技巧來豐富應用的實現,比如使用Future中的 get(long timeout,TimeUnit unit)方法實現可超時的阻塞。
public class CustomProducer01 {private static final String brokerList = "10.65.132.2:9093";private static final String topic = "test";public static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生產者屬性配置Properties properties = initConfig();// kafka生產者發送消息,默認是異步發送方式KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,同步發送!");try{// 發送消息Future<RecordMetadata> future = kafkaProducer.send(producerRecord);// 獲取異步計算的結果,如果計算還沒有完成,則該方法會阻塞直到計算完成RecordMetadata recordMetadata = future.get();System.out.println("metadata.topic() = " + recordMetadata.topic());}catch (Exception e){e.printStackTrace();}// 關閉資源kafkaProducer.close();}
}
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步發送!
調用Future.get()方法等待Kafka響應。如果消息沒有發送成功,那么這個方法將拋出一個異常。如果沒有發生錯誤,那么我們將得到一個RecordMetadata對象,并能從中獲取消息的偏移量和其他元數據。
KafkaProducer一般會出現兩種錯誤。一種是可重試錯誤,這種錯誤可以通過重發消息來解決。例如,對于連接錯誤,只要再次建立連接就可以解決。對于“not leader for partition”(非分區首領)錯誤,只要重新為分區選舉首領就可以解決,此時元數據也會被刷新。可以通過配置啟用KafkaProducer的自動重試機制。如果在多次重試后仍無法解決問題,則應用程序會收到重試異常。另一種錯誤則無法通過重試解決,比如“Message size too large”(消息太大)。對于這種錯誤,KafkaProducer不會進行任何重試,而會立即拋出異常。
3. 異步發送
假設一條消息在應用程序和Kafka集群之間往返需要10毫秒。如果在發送完每條消息后都需要等待響應,那么發送100條消息將需要1秒。如果只發送消息但不需要等待響應,那么發送100條消息所需要的時間就會少很多。大多數時候,并不需要等待響應——盡管Kafka會把消息的目標主題、分區信息和偏移量返回給客戶端,但對客戶端應用程序來說可能不是必需的。不過,當消息發送失敗,需要拋出異常、記錄錯誤日志或者把消息寫入“錯誤消息”文件以便日后分析診斷時,就需要用到這些信息了。為了能夠在異步發送消息時處理異常情況,生產者提供了回調機制。
生產者發送消息后不會等待服務器的響應,而是通過回調函數來處理服務器的響應。回調函數會在 producer 收到 ack 時調用,該方法有兩個參數,分別是元數據信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說明消息發送成功,如果 Exception 不為 null,說明消息發送失敗。
注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
public class CustomProducer01 {private static final String brokerList = "10.65.132.2:9093";private static final String topic = "test";public static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生產者屬性配置Properties properties = initConfig();// kafka生產者發送消息,默認是異步發送方式KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,異步發送帶返回值!");try{// 發送消息kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {// 說明消息發送成功if(e==null){System.out.println("metadata.topic() = " + recordMetadata.topic());System.out.println("metadata.partition() = " + recordMetadata.partition());}}});}catch (Exception e){e.printStackTrace();}// 關閉資源kafkaProducer.close();}
}
[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步發送!
你好,kafka,異步發送帶回調函數!
Kafka生產者異步發送消息時,可以通過指定回調函數來處理發送結果。當消息發送完成后,回調函數會被調用,以通知應用程序消息發送的結果。具體來說,當生產者成功發送消息時,回調函數會被傳遞一個RecordMetadata對象,該對象包含了發送消息的相關信息,如消息所在的分區、消息在分區中的偏移量等。如果發送消息失敗,則回調函數會被傳遞一個非空的Exception對象,以指示發送失敗的原因。
注意:回調的執行將在生產者主線程中進行,如果有兩條消息被發送給同一個分區,則這可以保證它們的回調是按照發送的順序執行的。這就要求回調的執行要快,避免生產者出現延遲或影響其他消息的發送。不建議在回調中執行阻塞操作,阻塞操作應該被放在其他線程中執行。
6. Kafka 消息對象 ProducerRecord
① ProducerRecord 成員變量:
public class ProducerRecord<K, V> {// 消息要發送到的主題private final String topic;// 消息要發送到的分區號,如果為null,則由Kafka自動選擇分區private final Integer partition;// 消息的鍵private final K key;// 消息的值private final V value;// 消息的時間戳,如果為null,則使用當前時間戳private final Long timestamp;// 消息的頭部信息private final Headers headers;// .....
}
- topic和partition字段分別代表消息要發往的主題和分區號。
- key是用來指定消息的鍵,它不僅是消息的附加信息,還可以用來計算分區號進而可以讓消息發往特定的分區。前面提及消息以主題為單位進行歸類,而這個key可以讓消息再進行二次歸類,同一個key的消息會被劃分到同一個分區中。
- value是指消息體,一般不為空,如果為空則表示特定的消息。
- timestamp是指消息的時間戳,它有CreateTime和LogAppendTime兩種類型,前者表示消息創建的時間,后者表示消息追加到日志文件的時間。
② ProducerRecord 構造函數:
public class ProducerRecord<K, V> {private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {if (topic == null)throw new IllegalArgumentException("Topic cannot be null.");if (timestamp != null && timestamp < 0)throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));if (partition != null && partition < 0)throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));this.topic = topic;this.partition = partition;this.key = key;this.value = value;this.timestamp = timestamp;this.headers = new RecordHeaders(headers);}public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {this(topic, partition, timestamp, key, value, null);}public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {this(topic, partition, null, key, value, headers);}public ProducerRecord(String topic, Integer partition, K key, V value) {this(topic, partition, null, key, value, null);}public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null);}public ProducerRecord(String topic, V value) {this(topic, null, null, null, value, null);}
}