kafka 日志索引 AbstractIndex

AbstractIndex

AbstractIndex?是 Kafka 日志(Log)子系統中一個至關重要的基礎類。它為 Kafka 的各種索引文件(如偏移量索引?.index?和時間戳索引?.timeindex)提供了一個統一的、抽象的框架。這個類的設計目標是實現極高的讀寫性能和可靠的文件管理。

AbstractIndex?最核心的設計思想是使用內存映射文件(mmap)來管理索引數據。這在類的注釋和實現中都有清晰的體現。

// ... existing code ...private volatile MappedByteBuffer mmap;
// ... existing code ...private void createAndAssignMmap() throws IOException {
// ... existing code ...MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, length, writable, entrySize());this.length = length;this.mmap = mmap;
// ... existing code ...}
// ... existing code ...

分析與邏輯:

  • 是什么:內存映射是一種將文件或設備直接映射到進程地址空間的技術。映射完成后,對這塊內存的讀寫操作會由操作系統自動同步到對應的磁盤文件中。
  • 為什么用
    1. 高性能:Kafka 無需在用戶空間(Java 堆)和內核空間之間頻繁地復制數據。所有的讀寫操作都直接在?MappedByteBuffer?上進行,這本質上是在操作操作系統的頁緩存(Page Cache)。這極大地減少了系統調用和內存拷貝的開銷。
    2. 利用操作系統優化:將文件的緩存管理完全交給操作系統。現代操作系統在頁緩存管理上(如 LRU 算法)已經做得非常成熟和高效,能夠很好地適應 Kafka 索引的訪問模式(通常是順序寫入和接近末尾的讀取)。
    3. 持久化:通過調用?mmap.force()?方法,可以確保內存中的修改被刷寫到磁盤,保證了數據的持久性。

文件管理與生命周期

AbstractIndex?封裝了對底層索引文件的完整生命周期管理。

// ... existing code ...private volatile File file;
// ... existing code ...public AbstractIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {
// ... existing code ...createAndAssignMmap();
// ... existing code ...}public boolean resize(int newSize) throws IOException {
// ... existing code ...}public void renameTo(File f) throws IOException {
// ... existing code ...}public boolean deleteIfExists() throws IOException {
// ... existing code ...}public void close() throws IOException {trimToValidSize();closeHandler();}
// ... existing code ...

分析與邏輯:

  • 創建:構造函數?AbstractIndex(...)?負責初始化。如果文件不存在,它會創建文件并預分配?maxIndexSize?大小的空間。預分配可以避免在寫入過程中頻繁地擴展文件,這是一種性能優化。
  • 調整大小 (resize):允許動態地改變索引文件的大小。一個關鍵點是,在 Windows 或 z/OS 上,必須先解除內存映射(safeForceUnmap())才能修改文件長度,這個方法處理了這種跨平臺的兼容性問題。
  • 重命名 (renameTo):當日志段(Log Segment)滾動時,對應的索引文件也需要被重命名(例如,從?000...1.index?變為?000...1.snapshot),這個方法提供了原子性的重命名操作。
  • 關閉與清理 (close,?closeHandler,?deleteIfExists)
    • close()?方法在關閉前會調用?trimToValidSize(),將文件裁剪到只包含有效數據的大小,回收未使用的預分配空間。
    • closeHandler()?會強制解除內存映射。注釋中提到,這樣做是為了避免 JVM 垃圾回收器在回收?MappedByteBuffer?時可能引發的長時間 STW(Stop-The-World)暫停(KAFKA-4614)。
    • deleteIfExists()?負責安全地刪除索引文件。

并發控制

索引文件可能會被多個線程訪問(例如,寫入線程和讀取線程),因此必須保證線程安全。

// ... existing code ...protected final ReentrantLock lock = new ReentrantLock();
// ... existing code ...public boolean resize(int newSize) throws IOException {lock.lock();try {
// ... existing code ...} finally {lock.unlock();}}public void flush() {lock.lock();try {mmap.force();} finally {lock.unlock();}}
// ... existing code ...

分析與邏輯:

  • AbstractIndex?使用了?java.util.concurrent.locks.ReentrantLock?來保護所有關鍵的修改操作,如?resize,?flush,?closeHandler?等。
  • 這確保了在進行文件結構性變更(如調整大小、刷盤、關閉)時,不會與其他操作發生沖突,保證了數據的一致性和完整性。

緩存友好的搜索算法(Warm Section 優化)

這是?AbstractIndex?中一個非常精妙的性能優化,在類的尾部有大段注釋詳細解釋。

// ... existing code .../** Kafka mmaps index files into memory...* ...* However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary* page faults...* ...* Here, we use a more cache-friendly lookup algorithm:* if (target > indexEntry[end - N]) // if the target is in the last N entries of the index*    binarySearch(end - N, end)* else*    binarySearch(begin, end - N)** If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync* lookups should go to the 1st branch. We call the last N entries the "warm" section...** We set N (_warmEntries) to 8192, because...*/
// ... existing code ...

