Kafka的配置文件詳細描述

在kafka/config/目錄下面有3個配置文件:

producer.propertiesconsumer.propertiesserver.properties

?

(1).producer.properties:生產端的配置文件

#指定kafka節點列表,用于獲取metadata,不必全部指定
#需要kafka的服務器地址,來獲取每一個topic的分片數等元數據信息。
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092#生產者生產的消息被發送到哪個block,需要一個分組策略。
#指定分區處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區
#partitioner.class=kafka.producer.DefaultPartitioner#生產者生產的消息可以通過一定的壓縮策略(或者說壓縮算法)來壓縮。消息被壓縮后發送到broker集群,
#而broker集群是不會進行解壓縮的,broker集
群只會把消息發送到消費者集群,然后由消費者來解壓縮。 #是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。 #壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。 #文本數據會以1比10或者更高的壓縮比進行壓縮。 compression.codec=none#指定序列化處理類,消息在網絡上傳輸就需要序列化,它有String、數組等許多種實現。 serializer.class=kafka.serializer.DefaultEncoder#如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮。 #如果上面啟用了壓縮,那么這里就需要設置 #compressed.topics= #這是消息的確認機制,默認值是0。在面試中常被問到。 #producer有個ack參數,有三個值,分別代表: #(1)不在乎是否寫入成功; #(2)寫入leader成功; #(3)寫入leader和所有副本都成功; #要求非常可靠的話可以犧牲性能設置成最后一種。 #為了保證消息不丟失,至少要設置為1,也就 #是說至少保證leader將消息保存成功。 #設置發送數據是否需要服務端的反饋,有三個值0,1,-1,分別代表3種狀態: #0: producer不會等待broker發送ack。生產者只要把消息發送給broker之后,就認為發送成功了,這是第1種情況; #1: 當leader接收到消息之后發送ack。生產者把消息發送到broker之后,并且消息被寫入到本地文件,才認為發送成功,這是第二種情況;#-1: 當所有的follower都同步消息成功后發送ack。不僅是主的分區將消息保存成功了,
#而且其所有的分區的副本數也都同步好了,才會被認為發動成功,這是第3種情況。
request.required.acks=0#broker必須在該時間范圍之內給出反饋,否則失敗。 #在向producer發送ack之前,broker允許等待的最大時間 ,如果超時, #broker將會向producer發送一個error ACK.意味著上一次消息因為某種原因 #未能成功(比如follower未能同步成功) request.timeout.ms=10000#生產者將消息發送到broker,有兩種方式,一種是同步,表示生產者發送一條,broker就接收一條;
#還有一種是異步,表示生產者積累到一批的消息,裝到一個池子里面緩存起來,再發送給broker,
#這個池子不會無限緩存消息,在下面,它分別有一個時間限制(時間閾值)和一個數量限制(數量閾值)的參數供我們來設置。
#一般我們會選擇異步。
#同步還是異步發送消息,默認“sync”表同步,"async"表異步。異步可以提高發送吞吐量,
#也意味著消息將會在本地buffer中,并適時批量發送,但是也可能導致丟失未發送過去的消息
producer.type=sync#在async模式下,當message被緩存的時間超過此值后,將會批量發送給broker, #默認為5000ms #此值和batch.num.messages協同工作. queue.buffering.max.ms = 5000#異步情況下,緩存中允許存放消息數量的大小。 #在async模式下,producer端允許buffer的最大消息量 #無論如何,producer都無法盡快的將消息發送給broker,從而導致消息在producer端大量沉積 #此時,如果消息的條數達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000條消息。 queue.buffering.max.messages=20000#如果是異步,指定每次批量發送數據量,默認為200 batch.num.messages=500#在生產端的緩沖池中,消息發送出去之后,在沒有收到確認之前,該緩沖池中的消息是不能被刪除的,
#但是生產者一直在生產消息,這個時候緩沖池可能會被撐爆,所以這就需要有一個處理的策略。
#有兩種處理方式,一種是讓生產者先別生產那么快,阻塞一下,等會再生產;另一種是將緩沖池中的消息清空。
#當消息在producer端沉積的條數達到"queue.buffering.max.meesages"后阻塞一定時間后,
#隊列仍然沒有enqueue(producer仍然沒有發送出任何消息)
#此時producer可以繼續阻塞或者將消息拋棄,此timeout值用于控制"阻塞"的時間 #-1: 不限制阻塞超時時間,讓produce一直阻塞,這個時候消息就不會被拋棄 #0: 立即清空隊列,消息被拋棄 queue.enqueue.timeout.ms=-1#當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發的次數 #因為broker并沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失) #有可能導致broker接收到重復的消息,默認值為3. message.send.max.retries=3#producer刷新topic metada的時間間隔,producer需要知道partition leader #的位置,以及當前topic的情況 #因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時, #將會立即刷新 #(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數來配置 #額外的刷新機制,默認值600000 topic.metadata.refresh.interval.ms=60000

