Kafka的Producer和Consumer源碼學習

先解釋下兩個概念:

high watermark (HW)

??????? 它表示已經被commited的最后一個message offset(所謂commited, 應該是ISR中所有replica都已寫入),HW以下的消息都已被ISR中各個replica同步,從而保持一致。HW以上的消息可能是臟數據:部分replica寫成功,但最終失敗了。

Kafka Partition:? 1> 均衡各個Broker之間的數據和請求壓力; 2> 分攤處理不同的消費者進程; 3> 在partition內可以保證局部有序和狀態記錄;

Producer發送數據時,Broker的內部處理流程: ??????? a. Broker Server接到一個Producer request

??????? b. 會先從ZK中獲取該topic的metadata

??????? c. 進而找到partition的metadata

??????? d. 從而確定對應的partition leader

??????? e. 接下來通過該leader partition來append log

???????? f. 最后計算是否調整leader的High Watermark (它是ISR中所有replica的logEndOffset的最小值與leader的logEndOffset比較得出的最大值) ??????? 當然,Broker Server還要根據Producer request的需要決定是否回復ack給client;

Kafka Producer配置:

Kafka的Producer采用了 linkedBlockingQueue, 所以用戶設置的batchNumMessages不能大于queueBufferingMaxMessages.
Producer線程可以設定為定期更新Topic Metadata (topic.metadata.refresh.interval.ms, 若該值為負值,則取消定期更新);但是一旦Producer遇到失敗情況(partition missing or leader not available), 她會自動更新Metadata;
message.send.max.retries: Kafka有可能會出現某個partition leader暫時不可訪問的情況,這個配置參數描述了Producer在此種情況下最多retry的次數。
compression.codec:?????????? 在Producer配置中,該參數指定壓縮模式,默認是NoCompressionCodec(對所有Topic都不使用壓縮)
compressed.topics: ? ? ? ? ?? 在compression.codec不是NoCompressionCodec的前提下,則為指定的若干Topic執行寫壓縮,當compressed.topics為空時則是為所有topic執行壓縮。
producer.type:?????????????????????? sync or async
metadata.broker.list:?????????? Producer通過這個參數指定的Broker來獲取Topic Metadata
partitioner.class:???????????????????? 關于生產者向指定的分區發送數據,通過設置partitioner.class的屬性來指定向那個分區發送數據;如果自己指定必須編寫相應的程序,默認是kafka.producer.DefaultPartitioner,分區程序是基于散列的鍵( Utils.abs(key.hashCode) % numPartitions )。
retry.backoff.ms: ? ? ? ? ? ? ? ?? 設置Producer在refresh metadata之前要等待的時間 (Producer在每次retry之前都要refresh metadata, 但是可能partition的leader selection等需要一定的執行時間),默認100毫秒;

Kafka Consumer配置:

group.id:???????????????????????????????? ? ? ?? ?? 指定consumer所屬的consumer group
consumer.id:????????????????????????? ? ? ?? ?? 如果不指定會自動生成
socket.timeout.ms:??????????? ? ? ? ? ? 網絡請求的超時設定
socket.receive.buffer.bytes: ? ? ?? Socket的接收緩存大小
fetch.message.max.bytes: ? ? ? ?? 試圖獲取的消息大小之和(bytes)
num.consumer.fetchers: ? ? ? ? ? ? 該消費去獲取data的總線程數
auto.commit.enable: ? ? ? ? ? ? ? ? ? ? ? 如果是true, 定期向zk中更新Consumer已經獲取的last message offset(所獲取的最后一個batch的first message offset)
auto.commit.interval.ms: ? ? ? ? ? ? Consumer向ZK中更新offset的時間間隔
queued.max.message.chunks:? 默認為2
rebalance.max.retries: ? ? ? ? ? ? ? ? ? ? 在rebalance時retry的最大次數,默認為4
fetch.min.bytes:????????????????????????????? 對于一個fetch request, Broker Server應該返回的最小數據大小,達不到該值request會被block, 默認是1字節。
fetch.wait.max.ms:?????????????????????????? Server在回答一個fetch request之前能block的最大時間(可能的block原因是返回數據大小還沒達到fetch.min.bytes規定);
rebalance.backoff.ms: ? ? ? ? ? ? ? ?? 當rebalance發生時,兩個相鄰retry操作之間需要間隔的時間。
refresh.leader.backoff.ms: ? ? ? ? ? 如果一個Consumer發現一個partition暫時沒有leader, 那么Consumer會繼續等待的最大時間窗口(這段時間內會refresh partition leader);
auto.offset.reset:??????????????????????????? 當發現offset超出合理范圍(out of range)時,應該設成的大小(默認是設成offsetRequest中指定的值):
???????????????? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? smallest: 自動把該consumer的offset設為最小的offset;
??????????????? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? largest: 自動把該consumer的offset設為最大的offset;
??????????????? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? anything else: throw exception to the consumer;
consumer.timeout.ms:???????????????? 如果在該規定時間內沒有消息可供消費,則向Consumer拋出timeout exception;
??????????????????? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? 該參數默認為-1, 即不指定Consumer timeout;
client.id:???????????????????????????????????????? ? 區分不同consumer的ID,默認是group.id