分析與邏輯:

  • 問題:標準的二分查找算法在訪問大文件時,訪問模式是跳躍式的,這對于操作系統的頁緩存(通常使用 LRU 策略)非常不友好。它可能會導致頻繁的“缺頁中斷”(Page Fault),即需要從磁盤加載數據到內存,從而阻塞線程,導致延遲飆升。
  • 解決方案:Kafka 觀察到,絕大多數的索引查找(來自消費者或副本同步)都集中在索引文件的末尾部分。因此,AbstractIndex?將索引邏輯上劃分為兩部分:
    1. "Warm Section"(熱區):索引文件末尾的 N 個條目(N 被設為 8192)。
    2. "Cold Section"(冷區):熱區之前的所有條目。
  • 查找邏輯:當進行查找時,首先判斷目標是否可能在“熱區”內。如果是,則只在熱區內進行二分查找;否則,才在整個“冷區”進行查找。
  • 效果:由于絕大多數查找都命中“熱區”,而“熱區”范圍較小(8192 個條目)且被頻繁訪問,因此它所對應的內存頁會一直保留在操作系統的頁緩存中,從而避免了磁盤 I/O,保證了低延遲的查找性能。選擇 8192 這個值也是經過計算的,以確保在常見 4KB 頁大小的系統上,一次熱區查找就能“觸摸”到所有相關的內存頁,使其保持“溫熱”。

indexSlotRangeFor

這是查找邏輯的入口點。它清晰地展示了如何決定是在熱區還是冷區進行搜索。

// ... existing code .../*** Lookup lower or upper bounds for the given target.*/private int indexSlotRangeFor(ByteBuffer idx, long target, IndexSearchType searchEntity,SearchResultType searchResultType) {// check if the index is emptyif (entries == 0)return -1;// 1. 計算熱區的起始位置int firstHotEntry = Math.max(0, entries - 1 - warmEntries());// 2. 判斷目標是否大于熱區的第一個條目,如果是,則在熱區 [firstHotEntry, entries - 1] 中搜索if (compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {return binarySearch(idx, target, searchEntity,searchResultType, firstHotEntry, entries - 1);}// 3. 檢查目標是否比整個索引的第一個條目還小(邊界檢查)if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) {switch (searchResultType) {case LARGEST_LOWER_BOUND:return -1;case SMALLEST_UPPER_BOUND:return 0;}}// 4. 如果目標不在熱區,并且不小于第一個條目,則在冷區 [0, firstHotEntry] 中搜索return binarySearch(idx, target, searchEntity, searchResultType, 0, firstHotEntry);}
// ... existing code ...

代碼邏輯分析:

  1. int firstHotEntry = Math.max(0, entries - 1 - warmEntries());
    • 這行代碼計算出“熱區”的起始條目索引。warmEntries()?方法返回熱區包含的條目數量。
  2. if (compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0)
    • 這是一個關鍵的判斷。它取出熱區的第一個條目,并與目標值?target?進行比較。
    • 如果目標值比熱區的起始值還要大,那么目標值只可能存在于熱區中。此時,就調用?binarySearch?方法,但搜索范圍被限定在?[firstHotEntry, entries - 1]?這個熱區內。
  3. return binarySearch(idx, target, searchEntity, searchResultType, 0, firstHotEntry);
    • 如果上一步的判斷不成立,說明目標值小于或等于熱區的起始值,那么目標值就應該在“冷區”中。此時,調用?binarySearch?的搜索范圍是?[0, firstHotEntry]?這個冷區。

熱區大小的定義:warmEntries

這個方法定義了“熱區”的大小。它的大小是固定的8KB,然后根據每個索引條目的大小(由子類定義)來計算出具體包含多少個條目。

// ... existing code ...* 1) support larger warm section* 2) make sure the warm section of low QPS topic-partitions are really warm.*/protected final int warmEntries() {return 8192 / entrySize();}protected void safeForceUnmap() {
// ... existing code ...

底層二分查找實現:binarySearch

這個方法是標準的二分查找算法,但它的特別之處在于,它接收?begin?和?end?參數,使其可以對索引的任意一個子區間(無論是熱區還是冷區)進行操作。

// ... existing code ...private int binarySearch(ByteBuffer idx, long target, IndexSearchType searchEntity,SearchResultType searchResultType, int begin, int end) {// binary search for the entryint lo = begin;int hi = end;while (lo < hi) {int mid = (lo + hi + 1) >>> 1;IndexEntry found = parseEntry(idx, mid);int compareResult = compareIndexEntry(found, target, searchEntity);if (compareResult > 0)hi = mid - 1;else if (compareResult < 0)lo = mid;elsereturn mid;}
// ... existing code ...

AbstractIndex?通過?indexSlotRangeFor?方法作為分流器,先判斷目標值的大致范圍,然后調用通用的?binarySearch?方法在更小的、更有可能被操作系統緩存的“熱區”或者“冷區”中進行精確查找,從而實現了高效且緩存友好的查詢。

maybeLock?

這是一個設計得非常巧妙的輔助方法,它的主要目的是處理跨操作系統的兼容性問題,同時優化性能

// ... existing code .../*** Execute the given function in a lock only if we are running on windows or z/OS. We do this* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it* and this requires synchronizing reads.*/protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)lock.lock();try {return action.execute();} finally {if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)lock.unlock();}}
// ... existing code ...

