5.1 性能調優全攻略
Producer調優
批量發送與延遲發送
通過調整batch.size
和linger.ms
參數提升吞吐量:
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 默認16KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待10ms以積累更多消息
batch.size
:批量發送的字節數,達到該大小或linger.ms
超時即發送。linger.ms
:消息在緩沖區的最大停留時間,即使未達到batch.size
也會發送。
壓縮算法選擇
啟用壓縮可顯著減少網絡傳輸和磁盤存儲開銷:
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 可選:gzip、snappy、lz4、zstd
- Snappy:壓縮速度快,壓縮比適中。
- LZ4:壓縮比和速度平衡,推薦大多數場景。
- ZSTD:壓縮比最高,但CPU開銷較大。
Broker調優
內存與線程配置
調整Broker的網絡和IO線程池大小:
# server.properties
num.network.threads=8 # 網絡處理線程數,默認3
num.io.threads=16 # IO處理線程數,默認8
socket.send.buffer.bytes=102400 # 發送緩沖區大小,默認100KB
socket.receive.buffer.bytes=102400 # 接收緩沖區大小,默認100KB
磁盤與日志管理
優化日志存儲和清理策略:
# 日志段滾動大小,默認1GB
log.segment.bytes=536870912 # 日志保留時間,默認7天
log.retention.hours=168 # 日志清理策略:delete(按時間刪除)或compact(按key壓縮)
log.cleanup.policy=delete # 后臺日志清理線程數
log.cleaner.threads=2
Consumer調優
并行消費與反序列化優化
增加Consumer實例數或使用多線程消費:
// 增加Consumer Group中的Consumer數量,實現分區級并行
KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer1.subscribe(Collections.singletonList("topic"));
consumer2.subscribe(Collections.singletonList("topic"));// 或在單個Consumer中使用多線程處理消息
ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {executor.submit(() -> process(record));}
}
使用高效的序列化格式(如Protobuf替代JSON):
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufSerializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufDeserializer.class.getName());
5.2 實戰場景模擬
場景一:高并發日志采集(每秒10W+消息寫入)
架構設計
- Topic配置:創建100個分區的Topic,利用多分區并行寫入提升吞吐量。
- Producer配置:
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB批次 props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 5ms延遲 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); props.put(ProducerConfig.ACKS_CONFIG, "1"); // 犧牲部分可靠性換取高吞吐量
- Broker配置:
num.partitions=100 # 默認分區數 log.flush.interval.messages=100000 # 每10W條消息刷盤一次 log.flush.interval.ms=10000 # 每10秒刷盤一次
性能測試
使用kafka-producer-perf-test.sh
工具測試寫入性能:
bin/kafka-producer-perf-test.sh --topic log-topic --num-records 10000000 \--record-size 100 --throughput -1 --producer-props bootstrap.servers=localhost:9092
場景二:實時數據分析(電商實時大屏)
數據流設計
- 數據源:用戶瀏覽、下單、支付等行為數據實時寫入Kafka。
- 流處理:Kafka Streams計算實時指標(如UV、GMV、轉化率):
KStream<String, String> userEvents = builder.stream("user-events-topic");
KTable<Windowed<String>, Long> hourlyUV = userEvents.selectKey((key, value) -> value.getUserId()).groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1))).count(Materialized.as("hourly-uv-store"));hourlyUV.toStream().map((windowedKey, count) -> new KeyValue<>(windowedKey.key(), count)).to("hourly-uv-topic", Produced.with(Serdes.String(), Serdes.Long()));
- 結果存儲:計算結果寫入Redis,供前端大屏實時查詢。
性能優化
- Kafka配置:
# 減少消息延遲 queued.max.requests=1000 replica.lag.time.max.ms=30000
- Kafka Streams配置:
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024); // 10MB緩存 config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); // 1秒提交一次
場景三:金融級數據一致性(事務消息實現分布式事務)
架構設計
- 訂單服務:接收用戶訂單請求,發送訂單創建消息到Kafka。
- 庫存服務:消費訂單消息,扣減庫存,發送庫存扣減結果。
- 支付服務:消費庫存扣減結果,處理支付,發送支付結果。
事務消息實現
// 初始化事務
producer.initTransactions();try {producer.beginTransaction();// 發送訂單創建消息producer.send(new ProducerRecord<>("order-topic", orderId, order));// 執行本地事務(如更新訂單狀態)orderService.updateOrderStatus(orderId, "PROCESSING");// 提交事務producer.commitTransaction();
} catch (Exception e) {// 回滾事務producer.abortTransaction();
}
冪等性保障
消費端通過唯一ID去重,確保同一消息只處理一次:
@KafkaListener(topics = "inventory-topic")
public void processInventory(InventoryMessage message) {// 檢查是否已處理過if (inventoryService.isProcessed(message.getId())) {return;}// 處理庫存扣減inventoryService.decreaseStock(message.getProductId(), message.getQuantity());// 標記為已處理inventoryService.markAsProcessed(message.getId());
}