1.簡介
KafkaConsumer是非線程安全的,它定義了一個acquire()
方法來檢測當前是否只有一個線程在操作,如不是則會拋出ConcurrentModifcationException
異常。
acquire()
可以看做是一個輕量級鎖,它僅通過線程操作計數標記的方式來檢測線程是否發生了并發操作,以此保證只有一個線程在操作。acquire()
方法和release()
方法成對出現,表示相應的加鎖操作和解鎖操作。
KafkaConsumer雖然是單線程的執行方式,但是在某些情況下如:生產者發送消息的速度遠大于消費者消費的速度,這樣長時間可能會造成消息的丟失,此時我們就需要消費者采用多線程消費的方式增加消費速度。
2.多線程實現的方式
2.1.線程封閉多線程
即為每個線程實例化一個KafkaConsumer,如圖所示,一個線程對應一個KafkaConsumer實例,所有的消費線程都屬于同一個消費者組。
這種方式的并發度受限于分區的實際個數。
實現代碼示例:
public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此處初始化消費者配置參數省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}// 消費線程public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;public KafkaConsumerThread(Properties prop, String topic) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Arrays.asList(topic));}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record: records) {// 處理消息}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}
}
2.1.消息處理模塊多線程
此方法是對上面方法的進一步優化,在消息處理模塊增加多線程來處理消息,進一步提升消息消費的速度。
public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此處初始化消費者配置參數省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {/// 處理records }}
}
此方法需要引入一個共享的offsets來參與提交。