Kafka Consumer如何與ZK交互:

/brokers/ids/[0]...[N-1]: N是Broker個數,每個[i]是一個Broker,里面存儲著每個Broker相關的信息(ip,port,epoch等);
/brokers/topics/[topic_name]/partitions/[0]...[N-1]/state: N是這個topic的partition數目,state里存放了每個partition的leader及ISR等信息;
???????????????????????????????????????????????????????????????????????????????????????????????????? 備注: [topic_name]這個znode本身也存儲了所有partition對應的leader broker;

/consumers/[group_id]/ids/[consumer_id]/[topic0]-[topicN]: [consumer_id]是一個臨時znode, 其子node是該consumer監聽的topic
/consumers/[group_id]/offsets/[topic_name]/[partition_id]: [partition_id]結點中的值即為offset
/consumers/[group_id]/owners/[topic_name]: ?(會陸續補充)

ConsumerFetcherThread的內部命名:?
????????????????????????????????? "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id)

Kafka消費端如何知道從哪個partition消費,以及如何在多個消費者之間平衡對于多個partition的消費? 目前Broker Server端不會做類似處理,client端可以利用zookeeper來動態分配。(此處還有些不解)

Kafka Consumer在ZK中注冊后會監聽Broker變化及同組內(Consumer Group)consumer的加入或推出,從而自動實現partitions與consumers的平衡。

關于平衡算法,簡單的說就是在實現平衡過程中,盡量保證一個consumer只和盡可能少的Broker維持連接。

轉載于:https://www.cnblogs.com/tonychai/p/4528208.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/375007.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/375007.shtml
英文地址,請注明出處:http://en.pswp.cn/news/375007.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

for+next()實現數組的遍歷及while list each 的使用

//要求使用for循環語句來完成該數組的遍歷//輸出每一項的鍵名和對應值&#xff1a; $a array( a > 34, 5 > 51, 13, 32, bb>15, 2 > 31 ); $len count($a); for($i0;$i<$len;$i)…

讀Pyqt4教程,帶你入門Pyqt4 _005

對話框窗體或對話框是現代GUI應用不可或缺的一部分。dialog定義為兩個或多個人之間的交談。在計算機程序中dialog是一個窗體&#xff0c;用來和程序“交談”。對話框用來輸入數據、修改數據、改變程序設置等等。對話框是用戶和計算機程序溝通的重要手段。 QColorDialog 顏色對話…

Linux內核的啟動過程分析

秦鼎濤 《Linux內核分析》MOOC課程http://mooc.study.163.com/course/USTC-1000029000  一、實驗目的及要求&#xff1a; 使用gdb跟蹤調試內核從start_kernel到init進程啟動 詳細分析從start_kernel到init進程啟動的過程并結合實驗截圖撰寫一篇署名博客&#xff0c;并在博客文…

static修飾符詳解

static表示“全局”或者“靜態”的意思&#xff0c;用來修飾成員變量和成員方法&#xff0c;也可以形成靜態static代碼塊&#xff0c;但是Java語言中沒有全局變量的概念。被static修飾的成員變量和成員方法獨立于該類的任何對象。也就是說&#xff0c;它不依賴類特定的實例&…

四則運算2+psp0

程序要求&#xff1a; 1.題目避免重復 2.可定制&#xff08;數量\打印方式&#xff09; 3.可以一下控制參數 ① 是否有乘除法 ② 是否有括號&#xff08;最多支持十個數參與運算&#xff09; ③ 數值范圍 ④加減有無負數 ⑤除法有無余數 分析&#xff1a;① 如果是兩個數…

kettle作業中的js如何寫日志文件

在kettle作業中JavaScript腳本有時候也扮演非常重要的角色&#xff0c;此時我們希望有一些日志記錄。下面是job中JavaScript記錄日志的方式。 job的js寫日志的方法。 得到日志輸出實例 org.pentaho.di.core.logging.LogWriter.getInstance();按照日志的級別輸出&#xff1a; pu…

淺析Kerberos原理,及其應用和管理

