ByteUtils
?是 Kafka 中一個非常基礎且核心的工具類。從包名?common.utils
?就可以看出,它被廣泛用于 Kafka 的各個模塊中。它的主要職責是提供一套高效、底層的靜態方法,用于在字節緩沖區 (ByteBuffer
)、字節數組 (byte[]
) 以及輸入/輸出流 (InputStream
/OutputStream
) 中讀寫 Java 的基本數據類型。
ZigZag 編解碼過程的數學原理詳解
康托爾對角線映射。
可以找到一種方式,任何一個有理數都可以在有限步驟被枚舉到。
可以見?哥德爾定理?的討論
ZigZag 編碼是一種巧妙的算法,它能將有符號整數(正數、負數、零)映射到無符號整數數軸上,其核心優勢在于能將絕對值小的數(無論正負)都映射為小的無符號整數。這使得它與 Varint 編碼結合使用時,能極大地壓縮數據體積。
其編解碼過程可以分為對非負數和負數兩種情況進行討論。
編碼過程 (Signed -> Unsigned)
編碼操作由公式 (n << 1) ^ (n >> 63)
(以64位 long 為例)實現,我們可以將其拆解為兩種情況:
對于非負數 (x >= 0):
- ??編碼公式??:
x -> 2*x
- ??推導??:
當 x 為非負數時,x >> 63
的結果是 0。
因此編碼公式簡化為(x << 1) ^ 0
,即x * 2
。 - ??示例??:
0 -> 0
,1 -> 2
,2 -> 4
, ...
對于負數 (x < 0):
- ??編碼公式??:
x -> -2*x - 1
- ??推導??:
當 x 為負數時,x >> 63
的結果是 -1(二進制全為1)。
編碼公式變為(x << 1) ^ -1
。
^ -1
在二進制中等價于按位取反 (~
)。
因此,編碼結果為~(x * 2)
。
根據二進制補碼的性質,~a = -a - 1
,所以~(x * 2)
等于- (x * 2) - 1
,即-2x - 1
。 - ??示例??:
-1 -> 1
,-2 -> 3
,-3 -> 5
, ...
??效果??:
通過這種方式,正數被映射到偶數,負數被映射到奇數,實現了在無符號數軸上的“之”字形(ZigZag)交錯排列。
解碼過程 (Unsigned -> Signed)
解碼操作由公式 (y >>> 1) ^ -(y & 1)
實現,其中 y 是編碼后的無符號數。
(1) y >>> 1
(無符號右移一位):
- ??數學意義??:
等價于y / 2
(向下取整)。 - ??對非負數編碼結果 (y 為偶數)??:
y/2
直接得到原始值 x,解碼完成。 - ??對負數編碼結果 (y 為奇數)??:
已知y = -2x - 1
,此時y/2 = (-2x - 1) / 2 = -x - 1
(向下取整)。
(2) -(y & 1)
(判斷奇偶并生成掩碼):
- ??作用??:
y & 1
用于判斷 y 的奇偶性:- 若 y 為偶數,結果為 0;
- 若 y 為奇數,結果為 1。
- ??掩碼生成??:
-(y & 1)
生成掩碼:- y 為偶數時,掩碼為 0;
- y 為奇數時,掩碼為 -1(二進制全為1)。
(3) ^
(異或操作):
- ??當 y 為偶數(來自非負數)??:
解碼公式為(y/2) ^ 0
,結果即y/2
(原始值 x)。 - ??當 y 為奇數(來自負數)??:
解碼公式為(y/2) ^ -1
。
已知此時y/2 = -x - 1
,因此:
(-x - 1) ^ -1
^ -1
等價于按位取反 (~
),故結果為~(-x - 1)
。
根據補碼性質~a = -a - 1
,推導如下:
~(-x - 1) = -(-x - 1) - 1 = (x + 1) - 1 = x
。
最終還原為原始負數 x。
??總結??:
通過這一系列精巧的位運算,解碼過程成功將無符號數還原為原始有符號數。
可變長度整數(Varints)和長整數(Varlongs)
這是?ByteUtils
?中非常重要的一部分,也是 Kafka 實現高效數據壓縮的關鍵技術之一。Varint 是一種使用一個或多個字節序列化整數的方法,數值越小的整數(絕對值)占用的字節數越少。這對于存儲大量小整數(如長度、數量等)的場景能有效節省空間。
Kafka 的 Varint 實現參考了 Google Protocol Buffers 的編碼方案。
無符號 Varint (Unsigned Varint
)
這是 Varint 的基礎。它將一個 32 位整數編碼為 1 到 5 個字節。每個字節的最高位(MSB)是標志位,1
?表示后面還有字節,0
?表示這是最后一個字節。剩下的 7 位用于存儲數據。
- 讀取 (
readUnsignedVarint
):// ... existing code ... public static int readUnsignedVarint(ByteBuffer buffer) {byte tmp = buffer.get();if (tmp >= 0) {return tmp;} else {int result = tmp & 127;if ((tmp = buffer.get()) >= 0) {result |= tmp << 7;} else {result |= (tmp & 127) << 7;if ((tmp = buffer.get()) >= 0) {result |= tmp << 14;} else {result |= (tmp & 127) << 14;if ((tmp = buffer.get()) >= 0) {result |= tmp << 21;} else {result |= (tmp & 127) << 21;result |= (tmp = buffer.get()) << 28;if (tmp < 0) {throw illegalVarintException(result);}}}}return result;} } // ... existing code ...
- 代碼分析: 這段代碼通過一系列的?
if-else
?結構展開了循環,這是一種為了性能的優化(循環展開)。 - 它逐字節讀取,檢查最高位(通過?
tmp >= 0
?判斷,如果為正數,說明最高位是0)。 - 如果最高位是1,就取其低7位 (
tmp & 127
),并將其拼接到結果?result
?的高位上,然后繼續讀取下一個字節。 - 如果讀取超過5個字節仍然沒有結束,會拋出異常。
- 代碼分析: 這段代碼通過一系列的?
有符號 Varint (Signed Varint
?- ZigZag 編碼)
直接用 Varint 編碼負數效率很低(例如 -1 會被編碼成 5 個字節)。為了高效地編碼有符號數,特別是那些絕對值較小的負數,Kafka 使用了 ZigZag 編碼。它通過一種位操作,將有符號數映射到無符號數上,使得絕對值小的數(無論正負)都映射為小的無符號數。
- 映射規則:
(n << 1) ^ (n >> 31)
?for signed?n
// ... existing code ...
public static int readVarint(ByteBuffer buffer) {int value = readUnsignedVarint(buffer);return (value >>> 1) ^ -(value & 1);
}
// ... existing code ...
這個方法的核心作用是解碼一個經過?ZigZag(對角線)編碼?和?Varint 編碼?的整數。整個過程分為兩步:
- 從?
ByteBuffer
?中讀取一個經過 Varint 編碼的無符號整數。 - 對這個無符號整數進行 ZigZag 解碼,將其還原為原始的有符號整數。
1.?int value = readUnsignedVarint(buffer);
這一步是 Varint 解碼。它首先調用?readUnsignedVarint
?方法,該方法會從字節流中讀取 1 到 5 個字節,并將它們解析成一個32位的無符號整數。這個解析出來的?value
?并不是最終結果,而是經過 ZigZag 編碼后的中間值。
2.?return (value >>> 1) ^ -(value & 1);
這是整個方法最關鍵的部分,即?ZigZag(對角線)解碼。這一行代碼非常精妙,它將上一步得到的無符號整數?value
?還原回它所代表的原始有符號整數。
為什么需要 ZigZag 編碼?
Varint 編碼對于小的正整數效率很高(例如,0-127 只需要1個字節)。但對于負數,其二進制補碼表示通常是一個很大的正數(例如,-1 的補碼是?0xFFFFFFFF
),如果直接用 Varint 編碼,會占用最多的5個字節,完全失去了 Varint 的優勢。
ZigZag 編碼解決了這個問題。它通過一種位運算,將有符號整數“之”字形地映射到無符號整數上,從而保證絕對值小的數(無論正負)都會被映射成小的無符號數。
映射關系(對角線/ZigZag 編碼)
原始有符號值 (Original Signed) | 編碼后無符號值 (Encoded Unsigned) |
---|---|
0 | 0 |
-1 | 1 |
1 | 2 |
-2 | 3 |
2 | 4 |
... | ... |
2,147,483,647 | 4,294,967,294 |
-2,147,483,648 | 4,294,967,295 |
解碼公式?(value >>> 1) ^ -(value & 1)
?的剖析
讓我們通過兩個例子來理解這個解碼過程:
-
示例 1: 解碼?
-1
- 從映射表可知,
-1
?編碼后的值為?1
。所以?value = 1
。 value & 1
?=>?1 & 1
?=>?1
。 (取最低位,用于判斷原始值的符號)-(value & 1)
?=>?-1
。在二進制補碼中,-1
?是?...11111111
。value >>> 1
?=>?1 >>> 1
?=>?0
。 (無符號右移一位,獲取數值部分)0 ^ -1
?=>?000...000 ^ 111...111
?=>?111...111
。結果是?-1
。解碼正確。
- 從映射表可知,
-
示例 2: 解碼?
2
- 從映射表可知,
2
?編碼后的值為?4
。所以?value = 4
?(二進制?...00000100
)。 value & 1
?=>?4 & 1
?=>?0
。-(value & 1)
?=>?-0
?=>?0
。value >>> 1
?=>?4 >>> 1
?=>?2
。2 ^ 0
?=>?2
。解碼正確。
- 從映射表可知,
readVarlong
readVarlong
?和?writeVarlong
?是?Varint
?的 64 位版本,原理完全相同,只是最多可以占用 10 個字節,同樣也使用了 ZigZag 編碼來處理有符號長整型。
// ... existing code ...public static long readVarlong(ByteBuffer buffer) {long raw = readUnsignedVarlong(buffer);return (raw >>> 1) ^ -(raw & 1);}// visible for testingstatic long readUnsignedVarlong(ByteBuffer buffer) {long value = 0L;int i = 0;long b;while (((b = buffer.get()) & 0x80) != 0) {value |= (b & 0x7f) << i;i += 7;if (i > 63)throw illegalVarlongException(value);}value |= b << i;return value;}
// ... existing code ...
- 代碼分析:?
readUnsignedVarlong
?使用了?while
?循環,邏輯更清晰。它不斷讀取字節,只要字節的最高位是1 ((b & 0x80) != 0
),就將其低7位拼接到結果中,并增加位移量?i
。當讀到最高位為0的字節時,循環結束。
writeVarlong
此方法的作用是將一個64位的有符號長整型 (long
) 編碼后寫入到一個?DataOutput
?輸出流中。這個編碼過程與我們之前討論的?writeVarint
?非常相似,同樣是?ZigZag(對角線)編碼?和?Varint 編碼?的組合,只不過這次是針對64位的?long
?類型。
// ... existing code .../*** Write the given integer following the variable-length zig-zag encoding from* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol Buffers</a>* into the output.** @param value The value to write* @param out The output to write to*/public static void writeVarlong(long value, DataOutput out) throws IOException {long v = (value << 1) ^ (value >> 63);while ((v & 0xffffffffffffff80L) != 0L) {out.writeByte(((int) v & 0x7f) | 0x80);v >>>= 7;}out.writeByte((byte) v);}
// ... existing code ...
整個方法的執行可以分為兩個主要步驟:
ZigZag(對角線)編碼
long v = (value << 1) ^ (value >> 63);
這是編碼的第一步,也是核心的 ZigZag 編碼步驟。
value << 1
: 將原始的?long
?值向左移動一位。這個操作的目的是為符號位騰出空間。value >> 63
: 這是一個算術右移操作。對于?long
?類型,算術右移63位會得到一個全為符號位的值。如果?value
?是正數或0,結果是?0L
;如果?value
?是負數,結果是?-1L
?(二進制?0xFFFFFFFFFFFFFFFF
)。^
: 異或操作。- 如果?
value
?是正數或0:?(value << 1) ^ 0
,結果就是?value
?的兩倍。 - 如果?
value
?是負數:?(value << 1) ^ -1
,結果是對?value
?左移一位后的值進行按位取反。
- 如果?
這個公式巧妙地將有符號的?long
?映射到了無符號的?long
?數軸上,實現了我們之前討論過的“對角線”映射,確保了絕對值小的數(無論正負)都會得到一個小的無符號編碼值?v
。
Varint 編碼
接下來的?while
?循環負責將上一步得到的無符號編碼值?v
?進行 Varint 編碼,并逐字節寫入輸出流。
while ((v & 0xffffffffffffff80L) != 0L) {out.writeByte(((int) v & 0x7f) | 0x80);v >>>= 7;
}
out.writeByte((byte) v);
-
while ((v & 0xffffffffffffff80L) != 0L)
: 這是循環的條件。0xffffffffffffff80L
?是一個掩碼,它的低7位是0,其余位都是1。這個條件檢查?v
?是否還有超過7位的數據。換句話說,只要?v
?的值大于等于 128 (2^7),循環就會繼續。 -
out.writeByte(((int) v & 0x7f) | 0x80);
: 這是循環體內的核心操作。(int) v & 0x7f
: 取出?v
?的低7位數據。| 0x80
: 將這7位數據與?0x80
?(二進制?10000000
) 進行或運算。這會將該字節的最高位(MSB)設置為?1
,表示后面還有更多的字節。out.writeByte(...)
: 將這個構造好的字節寫入輸出流。
-
v >>>= 7;
: 將?v
?無符號右移7位,準備處理下一組7位數據。 -
out.writeByte((byte) v);
: 當循環結束時,意味著?v
?的剩余值已經小于128,可以用7位來表示。這時,將?v
?的最后部分直接作為一個字節寫入。這個字節的最高位自然是?0
,表示這是 Varint 序列的最后一個字節。
總結
writeVarlong
?方法通過一個兩步過程高效地序列化一個?long
?值:
- ZigZag 編碼: 使用?
(value << 1) ^ (value >> 63)
?將有符號?long
?映射為無符號?long
,使得小數值(無論正負)編碼后依然是小數值。 - Varint 編碼: 使用?
while
?循環,每次從編碼后的值中取出7位數據,并加上一個“續傳”標志位(MSB=1),然后寫入字節流,直到最后不足7位的數據作為最后一個字節(MSB=0)寫入。
這種組合編碼方式是 Kafka 協議中節省空間、提升效率的關鍵技術之一,尤其在傳輸大量包含小整數(如時間戳增量、偏移量增量等)的消息時效果顯著。
無符號整數(Unsigned Integers)的處理
Java 的基本數據類型中沒有無符號整數(unsigned int)。但在網絡協議或與其他系統交互時,經常需要處理無符號數。ByteUtils
?提供了方法來模擬對 32 位無符號整數的讀寫。
讀取無符號整數
為了避免將一個最高位為1的32位整數錯誤地解釋為負數,ByteUtils
?在讀取后將其轉換為?long
?類型。
// ... existing code .../*** Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes** @param buffer The buffer to read from* @return The integer read, as a long to avoid signedness*/public static long readUnsignedInt(ByteBuffer buffer) {return buffer.getInt() & 0xffffffffL;}
// ... existing code ...
- 代碼分析:?
buffer.getInt()
?讀取一個標準的 32 位有符號整數。關鍵在于?& 0xffffffffL
?這個操作。它是一個按位與操作,通過一個?long
?類型的掩碼,將讀取到的?int
?值(可能會被當作負數)轉換為一個正的?long
?值,從而正確地表示了原始的無符號整數值。
寫入無符號整數
寫入時,邏輯類似,將一個?long
?值截斷為 32 位?int
?再寫入。
// ... existing code .../*** Write the given long value as a 4 byte unsigned integer. Overflow is ignored.** @param buffer The buffer to write to* @param value The value to write*/public static void writeUnsignedInt(ByteBuffer buffer, long value) {buffer.putInt((int) (value & 0xffffffffL));}
// ... existing code ...
- 代碼分析:?
(value & 0xffffffffL)
?確保只取?long
?值的低 32 位,然后強制轉換為?int
?并寫入?ByteBuffer
。
此外,該類還提供了處理小端字節序(Little-Endian)的方法,如?readUnsignedIntLE
?和?writeUnsignedIntLE
,這在需要與采用不同字節序的系統交互時非常有用。Kafka 的網絡協議本身是網絡字節序,即大端(Big-Endian)。
其他工具方法
除了上述核心功能,ByteUtils
?還包含一些其他有用的方法,例如:
readDouble
/writeDouble
: 讀寫 64 位浮點數。EMPTY_BUF
: 提供一個靜態的、空的?ByteBuffer
?實例,避免重復創建。
總結
ByteUtils
?是 Kafka 中一個至關重要的底層工具類,它封裝了對 Java 基本類型與字節之間進行高效轉換的邏輯。它的設計體現了對性能的極致追求,例如在?readUnsignedVarint
?中使用循環展開,以及提供 Varint/Varlong 這種空間高效的編碼方式。理解這個類的工作原理,特別是 Varint 和 ZigZag 編碼,對于深入理解 Kafka 的網絡協議、消息格式以及存儲機制非常有幫助。