Kafka 服務器(Broker) 的配置
server.properties
# broker.id: 每個 Kafka Broker 的唯一標識符。broker.id 必須在整個 Kafka 集群中唯一。
broker.id=0# 配置 Kafka Broker 監聽客戶端請求的地址和端口。這個配置決定了 Kafka 服務將接受來自生產者、消費者以及其他客戶端的連接。
listeners=PLAINTEXT://192.168.65.60:9092# Kafka 消息日志文件的存儲目錄
log.dir=/usr/local/data/kafka‐logs# Kafka 連接到 Zookeeper 的地址
zookeeper.connect=192.168.65.60:2181
每個 Kafka 集群中的節點(Broker)都需要有一個 server.properties 配置文件,并且每個節點的配置可以有所不同。
生產者
生產者配置
Properties props = new Properties();// Kafka服務器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把發送的key和value從字符串序列化為字節數組
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); /* * 1. 發出消息持久化機制參數* acks=0: 表示producer不需要等待任何broker確認收到消息的回復,就可以繼續發送下一條消息。性能最高,但是最容易丟消息* acks=1: 至少要等待leader已經成功將數據寫入本地log,但是不需要等待所有follower是否成功寫入,就可以繼續發送下一條消息* 如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失* acks=‐1或all: 需要等待 min.insync.replicas(默認為1,推薦配置大于等于2) 這個參數配置的副本個數都成功寫入日志。這是最強的數據保證。一般金融級別才會使用這種配置*/
props.put(ProducerConfig.ACKS_CONFIG, "1");// 2. 重試相關
//2.1 發送失敗重試次數,重試能保證消息發送的可靠性,但是也可能造成消息重復發送,需要接收者做好消息接收的冪等性處理
props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2.2 重試間隔設置,默認重試間隔100ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);/ * 3. 本地緩沖區和延遲發送相關* 在設置本地緩沖區/延遲發送后,消息會先發送到本地緩沖區,當達到批量發送消息的大 * 小時,本地線程會從緩沖區取數據(一個batch),批量發送到broker。同時,需要設置 * batch最大的延遲發送時間,如果一條消息在本地緩沖區中等待的時間達到設置的時間后 * batch沒滿,那么也必須把消息發送出去* /// 3.1 設置本地緩沖區大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// 3.2 設置batch大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/* * 3.3 batch最大的延遲發送時間* 默認值是0:意思就是消息必須立即被發送,但這樣會影響性能* 一般設置10毫秒左右,就是說這個消息發送完后會進入本地的一個batch,如果10毫秒內,這個batch滿了16kb就會隨batch一起被發送出去* 如果10毫秒內,batch沒滿,那么也必須把消息發送出去,不能讓消息的發送延遲時間太長* * 消息 -> 本地緩沖區(32M)-> batch(16k)-> 發送(10ms batch不滿也發送)*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
生產者發送消息
// 創建 Kafka 生產者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 發送消息
String topic = "test-topic"; // 主題名稱
String key = "order1"; // 消息的 key
String value = "Order details: 123"; // 消息的內容// 創建消息記錄
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);try {// 發送消息,這里的lambda函數就是onCompletion()方法producer.send(record, (metadata, exception) -> {if (exception != null) {System.out.println("Error sending message: " + exception.getMessage());} else {System.out.println("Message sent successfully to topic " + metadata.topic() +" partition " + metadata.partition() + " with offset " + metadata.offset());}});} catch (Exception e) {e.printStackTrace();
} finally {// 關閉生產者producer.close();
}
// 指定發送分區
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, key_json, value_json);// 也可以指定發送分區
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key_json, value_json);// 等待消息發送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();// 異步回調方式發送消息
producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {// 處理異常}
});
// 關閉
producer.close();
此外,為了保證生產者的消息發送成功,可以通過添加回調函數的方式,在send成功后打印日志。
詳細內容參考:Kafka如何保證消息不丟失
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生產者成功發送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生產者發送消失敗,原因:{}", ex.getMessage()));
消費者
消費者配置
Properties properties = new Properties();// Kafka服務器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把發送的key和value從字符串序列化為字節數組
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消費分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);// 是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自動提交offset的間隔時間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/* * 當消費主題的是一個新的消費組,或者指定offset的消費方式,offset不存在,那么應該如何消費* latest(默認) :只消費自己啟動之后發送到主題的消息* earliest:第一次從頭開始消費,以后按照消費offset記錄繼續消費,這個需要區別于 consumer.seekToBeginning(每次都從頭開始消費)*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// consumer給broker發送心跳的間隔時間,broker接收到心跳如果此時有rebalance發生會通過心跳響應將rebalance方案下發給consumer,這個時間可以稍微短一點
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);// 服務端broker多久感知不到一個consumer心跳就認為他故障了,會將其踢出消費組,對應的Partition也會被重新分配給其他consumer,默認是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);// 一次poll最大拉取消息的條數,如果消費者處理速度很快,可以設置大點,如果處理速度一般,可以設置小點
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果兩次poll操作間隔超過了這個時間,broker就會認為這個consumer處理能力太弱,會將其踢出消費組,將分區分配給別的consumer消費
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
消費者消費消息
// 創建 Kafka 消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 訂閱主題
consumer.subscribe(Collections.singletonList("test-topic"));// 消費指定分區,這段代碼指定了消費者從TOPIC_NAME的第一個分區(分區0)開始消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));/* 回溯消費(從頭消費 - seekToBeginning)* seekToBeginning()方法使消費者回溯到該分區的最初位置,意味著從頭開始消費該分 區的所有消息。* 這對于重新消費主題中的消息或重新同步時非常有用。* /
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 指定offset消費,即消費者將跳過之前的消息,從該offset開始消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);/* 從指定時間點開始消費 - 1小時前
* partitionsFor()方法獲取指定主題(TOPIC_NAME)的所有分區信息。
* fetchDataTime 是一個時間戳,表示1小時前的時間,new Date().getTime() - 1000 * 60 * 60 用來計算這個時間戳。
* map 用于存儲每個分區與其對應的時間戳(fetchDataTime)。這個時間戳將用于從Kafka中拉取時間戳較早的消息。
*/
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
long fetchDataTime = new Date().getTime() ‐ 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
// 消費消息
try {while (true) {consumer.poll(1000).forEach(record -> {// 可以修改為具體業務邏輯System.out.println("Consumed record with key: " + record.key() +", value: " + record.value() + ", from partition: " + record.partition());});}
} catch (Exception e) {e.printStackTrace();
} finally {// 關閉消費者consumer.close();
}
消費者提交offset
手動提交offset的意義
- 控制消費進度
手動提交offset能夠讓消費者在每個消息或消息批次消費后,明確地告訴Kafka“我已經消費到這個offset了”。這對于控制消息消費的精確性非常重要,尤其在需要精確控制消費位置的場景中。
- 避免消息丟失或重復消費
如果自動提交offset,可能會發生消費者在處理中出現異常(如程序崩潰),導致已消費的消息的offset提交失敗,導致消息丟失或重復消費。手動提交則可以在處理完消息并確保成功時再提交offset,避免這種問題。比如在金融交易、日志收集系統等場景中,需要確保消息的處理不會丟失,并且不會重復處理。
- 靈活的錯誤處理與恢復
通過手動提交offset,消費者可以在消費過程中靈活地處理錯誤。如果在消費某條消息時發生異常,消費者可以選擇不提交offset,這樣在消費者重啟或恢復時會重新消費該消息。它使得消費者在出錯時能更好地控制重試策略。
代碼實現
同步提交
consumer.commitSync();
當調用該方法時,消費者會將當前消費的偏移量提交到Kafka集群,并且當前線程會阻塞,直到該提交操作完成。
優勢:
- 阻塞:會等待offset提交成功,不會繼續執行后續代碼,直到提交完成。
- 可靠性:如果提交失敗,commitSync()會拋出異常,可以捕獲并進行處理,確保提交正確。
缺點:會導致性能問題,因為它會阻塞當前線程,直到提交完成。
異步提交
consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {// 處理異常}
});
回調函數:異步提交會接受一個OffsetCommitCallback回調接口作為參數,該接口的onComplete()方法會在提交操作完成時被調用。這個方法會接收到兩個參數:
offsets:包含提交的偏移量信息(TopicPartition和OffsetAndMetadata)。
ex:如果提交發生錯誤,該參數會包含異常信息。
優勢:
- 非阻塞:不會等待提交完成,允許程序繼續執行其他操作。
- 提高吞吐量:減少等待時間,尤其是在批量消費和提交的情況下,可以提高整體的吞吐量和性能。
缺點:可能會出現提交失敗的情況,回調函數中的異常處理需要做好,以確保異常得到及時處理。
Spring boot集成
1. 添加依賴
在pom.xml 中添加 Kafka 的相關依賴
<dependencies><!-- Spring Boot Starter for Apache Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Web (optional if you need a web app) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter for Actuator (optional for monitoring) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- Spring Boot Starter Test (optional for testing) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
2. 配置文件
application.yml
spring:kafka:# Kafka broker 地址bootstrap‐servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch‐size: 16384buffer‐memory: 33554432acks: 1key‐serializer: org.apache.kafka.common.serialization.StringSerializervalue‐serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group‐id: default‐groupenable‐auto‐commit: falseauto‐offset‐reset: earliestkey‐deserializer: xxx.StringDeserializervalue‐deserializer: xxx.StringDeserializerlistener:ack‐mode: manual_immediate
注意:
ack‐mode
RECORD:當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交
BATCH:當每一批poll()的數據被消費者監聽器處理之后提交
TIME:當每一批poll()的數據被消費者監聽器處理之后,距離上次提交時間大于TIME時提交
COUNT:當每一批poll()的數據被消費者監聽器處理之后,被處理record數量大于等于COUNT時提交
TIME | COUNT:有一個條件滿足時提交
MANUAL:當每一批poll()的數據被消費者監聽器處理之后, 手動調用Acknowledgment.acknowledge()后提交
MANUAL_IMMEDIATE:手動調用Acknowledgment.acknowledge()后立即提交,一般使用這種(一次提交一條消息)
3. 啟動類
package com.example.kafka;import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;@SpringBootApplication
public class KafkaApplication implements CommandLineRunner {@Autowiredprivate KafkaProducer kafkaProducer;public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 發送消息kafkaProducer.sendMessage("test-topic", "Hello, Kafka!");}
}
4. 生產者類
package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Message sent: " + message);}
}
5. 消費者類
package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group")public void consume(String message) {System.out.println("Consumed message: " + message);}@KafkaListener(topics = "test-topic",groupId = "test-group")public void consume1(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();ack.acknowledge(); //手動提交offset}// 配置多個topic,concurrency就是同組下的消費者個數,就是并發消費數,必須小于等于分區總數@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}), // 從topic1的分區0和1讀取消息@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) // 從topic2的分區0讀取消息,并設置分區1的初始偏移量為100}, concurrency = "6")public void listenToMultipleTopics(String message) {// 消費消息的邏輯System.out.println("Group: testGroup, Message: " + message);}
}
Kafka事務
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());// 初始化事務
producer.initTransactions();
try {// 開啟事務producer.beginTransaction();// 發到不同的主題的不同分區producer.send(/*...*/);// 提交事務producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();
} catch (KafkaException e) {// 回滾事務producer.abortTransaction();
}
// 關閉
producer.close();
spring框架下Kafka事務
可以通過**@Transactional**實現
配置
可以通過在application.yml文件或KafkaConfig配置類中添加配置的方式,提供事務支持。
1. application.yml
spring:kafka:bootstrap-servers: localhost:9092 # Kafka 集群地址producer:acks: all # 確保消息被所有副本確認transactional-id-prefix: tx- # 事務前綴,Kafka 事務需要一個事務 ID 前綴consumer:group-id: test-group # 消費者組 IDenable-auto-commit: false # 手動提交 offsetlistener:ack-mode: manual # 設置為手動提交確認
2. 配置類
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {// 設置 Kafka 生產者的事務管理器KafkaTransactionManager<String, String> transactionManager =new KafkaTransactionManager<>(producerFactory());KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());kafkaTemplate.setTransactionManager(transactionManager);return kafkaTemplate;}@Beanpublic DefaultKafkaProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-"); // 事務 IDreturn new DefaultKafkaProducerFactory<>(configProps);}
}
生產者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.Transactional;
import org.springframework.stereotype.Service;@Service
public class KafkaTransactionProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Transactionalpublic void sendTransactionalMessages() {try {// 發送事務消息kafkaTemplate.send("topic1", "key1", "message1");kafkaTemplate.send("topic2", "key2", "message2");// 你可以在此處加入其他業務邏輯,如果出現異常,會回滾事務if (someConditionFails()) {throw new RuntimeException("Simulating failure to trigger rollback");}// 如果沒有異常,事務提交,消息將被正常發送} catch (Exception e) {// 事務回滾System.out.println("Transaction failed, rolling back...");throw e;}}private boolean someConditionFails() {// 模擬某些條件下事務失敗return true;}
}
消費者
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaTransactionConsumer {@KafkaListener(topics = "topic1", groupId = "test-group")public void listenTopic1(String message) {System.out.println("Received message from topic1: " + message);}@KafkaListener(topics = "topic2", groupId = "test-group")public void listenTopic2(String message) {System.out.println("Received message from topic2: " + message);}
}
在生產者的配置中啟用事務,配置 transactional.id,并設置事務管理器 KafkaTransactionManager,它會自動管理 Kafka 事務的開始、提交和回滾。
事務管理:@Transactional 注解用于標識在發送消息的過程是一個事務操作。如果其中任何消息發送失敗,Spring Kafka 會自動回滾事務。
回滾機制:在 sendTransactionalMessages() 中模擬了一個失敗的條件,確保事務在遇到異常時會被回滾。