一,生產者架構

生產者客戶端由兩個線程協調運行,分別為主線程和Sender線程(發送線程)。
- 主線程:KafkaProducer創建消息,通過攔截器、序列化器和分區器之后緩存到消息收集器RecordAccumulator中;
- Sender線程:從RecordAccumulator中獲取消息并發送到Kafka集群;
1,RecordAccumulator
- RecordAccumulator用來緩存消息以便Sender 線程批量發送,進而減少網絡傳輸的資源消耗;
- 消息會被追加到RecordAccumulator的某個雙端隊列中, 每個partition都維護了一個雙端隊列;
- 雙端隊列內容是Deque<ProducerBatch>,ProducerBatch包含一至多個ProducerRecord;
2,主線程寫入消息到RecordAccumulator
消息在發送之前會緩存在java.io.ByteBuffer的內存區域。RecordAccumulator的內部有一個BufferPool,用來實現ByteBuffer的復用,以實現緩存的高效利用。 BufferPool只對特定大小的ByteBuffer進行管理,而其他大小的ByteBuffer不會緩存進BufferPool中,這個大小由batch.size參數指定;
消息流入RecordAccumulator過程:
1,先尋找與partition對應的雙端隊列(如果沒有則新建);2,從這個雙端隊列的尾部獲取一個ProducerBatch(如果沒有則新建);3,判斷ProducerBatch中是否還可以寫入這個ProducerRecord,如果可以則寫入,如果不可以則創建一個新的ProducerBatch;4,新建ProducerBatch時,評估這條消息的大小是否超過batch.size參數的大小:? ? a,如果不超過,那么就以 batch.size 的大小來創建ProducerBatch,這樣在使用完這段內存區域之后,可以通過BufferPool的管理來進行復用;? ? b,如果超過,那么就以評估的大小來創建ProducerBatch,這段內存區域不會被復用。
3,Sender線程讀取RecordAccumulator并發送
1. Sender從RecordAccumulator中獲取消息,會將原本<分區,Deque<ProducerBatch>>的形式轉變成<Node,List<ProducerBatch>,其中Node表示Kafka集群的broker節點;2. Sender將消息進一步封裝成<Node,Request>的形式,這樣就可以將Request請求發往各個Node;3,Sender線程發送Request之前,請求還會保存到InFlightRequests(保存的形式為 Map<NodeId,Deque<Request>>,緩存了已經發出去但還沒有收到響應的請求)中;
InFlightRequests還可以獲得leastLoadedNode,即所有Node中負載最小的那一個。負載最小是通過每個Node在InFlightRequests中 還未確認的請求決定的,未確認的請求越多則認為負載越大。
選擇leastLoadedNode發送請求可以使它能夠盡快發出。
二,元數據的更新
元數據是指Kafka集群的元數據,包括主題、分區、副本分布、哪些副本在AR、ISR等集合、集群中有哪些節點、控制器節點又是哪一個等等。
元數據的更新操作是由Sender線程發起的,對客戶端的外部使用者不可見。