1. 生產消息
1.1 生產消息的基本步驟
(一)創建Map類型的配置對象,根據場景增加相應的配置屬性:
參數名 | 參數作用 | 類型 | 默認值 | 推薦值 |
---|---|---|---|---|
bootstrap.servers | 集群地址,格式為:brokerIP1:端口號,brokerIP2:端口號 | 必須 | ||
key.serializer | 對生產數據Key進行序列化的類完整名稱 | 必須 | Kafka提供的字符串序列化類:StringSerializer | |
value.serializer | 對生產數據Value進行序列化的類完整名稱 | 必須 | Kafka提供的字符串序列化類:StringSerializer | |
interceptor.classes | 攔截器類名,多個用逗號隔開 | 可選 | ||
batch.size | 數據批次字節大小。此大小會和數據最大估計值進行比較,取大值。估值=61+21+(keySize+1+valueSize+1+1) | 可選 | 16K | |
retries | 重試次數 | 可選 | 整型最大值 | 0或整型最大值 |
request.timeout.ms | 請求超時時間 | 可選 | 30s | |
linger.ms | 數據批次在緩沖區中停留時間 | 可選 | ||
acks | 請求應答類型:all(-1), 0, 1 | 可選 | all(-1) | 根據數據場景進行設置 |
retry.backoff.ms | 兩次重試之間的時間間隔 | 可選 | 100ms | |
buffer.memory | 數據收集器緩沖區內存大小 | 可選 | 32M | 64M |
max.in.flight.requests.per.connection | 每個節點連接的最大同時處理請求的數量 | 可選 | 5 | 小于等于5 |
enable.idempotence | 冪等性, | 可選 | true | 根據數據場景進行設置 |
partitioner.ignore.keys | 是否放棄使用數據key選擇分區 | 可選 | false | |
partitioner.class | 分區器類名 | 可選 | null |
(二)創建待發送數據
在 Kafka 中傳遞的數據我們稱之為消息(message)或記錄(record),所以Kafka發送數據前,需要將待發送的數據封裝為指定的數據類型:
相關屬性必須在構建數據模型時指定,其中主題和value的值時必須要傳遞的。如果配置中開啟了自動創建主題,那么 Topic 主題可以不存在。value 就是我們需要真正傳遞的數據了,而 Key 可以用于數據的分區定位。
(三)創建生產者對象,發送生產的數據:
根據前面提供的配置信息創建生產者對象,通過這個生產者對象向 Kafka 服務器節點發送數據,而具體的發送是由生產者對象創建時,內部構件的多個組件實現的,多個組件的關系類似與生產者消費者模式。
(1)數據生產者(KafkaProducer):生產者對象,用于對我們的數據進行必要的轉換和處理,將處理后的數據放入到數據收集器中,類似于生產者消費者模式下的生產者。
- 如果配置攔截器棧(interceptor.classes),那么將數據進行攔截處理。某一個攔截器出現異常并不會影響后續的攔截器處理。
- 因為發送的數據為 KV 數據,所以需要根據配置信息中的序列化對象對數據中 Key 和 Value 分別進行序列化處理。
- 計算數據嗦發送的分區位置。
- 將數據追加到數據收集器中。
(2)數據收集器(RecordAccumulator):用于收集,轉換我們生產的數據,蕾西與生產者消費者模式下的緩沖區。為了優化數據的傳輸,Kafka 并不是生產一條數據就向 Broker 發送一條數據,而是通過合并單條消息,進行批量(批次)發送,提高吞吐量,減少帶寬消耗。
- 默認情況下,一個發送批次的數據容量為 16k,這個可以通過參數 batch.size進行改善。
- 批次是和分區進行綁定的。也就是說發往同一個分區的數據會進行合并,形成一個批次。
- 如果當前批次能容納數據,那么直接將數據追加到批次中即可,如果不能容納數據,那么會產生新的批次放入到當前分區的批次隊列中,這個隊列使用的是 Java 雙端隊列 Deque。舊的批次關閉不再接收新的數據,等待發送。
(3)數據發送器(Sender):線程對象,用于從收集器中獲取數據,向服務節點發送。類似于生產者消費者模式下的消費者。因為是線程對象,所以啟動后會不斷輪詢獲取數據收集器中已經關閉的批次數據。對批次進行整合后再發送到 Broker 節點中
- 因為數據真正發送的地方是 Broker 節點,不是分區。所以需要將從數據收集器中收集到的批次數據按照可用 Broker 節點重新組合成List集合。
- 將組合后的<節點,List<批次>>的數據封裝成客戶端請求(請求鍵為:Produce)發送到網絡客戶端對象的緩沖區,由網絡客戶端對象通過網絡發送給 Broker 節點。
- Broker 節點獲取客戶端請求,并根據請求鍵進行后續的數據處理:向分區中增加數據。
1.2 生產消息的基本代碼
// TODO 配置屬性集合
Map<String, Object> configMap = new HashMap<>();
// TODO 配置屬性:Kafka服務器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 配置屬性:Kafka生產的數據為KV對,所以在生產數據進行傳輸前需要分別對K,V進行對應的序列化操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// TODO 創建Kafka生產者對象,建立Kafka連接
// 構造對象時,需要傳遞配置參數
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 準備數據,定義泛型
// 構造對象時需要傳遞 【Topic主題名稱】,【Key】,【Value】三個參數
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key1", "value1"
);
// TODO 生產(發送)數據
producer.send(record);
// TODO 關閉生產者連接
producer.close();
1.3 發送消息
1.3.1 攔截器
生產者 API 在數據準備好發送給 Kafka 服務器之前,允許我們對生產的數據進行統一的處理,比如校驗,整合數據等等。這些處理我們是可以通過 Kafka 提供的攔截器完成。
這里的攔截器是可以配置多個的。執行時,會按照聲明順序執行完一個后,再執行下一個。并且某一個攔截器如果出現異常,只會跳出當前攔截器邏輯,并不會影響后續攔截器的處理。所以開發時,需要將攔截器的這種處理方法考慮進去。
1.3.1.1 增加攔截器類
(1)實現生產者攔截器接口 ProducerInterceptor
/*** TODO 自定義數據攔截器* 1. 實現Kafka提供的生產者接口ProducerInterceptor* 2. 定義數據泛型 <K, V>* 3. 重寫方法* onSend* onAcknowledgement* close* configure*/
public class KafkaInterceptorMock implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
(2)實現接口中的方法,根據業務功能重寫具體的方法
方法名 | 作用 |
---|---|
onSend | 數據發送前,會執行此方法,進行數據發送前的預處理 |
onAcknowledgement | 數據發送后,獲取應答時,會執行此方法 |
close | 生產者關閉時,會執行此方法,完成一些資源回收和釋放的操作 |
configure | 創建生產者對象的時候,會執行此方法,可以根據場景對生產者對象的配置進行統一修改或轉換。 |
1.3.1.2 配置攔截器
public class ProducerInterceptorTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());KafkaProducer<String, String> producer = null;try {producer = new KafkaProducer<>(configMap);for ( int i = 0; i < 1; i++ ) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);final Future<RecordMetadata> send = producer.send(record);}} catch ( Exception e ) {e.printStackTrace();} finally {if ( producer != null ) {producer.close();}}}
}
1.3.2 回調方法
Kafka 發送數據時,可以同時傳遞回調對象(Callback)用于對數據的發送結果進行對應處理,具體代碼實現采用匿名類或 Lambda 表達式都可以。
public class KafkaProducerASynTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循環生產數據for ( int i = 0; i < 1; i++ ) {// TODO 創建數據ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 發送數據producer.send(record, new Callback() {// TODO 回調對象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 當數據發送成功后,會回調此方法System.out.println("數據發送成功:" + recordMetadata.timestamp());}});}producer.close();}
}
1.3.3 異步發送
Kafka 發送數據時,底層的實現類似于生產者消費者模式。對應的,底層會由主線程代碼作為生產者向緩沖區中放數據,而數據發送線程會從緩沖區中獲取數據進行發送。Broker 接收到數據后進行后續處理。
如果 Kafka 通過主線程代碼將一條數據放入到緩沖區后,無需等待數據的后續發送過程,就直接發送下一條數據的場合,我們就稱之為異步發送。
public class KafkaProducerASynTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循環生產數據for ( int i = 0; i < 10; i++ ) {// TODO 創建數據ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 發送數據producer.send(record, new Callback() {// TODO 回調對象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 當數據發送成功后,會回調此方法System.out.println("數據發送成功:" + recordMetadata.timestamp());}});// TODO 發送當前數據System.out.println("發送數據");}producer.close();}
}
1.3.4 同步發送
Kafka 發送數據時,底層的實現類似于生產者消費者模式。對應的,底層會由主線程代碼作為生產者向緩沖區中放數據,而數據發送線程會從緩沖區中獲取數據進行發送。Broker 接收到數據后進行后續處理。
如果 Kafka 通過主線程代碼將一條數據放入到緩沖區后,需等待數據的后續發送操作的應答狀態,才能發送下一條數據的場合,我們就稱之為同步發送。所以這里的所謂同步,就是生產數據的線程需要等待線程的應答(響應)結果。
代碼實現上,采用的是 JDK1.5 增加的JUC 并發編程的 Future 接口的 get 方法實現。
public class KafkaProducerASynTest {public static void main(String[] args) throws Exception {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循環生產數據for ( int i = 0; i < 10; i++ ) {// TODO 創建數據ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 發送數據producer.send(record, new Callback() {// TODO 回調對象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 當數據發送成功后,會回調此方法System.out.println("數據發送成功:" + recordMetadata.timestamp());}}).get();// TODO 發送當前數據System.out.println("發送數據");}producer.close();}
}