?

?

(2).consumer.properties:消費端的配置文件

#消費者集群通過連接Zookeeper來找到broker。
#zookeeper連接服務器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181#zookeeper的session過期時間,默認5000ms,用于檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000#當消費者掛掉,其他消費者要等該指定時間才能檢查到并且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000#這是一個時間閾值。
#指定多久消費者更新offset到zookeeper中。
#注意offset更新時基于time而不是每次獲得的消息。
#一旦在更新zookeeper發生異常并重啟,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000#指定消費
group.id=xxxxx#這是一個數量閾值,經測試是500條。
#當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息#注意offset信息并不是每消費一次消息就向zk提交
#一次,而是現在本地保存(內存),并定期提交,默認為true
auto.commit.enable=true# 自動更新時間。默認60 * 1000
auto.commit.interval.ms=1000# 當前consumer的標識,可以設定,也可以有系統生成,
#主要用來跟蹤消息消費情況,便于觀察
conusmer.id=xxx# 消費者客戶端編號,用于區分不同客戶端,默認客戶端程序自動產生
client.id=xxxx# 最大取多少塊緩存到消費者(默認10)
queued.max.message.chunks=50# 當有新的consumer加入到group時,將會reblance,此后將會
#有partitions的消費端遷移到新  的consumer上,如果一個
#consumer獲得了某個partition的消費權限,那么它將會向zk
#注冊 "Partition Owner registry"節點信息,但是有可能
#此時舊的consumer尚沒有釋放此節點, 此值用于控制,
#注冊節點的重試次數.
rebalance.max.retries=5#每拉取一批消息的最大字節數
#獲取消息的最大尺寸,broker不會像consumer輸出大于
#此值的消息chunk 每次feth將得到多條消息,此值為總大小,
#提升此值,將會消耗更多的consumer端內存
fetch.min.bytes=6553600#當消息的尺寸不足時,server阻塞的時間,如果超時,
#消息將立即發送給consumer
#數據一批一批到達,如果每一批是10條消息,如果某一批還
#不到10條,但是超時了,也會立即發送給consumer。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360# 如果zookeeper沒有offset值或offset值超出范圍。
#那么就給個初始的offset。有smallest、largest、
#anything可選,分別表示給當前最小的offset、
#當前最大的offset、拋異常。默認largest
auto.offset.reset=smallest# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder

?

?

(3).server.properties:服務端的配置文件

