首先介紹一下 Kafka 生產者發送消息的過程:
- Kafka 會將發送消息包裝為 ProducerRecord 對象, ProducerRecord 對象包含了目標主題和要發送的內容,同時還可以指定鍵和分區。在發送 ProducerRecord 對象前,生產者會先把鍵和值對象序列化成字節數組,這樣它們才能夠在網絡上傳輸。
- 接下來,數據被傳給分區器。如果之前已經在 ProducerRecord 對象里指定了分區,那么分區器就不會再做任何事情。如果沒有指定分區 ,那么分區器會根據 ProducerRecord 對象的鍵來選擇一個分區,緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。
- 服務器在收到這些消息時會返回一個響應。如果消息成功寫入 Kafka,就返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量。如果寫入失敗,則會返回一個錯誤。生產者在收到錯誤之后會嘗試重新發送消息,如果達到指定的重試次數后還沒有成功,則直接拋出異常,不再重試。

二、創建生產者
2.1 項目依賴
本項目采用 Maven 構建,想要調用 Kafka 生產者 API,需要導入 kafka-clients 依賴,如下:
org.apache.kafka kafka-clients 2.2.0復制代碼
2.2 創建生產者
創建 Kafka 生產者時,以下三個屬性是必須指定的:
- bootstrap.servers :指定 broker 的地址清單,清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找 broker 的信息。不過建議至少要提供兩個 broker 的信息作為容錯;
- key.serializer :指定鍵的序列化器;
- value.serializer :指定值的序列化器。
創建的示例代碼如下:
public class SimpleProducer { public static void main(String[] args) { String topicName = "Hello-Kafka"; Properties props = new Properties(); props.put("bootstrap.servers