《Apache Kafka實戰》讀書筆記-調優Kafka集群
作者:尹正杰
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
?
?
?
一.確定調優目標
1>.常見的非功能性要求
一.性能(performance)最重要的非功能性需求之一。大多數生產環境對集群性能都有著嚴格的要求。不同的系統對于性能有著不同的訴求。比如對數據庫系統來說,最重要的性能是請求的響應時間(response time)。用戶總是希望一條查詢或更新操作的整體響應時間越短越好;而對kafak而言,性能一般指的是吞吐量和延時兩個方面 1>.吞吐量(throughput/TPS)broker或clients應用程序每秒能處理多少字節(或消息)。 2>.延時(latency)通常指代producer端發送消息到broker端持久化保存消息之間的時間間隔。該概念也用于統計端到端(end-to-end,E2E)的延時,比如producer端發送一條消息到consumer端消費這條消息的時間間隔。二.可用性在某段時間內,系統或組建正常運行的概率或在時間上的比率。業界一般用N個9來量化可用性,比如常見的“年度4個9”即指系統可用性要達到99.99%, 即一年中系統宕機的時間不能超過53分鐘(365 x 24 x 60 x 0.01% = 53).三.持久性確保了已提交操作系統產生的消息需要被持久化的保持,即使系統出現崩潰。對于kafka來說,持久性通常意味著已提交的消息需要持久化到broker底層的物理日志而不能發送丟失。由于篇幅有限,上述僅僅羅列出了對Kafka非常重要的的非功能性需求,接下來我們會進行詳細的展開來討論如何從這些方面調優kafka集群。?
2>.只有明確目標才能明確調優的方向
? 如上所述,我們需要從四個方面來考慮調優目標:吞吐量,延時,持久性和可用性。為了明確用戶生產環境中的目標,用戶需要結合業務仔細思考Kafka集群的使用場景和初衷。比如使用kafka集群的目的是什么?是作為一個消息隊列使用,還是作為數據存儲,抑或是用作流失數據處理,更或是以上所有?
?
明確這個目標有兩個重要的作用:
2.1>.萬物皆有度,世界上沒有十全十美的事情。你不可能同時最大化上述四個目標。它們彼此之間可能是矛盾的,到底看重哪個方面實際上是一個權衡選擇(trade-off)。
舉個例子:假設producer每秒發送一條消息需要花費2毫秒(即延時是2毫秒),那么顯然producer的吞吐量就應該是500條/秒,因為1秒可以發送1/0.002=500條消息。因此,吞吐量和延時的關系似乎可以使用共識來表示:TPS=1000/Latency(毫秒)。其實,兩者的關系遠非上面公司表示的那么簡單。我們依然以kafka producer來舉例子,假設它仍然 以2毫秒的延時來發送消息。如果每秒之發送一條消息,那么TPS自然就是500條/秒。由此可見,延時增大了4倍,但TPS卻增大的了將近200倍。上面的場景解釋了目前為什么批次化(batchingi)以及微批次話(micro-batchingh)流行的原因。實際環境中用戶幾乎總是愿意用增加較小延時的待見去換取TPS的顯著提升。畢竟從2毫秒到10毫秒的延遲增加通常是可以忍受的。值得一提的是,kafka producer也采取了這樣的理念,這里的9毫秒就是producer等待8毫秒積攢出的消息數遠遠多于同等時間內producer能夠發送的消息數。有的讀者會問:producer等待8毫秒就能積累1000條消息嗎?不是發送一條消息就需要2毫秒嗎?這里需要解釋一下,producer累積消息一般僅僅是將消息發送到內存中的緩沖區,而發送消息卻需要設計網絡I/O傳輸。內存操作和I/O操作的時間量級不是不同的,前者通常是基百納秒級別,而后者從毫秒到秒級別不等,故producer等待8毫秒積攢出的消息數遠遠多于同等時間內producer能夠發送的消息數。說了這么多其實就想強調上面4個調優目標統籌規劃時互相關聯,互相制約的。當然,這種制約覺得不是完全互斥,即提高一個目標一定會使其他目標降低。換句話說,就是用戶不能同時使這4個目標達到最優成都。這就是在調優前明確優化目標的第一個含義。
?
2.2>.用戶需要明確調優的重點才能針對性的調整不同的Kafka參數。Kafka提供的各類參數已達幾百個之多。只有我們明確要調優哪些方面才能確定適合的參數。比如如果要為集群中所有topic進行優化,那么就需要調整broker的參數,而如果是只是為某些topic進行優化,
則需要調整topic級別的參數。
?
二.集群基礎調優
可參考我之前的筆記:《Kafka權威指南》讀書筆記-操作系統調優篇(https://www.cnblogs.com/yinzhengjie/p/9993719.html)
? 關于內核調優的參數,我只是微調了一下,僅僅調試了12個內核參數,如下:(僅供參考,大家可以根據自己的實際環境調試相應的內核參數)
[root@yinzhengjie ~]# tail -13 /etc/sysctl.conf #Add by yinzhengjie net.ipv6.conf.all.disable_ipv6=1 vm.dirty_ratio = 80 vm.dirty_background_ratio = 5 vm.swappiness = 1 net.core.wmem_default = 256960 net.core.rmem_default = 256960 net.ipv4.tcp_wmem = 8760 256960 4088000 net.ipv4.tcp_rmem = 8760 256960 4088000 net.ipv4.tcp_window_scaling = 1 net.ipv4.tcp_max_syn_backlog = 2048 net.core.netdev_max_backlog = 2000 vm.max_map_count=262144 [root@yinzhengjie ~]#
?
三.調優吞吐量
? 若要調優吞吐量(TPS),producer,broker和consumer都需要進行調整,以便讓他們在相同的時間內傳輸更多的數據。眾所周知,Kafka基本的并行單元就是分區。producer在設計時就被要求能夠同時向多個分區發送消息,這些消息也要能夠被寫入到多個broker中供多個consumer同時消費。因此通常來說,分區數越多TPS越高。那么這是否意味著我們每次創建topic時都要創建大量的分區呢?答案顯然是否定的。這依然是一個權衡(trade-off)的問題:
一.過多的分區可能有哪些弊端呢? 1>.server/clients端將占用更多的內存。producer默認使用緩沖區為每個分區緩存消息,一旦滿足條件producer便會批量發出緩存的消息。看上去這是一個提升性能的設計,不過由于該參數時分區級別的,因此如果分區很多,這部分緩存的內存占用也會變大;而在broker端,每個broker內部都維護了很多分區的元數據,比如controller,副本管理器,分區管理器等等。顯然,分區數越多,緩存的成本越大。2>.分區屬越多,系統文件句柄書也越多每個分區在底層文件系統都有專屬目錄。該目錄下廚了3個索引文件之外還會保存日志段文件。通常生產環境下的日志短文件可能有多個,因此保守估計一個分區就可能要占用十幾個甚至幾十個文件句柄。當kafka一旦打開文件并不會顯式關閉該文件,故文件句柄書時不會釋放的。那么隨著分區數的增加,系統文件句柄書也會相應地增長。3>.分區數過多會導致controller處理周期過長每個分區通常都有若干個副本而副本保存在不同的broker上。當leader副本掛掉了,controller回自動監測到,然后在zookeeper的幫助下選擇新的leader。雖然在大部分情況下leader選舉只有很短的延時,但若分區數很多,當broker掛掉后,需要進行leader選舉的分區數就會很多。當前controller時單線程處理事件的,所以controller只能一個個地處理leader的變更請求,可能會拉長整體系統恢復的時間。二.設置合理的分區數:基于以上3點,分區數的選擇絕不是多多益善的。用戶需要結合自身的實際環境基于吞吐量等指標進行一系列測試來確定需要選擇多少分區數。有的用戶總是抱怨自身環境無法達到官網給出的性能結果。其實官網的基準測試對用戶的實際意義不大,因為不同的硬件,軟件,網絡,負載情況必然會帶來不同的測試結果。比如用戶使用1KB大小的啊消息進行測試,最后發現吞吐量才1MB/是,而官網說的每秒能達到10MB/s。這是因為官網使用的是100字節的消息體進行測試的,故根本沒有可比性。雖然沒法給出統一的分區數,但用戶基本上可以遵循下面的步驟來嘗試確定分區數:1>.創建單分區的topic,然后在實際生產機器上分別測試producer和consumer的TPS,分別為T(p) 和T(c)。2>.假設目標TPS是T(t),那么分區數大致可以確定為T(t)/max(T(p),T(c))。Kafka提供了專門的腳本kafka-producer-perf-test.sh和Kafka-consumer-perf-test.sh用于計算T(p) 和T(c)。這兩個腳本的具體使用方啊可參考我筆記:https://www.cnblogs.com/yinzhengjie/p/9953212.html。
好了,關于吞吐量的調優我就不廢話了,直接總結一下調優TPS的一些參數清單和要點:
一.broker端 1>.適當增加num.replica.fetchers(該值控制了broker端follower副本從leader副本處獲取消息的最大線程數,默認值1表示follower副本只使用一個線程去實時拉取leader處的最新消息。對于設置了acks=all的producer而言,主要的延時可能都耽誤在follower與leader同步的過程,故增加該值通常能夠縮短同步的時間間隔,從而間接地提升producer端的TPS),但不要超過CPU核數2>.調優GC避免經常性的Full GC二.producer端 1>.適當增加batch.size,比如100~512KB2>.適當增加limger.ms,比如10~100ms3>.設置compression.type=lz4(當前Kafka支持GZIP,Snappy和LZ4,但由于目前一些固有配置等原因,Kafka+LZ4組合的性能是最好的,因此推薦在那些CPU資源充足的環境中啟用producer端壓縮。)4>.ack=0或15>.retries=06>.若干線程共享producer或分區數很多,增加buffer.memory三.consumer端 1>.采用多consumer實例2>.增加fetch.min.bytes(該參數控制了leader副本每次返回consumer的最小數據字節數。通過增加該參數值,默認值是1,Kafka會為每個FETCH請求的repsonse填入更多的數據,從而減少了網絡開銷并提升了TPS。),比如10000
?
?
四.調優延時
針對不同的組件,延時(latency)的定義可能不同。對于producer而言,延時主要是消息發送的延時,即producer發送PRODUCE請求到broker端返回請求response的時間間隔;對consumer而言,延時衡量了consumer發送FETCH請求到broker端返回請求resonse的時間間隔。還有只用延時定義表示的是集群的端到端延時(end-to-end latency),即producer端發送消息到consumer端“看到”這條消息的時間間隔。不論是那種延時,他們調優的思想大致是相同的。
下面總結一下調優延時的一些參數清單:
一.broker端1>.適度增加num.replica.fetchers(該值控制了broker端follower副本從leader副本處獲取消息的最大線程數,默認值1表示follower副本只使用一個線程去實時拉取leader處的最新消息。對于設置了acks=all的producer而言,主要的延時可能都耽誤在follower與leader同步的過程,故增加該值通常能夠縮短同步的時間間隔,從而間接地提升producer端的TPS)2>.避免創建過多topic分區二.producer端 1>.設置linger.ms=02>.設置compression.type=none3>.設置acks=1或者0三.consumer端1>.設置fetch.min.bytes=1(該參數控制了leader副本每次返回consumer的最小數據字節數,默認值是1)
?
?
五.調優持久性
? 顧名思義,持久性(durability)定義了Kafka集群中消息不容易丟失的程度。持久性越高表面kafka越不會丟失消息。持久性通常由冗余的手段就是備份機制(reolication)。它保證每條Kafka消息最終會保存在多臺broker上。這樣即使單個broker崩潰,數據依然是可用的。
下面是總結調優丑就行的參數清單和要點:
一.broker端 1>.設置unclean.leader.election.enable=false(0.11.0.0之前版本需要顯式設置。0.11.0.0版本開始unclean.leader.election.enable參數的默認值由原來的true改為false,可以關閉unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不會被提升為新的 leader partition。kafka集群的持久化力大于可用性,如果ISR中沒有其它的replica,會導致這個partition不能讀寫。)2>.設置auto.create.topics.enable=false(否允許自動創建topic,默認值是true,若是false,就需要通過命令創建topic,推薦設置為false)3>.設置replication.factor=3(指定分區的副本數,默認是1),min.insync.replicas(指定最少分區數據同步的個數,默認是1.)=replication.factor -14>.default.replication.factor=3(指定默認的分區數,默認是1)5>.設置broker.rack屬性分區數據到不同幾家6>.設置log.flush.interval.messages(指定了kafka寫入多少條消息后執行一次消息“落盤”,是頻率緯度上的參數)和log.flush.interval.ms( 指定款Kafka多長時間執行一次消息“落盤”,是時間維度上的參數)為一個較小的值二.producer端 1>.設置acks=all2>.設置retries為一個較大的值,比如10~30。3>.設置max.in.flight.requests.per.connection=1(客戶端在阻塞之前在單個連接上發送的未確認請求的最大數量。注意,如果該設置設置大于1,并且發送失敗,則存在由于重試(即,如果啟用了重試)而導致消息重新排序的風險。默認值是5。簡單的來說,設置為1可以規避消息亂序的風險。)4>.設置enable.idempotence=true(保證同一條消息只被broker寫入一次)啟用冪等性三.consumer端 1>.設置auto.commit.enable=false(關閉自動提交位移) 2>.消息消費成功后調用commitSync提交位移(設置auto.commit.enable=false既然是手動提交,用戶需要調用commitSync方法來提位移,而不是使用commitAsync方法)。
?
六.調優可用性
所謂可用性(availability)反應的是Kafka集群應對崩潰的能力。無論broker,producer或consumer出現崩潰,kafka服務器依然保持可用狀態而不會因為failure就中斷服務。調優可用性就是要讓Kafka 更快地從崩潰中恢復。
一.broker端 1>.避免創建過多地分區2>.設置unclean.leader.election.enable=true3>.設置min.insync.replicas=14>.設置num.recovery.threads.per.data.dir=broker端參數log.dirs中設置的目錄數。二.producer端1>.設置acks=1,若一定要設置為all,則遵循上面broker端的min.insync.replicas配置三.consumer端 1>.設置session.timeout.ms為較低的值,比如5~10秒。2>.設置max.poll.interval.ms(默認是300000ms,即5分鐘。該參數賦予了consumer實例更多的時間來處理消息)為此消息平均處理時間稍大的值。(0.10.1.0及之后版本)3>.設置max.poll.records和max.partition.fetch.bytes減少consumer處理消息的總時長,避免頻繁rebalance。(0.10.1.0之前版本)
?