方法簽名解析

  • protected final:?protected?意味著這個方法可以被同一個包下的類以及?AbstractIndex?的子類訪問。final?意味著子類不能重寫(override)這個方法,保證了其行為的一致性。
  • <T, E extends Exception>: 這是泛型定義。
    • T: 代表方法執行后返回值的類型。
    • E extends Exception: 代表方法在執行過程中可能拋出的異常類型,它必須是?Exception?的子類。
  • T maybeLock(Lock lock, StorageAction<T, E> action) throws E:
    • maybeLock: 方法名,意為“可能加鎖”。
    • Lock lock: 接收一個?Lock?對象作為參數,用于實際的加鎖解鎖操作。
    • StorageAction<T, E> action: 接收一個?StorageAction?類型的對象。這本質上是一個函數式接口(類似于?Callable?或?Runnable),它封裝了真正需要被執行的業務邏輯。
    • throws E: 聲明了該方法可能會拋出?action?中定義的異常。

方法的注釋已經清晰地解釋了其設計意圖:

Execute the given function in a lock only if we are running on windows or z/OS. We do this because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it and this requires synchronizing reads.

翻譯和解讀:

  • 問題背景:在 Windows 和 z/OS 操作系統上,有一個限制:如果一個文件被內存映射(mmapped),你就不能改變這個文件的長度(比如?resize)。要改變文件長度,必須先解除內存映射(unmap),執行完操作后再重新映射。
  • 并發風險:解除和重新映射的過程不是原子的。如果在解除映射后、重新映射前,有另一個線程來讀取這個索引,就可能會讀到不一致或已失效的數據。因此,這個過程必須被同步機制(鎖)保護起來,以防止并發讀寫沖突。
  • 性能優化:在 Linux/Unix 等其他操作系統上,沒有這個限制,可以在文件被映射的同時調整其大小。在這些系統上,如果每次讀取都加鎖,會帶來不必要的性能開銷。
  • 解決方案 (maybeLock)
    1. 方法首先檢查當前操作系統:if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
    2. 如果是 Windows 或 z/OS,它會執行?lock.lock(),獲取鎖。
    3. 然后,在?try?塊中,執行傳入的?action.execute(),也就是真正的業務邏輯(比如?lookup?操作)。
    4. 最后,在?finally?塊中,再次檢查操作系統,如果是 Windows 或 z/OS,則執行?lock.unlock()?釋放鎖。
    5. 如果不是 Windows 或 z/OSif?條件不滿足,代碼會直接執行?action.execute(),完全不會進行任何加鎖解鎖操作

在?OffsetIndex?類的?lookup?方法中,就使用了?maybeLock

