pom 依賴:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version>
</dependency>
?或者
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.1</version>
</dependency>
?ps:前面的?spring-kafka 依賴中已經包含了后面的 kafka-clients
KafkaConsumerDemo.java:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;public class KafkaConsumerDemo {static Map<String,Object> properties = new HashMap<String,Object>();private static KafkaConsumer kafkaConsumer = null;/**** windows 環境需要將下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:* xxx.xxx.xxx.xxx1 xxx-data01* xxx.xxx.xxx.xxx2 xxx-data02* xxx.xxx.xxx.xxx3 xxx-data03* xxx.xxx.xxx.xxx4 xxx-data04* xxx.xxx.xxx.xxx5 xxx-data05* xxx.xxx.xxx.xxx6 xxx-data06* xxx.xxx.xxx.xxx7 xxx-data07* xxx.xxx.xxx.xxx8 xxx-data08* @param args*/public static void main(String[] args) {// 禁止控制臺輸出一些 org.apache.kafka.xxx 相關的日志LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192"); // 指定 Brokerproperties.put("group.id", "11111111111111111111111"); // 指定消費組群 ID,為防止自己啟動拉取消息導致其他生產環境的消費者無法消費該消息,請設置一個絕對不重復的值,以起到隔離的作用properties.put("max.poll.records", "1000");// todo 設置可批量拉取???properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 將 key 的字節數組轉成 Java 對象properties.put("value.deserializer", StringDeserializer.class); // 將 value 的字節數組轉成 Java 對象kafkaConsumer = new KafkaConsumer<String, String>(properties);// List<String> topics = queryAllTopics( consumer );kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) ); // 訂閱主題 order-eventsnew Thread(new Runnable() {@Overridepublic void run() {receiveMessage();}}).start();}/*** 查詢全部的主題(topic)列表* @param kafkaConsumer* @return*/private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {if( kafkaConsumer == null ){return null;}Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();if( map == null ){return null;}return new ArrayList<String>( map.keySet() );}public static void receiveMessage() {try {while ( true ){synchronized (KafkaConsumerDemo.class) {// ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 30L 表示超時時間為 30秒,有消息立即返回,沒消息最多等 30 秒后返回SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));String date = sdf.format(new Date());if( records == null ){System.out.println( date + " 本次未拉取到任何消息" );}else {System.out.println( date + " 本次拉取到 " + records.count() + " 條消息" );int i = 1;for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println( "第" + i + "條消息:" + info );i++;}kafkaConsumer.commitSync();}/*** 當你用 KafkaConsumer從Kafka里讀取消息并且處理完后,commitSync 方法會幫你把這些消息的處理進度(也就是偏移量 offset )同步地告訴 Kafka 服務器。* 這樣,Kafka 就知道你已經處理到哪兒了。如果消費者(也就是讀取消息的程序)突然崩潰或者重啟,Kafka 就能根據最后一次提交的偏移量,讓你從上一次處理* 完的地方繼續開始,而不會漏掉或者重復處理消息。* 簡單來說,commitSync 方 法就是用來“保存進度”的,確保消息處理的可靠性和順序性。*/// Thread.sleep( 5000L );}}} catch (Exception e){e.printStackTrace();} finally {kafkaConsumer.close();}}
}