ByteBuffer
當我們進行數據傳輸的時候,往往需要使用到緩沖區,常用的緩沖區就是JDK NIO類庫提供的java.nio.Buffer。
實際上,7種基礎類型(Boolean除外)都有自己的緩沖區實現,對于NIO編程而言,我們主要使用的是ByteBuffer。從功能角度而言,ByteBuffer完全可以滿足NIO編程的需要,但是由于NIO編程的復雜性,ByteBuffer也有其局限性,它的主要缺點如下。
(1)ByteBuffer長度固定,一旦分配完成,它的容量不能動態擴展和收縮,當需要編碼的POJO對象大于ByteBuffer的容量時,會發生索引越界異常;
(2)ByteBuffer只有一個標識位置的指針position,讀寫的時候需要手工調用flip()和rewind()等,使用者必須小心謹慎地處理這些API,否則很容易導致程序處理失敗;
(3)ByteBuffer的API功能有限,一些高級和實用的特性它不支持,需要使用者自己編程實現。
ByteBuf
為了彌補這些不足,Netty提供了自己的ByteBuffer實現——ByteBuf。
網絡數據的基本單位總是字節。Java NIO 提供了ByteBuffer 作為它的字節容器,但是這個類使用起來過于復雜,而且也有些繁瑣。
Netty 的ByteBuffer 替代品是ByteBuf,一個強大的實現,既解決了JDK API 的局限性,又為網絡應用程序的開發者提供了更好的API。在本章中我們將會說明和JDK 的ByteBuffer 相比,ByteBuf 的卓越功能性和靈活性。這
也將有助于更好地理解Netty 數據處理的一般方式。
繼承關系圖
ByteBuf的優點
Netty 的數據處理API 通過兩個組件暴露——abstract class ByteBuf 和interface ByteBufHolder。
下面是一些ByteBuf API 的優點:
- 它可以被用戶自定義的緩沖區類型擴展;
- 通過內置的復合緩沖區類型實現了透明的零拷貝;
- 容量可以按需增長(類似于JDK 的StringBuilder);
- 在讀和寫這兩種模式之間切換不需要調用ByteBuffer 的flip()方法;
- 讀和寫使用了不同的索引;
- 支持方法的鏈式調用;
- 支持引用計數;
- 支持池化。
其他類可用于管理ByteBuf 實例的分配,以及執行各種針對于數據容器本身和它所持有的數據的操作。我們將在仔細研究ByteBuf 和ByteBufHolder 時探討這些特性。
ByteBuf動態擴容
通常情況下,當我們對ByteBuffer進行put操作的時候,如果緩沖區剩余可寫空間不夠,就會發生BufferOverflowException異常。為了避免發生這個問題,通常在進行put操作的時候會對剩余可用空間進行校驗,如果剩余空間不足,需要重新創建一個新的ByteBuffer,并將之前的ByteBuffer復制到新創建的ByteBuffer中,最后釋放老的ByteBuffer,代碼示例如下。
public ByteBuffer put(ByteBuffer src) {if (src instanceof HeapByteBuffer) {if (src == this)throw new IllegalArgumentException();HeapByteBuffer sb = (HeapByteBuffer)src;int n = sb.remaining();if (n > remaining())throw new BufferOverflowException();System.arraycopy(sb.hb, sb.ix(sb.position()),hb, ix(position()), n);sb.position(sb.position() + n);position(position() + n);} else if (src.isDirect()) {int n = src.remaining();if (n > remaining())throw new BufferOverflowException();src.get(hb, ix(position()), n);position(position() + n);} else {super.put(src);}return this;}
ByteBuf的兩種索引
因為所有的網絡通信都涉及字節序列的移動,所以高效易用的數據結構明顯是必不可少的。Netty 的ByteBuf 實現滿足并超越了這些需求。讓我們首先來看看它是如何通過使用不同的索引來簡化對它所包含的數據的訪問的吧。
ByteBuf 維護了兩個不同的索引:一個用于讀取,一個用于寫入。當你從ByteBuf 讀取時,它的readerIndex 將會被遞增已經被讀取的字節數。同樣地,當你寫入ByteBuf 時,它的writerIndex 也會被遞增。圖5-1 展示了一個空ByteBuf 的布局結構和狀態(一個讀索引和寫索引都設置為0 的16 字節ByteBuf)。
可以看到,正常情況下,一個ByteBuf被兩個索引分成三部分。
readerIndex 達到和writerIndex 位于同一位置,表示我們到達"可以讀取的"數據的末尾。就如同試圖讀取超出數組末尾的數據一樣,試圖讀取超出該點的數據將會觸發一個IndexOutOfBoundsException。
名稱以read 或者write 開頭的ByteBuf 方法,將會推進其對應的索引,而名稱以set 或者get 開頭的操作則不會。后面的這些方法將在作為一個參數傳入的一個相對索引上執行操作。
ByteBuf的三種緩存區類型
和ByteBuffer 一樣,ByteBuf也是一個緩存區類,它有三種緩存區類型:
堆緩存
最常用的ByteBuf 模式是將數據存儲在JVM 的堆空間中,可以被jvm自動回收。這種模式被稱為支撐數組(backing array),它能在沒有使用池化的情況下提供快速的分配和釋放。這種方式,如代碼清單5-1 所示,非常適合于有遺留的數據需要處理的情況。
ByteBuf heapBuf = ...;//檢查ByteBuf 是否有一個支撐數組if (heapBuf.hasArray()) {//如果有,則獲取對該數組的引用byte[] array = heapBuf.array();//計算第一個字節的偏移量。int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();//獲得可讀字節數int length = heapBuf.readableBytes();//使用數組、偏移量和長度作為參數調用你的方法handleArray(array, offset, length);}
當hasArray()方法返回false 時,嘗試訪問支撐數組將觸發一個UnsupportedOperationException。這個模式類似于JDK 的ByteBuffer 的用法。
堆緩存區的缺點在于如果進行Socket的I/O讀寫,需要額外進行一次內存復制,將堆內存對應的緩沖區復制到內核的channel中,性能會有一定的下降。
直接緩存區
直接緩存區和非直接緩存區的區別
我們先來了解一下什么是直接緩存區:
我們知道java的ByteBuffer類型就有直接和非直接緩存區這兩種類型。
- 非直接緩沖區:通過 ByteBuffer的allocate() 方法分配緩沖區,將緩沖區建立在 JVM 的內存中。
- 直接緩沖區:通過 ByteBuffer的allocateDirect() 方法分配直接緩沖區,將緩沖區建立在物理內存中,不再對其進行復制,可以提高效率。雖然直接緩沖區使JVM可以進行高效的I/o操作,但它使用的內存是操作系統分配的,繞過了JVM堆棧,建立和銷毀比堆棧上的緩沖區要更大的開銷。
他們的區別如下:
- 字節緩沖區要么是直接的,要么是非直接的。如果為直接字節緩沖區,則 Java 虛擬機會盡最大努力直接在此緩沖區上執行本機 I/O 操作。也就是說,在每次調用基礎操作系統的一個本機 I/O 操作之前(或之后)
- 直接緩沖區的內容可以駐留在常規的垃圾回收堆之外,因此,它們對應用程序的內存需求量造成的影響可能并不明顯。所以,建議將直接緩沖區主要分配給那些易受基礎系統的本機 I/O 操作影響的大型、持久的緩沖區。一般情況下,最好僅在直接緩沖區能在程序性能方面帶來明顯好處時分配它們。
ByteBuf 直接緩存區的使用
我們直接看代碼:
ByteBuf directBuf = ...;//檢查ByteBuf 是否由數組支撐。如果不是,則這是一個直接緩沖區if (!directBuf.hasArray()) {//獲取可讀字節數int length = directBuf.readableBytes();//分配一個新的數組來保存具有該長度的字節數據byte[] array = new byte[length];//將字節復制到該數組directBuf.getBytes(directBuf.readerIndex(), array);//使用數組、偏移量和長度作為參數調用你的方法handleArray(array, 0, length);}
復合緩沖區
第三種也是最后一種模式使用的是復合緩沖區,它為多個ByteBuf 提供一個聚合視圖。在這里你可以根據需要添加或者刪除ByteBuf 實例,這是一個JDK 的ByteBuffer 沒有的特性。
Netty 通過一個ByteBuf 子類——CompositeByteBuf——實現了這個模式,它提供了一個將多個緩沖區表示為單個合并緩沖區的虛擬表示。
為了舉例說明,讓我們考慮一下一個由兩部分——頭部和主體——組成的將通過HTTP 協議傳輸的消息。這兩部分由應用程序的不同模塊產生,將會在消息被發送的時候組裝。該應用程序可以選擇為多個消息重用相同的消息主體。當這種情況發生時,對于每個消息都將會創建一個新的頭部。
因為我們不想為每個消息都重新分配這兩個緩沖區,所以使用CompositeByteBuf 是一個完美的選擇。它在消除了沒必要的復制的同時,暴露了通用的ByteBuf API。圖5-2 展示了生成的消息布局。
代碼清單5-3 展示了如何通過使用JDK 的ByteBuffer 來實現這一需求。創建了一個包含兩個ByteBuffer 的數組用來保存這些消息組件,同時創建了第三個ByteBuffer 用來保存所有這些數據的副本。
// Use an array to hold the message partsByteBuffer[] message = new ByteBuffer[] { header, body };// Create a new ByteBuffer and use copy to merge the header and bodyByteBuffer message2 =ByteBuffer.allocate(header.remaining() + body.remaining());message2.put(header);message2.put(body);message 2.flip();
分配和復制操作,以及伴隨著對數組管理的需要,使得這個版本的實現效率低下而且笨拙。
代碼清單5-4 展示了一個使用了CompositeByteBuf 的版本。
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();//將ByteBuf 實例追加到CompositeByteBufByteBuf headerBuf = ...; // can be backing or directByteBuf bodyBuf = ...; // can be backing or directmessageBuf.addComponents(headerBuf, bodyBuf);.....//刪除位于索引位置為 0(第一個組件)的ByteBufmessageBuf.removeComponent(0); // remove the header//循環遍歷所有的ByteBuf 實例for (ByteBuf buf : messageBuf) {System.out.println(buf.toString());}
復合緩存區的使用
CompositeByteBuf 可能不支持訪問其支撐數組,因此訪問CompositeByteBuf 中的數據類似于(訪問)直接緩沖區的模式,如代碼清單5-5 所示。
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
//獲得可讀字節數
int length = compBuf.readableBytes();
//分配一個具有可讀字節數長度的新數組
byte[] array = new byte[length];
//將字節讀到該數組中
compBuf.getBytes(compBuf.readerIndex(), array);
//使用偏移量和長度作為參數使用該數組
handleArray(array, 0, array.length);
總結
經驗表明:ByteBuf的最佳實踐是在I/O通信線程的讀寫緩沖區使用DirectByteBuf,后端業務消息的編解碼模塊使用HeapByteBuf
字節級操作
ByteBuf 提供了許多超出基本讀、寫操作的方法用于修改它的數據。在接下來的章節中,我們將會討論這些中最重要的部分。
通過索引訪問數據
如同在普通的Java 字節數組中一樣,ByteBuf 的索引是從零開始的:第一個字節的索引是0,最后一個字節的索引總是capacity() - 1。代碼清單5-6 表明,對存儲機制的封裝使得遍歷ByteBuf 的內容非常簡單。
ByteBuf buffer = ...;for (int i = 0; i < buffer.capacity(); i++) {byte b = buffer.getByte(i);System.out.println((char)b);}
需要注意的是,使用那些需要一個索引值參數的方法來訪問數據既不會改變readerIndex 也不會改變writerIndex。如果有需要,也可以通過調用readerIndex(index)或者writerIndex(index)來手動移動這兩者。
通過數據反查索引
在ByteBuf中有多種可以用來確定指定值的索引的方法。最簡單的是使用indexOf()方法。較復雜的查找可以通過那些需要一個ByteBufProcessor作為參數的方法達成。這個接口只定義了一個方法:
boolean process(byte value)
它將檢查輸入值是否是正在查找的值。
ByteBufProcessor針對一些常見的值定義了許多便利的枚舉。假設你的應用程序需要和所謂的包含有以NULL結尾的內容的Flash套接字,可以調用:
forEach Byte(ByteBufProcessor.FIND_NUL)
如代碼清單展示了一個查找回車符(r)的索引的例子。:
ByteBuf buffer = ...;int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);
常規讀/寫操作
正如我們所提到過的,有兩種類別的讀/寫操作:
- get()和set()操作,從給定的索引開始,并且保持索引不變;
- read()和write()操作,從給定的索引開始,并且會根據已經訪問過的字節數對索引進行調整。
get()和set()操作
表5-1 列舉了最常用的get()方法。完整列表請參考對應的API 文檔。
這里面getBytes方法我們需要強調一下,比如buf.getBytes(buf.readerIndex(), array);表示將從buf實例的readerIndex為起點的數據傳入指定的目的地(一個數組中)。
read()和write()操作
現在,讓我們研究一下read()操作,其作用于當前的readerIndex 或writerIndex。這些方法將用于從ByteBuf 中讀取數據,如同它是一個流。表5-3 展示了最常用的方法。
幾乎每個read()方法都有對應的write()方法,用于將數據追加到ByteBuf 中。注意,表5-4 中所列出的這些方法的參數是需要寫入的值,而不是索引值
刪除已讀字節
正如我們之前看過的這張圖:
在上圖中標記為可丟棄字節的分段包含了已經被讀過的字節。通過調用discardReadBytes()方法,可以丟棄它們并回收空間。這個分段的初始大小為0,存儲在readerIndex 中,會隨著read 操作的執行而增加(get*操作不會移動readerIndex)。
上圖展示了下圖中所展示的緩沖區上調用discardReadBytes()方法后的結果。可以看到,可丟棄字節分段中的空間已經變為可寫的了。注意,在調用discardReadBytes()之后,對可寫分段的內容并沒有任何的保證。
雖然你可能會傾向于頻繁地調用discardReadBytes()方法以確保可寫分段的最大化,但是請注意,這將極有可能會導致內存復制,因為可讀字節(圖中標記為CONTENT 的部分)必須被移動到緩沖區的開始位置。我們建議只在有真正需要的時候才這樣做,例如,當內存非常寶貴的時候。
讀取可讀字節
ByteBuf 的可讀字節分段存儲了實際數據。新分配的、包裝的或者復制的緩沖區的默認的readerIndex 值為0。任何名稱以read 或者skip 開頭的操作都將檢索或者跳過位于當前readerIndex 的數據,并且將它增加已讀字節數。
以下代碼清單展示了如何讀取所有可以讀的字節。
ByteBuf buffer = ...;while (buffer.isReadable()) {System.out.println(buffer.readByte());}
寫數據
可寫字節分段是指一個擁有未定義內容的、寫入就緒的內存區域。新分配的緩沖區的writerIndex 的默認值為0。任何名稱以write 開頭的操作都將從當前的writerIndex 處開始寫數據,并將它增加已經寫入的字節數。如果嘗試往目標寫入超過目標容量的數據,將會引發一個IndexOutOfBoundException。
以下代碼清單是一個用隨機整數值填充緩沖區,直到它空間不足為止的例子。writeableBytes()方法在這里被用來確定該緩沖區中是否還有足夠的空間。
// Fills the writable bytes of a buffer with random integers.ByteBuf buffer = ...;//因為一個int為四個字節while (buffer.writableBytes() >= 4) {buffer.writeInt(random.nextInt());}
手動設置索引
JDK 的InputStream 定義了mark(int readlimit)和reset()方法,這些方法分別被用來將流中的當前位置標記為指定的值,以及將流重置到該位置。
同樣,可以通過調用markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex()來標記和重置ByteBuf 的readerIndex 和writerIndex。這些和InputStream 上的調用類似,只是沒有readlimit 參數來指定標記什么時候失效。
也可以通過調用readerIndex(int)或者writerIndex(int)來將索引移動到指定位置。試圖將任何一個索引設置到一個無效的位置都將導致一個IndexOutOfBoundsException。可以通過調用clear()方法來將readerIndex 和writerIndex 都設置為0。注意,這并不會清除內存中的內容。
調用clear()比調用discardReadBytes()輕量得多,因為它將只是重置索引而不會復制任何的內存。
復制指向緩存區的指針
派生緩沖區為ByteBuf 提供了以專門的方式來呈現該ByteBuf內容的視圖。這類視圖可以通過以下方法被創建的:
- duplicate();
- slice();獲取調用者的子緩沖區,且與原緩沖區共享緩沖區
- slice(int, int);獲取調用者的子緩沖區,且與原緩沖區共享緩沖區
- Unpooled.unmodifiableBuffer(…);
- order(ByteOrder);
- readSlice(int)。
每個這些方法都將返回一個新的ByteBuf 實例,它具有自己的讀索引、寫索引和標記索引。其內部存儲和JDK 的ByteBuffer一樣也是共享的。這使得派生緩沖區的創建成本是很低廉的,但是這也意味著,如果你修改了它的內容,也同時修改了其對應的源實例,所以要小心。
Charset utf8 = Charset.forName("UTF-8");//創建一個ByteBuf "Netty in Action"ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);//創建該ByteBuf 從索引0 開始到索引15結束的一個新切片ByteBuf sliced = buf.slice(0, 15);System.out.println(sliced.toString(utf8));//更新索引0 處的字節buf.setByte(0, (byte)'J');//將會成功,因為數據是共享的,對其中一個所做的更改對另外一個也是可見的assert buf.getByte(0) == sliced.getByte(0);
復制緩存區的內容
如果需要一個現有緩沖區的真實副本,請使用copy()或者copy(int, int)方法。不同于派生緩沖區,由這個調用所返回的ByteBuf 擁有獨立的數據副本。
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(utf8));
buf.setByte(0, (byte) 'J');
//將會成功,因為數據不是共享的
assert buf.getByte(0) != copy.getByte(0);
如果我們不修改原始ByteBuf 的切片或者副本,這兩種場景是相同的。只要有可能,我們盡量使用slice()方法來避免復制內存的開銷。
其他api
ByteBufHolder 接口
我們經常發現,除了實際的數據負載之外,我們還需要存儲各種屬性值。HTTP 響應便是一個很好的例子,除了表示為字節的內容,還包括狀態碼、cookie 等。
為了處理這種常見的用例,Netty 提供了ByteBufHolder,我們可以看看他的默認實現:
可以看出,它主要就是封裝了一個ByteBuf對象,以及對這個對象的一些操作api。現在假如我們要構造一個HTTP響應的對象,那么就可以在繼承ByteBufHolder的基礎上在拓展其他的比如狀態碼、cookie等字段,達到自己的目的。
它常用的api如下:
ByteBuf分配機制
在這一節中,我們將描述管理ByteBuf 實例的不同方式。
按需分配:ByteBufAllocator 接口
為了降低分配和釋放內存的開銷,Netty 通過interface ByteBufAllocator 實現了(ByteBuf 的)池化,它可以用來分配我們所描述過的任意類型的ByteBuf 實例。
關于ioBuffer,默認地,當所運行的環境具有sun.misc.Unsafe 支持時,返回基于直接內存存儲的ByteBuf,否則返回基于堆內存存儲的ByteBuf;當指定使用PreferHeapByteBufAllocator 時,則只會返回基于堆內存存儲的ByteBuf。
我們可以通過Channel(每個都可以有一個不同的ByteBufAllocator 實例)或者綁定到ChannelHandler 的ChannelHandlerContext 獲取一個到ByteBufAllocator 的引用。代碼清單5-14 說明了這兩種方法。
獲取一個到ByteBufAllocator 的引用
//從Channel 獲取一個到ByteBufAllocator 的引用
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
//從ChannelHandlerContext 獲取一個到ByteBufAllocator 的引用
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator2 = ctx.alloc();
...
Netty提供了兩種ByteBufAllocator的實現:PooledByteBufAllocator和UnpooledByteBufAllocator。前者池化了ByteBuf的實例以提高性能并最大限度地減少內存碎片。此實現使用了一種稱為jemalloc的已被大量現代操作系統所采用的高效方法來分配內存。后者的實現不池化ByteBuf實例,并且在每次它被調用時都會返回一個新的實例。
Netty默認使用了PooledByteBufAllocator
Unpooled 緩沖區
可能某些情況下,你未能獲取一個到ByteBufAllocator 的引用。對于這種情況,Netty 提供了一個簡單的稱為Unpooled 的工具類,它提供了靜態的輔助方法來創建未池化的ByteBuf實例。表5-8 列舉了這些中最重要的方法。
Unpooled 類還使得ByteBuf 同樣可用于那些并不需要Netty 的其他組件的非網絡項目,使得其能得益于高性能的可擴展的緩沖區API。
ByteBufUtil 類
ByteBufUtil 提供了用于操作ByteBuf 的靜態的輔助方法。因為這個API 是通用的,并且和池化無關,所以這些方法已然在分配類的外部實現。
這些靜態方法中最有價值的可能就是hexdump()方法,它以十六進制的表示形式打印ByteBuf 的內容。這在各種情況下都很有用,例如,出于調試的目的記錄ByteBuf 的內容。十六進制的表示通常會提供一個比字節值的直接表示形式更加有用的日志條目,此外,十六進制的版本還可以很容易地轉換回實際的字節表示。
另一個有用的方法是boolean equals(ByteBuf, ByteBuf),它被用來判斷兩個ByteBuf實例的相等性。如果你實現自己的ByteBuf 子類,你可能會發現ByteBufUtil 的其他有用方法。
引用計數
引用計數是一種通過在某個對象所持有的資源不再被其他對象引用時釋放該對象所持有的資源來優化內存使用和性能的技術。Netty 在第4 版中為ByteBuf 和ByteBufHolder 引入了引用計數技術,它們都實現了interface ReferenceCounted。
引用計數背后的想法并不是特別的復雜;它主要涉及跟蹤到某個特定對象的活動引用的數量。一個ReferenceCounted 實現的實例將通常以活動的引用計數為1 作為開始。只要引用計數大于0,就能保證對象不會被釋放。當活動引用的數量減少到0 時,該實例就會被釋放。注意,雖然釋放的確切語義可能是特定于實現的,但是至少已經釋放的對象應該不可再用了。
引用計數對于池化實現(如PooledByteBufAllocator)來說是至關重要的,它降低了內存分配的開銷。代碼清單5-15 展示了相關的示例。
//從Channel 獲取ByteBufAllocator
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
//從ByteBufAllocator分配一個ByteBuf
ByteBuf buffer = allocator.directBuffer();
//檢查引用計數是否為預期的1
assert buffer.refCnt() == 1;
...
//減少到該對象的活動引用。當減少到0 時,該對象被釋放,并且該方法返回true
ByteBuf buffer = ...;
boolean released = buffer.release();
試圖訪問一個已經被釋放的引用計數的對象,將會導致一個IllegalReferenceCountException。
注意,一個特定的(ReferenceCounted 的實現)類,可以用它自己的獨特方式來定義它的引用計數規則。例如,我們可以設想一個類,其release()方法的實現總是將引用計數設為零,而不用關心它的當前值,從而一次性地使所有的活動引用都失效。
誰負責釋放release呢 一般來說,是由最后訪問(引用計數)對象的那一方來負責將它釋放。在第6 章中,
我們將會解釋這個概念和ChannelHandler 以及ChannelPipeline 的相關性。