// 在 OffsetIndex.java 中
public OffsetPosition lookup(long targetOffset) {return maybeLock(lock, () -> { // 使用 lambda 表達式傳入一個 StorageActionByteBuffer idx = mmap().duplicate();int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);if (slot == -1)return new OffsetPosition(baseOffset(), 0);elsereturn parseEntry(idx, slot);});
}

當這段代碼運行時:

  • 在 Linux 上:maybeLock?不會加鎖,直接執行 lambda 表達式中的查找邏輯。
  • 在 Windows 上:maybeLock?會先獲取?lock,然后執行 lambda 表達式中的查找邏輯,最后釋放?lock

maybeLock?是一個典型的策略模式應用,它根據運行環境(操作系統)動態地選擇不同的執行策略(加鎖或不加鎖)。它通過將“是否加鎖”的判斷邏輯與“具體業務邏輯”解耦,實現了:

  1. 跨平臺兼容性:確保了在有特殊限制的操作系統上(Windows/z/OS)數據操作的線程安全。
  2. 性能最優化:避免了在沒有限制的操作系統上(Linux/Unix)引入不必要的鎖開銷。
  3. 代碼簡潔性:將平臺相關的鎖邏輯封裝在一個地方,使得調用方的代碼(如?lookup)無需關心底層操作系統的差異,保持了業務邏輯的純粹和清晰。

抽象與擴展

作為一個抽象類,它定義了所有索引的通用行為,并將與具體索引格式相關的部分留給子類實現。

// ... existing code ...public abstract class AbstractIndex implements Closeable {
// ... existing code ...public abstract void sanityCheck();public abstract void truncateTo(long offset);protected abstract void truncate();protected abstract int entrySize();protected abstract IndexEntry parseEntry(ByteBuffer buffer, int n);
// ... existing code ...}

分析與邏輯:

  • entrySize(): 不同的索引(偏移量索引、時間戳索引)每個條目的大小不同,由子類定義。
  • parseEntry(...): 如何從?ByteBuffer?中解析出一個具體的索引條目,其邏輯也由子類實現。
  • sanityCheck(),?truncateTo(...): 這些操作的具體邏輯也可能因索引類型而異。

這種設計遵循了面向對象的設計原則,使得代碼結構清晰,易于擴展。如果未來需要引入新的索引類型(例如,基于 Producer ID 的索引),只需繼承?AbstractIndex?并實現這些抽象方法即可。

總結

public abstract class AbstractIndex?是 Kafka 高性能存儲引擎的基石。它通過內存映射文件實現了高效的 I/O,通過精細的文件生命周期管理和并發控制保證了數據安全和可靠性,并通過創新的緩存友好搜索算法解決了大規模數據查找的性能瓶頸。其良好的抽象設計也為系統的可擴展性提供了堅實的基礎。理解了這個類,就等于掌握了 Kafka 日志索引實現的核心。

OffsetIndex

OffsetIndex?是 Kafka 存儲引擎中一個具體且至關重要的索引實現。顧名思義,它負責維護 邏輯偏移量(Offset)物理文件位置(Position) 之間的映射關系。當消費者或副本需要從某個特定偏移量開始讀取數據時,Kafka 正是利用?OffsetIndex?來快速定位到數據在日志文件(.log?文件)中的大致位置,從而避免了從頭掃描整個巨大的日志文件。

OffsetIndex?的核心職責是提供高效的偏移量查找。它對應磁盤上的?.index?文件。為了實現極致的性能和空間效率,它采用了非常緊湊的文件格式。

文件格式分析:

類的注釋中清晰地描述了其格式:

The physical format is a 4 byte "relative" offset and a 4 byte file location for the message with that offset.

  • 8字節條目(Entry): 每個索引條目固定為8個字節。這在代碼中由常量定義:
    // ... existing code ...
    public final class OffsetIndex extends AbstractIndex {private static final Logger log = LoggerFactory.getLogger(OffsetIndex.class);private static final int ENTRY_SIZE = 8;
    // ... existing code ...
    
  • 4字節相對偏移量 (Relative Offset): 為了節省空間,索引文件中存儲的不是絕對偏移量,而是相對于該日志段(Log Segment)的?baseOffset?的相對值。例如,如果一個日志段的?baseOffset?是 5000,那么邏輯偏移量 5050 在索引文件中會被存儲為?50。這使得偏移量可以用一個4字節的整數表示,極大地擴展了單個日志段能覆蓋的范圍。
  • 4字節物理位置 (Physical Position): 這4個字節存儲的是該消息在對應的?.log?文件中的物理字節位置。

這種設計體現了 Kafka 對性能和存儲效率的極致追求。

繼承?AbstractIndex?并實現其抽象方法

OffsetIndex?是?final?類,它繼承了?AbstractIndex,這意味著它自動獲得了父類提供的所有強大功能:

  • 內存映射(mmap):直接在操作系統的頁緩存中進行讀寫,性能極高。
  • 文件生命周期管理:創建、關閉、重命名、調整大小等。
  • 并發控制:通過?ReentrantLock?保證線程安全。
  • 緩存友好的查找算法:自動擁有了“熱區/冷區”分段二分查找的能力。

OffsetIndex?需要做的就是實現父類定義的抽象方法,告訴框架如何處理自己特有的8字節條目格式。

  • entrySize():

    // ... existing code ...@Overrideprotected int entrySize() {return ENTRY_SIZE;}
    

    這個方法非常簡單,直接返回常量?8

  • parseEntry(ByteBuffer buffer, int n):

    // ... existing code ...@Overrideprotected OffsetPosition parseEntry(ByteBuffer buffer, int n) {return new OffsetPosition(baseOffset() + relativeOffset(buffer, n), physical(buffer, n));}private int relativeOffset(ByteBuffer buffer, int n) {return buffer.getInt(n * ENTRY_SIZE);}private int physical(ByteBuffer buffer, int n) {return buffer.getInt(n * ENTRY_SIZE + 4);}
    // ... existing code ...
    

    這是連接抽象框架和具體實現的橋梁。它根據給定的條目序號?n,從?ByteBuffer?中:

    1. 讀取前4個字節作為相對偏移量 (relativeOffset)。
    2. 讀取后4個字節作為物理位置 (physical)。
    3. 將相對偏移量加上?baseOffset()?轉換回絕對偏移量。
    4. 最后,用絕對偏移量和物理位置創建一個?OffsetPosition?對象并返回。

lookup(long targetOffset)

// ... existing code ...public OffsetPosition lookup(long targetOffset) {return maybeLock(lock, () -> {ByteBuffer idx = mmap().duplicate();int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);if (slot == -1)return new OffsetPosition(baseOffset(), 0);elsereturn parseEntry(idx, slot);});}
// ... existing code ...

邏輯分析

  1. 它直接調用了從?AbstractIndex?繼承的?largestLowerBoundSlotFor?方法。這個方法的作用是:找到小于或等于?targetOffset?的那個最大的偏移量所在的索引槽位(slot)。
  2. 這個調用自動享受了“熱區/冷區”優化,性能非常高。
  3. 如果找不到(slot == -1),說明?targetOffset?比索引中最小的偏移量還要小,此時返回該日志段的起始位置?(baseOffset, 0)
  4. 如果找到了,就用?parseEntry?解析該槽位的數據,返回對應的?OffsetPosition

append: 唯一的寫入方法。

// ... existing code ...public void append(long offset, int position) {lock.lock();try {if (isFull())throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ").");if (entries() == 0 || offset > lastOffset) {log.trace("Adding index entry {} => {} to {}", offset, position, file().getAbsolutePath());mmap().putInt(relativeOffset(offset));mmap().putInt(position);incrementEntries();lastOffset = offset;
// ... existing code ...} elsethrow new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() +" no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath());} finally {lock.unlock();}}
// ... existing code ...

邏輯分析

  1. 該方法是線程安全的,通過?lock?保護。
  2. 它會檢查索引是否已滿,以及待追加的?offset?是否大于當前索引中最后的?lastOffset索引條目必須是按偏移量單調遞增的
  3. 如果檢查通過,它會將?offset?轉換為相對偏移量,然后將4字節的相對偏移量和4字節的物理位置?position?寫入?mmap
  4. 最后更新內部狀態,如條目數?entries?和?lastOffset
  • truncateTo(long offset)?和?truncate(): 用于日志截斷。當 Kafka 需要刪除某個偏移量之后的數據時(例如日志清理策略或副本同步失敗),會調用這些方法來同步截斷索引文件,確保索引和日志文件的一致性。

總結

OffsetIndex?是一個設計精良、高度優化的類。它完美地展示了 Kafka 如何通過繼承和組合來復用通用邏輯,同時通過專用的、緊湊的數據結構精巧的算法(如相對偏移量、熱區查找)來滿足特定的高性能需求。

簡單來說,OffsetIndex?就是 Kafka 能夠快速在海量數據中定位到任意一條消息的“目錄”或“索引頁”,是其實現高性能隨機讀取能力的關鍵所在。

TimeIndex

TimeIndex?是 Kafka 存儲引擎中與?OffsetIndex?并列的另一個核心索引實現。它的主要職責是建立 時間戳(Timestamp)邏輯偏移量(Offset) 之間的映射關系。這個功能對于 Kafka 的許多高級特性至關重要,比如:

  • 按時間消費:允許消費者從指定的時間點開始消費消息(例如,消費過去一小時內的所有消息)。
  • 基于時間的日志保留策略:根據消息的時間戳來刪除過期的日志段(例如,刪除超過7天的數據)。

TimeIndex?對應于磁盤上的?.timeindex?文件。

TimeIndex?的核心是提供高效的時間戳查找。為了實現這一點,它采用了與?OffsetIndex?類似但又不同的緊湊文件格式。

文件格式分析:

類的注釋中清晰地描述了其格式:

The physical format is a 8 bytes timestamp and a 4 bytes "relative" offset... A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen before OFFSET is TIMESTAMP.

  • 12字節條目(Entry): 每個索引條目固定為12個字節。這在代碼中由常量定義:
    // ... existing code ...
    public class TimeIndex extends AbstractIndex {private static final Logger log = LoggerFactory.getLogger(TimeIndex.class);private static final int ENTRY_SIZE = 12;
    // ... existing code ...
    
  • 8字節時間戳 (Timestamp): 使用一個64位的長整型來存儲時間戳,足以應對未來的需求。
  • 4字節相對偏移量 (Relative Offset): 與?OffsetIndex?一樣,這里存儲的也是相對于日志段?baseOffset?的相對偏移量,以節省空間。

關鍵語義:一個?(TIMESTAMP, OFFSET)?條目的含義是:在該日志段中,所有偏移量小于?OFFSET?的消息,其時間戳都小于或等于?TIMESTAMP。這個定義對于查找算法至關重要。

繼承?AbstractIndex?并實現其抽象方法

TimeIndex?同樣繼承自?AbstractIndex,因此它也自動獲得了內存映射、文件管理、并發控制和緩存友好查找等所有底層能力。它需要做的就是根據自己12字節的條目格式來實現父類的抽象方法。

  • entrySize():

    // ... existing code ...@Overrideprotected int entrySize() {return ENTRY_SIZE;}
    // ... existing code ...
    

    直接返回常量?12

  • parseEntry(ByteBuffer buffer, int n):

    // ... existing code ...@Overrideprotected TimestampOffset parseEntry(ByteBuffer buffer, int n) {return new TimestampOffset(timestamp(buffer, n), baseOffset() + relativeOffset(buffer, n));}private long timestamp(ByteBuffer buffer, int n) {return buffer.getLong(n * ENTRY_SIZE);}private int relativeOffset(ByteBuffer buffer, int n) {return buffer.getInt(n * ENTRY_SIZE + 8);}
    // ... existing code ...
    

    這個實現從?ByteBuffer?的指定位置?n

    1. 讀取前8個字節作為時間戳。
    2. 讀取后4個字節作為相對偏移量。
    3. 將相對偏移量加上?baseOffset()?轉換回絕對偏移量。
    4. 最后,用時間戳和絕對偏移量創建一個?TimestampOffset?對象并返回。

lookup: 時間戳查找方法。

// ... existing code ...public TimestampOffset lookup(long targetTimestamp) {return maybeLock(lock, () -> {ByteBuffer idx = mmap().duplicate();int slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY);if (slot == -1)return new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset());elsereturn parseEntry(idx, slot);});}
// ... existing code ...

邏輯分析

  1. 它調用了繼承自?AbstractIndex?的?largestLowerBoundSlotFor?方法,并指定按?KEY(即時間戳)搜索。
  2. 該方法會利用“熱區/冷區”優化,高效地找到索引中時間戳小于或等于?targetTimestamp?的那個最大的條目
  3. 返回這個條目對應的?TimestampOffset。這個結果告訴調用者,從返回的?offset?開始掃描?.log?文件,就有可能找到時間戳大于等于?targetTimestamp?的第一條消息。

maybeAppend: 唯一的寫入方法。

// ... existing code ...public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {lock.lock();try {if (!skipFullCheck && isFull())throw new IllegalArgumentException("Attempt to append to a full time index (size = " + entries() + ").");
// ... existing code ...if (entries() != 0 && timestamp < lastEntry.timestamp)throw new IllegalStateException("Attempt to append a timestamp (" + timestamp + ") to slot " + entries()+ " no larger than the last timestamp appended (" + lastEntry.timestamp + ") to " + file().getAbsolutePath());// We only append to the time index when the timestamp is greater than the last inserted timestamp.if (timestamp > lastEntry.timestamp) {log.trace("Adding index entry {} => {} to {}.", timestamp, offset, file().getAbsolutePath());MappedByteBuffer mmap = mmap();mmap.putLong(timestamp);mmap.putInt(relativeOffset(offset));incrementEntries();this.lastEntry = new TimestampOffset(timestamp, offset);
// ... existing code ...}} finally {lock.unlock();}}
// ... existing code ...

邏輯分析

  1. 該方法是線程安全的,通過?lock?保護。
  2. 它有嚴格的校驗:待追加的?timestamp?和?offset?都不能小于最后一條已存入的條目,這保證了索引的單調遞增特性,這是二分查找正確性的前提。
  3. 一個關鍵點是?if (timestamp > lastEntry.timestamp):只有當新消息的時間戳嚴格大于上一條索引的時間戳時,才會追加新條目。如果時間戳相同,則不追加。這是一種優化,避免了在時間戳沒有變化時(例如,Producer沒有設置時間戳,使用LogAppendTime,而一批消息在同一毫秒內寫入)產生冗余的索引項。
  • isFull():

    // ... existing code ...// We override the full check to reserve the last time index entry slot for the on roll call.@Overridepublic boolean isFull() {return entries() >= maxEntries() - 1;}
    // ... existing code ...
    

    這個方法重寫了父類的實現。它故意保留了最后一個索引槽位。注釋解釋了原因:這是為日志段滾動(roll over)時準備的。當一個日志段即將被關閉時,Kafka需要確保該段最后一條消息的時間戳被準確記錄下來,即使此時索引按正常標準已經“滿了”。這個預留的槽位就是為了這個目的。

總結

TimeIndex?是 Kafka 實現高級時間相關功能(如按時間點查找、按時間保留數據)的基石。它與?OffsetIndex?協同工作,構成了 Kafka 高性能存儲引擎的索引雙雄。

它繼承了?AbstractIndex?的通用高性能框架,并針對時間戳->偏移量的映射需求,定義了自己專屬的12字節條目格式和讀寫邏輯。通過強制的單調遞增規則和精巧的追加策略,TimeIndex?在保證數據正確性的同時,實現了極高的空間和時間效率。

TimeIndex類和具體索引條目

TimeIndex?類并不是只保存一項索引,而是作為一個管理者(Manager)或訪問器(Accessor),負責管理和操作整個?.timeindex?文件,而這個文件里包含了成百上千條索引條目。

讓我們一步步來分析:

可以把?TimeIndex?類想象成一個文件的“遙控器”或者“句柄”(Handle)。它本身不是數據集合,而是提供了操作數據集合(也就是?.timeindex?文件)的所有方法。

  • 文件 (.timeindex): 這是物理存儲,是真正的數據倉庫。它在磁盤上,里面按順序存放著一條又一條的索引條目。
  • 類 (TimeIndex): 這是在內存中的一個對象,它“知道”如何去讀、寫、和管理那個物理文件。

當創建一個?TimeIndex?對象時,構造函數會打開對應的文件,并使用 內存映射(mmap) 技術將文件內容映射到內存中,以便進行高性能的讀寫。

// ... existing code ...
public class TimeIndex extends AbstractIndex {
// ... existing code ...@SuppressWarnings("this-escape")public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {// super(...) 負責打開文件、進行內存映射等底層操作super(file, baseOffset, maxIndexSize, writable);// 從文件中讀取最后一個條目,并緩存起來this.lastEntry = lastEntryFromIndexFile();log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}",file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position());}
// ... existing code ...

在構造函數中,super(...)?調用了父類?AbstractIndex?的邏輯,正是這一步完成了對?file?的打開和內存映射。

文件內容:一系列的索引條目

.timeindex?文件并不是只存一個?TimestampOffset。它的內部格式是一連串的12字節的條目:

[Timestamp 1 (8 bytes), Offset 1 (4 bytes)], [Timestamp 2 (8 bytes), Offset 2 (4 bytes)], [Timestamp 3 (8 bytes), Offset 3 (4 bytes)], ...

TimeIndex?類提供了?parseEntry?方法,用于從文件的任意位置解析出這樣一條條的?TimestampOffset?對象。

// ... existing code ...@Overrideprotected TimestampOffset parseEntry(ByteBuffer buffer, int n) {// 從 buffer (也就是映射的內存) 的第 n 個條目位置,解析出一個 TimestampOffset 對象return new TimestampOffset(timestamp(buffer, n), baseOffset() + relativeOffset(buffer, n));}
// ... existing code ...

lastEntry?字段的作用:性能優化緩存

現在我們來解釋最關鍵的一點:private volatile TimestampOffset lastEntry;?這個字段為什么存在?

它并不是說?TimeIndex?只存這一個條目,而是對文件中最后一個條目的內存緩存(Cache)

為什么需要這個緩存??當 Kafka 往日志段中寫入新消息時,需要判斷是否要向?.timeindex?文件中追加新的索引條目。追加的規則之一是,新消息的時間戳必須大于索引中最后一條記錄的時間戳。

// ... existing code ...public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {lock.lock();try {
// ...// 檢查時間戳是否比上一個條目大if (entries() != 0 && timestamp < lastEntry.timestamp)throw new IllegalStateException("...");// 只有當新時間戳 > 上一個時間戳時,才寫入if (timestamp > lastEntry.timestamp) {// ... 寫入文件 ...// 更新內存中的緩存this.lastEntry = new TimestampOffset(timestamp, offset);}
// ...} finally {lock.unlock();}}
// ... existing code ...

如果沒有?lastEntry?這個緩存,那么每次調用?maybeAppend?時,都需要去訪問文件(即使是通過mmap)來讀取最后12個字節,以獲取上一個時間戳。通過?lastEntry?字段將這個值緩存在內存中,就可以極快地完成這個比較,這是一個非常重要的性能優化。volatile?關鍵字確保了多線程之間的可見性。

總結

  • TimeIndex?類?是一個管理器,它引用并操作一個文件
  • .timeindex?文件?是一個數據容器,它包含了一個索引條目序列
  • TimestampOffset?對象?是索引條目在內存中的邏輯表示
  • lastEntry?字段?是對文件中最后一個條目的性能緩存,而不是?TimeIndex?的全部內容。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/89959.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/89959.shtml
英文地址,請注明出處:http://en.pswp.cn/web/89959.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

重學前端008 --- 響應式網頁設計 CSS 無障礙 Quiz

文章目錄meta 總結html 頁面結構img 尺寸子選擇器 >a 錨點僅屏幕閱讀器可見li 元素的懸停設置小屏幕防止溢出meta 總結 <head><!-- 基礎字符編碼聲明 --><meta charset"UTF-8"><!-- 視口設置&#xff0c;響應式設計必備 --><meta nam…

C# 調用CodeSoft模板打印標簽,編輯模板覆蓋根目錄的文件,不能拷貝

C# 調用CodeSoft模板打印標簽&#xff0c;編輯模板覆蓋根目錄的文件&#xff0c;不能拷貝&#xff0c;報文件已經打開。 原因&#xff1a;C#窗體關閉時&#xff0c;沒有關閉LabelManager2.ApplicationClass labApp&#xff0c;別忘記寫labApp1.Quit(); if (labApp1 ! null) {la…

Logback簡單使用

Logback 日志框架介紹 正如你所知&#xff0c;開發者擁有大量日志工具可供選擇。本節中&#xff0c;我們將學習一個非常流行的日志庫 —— Logback。它是 Log4j 日志庫的繼任者&#xff0c;基于相似的理念構建。Logback 在同步和異步日志記錄方面都非常快速&#xff0c;并提供了…

Python爬蟲實戰:研究langid.py庫相關技術

一、引言 在當今全球化的網絡環境下,互聯網上的內容呈現出多語言的特點。對于許多自然語言處理 (NLP) 任務,如文本分類、情感分析和信息檢索,準確識別文本的語言是首要步驟。網絡爬蟲作為獲取互聯網內容的重要工具,結合語言識別技術,可以為多語言信息處理提供豐富的數據來…

打車代駕 app 派單接單系統模塊搭建

一、邏輯分析打車代駕 APP 的派單接單系統模塊是整個應用的核心部分&#xff0c;它需要高效、準確地處理訂單分配和司機接單流程&#xff0c;以確保用戶能夠快速得到服務&#xff0c;司機能夠合理地接到訂單。用戶端下單邏輯&#xff1a;用戶打開 APP&#xff0c;輸入出發地、目…

Java Stream API性能優化:原理深度解析與實戰指南

Java Stream API性能優化&#xff1a;原理深度解析與實戰指南 技術背景與應用場景 隨著大數據量處理和高并發場景的普及&#xff0c;傳統的集合遍歷方式在代碼可讀性和性能上逐漸顯現瓶頸。Java 8引入的Stream API&#xff0c;通過聲明式的流式編程極大提升了開發效率和可讀性&…

Nginx配置proxy protocol代理獲取真實ip

Nginx配置proxy protocol 文章目錄Nginx配置proxy protocol前言一、PROXY Protocol協議二、配置方法代理服務器配置http模塊代理??Stream 模塊?代理測試配置是否生效端口檢查測試ip記錄驗證http驗證tcp注意事項和理解誤區應用程序機器配置總結前言 在現代開發中有很多場景需…

什么是商業智能BI數據分析的指標爆炸?

指標爆炸這個詞大家可能都是第一次聽說&#xff0c;指標怎么會爆炸呢&#xff1f;其實這個是我們很多年前在一些商業智能BI項目上總結出來的一種場景或者現象&#xff0c;就是過于的開放給業務人員在BI自助分析過程中創造了很多衍生性的分析指標&#xff0c;結果就造成了前端指…

Spring AI 系列之十八 - ChatModel

之前做個幾個大模型的應用&#xff0c;都是使用Python語言&#xff0c;后來有一個項目使用了Java&#xff0c;并使用了Spring AI框架。隨著Spring AI不斷地完善&#xff0c;最近它發布了1.0正式版&#xff0c;意味著它已經能很好的作為企業級生產環境的使用。對于Java開發者來說…

Linux學習之Linux系統權限

在上一篇的內容中我們學習到了Linux系統命令相關的知識及其相關的擴展內容&#xff0c;本期我們將學習Linux基礎的另一個重要部分&#xff1a;Linux系統權限管理 作者的個人gitee&#xff1a;樓田莉子 (riko-lou-tian) - Gitee.com 目錄 權限概念及必要性 什么是權限 為什么要…

Web3.0 能為你帶來哪些實質性的 改變與突破

如今各種大廠裁員消息層出不窮&#xff0c;今年又添飛書、剪映、微軟、思科... 這有一張網友整理的去年互聯網大廠裁員裁員信息表&#xff1a; 目前國內很多大廠都在裁員&#xff0c;非常現實、且越來越多 35 技術人&#xff0c;正在面臨這樣的問題&#xff0c;那么Web3.0 確實…

doker centos7安裝1

1.什么是doker Docker 是一個開源的應用容器引擎&#xff0c;它允許開發者將應用程序及其依賴項打包到一個可移植的容器中&#xff0c;然后發布到任何支持 Docker 的操作系統上&#xff0c;實現 “一次構建&#xff0c;到處運行”。 容器是一種輕量級的虛擬化技術&#xff0c…

自動化面試題

1、什么是測試套件測試套件是多個測試用例的集合。2、搭建接口自動化框架中&#xff0c;你遇到最大的難點是什么&#xff0c;以及怎么解決的?測試數據動態管理難點:接口依賴動態參數(如Token、訂單ID)&#xff0c;數據無法硬編碼.解決方案:使用關聯提取(如正則提取響應中的Tok…

【Linux】LVS(Linux virual server)環境搭建

一、LVS的運行原理1.1 LVS簡介LVS:Linux Virtual Server&#xff0c;負載調度器&#xff0c;內核集成&#xff0c;章文嵩&#xff0c;阿里的四層SLB(Server LoadBalance)是基于LVSkeepalived實現LVS 官網: http://www.linuxvirtualserver.org/ LVS 相關術語 VS: Virtual Server…

算法競賽備賽——【圖論】求最短路徑——Dijkstra

Dijkstra 用來計算從一個點到其他所有點的最短路徑的算法&#xff0c;是一種單源最短路徑算法。也就是說&#xff0c;只能計算起點只有一個的情況。Dijkstra的時間復雜度是O (|v|^2)&#xff0c;它不能處理存在負邊權的情況。 鄰接矩陣存圖 #include<iostream> using …

影刀 RPA:批量修改 Word 文檔格式,高效便捷省時省力

在日常辦公和文檔處理中&#xff0c;Word 文檔格式的統一和規范是許多企業和個人用戶的重要需求。無論是撰寫報告、制作提案&#xff0c;還是整理資料&#xff0c;都需要確保文檔格式的一致性。然而&#xff0c;手動修改多個 Word 文檔的格式不僅耗時費力&#xff0c;還容易因疏…

GitLab 社區版 10.8.4 安裝、漢化與使用教程

一、GitLab 安裝 GitLab 提供了集成所需軟件的 RPM 包&#xff0c;簡化了安裝流程。我們選擇安裝社區版&#xff08;CE&#xff09;10.8.4&#xff0c;可通過官方網站或國內鏡像源&#xff08;如清華鏡像&#xff09;獲取安裝包。 1. 準備工作 首先創建工具目錄并進入&#…

[硬件電路-64]:模擬器件 -二極管在穩壓電路中的應用

二極管在穩壓電路中的應用主要基于其單向導電性和特定類型二極管&#xff08;如穩壓二極管&#xff09;的電壓穩定特性。以下是詳細解釋&#xff1a;一、普通二極管的穩壓作用&#xff08;有限場景&#xff09;正向導通壓降的利用&#xff1a;原理&#xff1a;普通二極管在正向…

【Linux】重生之從零開始學習運維之Nginx

安裝apt/yum安裝apt imstall nginx yum install nginxRocky源碼編譯安裝基礎編譯環境yum install gcc make gcc-c glibc glibc-devel pcre pcre-devel openssl openssldevel systemd-devel zlib-devel yum install libxml2 libxml2-devel libxslt libxslt-devel php-gd gd-deve…

主流 MQ 的關鍵性能指標

常用消息隊列&#xff08;MQ&#xff09;的“數量級”通常圍繞吞吐量&#xff08;TPS&#xff0c;每秒處理消息數&#xff09;、消息堆積能力、延遲三個核心指標展開&#xff0c;不同MQ因設計目標&#xff08;高吞吐、低延遲、高可靠等&#xff09;不同&#xff0c;數量級差異顯…