#broker的全局唯一編號,不能重復
broker.id=0#用來監聽鏈接的端口,producer或consumer將在此端口建立連接
port=9092#處理網絡請求的線程數量,也就是接收消息的線程數。
#接收線程會將接收到的消息放到內存中,然后再從內存中寫入磁盤。
num.network.threads=3#消息從內存中寫入磁盤是時候使用的線程數量。
#用來處理磁盤IO的線程數量
num.io.threads=8#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400#接受套接字的緩沖區大小
socket.receive.buffer.bytes=102400#請求套接字的緩沖區大小
socket.request.max.bytes=104857600#kafka運行日志存放的路徑
log.dirs=/export/servers/logs/kafka#topic在當前broker上的分片個數
num.partitions=2#我們知道segment文件默認會被保留7天的時間,超時的話就
#會被清理,那么清理這件事情就需要有一些線程來做。這里就是
#用來設置恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1#segment文件保留的最長時間,默認保留7天(168小時),
#超時將被刪除,也就是說7天之前的數據將被清理掉。
log.retention.hours=168#滾動生成新的segment文件的最大時間
log.roll.hours=168#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824#上面的參數設置了每一個segment文件的大小是1G,那么
#就需要有一個東西去定期檢查segment文件有沒有達到1G,
#多長時間去檢查一次,就需要設置一個周期性檢查文件大小
#的時間(單位是毫秒)。
log.retention.check.interval.ms=300000#日志清理是否打開
log.cleaner.enable=true#broker需要使用zookeeper保存meta數據
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000#上面我們說過接收線程會將接收到的消息放到內存中,然后再從內存
#寫到磁盤上,那么什么時候將消息從內存中寫入磁盤,就有一個
#時間限制(時間閾值)和一個數量限制(數量閾值),這里設置的是
#數量閾值,下一個參數設置的則是時間閾值。
#partion buffer中,消息的條數達到閾值,將觸發flush到磁盤。
log.flush.interval.messages=10000#消息buffer的時間,達到閾值,將觸發將消息從內存flush到磁盤,
#單位是毫秒。
log.flush.interval.ms=3000#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:
#Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01advertised.host.name=192.168.239.128

?

如果覺得本文對您有幫助,不妨掃描下方微信二維碼打賞點,您的鼓勵是我前進最大的動力:

?

轉載于:https://www.cnblogs.com/jun1019/p/6256371.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/541961.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/541961.shtml
英文地址,請注明出處:http://en.pswp.cn/news/541961.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java StreamTokenizer nextToken()方法與示例

StreamTokenizer類nextToken()方法 (StreamTokenizer Class nextToken() method) nextToken() method is available in java.io package. nextToken()方法在java.io包中可用。 nextToken() method is used to parse the next token from the input stream of this StreamTokeni…

二維數組m的元素是4個字符組成的串_串、數組和廣義表

