文章目錄
- 背景
- 問題復現
- 解決問題
- 原理分析
- fetch.min.bytes
- fetch.max.wait.ms
- 源碼分析
- ReplicaManager#fetchMessages
背景
開發過程中,使用kafka批量消費,發現拉取數量一直為1,如何提高批量拉取數量,記錄下踩坑記錄。
問題復現
- kafka maven依賴
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
- 配置消費者
@Configuration
public class KafkaBlukConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.max-poll-records:30}")private Integer maxPollRecords;@Value("${spring.kafka.consumer.groupId:group1}")private String group;/*** 消費者配置信息*/@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}/*** 消費者批量?程*/@Beanpublic KafkaListenerContainerFactory<?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//設置為批量消費,每個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.setBatchListener(true);return factory;}
}
- 消費端代碼
@Component
public class KafkaBatchConsumer {private static final Logger log = LoggerFactory.getLogger(KafkaBatchConsumer.class);@KafkaListener(id = "consumer1", topics = "topic2", containerFactory = "batchFactory")public void consume(List<ConsumerRecord<String, String>> record) throws Exception {log.info("KafkaBatchConsumer recode size : {} ", record.size());}}
- 使用yml配置生產者
spring:kafka:bootstrap-servers: 192.168.56.112:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
- 使用生產者發送消息
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {// 自定義的主題名稱public static final String TOPIC_NAME = "topic2";private KafkaTemplate<String, String> kafkaTemplate;/*** http://localhost:8080/kafka/send?msg=a* @param msg*/@RequestMapping("/send")public String send(@RequestParam("msg") String msg) {log.info("準備發送消息為:{}", msg);// 1.發送消息ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, msg);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {// 2.發送失敗的處理log.error("生產者 發送消息失敗:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> stringObjectSendResult) {// 3.發送成功的處理log.info("生產者 發送消息成功:" + stringObjectSendResult.toString());}});return "接口調用成功";}
}
- 發送消息,觀察消費者批量消費情況
http://localhost:9999/kafka/send?msg=a
多次調用發現如下:
發現拉取消息的大小始終為1
解決問題
- 添加下面兩行代碼
@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);################ 添加下面兩行 ###########props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);######################################props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}
- 再次發送消息,觀察消費情況
可以看到批量消費成功。
原理分析
fetch.min.bytes
消費者從服務器獲取記錄的最小字節數,broker 收到消費者拉取數據的請求的時候,如果可用數據量小于設置的值,那么 broker 將會等待有足夠可用的數據的時候才返回給消費者,這樣可以降低消費者和 broker 的工作負載。如果消費者的數量比較多,把該屬性的值設置得大一點可以降低 broker 的工作負載。
fetch.max.wait.ms
如果 Kafka 僅僅參考 fetch.min.bytes 參數的要求,那么有可能會因為獲取不到足夠大小的消息而一直阻塞等待,從而無法發送響應給 Consumer,顯然這是不合理的。fetch.max.wait.ms 參數用于指定 等待 FetchResponse 的最長時間,服務端根據此時間決定何時進行響應,默認值為 500(ms)。如果 Kafka 中沒有足夠多的消息而滿足不了 fetch.min.bytes 參數的要求,那么最終會等待 500ms 再響應消費者請求。這個參數的設定需要參考 Consumer 與 Kafka 之間的延遲大小,如果業務應用對延遲敏感,那么可以適當調小這個參數。
源碼分析
ReplicaManager#fetchMessages
/*** 能夠立即返回給客戶端的4種情況* 1. fetch請求沒有大于0的wait時間,參考fetch.max.wait.ms設置* 2. fetch請求要拉取的分區為空* 3. 根據fetch.min.bytes的設置,有足夠的數據返回* 4. 出現異常*/if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {// fetchPartitionData是一個TopicPartition -> FetchPartitionData 的map集合val fetchPartitionData = logReadResults.map { case (tp, result) =>tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,result.lastStableOffset, result.info.abortedTransactions)}// 調用響應回調函數responseCallback(fetchPartitionData)}