目錄
使用Kafka-Client實現消息收發
引入依賴
發送端:
消費端:
SpringBoot集成
引入maven依賴
消費端
在上一篇我們深度解析了Kafka的運行操作原理以及集群消息消費機制等,請點擊下方鏈接獲取
Kafka消息隊列深度解析與實戰指南
????????本篇我們將著重實戰
-
使用Kafka-Client實現消息收發
引入依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
發送端:
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import com.google.gson.Gson;import lombok.extern.slf4j.Slf4j;@Slf4j
@SpringBootTest
class KafkaProducerTests {private final static String TOPIC_NAME = "muse-rp";private final static Integer PARTITION_ONE = 0;private final static Integer PARTITION_TWO = 1;private final static Gson GSON = new Gson();/*** 同步阻塞——消息發送*/@Testvoid testBlockingSendMsg() {/** 初始化生產者屬性信息 */Properties properties = initProducerProp();/** 創建消息發送的客戶端 */Producer<Integer, String> producer = new KafkaProducer<>(properties);Message message;for (int i=0; i< 3; i++) {/** 構造消息 */message = new Message(i, "BLOCKING_MSG_"+i);ProducerRecord<Integer, String> producerRecord;int SEND_MSG_METHOD = 0;switch (SEND_MSG_METHOD) {case 0: /** 【發送方式1】未指定發送的分區 */producerRecord = new ProducerRecord<>(TOPIC_NAME, GSON.toJson(message));break;case 1: /** 【發送方式2】未指定發送的分區,根據第二個參數key來判斷發送到哪個分區*/producerRecord = new ProducerRecord<>(TOPIC_NAME, message.getMegId(), GSON.toJson(message));break;default: /** 【發送方式3】指定發送的分區 */producerRecord = new ProducerRecord(TOPIC_NAME, PARTITION_ONE, message.getMegId(), GSON.toJson(message));}/** 同步阻塞——等待消息發送成功 */try {Future<RecordMetadata> recordMetadataFuture = producer.send(producerRecord);log.info("調用send方法完畢,msg={}", producerRecord.value());RecordMetadata recordMetadata = recordMetadataFuture.get();log.info("[topic]={}, [partition]={}, [offset]={}", recordMetadata.topic(), recordMetadata.partition(),recordMetadata.offset());} catch (Throwable e) {log.error("發送消息異常!", e);}}producer.close(); // close方法會阻塞等待之前所有的發送請求完成后再關閉KafkaProducer}/*** 異步回調——消息發送*/@Testvoid testNoBlockingSendMsg() {/** 初始化生產者屬性信息 */Properties properties = initProducerProp();/** 創建消息發送的客戶端 */Producer<Integer, String> producer = new KafkaProducer<>(properties);CountDownLatch latch = new CountDownLatch(5);Message message;for (int i=0; i< 5; i++) {message = new Message(i, "NO_BLOCKING_MSG_" + i);/** 指定發送的分區 */ProducerRecord<Integer, String> producerRecord = new ProducerRecord(TOPIC_NAME, PARTITION_ONE,message.getMegId(), GSON.toJson(message));/** 異步回調方式發送消息 */producer.send(producerRecord, (metadata, exception) -> {if (exception != null) {log.error("消息發送失敗!", exception);}if (metadata != null) {log.info("[topic]={}, [partition]={}, [offset]={}", metadata.topic(), metadata.partition(),metadata.offset());}latch.countDown();});log.info("調用send方法完畢,msg={}", producerRecord.value());}producer.close();}/*** 初始化生產者屬性*/private Properties initProducerProp() {Properties properties = new Properties();// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095"); // 配置kafka的Broker列表properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");/*** 發出消息持久化機制參數* acks=0: 表示producer不需要等待任何broker確認收到消息的ACK回復,就可以繼續發送下一條消息。性能最高,但是最容易丟失消息* acks=1: 表示至少等待leader已經成功將數據寫入本地log,但是不需要等待所有follower都寫入成功,就可以繼續發送下一條消息。* 這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息就會丟失。* acks=-1: 表示kafka ISR列表中所有的副本同步數據成功,才返回消息給客戶端,這是最強的數據保證。min.insync.replicas 這個配置是* 用來設置同步副本個數的下限的, 并不是只有 min.insync.replicas 個副本同步成功就返回ack。而是,只要acks=all就意味著* ISR列表里面的副本必須都要同步成功。*/properties.put(ProducerConfig.ACKS_CONFIG, "1");/*** 發送失敗重試的次數,默認是間隔100ms* 重試能保證消息發送的可靠性,但是也可能造成消息重復發送,所以需要在消費者端做好冪等性處理*/properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失敗重試3次properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重試間隔300ms/*** 設置發送消息的本地緩沖區* 如果設置了該緩沖區,消息會先發送到本地緩沖區,可以提高消息發送性能,默認值為32MB*/properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024);/*** 設置批量發送消息的大小* kafka本地線程會從緩沖區去取數據,然后批量發送到Broker,默認值16KB,即:一個批次滿足16KB就會發送出去。*/properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);/*** 默認值為0,表示消息必須立即被發送,但這樣會影響性能* 一般設置10ms左右,也就是說這個消息發送完后會進入本地的一個批次中,如果10ms內,這個批次滿足了16KB,那么就會隨著批次一起被發送出去* 如果10ms內,批次沒滿,那么也必須要把消息發送出去,不能讓消息的發送延遲時間太長*/properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);/** 把發送的key和消息value從字符串序列化為字節數組 */properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}}
消費端:
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import com.google.gson.Gson;import lombok.extern.slf4j.Slf4j;@Slf4j
@SpringBootTest
class KafkaConsumerTests {private final static String TOPIC_NAME = "muse-rp";private final static String CONSUMER_GROUP_NAME = "museGroup";private final static Integer PARTITION_ONE = 0;private final static Gson GSON = new Gson();/*** 自動提交offset*/@Testvoid testAutoCommitOffset() throws Throwable {Properties properties = initConsumerProp();/** 是否自動提交offset,默認:true*/properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");/** 自動提交offset的時間間隔 */properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/** 配置Rebalance策略 */
// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays
// .asList(RangeAssignor.class, CooperativeStickyAssignor.class));/** 創建消息發送的客戶端 */Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);int RECV_MSG_METHOD = 0;switch (RECV_MSG_METHOD) {case 0: /** 【接收方式1】未指定接收的分區 */consumer.subscribe(Lists.newArrayList(TOPIC_NAME));break;case 1: /** 【接收方式2】指定分區消費 */consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));break;case 2: /** 【接收方式3】指定從頭開始消費 */consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));consumer.seekToBeginning(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));break;default: /**【接收方式4】指定分區和offset進行消費*/consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));consumer.seek(new TopicPartition(TOPIC_NAME, PARTITION_ONE), 10);}ConsumerRecords<Integer, String> records;while (true) {/** 長輪詢的方式拉取消息 */records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord record : records) {log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),record.offset(), record.value());}Thread.sleep(3000);}}/*** 手動提交offset*/@Testvoid testManualCommitOffset() throws Throwable {Properties properties = initConsumerProp();/** 是否自動提交offset,默認:true*/properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");/** 創建消息發送的客戶端 */Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Lists.newArrayList(TOPIC_NAME));// consumer.assign(Lists.newArrayList(new TopicPartition(TOPIC_NAME, PARTITION_ONE)));ConsumerRecords<Integer, String> records;while (true) {/** 長輪詢的方式拉取消息 */records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord record : records) {log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),record.offset(), record.value());}boolean isSync = true;if (records.count() > 0) {if (isSync) {/** 【手動同步提交offset】當前線程會阻塞直到offset提交成功;常用同步提交方式 */consumer.commitSync();} else {/** 【手動異步提交offset】當前線程提交offset不會阻塞,可以繼續執行后面的邏輯代碼 */consumer.commitAsync((offsets, exception) -> {log.error("offset={}", GSON.toJson(offsets));if (exception != null) {log.error("提交offset發生異常!", exception);}});}}Thread.sleep(1000);}}/*** 初始化消費者配置*/private Properties initConsumerProp() {Properties properties = new Properties();// 配置kafka的Broker列表// properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");/** 配置消費組——museGroup */properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);/*** offset的重置策略——例如創建一個新的消費組,offset是不存在的,如何對offset賦值消費* latest(默認):只消費自己啟動之后發送到主題的消息。* earliest:第一次從頭開始消費,以后按照消費offset記錄繼續消費。*/properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");/** Consumer給Broker發送心跳的時間間隔 */properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);/** 如果超過10秒沒有接收到消費者的心跳,則會把消費者踢出消費組,然后重新進行rebalance操作,把分區分配給其他消費者 */properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);/** 一次poll最大拉取消息的條數,可以根據消費速度的快慢來設置 */properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);/** 如果兩次poll的時間超出了30秒的時間間隔,kafka會認為整個Consumer的消費能力太弱,會將它踢出消費組。將分區分配給其他消費者 */properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);/** key和value的反序列化 */properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());return properties;}}
-
SpringBoot集成
引入maven依賴
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
import javax.annotation.Resource;import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import com.muse.springbootdemo.entity.Message;import lombok.extern.slf4j.Slf4j;@Slf4j
@Service
public class ProducerService {private final static String TOPIC_NAME = "muse-rp";private final static Integer PARTITION_ONE = 0;private final static Integer PARTITION_TWO = 1;@Resourceprivate KafkaTemplate<String, Message> kafkaTemplate;/*** 同步阻塞——消息發送*/public void blockingSendMsg() throws Throwable {Message message;for (int i=0; i< 5; i++) {message = new Message(String.valueOf(i), "BLOCKING_MSG_SPRINGBOOT_" + i);ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_ONE,"" + message.getMegId(), message);SendResult<String, Message> sendResult = future.get();RecordMetadata recordMetadata = sendResult.getRecordMetadata();log.info("---BLOCKING_MSG_SPRINGBOOT--- [topic]={}, [partition]={}, [offset]={}", recordMetadata.topic(),recordMetadata.partition(), recordMetadata.offset());}}/*** 異步回調——消息發送*/public void noBlockingSendMsg() {Message message;for (int i=0; i< 5; i++) {message = new Message(String.valueOf(i), "NO_BLOCKING_MSG_SPRINGBOOT_" + i);ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC_NAME, PARTITION_ONE,"" + message.getMegId(), message);future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {@Overridepublic void onFailure(Throwable ex) {log.error("消息發送失敗!", ex);}@Overridepublic void onSuccess(SendResult<String, Message> result) {RecordMetadata recordMetadata = result.getRecordMetadata();log.info("---NO_BLOCKING_MSG_SPRINGBOOT---[topic]={}, [partition]={}, [offset]={}",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());}});}}
}
消費端
package com.muse.springbootdemo.service;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Service;import com.muse.springbootdemo.entity.Message;import lombok.extern.slf4j.Slf4j;@Slf4j
@Service
public class ConsumerService {private final static String TOPIC_NAME = "muse-rp";private final static String CONSUMER_GROUP_NAME = "museGroup";/*** 消息消費演示*/@KafkaListener(topics = TOPIC_NAME, groupId = CONSUMER_GROUP_NAME)public void listenGroup(ConsumerRecord<String, Message> record) {log.info(" [topic]={}, [partition]={}, [offset]={}, [value]={}", record.topic(), record.partition(),record.offset(), record.value());}}
下一篇將解析關于Kafka應用過程中的常見問題及大廠高頻面試題
- ?包括 1)防止消息丟失;2)防止重復消費通過冪等處理;3)順序消費需單分區單消費者;4)消息積壓時提升消費能力;5)延遲隊列通過時間判斷實現;6)高吞吐依靠頁面緩存+順序寫+零拷貝技術等問題 將在12h內更新
-
Kafka應用過程中的高頻問題