文章作者&#xff1a;luxianghao 文章來源&#xff1a;http://www.cnblogs.com/luxianghao/p/5269739.html 轉載請注明&#xff0c;謝謝合作。 免責聲明&#xff1a;文章內容僅代表個人觀點&#xff0c;如有不當&#xff0c;歡迎指正。 --- 一&#xff0c;引言 Kerberos簡單來…

2014! 的末尾有多少個0

2014&#xff01; 的末尾有多少個0<?xml version"1.0" encoding"UTF-8"?> 假設 末尾有 k 個0&#xff0c;所以 2014&#xff01; x * 10^k ; 10 ^ k &#xff08;2 * 5 &#xff09;^ k 2^k * 5^k, 明顯所有數字中因數含有2的數字多于含有5的數…

[轉載]一句話插配置文件

http://www.t00ls.net/viewthread.php?tid13901 一句話插入配置文件system.asp沒有過濾雙引號&#xff0c;插入一句就行。常規插法如下&#xff1a;"%><%eval request("d")%><%但金刀客這篇文件&#xff08;http://www.cqzh.cn/post/328.html&…

android插件化-獲取apkplug框架已安裝插件-03

上一篇文章成功的將apkplug框架嵌入了應用中而且啟動 鏈接http://www.apkplug.com/blog/?post10 這一篇文章實現怎樣獲取全部已安裝插件 一 獲取框架的SystemBundle的上下文BundleContext apkplug框架啟動會自己主動創建一個SystemBundle, 它是框架的第一個插件不可停止和卸…

Java實現棧。

定義一個接口MyStack接口&#xff1a; package Stack; public interface MyStack<T> { boolean isEmpty(); int length(); boolean push(T date); T pop();} 數組實現&#xff1a; package Stack; public class ArrayStack<T> implements MyStack<T>{ privat…

轉載]SA權限九種上傳方法

剛看了一種方法&#xff0c;如果是注入點&#xff0c;利用管中窺豹以二進制的方式上傳&#xff0c;上傳的時候最好改下名&#xff0c;比如do.exe&#xff0c;上傳到目標服務器可以改成do.cmd&#xff0c;等傳上去之后用copy 命令改回來。 當然用啊d也可以上傳&#xff0c;還有…

asp.net 導出Excel

asp.net 導出Excel 分享一個asp.net 導出假Excel代碼。優點&#xff0c;不用借助于任何插件比如&#xff08;NPOI&#xff09;,復制代碼&#xff0c;修改grid.DataSource直接導出。 先看導出后的效果圖 1 System.Web.UI.WebControls.DataGrid grid new DataGrid();2 …

bzoj 2300 動態維護上凸殼(不支持刪除)

新技能GET。 用set保存點&#xff0c;然后只需要找前趨和后繼就可以動態維護了。 1 /**************************************************************2 Problem: 23003 User: idy0024 Language: C5 Result: Accepted6 Time:556 ms7 Memory:4824 kb8 …

帶有Guice的富域模型

貧血域模型是一個非常常見的反模式。 在ORM和DI框架的世界中&#xff0c;我們自然會發現自己擁有一個由ORM管理的“域”&#xff0c;該域包含所有數據且無行為。 通過我們的DI框架有幫助地注入了輔助類&#xff0c;這些輔助類都是行為且沒有數據。 在本文中&#xff0c;我將介紹…

php匿名函數小示例

<?php //$fun function($params){ // echo $params; //}; // //$fun(aa);//例一 //在普通函數中定義一個匿名函數 //function printStr(){ // $fun function($something){ // echo $something; // }; // $fun(something); // //} //printStr();//例子…

購書心得

作者&#xff1a;泉哥主頁&#xff1a;http://riusksk.blogbus.com富家不用買良田&#xff0c;書中自有千鐘粟&#xff1b;安居不用架高堂&#xff0c;書中自有黃金屋&#xff1b;出門莫恨無人隨&#xff0c;書中車馬多如簇&#xff1b;娶妻莫恨無良媒&#xff0c;書中自有顏如…

MariaDB?條件語句WHERE

MariaDB 條件語句WHEREWHERE Clause Operators Operator Description Equality<> Nonequality! Nonequality< Less than< Less than or equal to > Greater than > Greater than or equal to BETWEEN Between two specified values BETWEEN AND (jlive)[c…

Spring 3.1緩存抽象教程

即將發布的Spring 3.1版本中引入的新功能之一是緩存抽象之一 。 Spring Framework提供了對將緩存透明添加到現有Spring應用程序中的支持。 與事務支持類似&#xff0c;緩存抽象允許一致使用各種緩存解決方案&#xff0c;而對代碼的影響最小。 從本質上講&#xff0c;抽象將緩存…