Kafka 生產者和消費者高級用法
1 生產者的事務支持
Kafka 從版本0.11開始引入了事務支持,使得生產者可以實現原子操作,確保消息的可靠性。
// 示例代碼:使用 Kafka 事務
producer.initTransactions();
try {producer.beginTransaction();producer.send(new ProducerRecord<>("my-topic", "key", "value"));producer.send(new ProducerRecord<>("my-other-topic", "key", "value"));producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();
} catch (KafkaException e) {producer.close();throw e;
}
2 消費者的多線程處理
在高吞吐量的場景下,多線程消費消息是提高效率的重要手段。消費者可以通過多線程同時處理多個分區的消息。
// 示例代碼:多線程消費者
properties.put("max.poll.records", 500);
properties.put("max.poll.interval.ms", 300000);Consumer<String, String> consumer = new KafkaConsumer<>(properties);// 訂閱主題 "my-topic"
consumer.subscribe(Collections.singletonList("my-topic"));// 多線程消費消息
int numberOfThreads = 5;
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {executor.submit(() -> processRecord(record));}
}// 關閉消費者
consumer.close();
executor.shutdown();
3 自定義序列化和反序列化
Kafka 默認提供了一些基本的序列化和反序列化器,但你也可以根據需求自定義實現。這在處理復雜數據結構時非常有用。
// 示例代碼:自定義序列化器
public class CustomSerializer implements Serializer<MyObject> {@Overridepublic byte[] serialize(String topic, MyObject data) {// 實現自定義序列化邏輯}
}