Kafka生產者發送消息的方式
Kafka生產者發送消息主要通過以下三種方式:
同步發送
生產者發送消息后,會阻塞等待Broker的響應,確認消息是否成功寫入。這種方式可靠性高,但吞吐量較低。代碼示例:
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
RecordMetadata metadata = producer.send(record).get();
異步發送
生產者發送消息后立即返回,通過回調函數處理Broker的響應。這種方式吞吐量高,但需要自行處理失敗情況。代碼示例:
producer.send(record, (metadata, exception) -> {if (exception != null) {// 處理失敗邏輯}
});
異步發送(無回調)
生產者直接發送消息而不關心結果,適用于對可靠性要求不高的場景。吞吐量最高,但可能丟失消息。代碼示例:
producer.send(record);
Kafka生產者發送消息的特點
分區策略
生產者可以通過指定分區鍵(Key)控制消息寫入的分區。若未指定Key,則采用輪詢策略分配分區。支持自定義分區器(Partitioner)。
消息確認機制(acks)
acks=0
:生產者不等待Broker確認,消息可能丟失。acks=1
:Leader副本寫入成功后即返回響應。acks=all/-1
:需所有ISR副本寫入成功,可靠性最高。
批量發送(Batch)
生產者會將多條消息合并為一個批次發送,減少網絡開銷。通過linger.ms
和batch.size
參數控制批處理行為。
消息重試
網絡異常或Leader切換時,生產者會自動重試發送消息。可通過retries
和retry.backoff.ms
參數配置重試策略。
冪等性與事務
- 冪等性:通過啟用
enable.idempotence=true
避免消息重復發送。 - 事務:支持跨分區原子性寫入,需配置
transactional.id
。
緩沖區機制
生產者維護一個內存緩沖區(buffer.memory
),暫存待發送消息。緩沖區滿時,發送調用會被阻塞或拋出異常。
Kafka消費者的基本工作流程
Kafka消費者通過訂閱主題(Topic)或特定分區(Partition)來消費消息。消費者組(Consumer Group)機制允許并行處理消息,每個消費者組內的消費者獨立消費不同分區的數據。
消費者啟動時會向Kafka集群發送元數據請求,獲取訂閱主題的分區信息。消費者與分區建立連接后,通過輪詢(Poll)機制從分區拉取消息。消費者會定期提交偏移量(Offset)到Kafka,記錄消費進度。
消費者配置關鍵參數
bootstrap.servers
: Kafka集群的地址列表。group.id
: 消費者所屬的組名稱。auto.offset.reset
: 當無初始偏移量時如何處理(earliest
或latest
)。enable.auto.commit
: 是否自動提交偏移量(默認true
)。max.poll.records
: 單次Poll返回的最大消息數。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
消息消費模式
訂閱主題模式
消費者訂閱一個或多個主題,Kafka自動分配分區:
consumer.subscribe(Arrays.asList("topic1", "topic2"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}
}
手動分配分區模式
直接指定消費的分區,繞過消費者組協調:
TopicPartition partition = new TopicPartition("topic1", 0);
consumer.assign(Arrays.asList(partition));
偏移量管理
自動提交
配置enable.auto.commit=true
,Kafka定期提交偏移量(默認5秒一次)。
手動同步提交
精確控制提交時機,確保消息處理完成后再提交:
consumer.commitSync();
手動異步提交
非阻塞式提交,需處理回調:
consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("Commit failed: " + offsets);}
});
消費者再平衡(Rebalance)
當消費者組內成員變化(如新增或下線消費者)時,Kafka觸發再平衡,重新分配分區。可通過ConsumerRebalanceListener
接口實現自定義邏輯:
consumer.subscribe(Arrays.asList("topic1"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 分區被回收前的處理(如提交偏移量)}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 新分區分配后的處理(如恢復狀態)}
});
處理消費延遲與積壓
- 調整
max.poll.records
減少單次Poll的數據量。 - 優化消息處理邏輯,避免阻塞Poll線程。
- 增加消費者實例數量(不超過分區數)。
- 監控消費者延遲指標(如
consumer_lag
)。