先解釋下兩個概念:
high watermark (HW)
??????? 它表示已經被commited的最后一個message offset(所謂commited, 應該是ISR中所有replica都已寫入),HW以下的消息都已被ISR中各個replica同步,從而保持一致。HW以上的消息可能是臟數據:部分replica寫成功,但最終失敗了。
Kafka Partition:? 1> 均衡各個Broker之間的數據和請求壓力; 2> 分攤處理不同的消費者進程; 3> 在partition內可以保證局部有序和狀態記錄;
Producer發送數據時,Broker的內部處理流程: ??????? a. Broker Server接到一個Producer request
??????? b. 會先從ZK中獲取該topic的metadata
??????? c. 進而找到partition的metadata
??????? d. 從而確定對應的partition leader
??????? e. 接下來通過該leader partition來append log
???????? f. 最后計算是否調整leader的High Watermark (它是ISR中所有replica的logEndOffset的最小值與leader的logEndOffset比較得出的最大值) ??????? 當然,Broker Server還要根據Producer request的需要決定是否回復ack給client;
Kafka Producer配置:
Kafka的Producer采用了 linkedBlockingQueue, 所以用戶設置的batchNumMessages不能大于queueBufferingMaxMessages.
Producer線程可以設定為定期更新Topic Metadata (topic.metadata.refresh.interval.ms, 若該值為負值,則取消定期更新);但是一旦Producer遇到失敗情況(partition missing or leader not available), 她會自動更新Metadata;
message.send.max.retries: Kafka有可能會出現某個partition leader暫時不可訪問的情況,這個配置參數描述了Producer在此種情況下最多retry的次數。
compression.codec:?????????? 在Producer配置中,該參數指定壓縮模式,默認是NoCompressionCodec(對所有Topic都不使用壓縮)
compressed.topics: ? ? ? ? ?? 在compression.codec不是NoCompressionCodec的前提下,則為指定的若干Topic執行寫壓縮,當compressed.topics為空時則是為所有topic執行壓縮。
producer.type:?????????????????????? sync or async
metadata.broker.list:?????????? Producer通過這個參數指定的Broker來獲取Topic Metadata
partitioner.class:???????????????????? 關于生產者向指定的分區發送數據,通過設置partitioner.class的屬性來指定向那個分區發送數據;如果自己指定必須編寫相應的程序,默認是kafka.producer.DefaultPartitioner,分區程序是基于散列的鍵( Utils.abs(key.hashCode) % numPartitions )。
retry.backoff.ms: ? ? ? ? ? ? ? ?? 設置Producer在refresh metadata之前要等待的時間 (Producer在每次retry之前都要refresh metadata, 但是可能partition的leader selection等需要一定的執行時間),默認100毫秒;
Kafka Consumer配置:
group.id:???????????????????????????????? ? ? ?? ?? 指定consumer所屬的consumer group
consumer.id:????????????????????????? ? ? ?? ?? 如果不指定會自動生成
socket.timeout.ms:??????????? ? ? ? ? ? 網絡請求的超時設定
socket.receive.buffer.bytes: ? ? ?? Socket的接收緩存大小
fetch.message.max.bytes: ? ? ? ?? 試圖獲取的消息大小之和(bytes)
num.consumer.fetchers: ? ? ? ? ? ? 該消費去獲取data的總線程數
auto.commit.enable: ? ? ? ? ? ? ? ? ? ? ? 如果是true, 定期向zk中更新Consumer已經獲取的last message offset(所獲取的最后一個batch的first message offset)
auto.commit.interval.ms: ? ? ? ? ? ? Consumer向ZK中更新offset的時間間隔
queued.max.message.chunks:? 默認為2
rebalance.max.retries: ? ? ? ? ? ? ? ? ? ? 在rebalance時retry的最大次數,默認為4
fetch.min.bytes:????????????????????????????? 對于一個fetch request, Broker Server應該返回的最小數據大小,達不到該值request會被block, 默認是1字節。
fetch.wait.max.ms:?????????????????????????? Server在回答一個fetch request之前能block的最大時間(可能的block原因是返回數據大小還沒達到fetch.min.bytes規定);
rebalance.backoff.ms: ? ? ? ? ? ? ? ?? 當rebalance發生時,兩個相鄰retry操作之間需要間隔的時間。
refresh.leader.backoff.ms: ? ? ? ? ? 如果一個Consumer發現一個partition暫時沒有leader, 那么Consumer會繼續等待的最大時間窗口(這段時間內會refresh partition leader);
auto.offset.reset:??????????????????????????? 當發現offset超出合理范圍(out of range)時,應該設成的大小(默認是設成offsetRequest中指定的值):
???????????????? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? smallest: 自動把該consumer的offset設為最小的offset;
??????????????? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? largest: 自動把該consumer的offset設為最大的offset;
??????????????? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? anything else: throw exception to the consumer;
consumer.timeout.ms:???????????????? 如果在該規定時間內沒有消息可供消費,則向Consumer拋出timeout exception;
??????????????????? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? 該參數默認為-1, 即不指定Consumer timeout;
client.id:???????????????????????????????????????? ? 區分不同consumer的ID,默認是group.id
Kafka Consumer如何與ZK交互:
/brokers/ids/[0]...[N-1]: N是Broker個數,每個[i]是一個Broker,里面存儲著每個Broker相關的信息(ip,port,epoch等);
/brokers/topics/[topic_name]/partitions/[0]...[N-1]/state: N是這個topic的partition數目,state里存放了每個partition的leader及ISR等信息;
???????????????????????????????????????????????????????????????????????????????????????????????????? 備注: [topic_name]這個znode本身也存儲了所有partition對應的leader broker;
/consumers/[group_id]/ids/[consumer_id]/[topic0]-[topicN]: [consumer_id]是一個臨時znode, 其子node是該consumer監聽的topic
/consumers/[group_id]/offsets/[topic_name]/[partition_id]: [partition_id]結點中的值即為offset
/consumers/[group_id]/owners/[topic_name]: ?(會陸續補充)
ConsumerFetcherThread的內部命名:?
????????????????????????????????? "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id)
Kafka消費端如何知道從哪個partition消費,以及如何在多個消費者之間平衡對于多個partition的消費? 目前Broker Server端不會做類似處理,client端可以利用zookeeper來動態分配。(此處還有些不解)
Kafka Consumer在ZK中注冊后會監聽Broker變化及同組內(Consumer Group)consumer的加入或推出,從而自動實現partitions與consumers的平衡。
關于平衡算法,簡單的說就是在實現平衡過程中,盡量保證一個consumer只和盡可能少的Broker維持連接。