? ? ? ? 起因:車聯網項目開發,車輛發生故障需要給三個系統推送消息,故障上報較為頻繁,所以為了不阻塞主流程,采用了使用kafka。消費方負責推送并保存推送記錄,但在一次壓測中發現,實際只發生了10次故障,但是推送記錄卻有30多條。
????????問題排查,發現是因為其中一個系統宕機,導致往這個系統推送消息時,一直連接超時,導致每條消息的推送時長被拉長。而且kafka消息拉取參數max-poll-records設置了500,意味著一次會批量拉取500條消息到本地處理,而max.poll.interval.ms參數默認是5分鐘,當500條消息處理時長超過5分鐘后,就會認為消費者死掉了,觸發再均衡,導致同一個消息被重復消費。
解決:
? ? ? ? 主要是提高消費者的處理速度,避免不必要的Rebalance。主要采用2種措施:
- 減少每次拉去消息數max-poll-records,從500,降到20
- 拉取到消息之后異步處理(創建線程池,對推送消息的部分利用多線程處理)
常見配置
fetch.min.byte:配置Consumer一次拉取請求中能從Kafka中拉取的最小數據量,默認為1B,如果小于這個參數配置的值,就需要進行等待,直到數據量滿足這個參數的配置大小。調大可以提交吞吐量,但也會造成延遲
fetch.max.bytes,一次拉取數據的最大數據量,默認為52428800B,也就是50M,但是如果設置的值過小,甚至小于每條消息的值,實際上也是能消費成功的
fetch.wait.max.ms,若是不滿足fetch.min.bytes時,等待消費端請求的最長等待時間,默認是500ms
max.poll.records,單次poll調用返回的最大消息記錄數,如果處理邏輯很輕量,可以適當提高該值。一次從kafka中poll出來的數據條數,max.poll.records條數據需要在在session.timeout.ms這個時間內處理完,默認值為500
consumer.poll(100)?,100 毫秒是一個超時時間,一旦拿到足夠多的數據(fetch.min.bytes 參數設置),consumer.poll(100)會立即返回 ConsumerRecords<String, String> records。如果沒有拿到足夠多的數據,會阻塞100ms,但不會超過100ms就會返回
max.poll.interval.ms,兩次拉取消息的間隔,默認5分鐘;通過消費組管理消費者時,該配置指定拉取消息線程最長空閑時間,若超過這個時間間隔沒有發起poll操作,則消費組認為該消費者已離開了消費組,將進行再均衡操作(將分區分配給組內其他消費者成員)
若超過這個時間則報如下異常:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has alreadyrebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or byreducing the maximum size of batches returned in poll() with max.poll.records.
即:無法完成提交,因為組已經重新平衡并將分區分配給另一個成員。這意味著對poll()的后續調用之間的時間比配置的max.poll.interval.ms長,這通常意味著poll循環花費了太多的時間來處理消息。
可以通過增加max.poll.interval.ms來解決這個問題,也可以通過減少在poll()中使用max.poll.records返回的批的最大大小來解決這個問題。
max.partition.fetch.bytes:該屬性指定了服務器從每個分區返回給消費者的最大字節數,默認為 1MB。
session.timeout.ms:消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是 3s,將觸發再均衡操作。
對于每一個Consumer Group,Kafka集群為其從Broker集群中選擇一個Broker作為其Coordinator。Coordinator主要做兩件事:
-
維持Group成員的組成。這包括加入新的成員,檢測成員的存活性,清除不再存活的成員。
-
協調Group成員的行為。
poll機制
- 每次poll的消息處理完成之后再進行下一次poll,是同步操作
- 每次poll之前檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移
- 每次poll時,consumer都將嘗試使用上次消費的offset作為起始offset,然后依次拉取消息
- poll(long timeout),timeout指等待輪詢緩沖區的數據所花費的時間,單位是毫秒