文章目錄
- 1、生產者調優
- batch.size(重要)
- linger.ms
- compression.type
- acks(重要)
- buffer.memory
- max.in.flight.requests.per.connection(重要)
- message.max.bytes(重要)
- 2、消費者調優
- fetch.min.bytes(重要)
- fetch.max.wait.ms(重要)
- max.poll.records(重要)
- auto.offset.reset(重要)
- session.timeout.ms
- heartbeat.interval.ms
- 3、Broker 調優
- num.network.threads
- num.io.threads
- log.flush.interval.messages(重要)
- log.flush.interval.ms(重要)
- num.replica.fetchers
- auto.create.topics.enable
- unclean.leader.election.enable
- log.retention.hours
1、生產者調優
batch.size(重要)
建議:64KB ~ 1MB
batch.size
指的是生產者在將消息發送到 Broker 之前,會將多條消息收集到一個批次進行發送。
增大batch.size
可以顯著減少網絡請求次數,因為每次發送更多的消息,從而提高了網絡傳輸的效率,有助于提升整體的吞吐量。
但是batch.size
調整過大,也會帶來一定的延遲,因為生產者需要等待更多的消息填滿批次,如果批次一直無法填滿,消息就會在生產者端停留更長時間,直到達到其他觸發發送的條件。在高吞吐量且對延遲要求不是特別苛刻的場景,可以適當增大該值。
linger.ms
建議:10 ~ 100ms
linger.ms
是生產者等待batch.size
填滿的時間。
當設置該值后,即使沒有填滿,只要到了等待時間 linger.ms
值,生產者也會將當前批次的消息發送出去。
增大 linger.ms
可以讓生產者有更多的時間去收集消息,從而形成更大的批次,提升吞吐量。這樣消息在生產者端等待的時間會變長,會增加消息的延遲。在一些允許一定延遲的批量數據處理場景中,可以適當增大該值來提高吞吐量。
compression.type
建議:snappy 或 lz4
用于減少生產者發送到 Broker 的數據量。使用compression.type
可以將消息進行壓縮后再傳輸,這樣可以有效節省網絡帶寬,提高網絡傳輸效率。
snappy
壓縮算法的特點是壓縮和解壓縮速度快,對 CPU 資源的消耗相對較低,適用于對壓縮和解壓縮速度要求較高的場景;lz4
同樣具有較高的壓縮和解壓縮速度,并且在壓縮比上可能比 snappy
稍好一些。使用壓縮算法本質上是用 CPU 換取帶寬,在網絡帶寬有限的情況下非常有用。
acks(重要)
建議:1 或 all
acks
參數控制了生產者在發送消息后需要收到多少確認才認為消息發送成功。
當 acks = 1
時,只要 Leader 副本
確認接收到消息,生產者就會收到響應,這種方式在保證一定可靠性的同時,具有較高的吞吐量。
當 acks = all
時,生產者需要等待所有的 ISR(In-Sync Replicas)
副本都確認接收到消息才會收到響應,這提供了最高的可靠性,但會略微降低吞吐量,因為需要等待更多的確認信息。在對消息可靠性要求極高的場景下,如金融交易數據的傳輸,建議使用 acks = all。
buffer.memory
默認:32MB
建議:32 ~ 64MB
buffer.memory
用于暫存生產者未發送到 Broker 的消息。
當生產者發送消息的速度超過了 Broker 接收的速度時,消息會被存儲在buffer.memory
中。
如果緩沖區設置過小,可能會導致緩沖區很快被填滿,從而使生產者阻塞,影響消息的發送效率。適當增大緩沖區大小可以避免因緩沖區滿導致的阻塞問題,但也不能設置過大,否則會占用過多的系統內存資源。
max.in.flight.requests.per.connection(重要)
建議:1(性能差、順序強一致)、n(高性能、但可能亂序)
max.in.flight.requests.per.connection
限制了單個連接上允許的最大未確認請求數。生產者可以在一個連接上同時發送多個請求而無需等待每個請求的響應。
如果將該值設置得過高,可能會導致消息的順序被破壞。例如:當一個請求失敗并重試時,后續已經發送的請求可能會先到達 Broker,從而導致消息的順序不一致。需要根據實際情況合理設置該值,以平衡吞吐量和消息順序性。
message.max.bytes(重要)
建議:小于128KB
生產者能夠發送的最大消息大小。
設置合適的 message.max.bytes
可以確保生產者發送的消息不會超出 Broker 和消費者的處理能力。如果設置得過大,可能會導致 Broker 和消費者的內存壓力增大,處理效率降低;如果設置得過小,可能無法滿足某些業務場景下大消息的傳輸需求。需要根據實際業務中的消息大小分布來合理調整該值。
2、消費者調優
fetch.min.bytes(重要)
建議:1MB - 10MB
消費者在單次拉取數據時,從 Broker 獲得的最小數據量。
通過設置一個較大的值,可以減少消費者向 Broker 發送拉取請求的頻率,因為只有當 Broker 上有足夠的數據滿足 fetch.min.bytes
的要求時,才會將數據返回給消費者。這樣可以降低網絡開銷,提高整體的性能。但如果設置得過大,可能會導致消費者等待時間過長,影響實時性。
fetch.max.wait.ms(重要)
建議:500ms
消費者等待 fetch.min.bytes
數據量的超時時間。
當消費者向 Broker 發送拉取請求后,如果在 fetch.max.wait.ms
時間內,Broker 上的數據量仍然沒有達到 fetch.min.bytes
的要求,Broker 也會將當前可用的數據返回給消費者。
該參數用于平衡延遲和吞吐量。設置得較短,消費者可能會頻繁地拉取少量數據,增加網絡開銷;設置得較長,可能會導致消息處理的延遲增加。
max.poll.records(重要)
建議:500 - 5000
該參數限制了消費者單次拉取的最大消息數。合理設置該值可以避免消費者處理消息時出現阻塞的情況。
設置得過大,消費者可能會一次性拉取過多的消息,導致處理這些消息的時間過長,從而影響后續消息的拉取和處理;設置得過小,消費者需要頻繁地向 Broker 發送拉取請求,增加了網絡開銷。需要根據消費者的處理能力和 Broker 的負載情況來調整該值。
auto.offset.reset(重要)
建議:earliest
該參數決定了在消費者沒有有效的偏移量時(例如,新的消費者組或者偏移量已經過期),從何處開始消費消息。當設置為 latest
時,消費者將從最新的消息開始消費,即只消費后續產生的新消息;
當設置為 earliest
時,消費者將從分區的最早消息開始消費。在需要處理歷史數據的場景中,可以選擇 earliest
;而在只關注最新數據的場景下,選擇 latest
更為合適(會丟數據)。
session.timeout.ms
建議:10s - 30s
消費者會定期向協調者發送心跳信息,以表明自己處于活躍狀態。如果在 session.timeout.ms
時間內,協調者沒有收到消費者的心跳信息,就會認為該消費者已經失效,并將其從消費者組中剔除。
如果該值設置得過短,可能會因為網絡抖動等原因導致消費者被誤剔除;如果設置得過長,當消費者真正出現故障時,協調者不能及時發現并重新分配分區,會影響整個消費者組的性能。
heartbeat.interval.ms
建議:1/3 * session.timeout.ms
指定消費者向協調者發送心跳的時間間隔。
心跳機制用于維持消費者與協調者之間的連接,確保協調者知道消費者處于活躍狀態。通常建議將其設置為 session.timeout.ms
的三分之一,這樣可以在保證及時檢測到消費者故障的同時,避免過于頻繁的心跳請求增加網絡開銷。
3、Broker 調優
num.network.threads
建議:CPU 核心數 × 3
指定 Broker 用于處理網絡請求的線程數。
在高并發場景下,大量的生產者和消費者會同時向 Broker 發送網絡請求,如果處理網絡請求的線程數不足,會導致請求處理不及時,影響系統的性能。
通過將該值設置為 CPU 核心數的 3 倍可以充分利用 CPU 資源,提高網絡請求的處理能力。但具體的值還需要根據實際的并發情況和系統資源進行調整。
num.io.threads
建議:CPU 核心數 × 8
用于處理磁盤 I/O 的線程數。
Kafka 的消息存儲依賴于磁盤 I/O,因此磁盤 I/O 的性能對系統的整體性能有很大影響。增加處理磁盤 I/O 的線程數可以提高磁盤 I/O 的并發能力,加快消息的讀寫速度。
但需要注意的是,該值需要根據磁盤的性能進行調整,如果磁盤的性能較差,過多的線程可能會導致磁盤競爭加劇,反而降低性能。
log.flush.interval.messages(重要)
建議:1(立即刷盤,性能差)、n (批量刷盤,性能好,會丟數據)
指定將消息刷寫到磁盤之前,需要累積的消息數。
增大該值可以減少磁盤 I/O 次數,因為每次刷盤會將更多的消息一次性寫入磁盤,從而提高磁盤的寫入效率。但這也會犧牲部分持久性,因為如果在消息累積過程中 Broker 發生故障,可能會導致部分未刷盤的消息丟失。需要根據業務對數據持久性的要求來合理設置該值。
log.flush.interval.ms(重要)
建議:1000 - 5000ms
強制刷盤的時間間隔,與 log.flush.interval.messages
配合使用。
即使消息數量沒有達到 log.flush.interval.messages
的要求,只要時間達到 log.flush.interval.ms
,也會將當前的消息刷寫到磁盤。通過設置合理的刷盤間隔,可以在保證一定磁盤 I/O 效率的同時,提高數據的持久性。
num.replica.fetchers
建議:2 - 4
指定 Broker 用于副本同步的線程數。
副本同步是 Kafka 保證數據可靠性的重要機制,增加副本同步線程數可以提升副本同步的速度,減少副本之間的延遲。但過多的線程可能會占用過多的系統資源,因此需要根據實際情況進行調整。
auto.create.topics.enable
建議:false
在生產環境中,建議禁用自動創建 Topic 的功能。
如果啟用該功能,當生產者或消費者嘗試訪問一個不存在的 Topic 時,Kafka 會自動創建該 Topic。這可能會導致生產環境中產生大量不必要的 Topic,增加系統的管理成本和資源消耗。
為了避免這種情況的發生,應該手動創建所需的 Topic,并將 auto.create.topics.enable
設置為 false
。
unclean.leader.election.enable
建議:false
禁止非 ISR 副本成為 Leader 可以確保數據的一致性。
當 Leader 副本失效時,如果啟用了 unclean.leader.election.enable
,非 ISR 副本可能會被選舉為新的 Leader。由于非 ISR 副本可能落后于其他副本,成為 Leader 后可能會導致數據丟失或不一致。為了保證數據的一致性,建議將該參數設置為 false
。
log.retention.hours
建議:72小時
該參數指定了 Kafka 消息在磁盤上的保留時間。
根據存儲容量和業務需求,可以調整該值。如果存儲容量充足,且業務需要長期保留數據,可以適當增大該值;如果存儲容量有限,或者業務只需要短期的數據,可以減小該值。