文章目錄
- fetch.min.bytes
- fetch.max.wait.ms
- fetch.max.bytes
- max.poll.records
- max.partition.fetch.bytes
- session.timeout.ms和heartbeat.interval.ms
- max.poll.interval.ms
- request.timeout.ms
- auto.offset.reset
- enable.auto.commit
- partition.assignment.strategy
- 區間(range)
- 輪詢(roundRobin)
- 黏性(sticky)
- 協作黏性(cooperative sticky)
fetch.min.bytes
這個屬性指定了消費者從服務器獲取記錄的最小字節數,默認是1字節。broker在收到消費者的獲取數據請求時,如果可用數據量小于fetch.min.bytes指定的大小,那么它就會等到有足夠可用數據時才將數據返回。這樣可以降低消費者和broker的負載,因為它們在主題流量不是很大的時候(或者一天里的低流量時段)不需要來來回回地傳輸消息。如果消費者在沒有太多可用數據時CPU使用率很高,或者在有很多消費者時為了降低broker的負載,那么可以把這個屬性的值設置得比默認值大。但需要注意的是,在低吞吐量的情況下,加大這個值會增加延遲。
fetch.max.wait.ms
通過設置fetch.min.bytes,可以讓Kafka等到有足夠多的數據時才將它們返回給消費者,feth.max.wait.ms則用于指定broker等待的時間,默認是500毫秒。如果沒有足夠多的數據流入Kafka,那么消費者獲取數據的請求就得不到滿足,最多會導致500毫秒的延遲。如果要降低潛在的延遲(為了滿足SLA),那么可以把這個屬性的值設置得小一些。如果fetch.max.wait.ms被設置為100毫秒,fetch.min.bytes被設置為1 MB,那么Kafka在收到消費者的請求后,如果有1 MB數據,就將其返回,如果沒有,就在100毫秒后返回,就看哪個條件先得到滿足。
fetch.max.bytes
這個屬性指定了Kafka返回的數據的最大字節數(默認為50 MB)。消費者會將服務器返回的數據放在內存中,所以這個屬性被用于限制消費者用來存放數據的內存大小。需要注意的是,記錄是分批發送給客戶端的,如果broker要發送的批次超過了這個屬性指定的大小,那么這個限制將被忽略。這樣可以保證消費者能夠繼續處理消息。值得注意的是,broker端也有一個與之對應的配置屬性,Kafka管理員可以用它來限制最大獲取數量。broker端的這個配置屬性可能很有用,因為請求的數據量越大,需要從磁盤讀取的數據量就越大,通過網絡發送數據的時間就越長,這可能會導致資源爭用并增加broker的負載。
max.poll.records
這個屬性用于控制單次調用poll()方法返回的記錄條數。可以用它來控制應用程序在進行每一次輪詢循環時需要處理的記錄條數(不是記錄的大小)。
max.partition.fetch.bytes
這個屬性指定了服務器從每個分區里返回給消費者的最大字節數(默認值是1 MB)。當KafkaConsumer.poll()方法返回ConsumerRecords時,從每個分區里返回的記錄最多不超過max.partition.fetch.bytes指定的字節。需要注意的是,使用這個屬性來控制消費者的內存使用量會讓事情變得復雜,因為你無法控制broker返回的響應里包含多少個分區的數據。因此,對于這種情況,建議用fetch.max.bytes替代,除非有特殊的需求,比如要求從每個分區讀取差不多的數據量。
session.timeout.ms和heartbeat.interval.ms
session.timeout.ms指定了消費者可以在多長時間內不與服務器發生交互而仍然被認為還“活著”,默認是10秒。如果消費者沒有在session.timeout.ms指定的時間內發送心跳給群組協調器,則會被認為已“死亡”,協調器就會觸發再均衡,把分區分配給群組里的其他消費者。session.timeout.ms與heartbeat.interval.ms緊密相關。heartbeat.interval.ms指定了消費者向協調器發送心跳的頻率,session.timeout.ms指定了消費者可以多久不發送心跳。因此,我們一般會同時設置這兩個屬性,heartbeat.interval.ms必須比session.timeout.ms小,通常前者是后者的1/3。如果session.timeout.ms是3秒,那么heartbeat.interval.ms就應該是1秒。把session.timeout.ms設置得比默認值小,可以更快地檢測到崩潰,并從崩潰中恢復,但也會導致不必要的再均衡。把session.timeout.ms設置得比默認值大,可以減少意外的再均衡,但需要更長的時間才能檢測到崩潰。
max.poll.interval.ms
這個屬性指定了消費者在被認為已經“死亡”之前可以在多長時間內不發起輪詢。前面提到過,心跳和會話超時是Kafka檢測已“死亡”的消費者并撤銷其分區的主要機制。我們也提到了心跳是通過后臺線程發送的,而后臺線程有可能在消費者主線程發生死鎖的情況下繼續發送心跳,但這個消費者并沒有在讀取分區里的數據。要想知道消費者是否還在處理消息,最簡單的方法是檢查它是否還在請求數據。但是,請求之間的時間間隔是很難預測的,它不僅取決于可用的數據量、消費者處理數據的方式,有時還取決于其他服務的延遲。在需要耗費時間來處理每個記錄的應用程序中,可以通過max.poll.records來限制返回的數據量,從而限制應用程序在再次調用poll()之前的等待時長。但是,即使設置了max.poll.records,調用poll()的時間間隔仍然很難預測。于是,設置max.poll.interval.ms就成了一種保險措施。它必須被設置得足夠大,讓正常的消費者盡量不觸及這個閾值,但又要足夠小,避免有問題的消費者給應用程序造成嚴重影響。這個屬性的默認值為5分鐘。當這個閾值被觸及時,后臺線程將向broker發送一個“離開群組”的請求,讓broker知道這個消費者已經“死亡”,必須進行群組再均衡,然后停止發送心跳。
request.timeout.ms
這個屬性指定了消費者在收到broker響應之前可以等待的最長時間。如果broker在指定時間內沒有做出響應,那么客戶端就會關閉連接并嘗試重連。它的默認值是30秒。不建議把它設置得比默認值小。在放棄請求之前要給broker留有足夠長的時間來處理其他請求,因為向已經過載的broker發送請求幾乎沒有什么好處,況且斷開并重連只會造成更大的開銷。
auto.offset.reset
這個屬性指定了消費者在讀取一個沒有偏移量或偏移量無效(因消費者長時間不在線,偏移量對應的記錄已經過期并被刪除)的分區時該做何處理。它的默認值是latest,意思是說,如果沒有有效的偏移量,那么消費者將從最新的記錄(在消費者啟動之后寫入Kafka的記錄)開始讀取。另一個值是earliest,意思是說,如果沒有有效的偏移量,那么消費者將從起始位置開始讀取記錄。如果將auto.offset.reset設置為none,并試圖用一個無效的偏移量來讀取記錄,則消費者將拋出異常。
enable.auto.commit
**這個屬性指定了消費者是否自動提交偏移量,默認值是true。**你可以把它設置為false,選擇自己控制何時提交偏移量,以盡量避免出現數據重復和丟失。如果它被設置為true,那么還有另外一個屬性auto.commit.interval.ms可以用來控制偏移量的提交頻率。本章后續部分將深入介紹與提交偏移量相關的其他內容。
partition.assignment.strategy
我們知道,分區會被分配給群組里的消費者。PartitionAssignor根據給定的消費者和它們訂閱的主題來決定哪些分區應該被分配給哪個消費者。Kafka提供了幾種默認的分配策略。
區間(range)
這個策略會把每一個主題的若干個連續分區分配給消費者。假設消費者C1和消費者C2同時訂閱了主題T1和主題T2,并且每個主題有3個分區。那么消費者C1有可能會被分配到這兩個主題的分區0和分區1,消費者C2則會被分配到這兩個主題的分區2。因為每個主題擁有奇數個分區,并且都遵循一樣的分配策略,所以第一個消費者會分配到比第二個消費者更多的分區。只要使用了這個策略,并且分區數量無法被消費者數量整除,就會出現這種情況。
輪詢(roundRobin)
這個策略會把所有被訂閱的主題的所有分區按順序逐個分配給消費者。如果使用輪詢策略為消費者C1和消費者C2分配分區,那么消費者C1將分配到主題T1的分區0和分區2以及主題T2的分區1,消費者C2將分配到主題T1的分區1以及主題T2的分區0和分區2。一般來說,如果所有消費者都訂閱了相同的主題(這種情況很常見),那么輪詢策略會給所有消費者都分配相同數量(或最多就差一個)的分區。
黏性(sticky)
設計黏性分區分配器的目的有兩個:一是盡可能均衡地分配分區,二是在進行再均衡時盡可能多地保留原先的分區所有權關系,減少將分區從一個消費者轉移給另一個消費者所帶來的開銷。如果所有消費者都訂閱了相同的主題,那么黏性分配器初始的分配比例將與輪詢分配器一樣均衡。后續的重新分配將同樣保持均衡,但減少了需要移動的分區的數量。如果同一個群組里的消費者訂閱了不同的主題,那么黏性分配器的分配比例將比輪詢分配器更加均衡。
協作黏性(cooperative sticky)
這個分配策略與黏性分配器一樣,只是它支持協作(增量式)再均衡,在進行再均衡時消費者可以繼續從沒有被重新分配的分區讀取消息。可以參考4.1.2節了解更多有關協作再均衡的內容。需要注意的是,如果你從Kafka 2.3之前的版本開始升級,并希望使用協作黏性分配策略,則需要遵循特定的升級路徑,具體請參看相關升級指南。
可以通過partition.assignment.strategy來配置分區策略,默認值是org.apache.kafka.clients.consumer.RangeAssignor,它實現了區間策略。你也可以把它改成org.apache.kafka.clients.consumer.RoundRobinAssignor、org.apache.kafka.clients.consumer.StickyAssignor或org.apache.kafka.clients.consumer.CooperativeStickyAssignor。還可以使用自定義分配策略,如果是這樣,則需要把partition.assignment.strategy設置成自定義類的名字。