1 初始化配置
??Kafka 通過 KafkaProducer 構造器初始化生產者客戶端的配置。
??常用的重要配置,詳見官網。
- bootstrap.servers:Kafka 集群地址(host1:post,host2:post),Kafka 客戶端初始化時會自動發現地址,所以可以不填寫所有地址。
- key.serializer:實現了 Kafka 序列化接口的類,用來序列化 key。
- value.serializer:實現了 Kafka 序列化接口的類,用來序列化 value。
- acks:leader 接收到的 follower 確認的數量需要滿足 acks 的配置。
?0:生產者把消息發送出去就認為發送完成了。
?1:leader 接收到消息后,不用等 follower 的確認,就表示發送完成了。
?all/-1:leader 接收到消息后,需要所有在 ISR 集合的 follower 確認后,才表示完成了。 - retries:消息發送失敗后的重試次數。如果允許重試,而 max.in.flight.requests.per.connection>1,則可能導致消息亂序,因為如果把兩批消息發送到同一個分區,第一批失敗并重試,而第二批成功了,則第二批消息可能先生成了。
- retry.backoff.ms:消息重試發送的間隔。
- client.id:標識客戶端的 id。
- compression.type:壓縮類型。可選:none、gzip、snappy、lz4。
- buffer.memory:記錄累加器可以使用的最大內存緩沖池大小。
- batch.size:內存緩沖池的緩沖列表大小。當 batch 的大小超過 batch.size 或者時間達到 linger.ms 就會發送 batch。
- transactional.id:事務 ID。
// 基礎配置
Map<String, Object> configs = new HashMap<>();
// Kafka broker 集群
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// key 序列化
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value 序列化
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
2 構造消息
??Kafka 提供了6種構造器來構造消息。
- topic:消息主題,必填;
- partition:分區號,非必填。如果為空,會計算 key 的 hash 值,再和該主題的分區總數取余得到分區號;如果 key 也為空,客戶端會生成遞增的隨機整數,再和該主題的分區總數區域得到分區號。
- timestamp:時間戳,非必填。如果為空,默認為 KafkaProducer 構造器初始化的時間。
- key:消息 key,非必填。關系到分區分配,broker 會對帶 key 的消息進行日志壓縮。
- value:消息內容,必填。
- headers:消息頭,非必填。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, K key, V value);
public ProducerRecord(String topic, K key, V value);
public ProducerRecord(String topic, V value);
3 發送消息
??支持同步發送和異步發送消息。
??同步發送
producer.send(record).get();
??異步發送
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 回調處理流程}
});