1. 串1.1 串的定義ADT String{ 數據對象:D{ai|ai∈CharacterSet, i1, 2, …, n, n≧0} 數據關系:R1{|ai-1, ai∈D, i2, …, n} 基本操作: 生成一個值等于chars的串 復制一個串 判斷串是否空串 比較串的大小 返回串元素的個數 將串清空 …

流媒體測試筆記記錄之————阿里云監控、OBS、FFmpeg拉流和推流變化比較記錄...

OBS設置視頻(512kbps)和音頻(128kbps)比特率 阿里云監控結果: 使用FFmpeg拉流到Nginx 服務器測試比特率 第二次測試,修改視頻和音頻比特率 OBS設置 阿里云監控 Nginx 比特率變化 FFMPEG 拉流截圖

Java RandomAccessFile readChar()方法及示例

RandomAccessFile類readChar()方法 (RandomAccessFile Class readChar() method) readChar() method is available in java.io package. readChar()方法在java.io包中可用。 readChar() method is used to read a character value from this file and it can read character up…

python方差分析模型的預測結果怎么看_statsmodels中方差分析表結果解析

引言通常我們在對多個變量進行統計分析的時候,結果的匯總和整理需要耗費大量的時間和精力,稍有不慎還有可能出現錯誤。因此在對多個變量統計分析的時候,使用自動化的腳本對結果進行整理和匯總就十分的方便了。這里筆者使用Python當中的statsm…

Java PipedOutputStream connect()方法與示例

PipedOutputStream類的connect()方法 (PipedOutputStream Class connect() method) connect() method is available in java.io package. connect()方法在java.io包中可用。 connect() method is used to cause this PipedOutputStream to be connected to the given PipedInpu…

[轉載] 中國象棋軟件-引擎實現(一)概述

2005年6月我系第二批科技小組的項目正式確定為實現一款中國象棋對弈軟件。基本功能包括人機對戰、網絡對戰。我負責開發人機對戰的引擎部分,也就是讓計算機下棋。經過了暑假整整兩個月的學習與實踐,我終于初步完成了程序,雖然電腦的下棋水平實…

Java FilePermission getActions()方法與示例

FilePermission類的getActions()方法 (FilePermission Class getActions() method) getActions() method is available in java.io package. getActions()方法在java.io包中可用。 getActions() method is used to check whether this FilePermission and the given object are…

字符與編碼(編碼轉換)

作為一名程序員,肯定有被亂碼困擾的時候,真到了百思不得其解的時候,就會覺得:英文程序員真幸福。但其實只要明白編碼之間的轉換規律,其實亂碼還是很好解決的。我們都知道字符串在保存和傳輸的時候需要先經過編碼成二進…

mysql 刷新二進制日志_使用binlog日志恢復MySQL數據庫刪除數據的方法

binlog日志簡介:binlog 就是binarylog,二進制日志文件,這個文件記錄了MySQL所有的DDL和DML(除了數據查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間。binlog日志包括兩類文件:1)二進制日志索引文件(文件名…

Java FileInputStream available()方法與示例

FileInputStream類的available()方法 (FileInputStream Class available() method) available() method is available in java.io package. available()方法在java.io包中可用。 available() method is used to return the number of bytes left that can be read from this Fi…

mysql 輸出參數 sql語句_MySQL: 詳細的sql語句

1添1.1【插入單行】insert [into] (列名) values (列值)例:insert into Strdents (姓名,性別,出生日期) values (開心朋朋,男,1980/6/15)1.2【將現有表數據添加到一個已有表】insert into (列名) select from 例:insert into tongxunlu (姓名,地址,電子郵…

執行git push出現Everything up-to-date

在github上git clone一個項目,在里面創建一個目錄,然后git push的時候,出現報錯"Everything up-to-date" 原因:1)沒有git add .2)沒有git commit -m "提交信息"如果上面兩個步驟都成功…

Java File類boolean delete()方法(帶示例)

文件類布爾型delete() (File Class boolean delete()) This method is available in package java.io.File.delete(). 軟件包java.io.File.delete()中提供了此方法。 This method is used to delete file or directory by using delete() method and this method is accessible…

Unity3D Adam Demo的學習與研究

1.簡述 這篇文章是對Adam各種相關資料了解后進行一些精簡的內容。如果你想仔細研究某個技術請跳轉至unity相關頁面。 Adam官方頁面: https://unity3d.com/cn/pages/adam 搬運視頻以及資源包網盤下載: http://pan.baidu.com/s/1jH6NF86 Adam這個demo由8個人的團隊耗時6個月(part…

Java File類boolean isFile()方法(帶示例)

File類boolean isFile() (File Class boolean isFile()) This method is available in package java.io.File.isFile(). 軟件包java.io.File.isFile()中提供了此方法。 This method is used to check whether the file is specified by filepath is a file or not. 此方法用于檢…

要加油!

現實中我容易佩服一個人。 一個頑強的女人,一個艱苦奮斗的男人..... 但是在網絡的世界里,我沒有佩服過幾個,但是不得不說的就是冰河。同樣的年齡人家做的事情和我們做的事情差距是多么的大,真的想想心里都是天壤之別。 比一比才知…

Java DataOutputStream writeInt()方法及示例

DataOutputStream類writeInt()方法 (DataOutputStream Class writeInt() method) writeInt() method is available in java.io package. writeInt()方法在java.io包中可用。 writeInt() method is used to write the given integer value to the basic DataOutputStream as 4 b…

python安卓自動化實現方法_uiautomator +python 實現安卓UI自動化

簡單實例注:安卓6.0以上的手機不會自動安裝app-uiautomator.apk和app-uiautomator-test.apk,需要手動安裝,否則報錯ioerror RPC server not starteduiautomator pythonHTMLTestRunner 安卓UI自動化實現#coding:utf-8from uiautomator importD…

ES6特性之:Spread操作符

Spread操作符(...),也稱作展開操作符,作用是將可迭代的(Iterable)對象進行展開。 比如有2個數組,我們要將其中一個數組中所有元素插入到另一個數組中,通過Spread操作符,就可以這樣進行: var fruits ["…