背景:
? ?對kafka消息進行監聽,生產者發了消息,但是消費端沒有接到消息,監聽代碼
消費端,kafka配置
spring.kafka.bootstrap-servers=kafka.cestc.dmp:9591
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Kafka#Cestc2021";
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量發送消息的數量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默認消費者group id
spring.kafka.consumer.group-id=dq
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
@KafkaListener(groupId = "${spring.kafka.consumer.group-id:dq}",topics = {"t_dq_rwzt_topic"}) public ReturnT<String> listenKafka2(String records, Acknowledgment ack) {
}
offset explorer發現生產者發送了消息,offset是0
問題解決:
后來查看生產者kafka配置,發現他們的enable-auto-commit是false:
spring.kafka.consumer.enable-auto-commit=false
修改kafka配置
spring.kafka.consumer.enable-auto-commit=false
# 在偵聽器容器中運行的線程數
spring.kafka.listener.concurrency=5
# listner負責ack,每調用commit方法,立即向服務器提交
spring.kafka.listener.ack-mode=manual_immediate