Kafka生產者相關-CSDN博客
消費者消費數據基本流程
package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消費者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消費者組consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-1");//創建消費者對象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//訂閱主題consumer.subscribe(Collections.singletonList("test"));//從Kafka主題中獲取數據while (true){ConsumerRecords<String, String> poll = consumer.poll(100);for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//關閉消費者對象 因上面在無線循環//consumer.close();}
}
消費數據偏移量問題
Kafka中的 偏移量(offset)是用于標識每個消費者在某個分區內消費到的位置。每個分區的消息都有一個唯一的偏移量,消費者會根據這個偏移量來讀取消息。
Kafka偏移量的管理
Kafka默認提供兩種方式來管理偏移量:
- 自動提交偏移量(默認方式)
- 手動提交偏移量(需要顯式配置)
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");? //默認設置
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); //默認設置 每5秒提交一次
也就是說默認情況下? ?消費者如果后啟動? 無法讀取到生產者已經發送的消息
偏移量的重置
如果需要重新消費數據,可以通過 auto.offset.reset
配置項來控制消費者的偏移量重置行為。這個配置項有幾個常用的值:
earliest
:如果沒有找到偏移量(比如第一次消費),消費者會從最早的消息開始消費。latest
:如果沒有找到偏移量,消費者會從最新的消息開始消費。none
:如果沒有找到偏移量,消費者會拋出異常。
例如,設置為從最早的消息開始消費:
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
如果想從指定偏移量獲取消息
解決消費者重復消費的問題(不能完全解決)
以上示例,偏移量默認都是5秒一次提交
例如先啟動消費者
然后生產者發送了10000條數據? 不好演示的話可以在生產者那邊每發送一條數據然后
Thread.sleep 1秒
如果消費者在消費到一定程度之后? 突然停止? ?觀察再次啟動消費者? 存在消費者重復消費的情況
原因就是消費者偏移量默認5秒提交一次的原因
那么可以將消費者默認5秒提交偏移量縮短為1秒? 但是這樣不能完全解決問題
示例
消費者
代碼
package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消費者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消費者組consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//設置從最早的消息讀取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//創建消費者對象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//訂閱主題consumer.subscribe(Collections.singletonList("test"));
// boolean flg = true;
// while (flg) {
// // 拉取數據
// consumer.poll(Duration.ofMillis(100));
// final Set<TopicPartition> assignment = consumer.assignment();
//
// if (assignment != null && !assignment.isEmpty()) {
// // 檢查分配的分區
// for (TopicPartition topicPartition : assignment) {
// if ("test".equals(topicPartition.topic())) {
// // 將偏移量設置為2
// consumer.seek(topicPartition, 2);
// // 停止循環
// flg = false;
// }
// }
// }
// }//從Kafka主題中獲取數據while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//關閉消費者對象 因上面在無線循環//consumer.close();}
}
生產者?
代碼
package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//創建配置對象Map<String,Object> configMap=new HashMap<>();//如果是集群隨意指定一個configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//對Key Value進行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置冪等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重試次數configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超時configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事務 事務基于冪等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//創建生產者對象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事務kafkaProducer.initTransactions();try {//開啟事務kafkaProducer.beginTransaction();for(int i=0;i<10000;i++){//key的作用是通過某種算法,放到topic的某個分區中//可以不設置key 默認是按照輪詢的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//發送數據 send方法還可以接收一個參數,就是回調函數 kafkaProducer.send(record);是異步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 處理發送失敗的情況e.printStackTrace();} else {// 處理發送成功的情況System.out.println("發送成功:" + recordMetadata);}}});//這樣變成同步了send.get();Thread.sleep(1000);}//提交事務kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事務kafkaProducer.abortTransaction();}finally {//關閉生產者對象kafkaProducer.close();}}
}
先啟動消費者
然后啟動生產者
觀察消費者控制臺
過會我再次啟動消費者
1.縮短自動提交偏移量的時間
因為默認消費者每5秒自動提交提交
可以縮短自動提交偏移量的時間? ?但這樣只能減少重復消費的量? 并不能徹底解決重復消費的問題
2.手動提交偏移量
package com.hrui;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消費者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消費者組consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//默認自動提交 改成false 變成手動提交偏移量consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//設置從最早的消息讀取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//創建消費者對象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//訂閱主題consumer.subscribe(Collections.singletonList("test"));
// boolean flg = true;
// while (flg) {
// // 拉取數據
// consumer.poll(Duration.ofMillis(100));
// final Set<TopicPartition> assignment = consumer.assignment();
//
// if (assignment != null && !assignment.isEmpty()) {
// // 檢查分配的分區
// for (TopicPartition topicPartition : assignment) {
// if ("test".equals(topicPartition.topic())) {
// // 將偏移量設置為2
// consumer.seek(topicPartition, 2);
// // 停止循環
// flg = false;
// }
// }
// }
// }//從Kafka主題中獲取數據while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}//設置自動提交偏移量之后 這里要手動去保存偏移量//這里有兩種方式 同步提交 和異步提交偏移量consumer.commitAsync();//異步consumer.commitSync();//同步}//關閉消費者對象 因上面在無線循環//consumer.close();}
}
說明以上方式都不能徹底解決重復消費問題
重復消費問題還是存在
如果要進行原子綁定? 并非做不到,Kafka本身沒有提供相關功能
例如把拉取到的數據全部處理完了,才進行事務提交
一旦出現意外,業務數據恢復? 但是Kafka本身沒有提供相關功能 和與其他支持事務處理的應用結合使用
消費數據-事務隔離級別
生產者事務圖
這個報錯 寫在提交之前即可
消費者組介紹
如果兩個應用都是同一個消費者組
生產者A生產消息? ? ? 消費者B和C在同一個消費者組? 那么A的消息如果被B消費過了那么C是消費不到的? B和C默認是競爭關系
如果生產者A生產消息??消費者B和C在不同消費者組? ?那么消息會被B和C都消費
第一個場景:消費者B和C在同一個消費者組
如果消費者B和消費者C在同一個消費者組內,消息會按照負載均衡的方式分配給它們。這意味著生產者A生產的消息會被消費者B或消費者C中的一個消費,而不是同時被兩個消費者消費。所以,如果B已經消費了某條消息,消費者C就無法再消費到這條消息。這種行為是消費者組的基本特性,主要用于確保每條消息只被某個消費者處理一次。
第二個場景:消費者B和C在不同的消費者組
如果消費者B和消費者C在不同的消費者組中,那么生產者A生產的消息會分別被B和C都消費到。因為每個消費者組有自己獨立的消費進度(每個組有獨立的偏移量),所以每個消費者組都能獨立消費該消息。
第三個場景:消費者B和C同時開啟從頭消費
如果B和C都在同一個消費者組,并且設置了從頭消費,那么它們將從消息隊列的最開始位置開始消費。這種情況下,B和C是共享消費隊列的,它們會根據負載均衡規則交替消費消息,而不是同時消費同一條消息。因此,A的消息仍然不會同時被B和C消費。每條消息仍然只會被消費者組內的某個消費者消費,并且消息的消費是共享的,但并不是同時共享。
總結來說,在同一個消費者組內,消息的消費是競爭式的。即使B和C同時開啟從頭消費,它們也不會同時消費同一條消息。每條消息只會由其中一個消費者處理。