Kafka delete 日志清理策略(日志刪除)
- 原理:按照一定保留策略,直接刪除不符合條件的日志分段。Kafka 把 topic 的一個 partition 大文件分成多個小文件段,通過這種方式,能方便地定期清除或刪除已消費完的文件,以減少磁盤占用 。
- 保留策略
- 按時間刪除:設定一個時間閾值,刪除修改時間在該時間之前的日志。比如設置
log.retention.hours = 1
?,就表示只保存 1 小時內的日志,超出 1 小時的日志分段會被刪除。 - 按大小刪除:指定一個數據大小閾值,當日志數據超過這個大小時,執行刪除操作,保留最后的指定大小數據。例如
log.retention.bytes = 1073741824
?(即 1GB ),表示日志數據超過 1GB 時,會刪除舊的消息。
- 按時間刪除:設定一個時間閾值,刪除修改時間在該時間之前的日志。比如設置
- 相關參數
log.cleanup.policy = delete
?:啟用刪除策略。log.retention.check.interval.ms
?:專門的日志刪除任務周期性檢測的時間間隔,默認 300000ms(5 分鐘 ),即每隔 5 分鐘檢查一次是否有符合刪除條件的日志分段 。log.retention.hours
?:按時間清理時,指定日志保留的小時數 。log.retention.bytes
?:按大小清理時,指定日志保留的字節數 。需注意log.retention.bytes
和log.retention.hours
任意一個達到要求,都會執行刪除操作,且會被 topic 創建時的指定參數覆蓋 。
Kafka compact 日志清理策略(日志壓縮)
- 原理:針對每個消息的 key 進行整合,對于有相同 key 的不同 value 值,只保留最后一個版本 。就像在一個記錄集合里,相同標識(key )的記錄,只留下最新的那條 。比如一個 key 對應的值先后為 “值 1”“值 2”“值 3” ,經過日志壓縮后,只會保留 “值 3” 。清理重復 key 后,一些 segment 文件大小會變小,Kafka 會將小文件再合并成大的 segment 文件 。
- 特殊處理:當某個 key 的最新版本消息沒有內容(value 為 null )時,這個 key 將被刪除,這類消息被稱為 “墓碑消息(tombstone )” 。墓碑消息的存放時間和 broker 的配置
log.cleaner.delete.retention.ms
有關,默認值是 24 小時 。在執行日志清理時,會刪除到期的墓碑消息 。 - 清理流程
- 對于每個 Kafka partition 的日志,以 segment 為單位,分為已清理和未清理部分,未清理部分又細分為可清理和不可清理的。
- 每個日志目錄下有
cleaner - offset - checkpoint
文件記錄清理進度。 - 找出可清理的 segment(active segment 不能清理,根據
min.compaction.lag.ms
配置判斷其他 segment 是否能清理 ,即判斷 segment 最后一條記錄的插入時間是否超過最小保留時間 )。 - 構建 SkimpyOffsetMap 對象(key 與 offset 的映射哈希表 ),遍歷可清理 segment 的每條日志,將 key 和 offset 存入其中。
- 再遍歷已清理和可清理部分的 segment 日志,根據 SkimpyOffsetMap 判斷是否保留日志 。
- 執行清理操作,將可清理部分的 segment 變為已清理的,同時更新 cleaner checkpoint 記錄的 offset 。
- 相關參數
log.cleaner.enable = true
?:開啟日志壓縮功能。log.cleanup.policy = compact
?:啟用日志壓縮策略 。min.compaction.lag.ms
?:設置日志段中消息可被壓縮的最小時間間隔 ,確保消息存留一定時間后才可能被清理 。log.cleaner.delete.retention.ms
?:墓碑消息的保留時長 。