歡迎來到啾啾的博客🐱。
記錄學習點滴。分享工作思考和實用技巧,偶爾也分享一些雜談💬。
有很多很多不足的地方,歡迎評論交流,感謝您的閱讀和評論😄。
目錄
- 1 引言
- 2 緩沖區
- 2.1 消息在Partition內有序
- 2.2 批次消息ProducerBatch
- 2.2.1 內存分配
- 2.2.2 線程安全
- 3 發送消息Sender
- 4 總結
1 引言
繼續看Kafka源碼,看其是如何批量發送消息的。
2 緩沖區
當調用producer.send(record)時,消息將先到緩沖區,在緩沖區按照目標的Topic-Partition進行組織,滿足以條件后隨批次發送給Broker。
// KafkaProducer.java
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// ... 省略了部分代碼 ...return doSend(record, callback); // 轉交給 doSend 方法
}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {// ...// 1. 等待元數據更新(如果需要的話)// ...// 2.【核心步驟】調用 RecordAccumulator 的 append 方法RecordAccumulator.RecordAppendResult result = accumulator.append(tp,timestamp, key, value, headers, interceptors, remainingWaitMs);// ...// 3. 喚醒 Sender 線程,告訴他“可能有新活兒干了”this.sender.wakeup();// ...return result.future;
}
我們看一下“緩沖區”RecordAccumulator。
2.1 消息在Partition內有序
RecordAccumulator維護了一個數據結構:
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
append代碼簡化如下:
// RecordAccumulator.java
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, ...) {// 1. 獲取該分區的批次隊列 batches中獲取,沒有則創建Deque<ProducerBatch> dq = getOrCreateDeque(tp);synchronized (dq) { // 對該分區的隊列加鎖,保證線程安全// 2. 嘗試追加到最后一個(當前活躍的)批次中ProducerBatch last = dq.peekLast();if (last != null) {FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, ...);if (future != null) {// 如果追加成功,直接返回return new RecordAppendResult(future, dq.size() > 1 || last.isFull(), false);}}// 3. 如果最后一個批次滿了,或者不存在,就需要一個新的批次// 從 BufferPool 申請一塊內存,大小由 batch.size 配置決定ByteBuffer buffer = free.allocate(batchSize, maxTimeToBlock);// 4. 創建一個新的 ProducerBatch (貨運箱)ProducerBatch batch = new ProducerBatch(tp, memoryRecordsBuilder, now);FutureRecordMetadata future = batch.tryAppend(timestamp, key, value, headers, ...); // 把當前消息放進去// 5. 將新的批次加入到隊列的末尾dq.addLast(batch);// ...return new RecordAppendResult(future, ...);}
}
可以看到batches的value類型為Deque,所以生產者可以維護發送時partition內的順序結構。
但是在網絡抖動時這樣做還是不夠,時序性還是難以保障,所以生產者還有別的配置:
每個連接上允許發送的未確認請求的最大數量
max.in.flight.requests.per.connection
- 當 max.in.flight.requests.per.connection = 1 時,Sender 線程在發送完 Batch-1 后,會阻塞自己,直到 Batch-1 的請求得到響應(成功或失敗),它絕不會在此期間發送 Batch-2。這樣一來,即使 Batch-1 需要重試,Batch-2 也只能乖乖地在后面排隊。這就從根本上杜絕了因重試導致亂序的可能。
- 默認 max.in.flight.requests.per.connection = 5。即它允許 Producer 在還沒收到 Batch-1 的 ACK 時,就繼續發送 Batch-2、3、4、5。這極大地提升了吞吐量(不用傻等),但犧牲了順序性。
一般max.in.flight.requests.per.connection還需要與生產者冪等性配合。
enable.idempotence = true
開啟冪等后,Producer 會被分配一個唯一的 Producer ID (PID),并且它發送的每一批消息都會帶上一個從0開始遞增的序列號。Broker 端會為每個 TopicPartition 維護這個 PID 和序列號。如果收到的消息序列號不是預期的下一個,Broker 就會拒絕它。
// NetworkClient.java// 這個方法判斷我們是否可以向某個節點發送更多數據
@Override
public boolean isReady(Node node, long now) {// ... 省略了連接狀態的檢查 ...// 檢查在途請求數是否小于該連接配置的上限return !connectionStates.isBlackedOut(node.idString(), now) &&canSendRequest(node.idString(), now);
}// canSendRequest 方法內部會調用 inFlightRequests.canSendMore()
// InFlightRequests.java
public boolean canSendMore(String nodeId) {// this.requests 是一個 Map<String, Deque<NetworkClient.InFlightRequest>>// 它記錄了每個節點上所有在途(已發送但未收到響應)的請求Deque<InFlightRequest> queue = requests.get(nodeId);// 如果隊列為空,當然可以發送if (queue == null) {return true;}// 將在途請求數 與 從配置中讀到的max.in.flight.requests.per.connection比較// this.maxInFlightRequestsPerConnection 就是你配置的那個值return queue.size() < this.maxInFlightRequestsPerConnection;
}
2.2 批次消息ProducerBatch
可以看到在結構private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
中,批次消息被封裝為ProducerBatch。
2.2.1 內存分配
這個類的核心是MemoryRecordsBuilder。
總所周知,頻繁地創建和銷毀對象,特別是大塊的byte[]對GC非常不友好。MemoryRecordsBuilder內部管理者一個巨大的、連續的ByteBuffer。
這個 ByteBuffer 不是每次創建 ProducerBatch 時都 new 出來的。它是在 RecordAccumulator 初始化時(我們在上面的RecordAccumulator中有看到BufferPool),從一個叫 BufferPool 的內存池中借用 (allocate) 的。當 ProducerBatch 發送完畢,這塊內存會歸還 (deallocate) 給池子,供下一個 ProducerBatch 復用
當你調用 tryAppend 添加消息時,消息的 key, value 等內容被直接序列化成字節,并寫入到這個 ByteBuffer 的末尾。它不是在發送時才做序列化,而是在追加時就完成了。
池化:對于那些需要頻繁創建和銷毀的、生命周期短暫的、昂貴的對象(如數據庫連接、線程、大塊內存),一定要使用池化技術。這能極大地降低GC壓力,提升系統穩定性。
Redis和Kafka都有共同的高頻內存使用的特性,也都設計了預分配和復用。Kafka生產者與其用多少申請多少,不如一次性申請一塊大內存,然后通過內部的指針移動(position, limit)來管理這塊內存的使用。
2.2.2 線程安全
ProducerBatch 會被多個線程訪問:
- 你的業務線程(Producer主線程):調用 tryAppend() 往里面寫數據。
- Sender 線程:檢查它是否已滿 (isFull)、是否超時 (isExpired),并最終把它發送出去。
ProducerBatch 內部有一個精密的“狀態機”,并用 volatile 和 synchronized 保護。
// ProducerBatch.java (簡化后)
private final List<Thunk> thunks;
private final MemoryRecordsBuilder recordsBuilder;// 【關鍵狀態】volatile 保證了多線程間的可見性
private volatile boolean closed;
private int appends; // 記錄追加次數public FutureRecordMetadata tryAppend(...) {// 【關鍵檢查】在方法入口處檢查狀態,快速失敗if (this.closed) {return null; }// ... 將消息寫入 recordsBuilder ...// ...
}// 這個方法會被 Sender 線程調用
public void close() {this.closed = true;
}// 當批次被確認后,由 Sender 線程調用
public void done(long baseOffset, long logAppendTime, RuntimeException exception) {// for-each 循環是線程安全的,因為 thunks 列表在 close 之后就不再被修改for (Thunk thunk : this.thunks) {try {// 【核心】執行每個 send() 調用對應的回調函數thunk.callback.onCompletion(metadata, exception);} catch (Exception e) {// ...}}
}
總的來說是職責分離+最小化鎖的設計以保證線程安全。
3 發送消息Sender
Sender 是一個實現了 Runnable 接口的類,它在一個獨立的線程里無限循環,最終發送消息。
// Sender.java
public void run() {while (running) {try {runOnce();} catch (Exception e) {// ...}}
}void runOnce() {// ...// 1. 【核心】找出所有可以發送的批次// linger.ms 決定了可以等待的最長時間RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);// 2. 如果有準備好的節點(分區),就發送它們if (!result.readyNodes.isEmpty()) {// ...// 從累加器中“榨干”所有準備好的批次Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, ...);// ...// 將批次轉換成網絡請求并發送sendProducerData(now, ... , batches);}// ...
}
RecordAccumulator.ready() 方法是決定何時發送的關鍵。它會遍歷所有的 ProducerBatch,滿足以下任意一個條件的批次,就會被認為是 “ready”(準備就緒):
- 批次已滿:批次大小達到了 batch.size。
- 等待超時:批次從創建到現在,等待的時間超過了 linger.ms。
- 其他原因:比如 Producer 被關閉,或者有新的 Producer 加入導致需要立即發送等。
Sender 的工作模式是:
不斷地問accumulator.ready()有到linger.ms時間的或者裝滿batch.size的批次沒有。然后依據節點列表,通過NetworkClient發送ProducerBatch到Kafka Broker。
4 總結
Kafka Producer 在客戶端內部通過 RecordAccumulator 維護了一個按 TopicPartition 分類的內存緩沖區。當用戶調用 send() 方法時,消息并不會立即發送,而是被追加到對應分區的某個 ProducerBatch 中。一個獨立的 Sender 線程在后臺運行,它會持續檢查 RecordAccumulator 中的批次,一旦某個批次滿足了“大小達到 batch.size”或“等待時間超過 linger.ms”這兩個條件之一,Sender 線程就會將這個批次以及其他所有準備好的批次一同取出,打包成一個請求,通過網絡一次性發送給 Broker,從而實現批量發送,極大地提升了吞吐能力。
這個設計是經典的 “空間換時間” 和 “攢一批再處理” 的思想,通過犧牲一點點延遲(linger.ms),換取了巨大的吞吐量提升。理解了這個機制,你就能更好地去配置 batch.size 和 linger.ms 這兩個核心參數,以平衡你的業務對吞吐和延遲的需求。