佛教中除不聽各種淫邪聲音之外,更不可聽別人的秘密。因他論耳根最到家,故取挖耳之形,以示耳根清凈。
來看看netty的核心組件解碼器Decoder
- ?Decoder的作用
- 半包,粘包問題
- 從模板和裝飾器模式看Decoder解碼原理
1.Decoder作用
?最根本的就是將網絡中的二進制數據,解析成為目標對象(Object)。
抽象出來本質就是上圖,其它任何動作都是在完成這個目的。
對吧,學習一定要抓住本質。
先來看看網絡數據再網絡和操作系統底層是怎么傳輸的:
a.網絡數據是以二進制的數據包進行傳輸的,在netty中是以bytefuf數據包進行傳遞。這個概念之? ???后? 章節會講到。
b.會涉及tcp/ipc傳輸協議。
?先來看看TCP/IP協議的數據封裝和分用過程,大致如下圖所示:
?
可以看到沒往下傳輸一層,會加上一個每層的首部。這個可以理解為,唐僧西天取經的過程中,通關文牒都要蓋一個章一樣。不加這個人家不認你。用的時候在解析出來報文體。
這里就不得不提一個概念MSS(TCP[數據包]每次能夠傳輸的最大數據分段)
后面在傳輸過程中,超過這個值,數據包會進行拆分;小于這個值,數據包會進行合并。
c.dma復制
簡單講 就是繞過cpu,直接操作內存在設備間傳輸數據。
具體流程如下:
dam傳輸數據時,cpu不參與,dma處理完之后通知cpu進行后續處理。
理解這幾個點之后,我們再來看一看正真的數據傳播流程:
1.首先發送端,在用戶緩存區里的bytebuf數據包會進行一次cpu復制;
2.cpu復制之后數據會緩存在發送緩沖區的內核空間里,這時候數據是完整的;
3.內核緩沖區進行一次DMA復制,數據被寫入網卡設備中,此時網卡里面的數據包會進行重組;
4.寫入網卡的數據通過TCP/IP協議進行傳輸,組裝成新的二進制數據包,有最大數據限制;
5.接收端通過TCP/IP協議接收到二進制數據包,此時數據是完整的;
6.接收端在內核緩沖區進行一次DMA復制,形成新的bytebuf數據包;
7.接收端進行一次cpu復制,bytebuf數據包被寫入用戶緩沖區。
具體流程如下圖:
這個過程會涉及兩次分割二進制包:
a.傳輸過程中的分割;
b.系統復制過程中的分割;
所以粘包,和半包問題的出現就發生在這兩個過程中,數據包少了,多個合在一起就會造成粘包。數據包多了,就會進行分割,分割就會打破原來的數據結構,出現半包問題。
來看看netty Decoder是怎么解決這個問題的
1.通過ByteToMessageDecoder 將二進制數據轉換成對象
2.通過MessageToMessageDecoder將對象數據轉換成對象
先來看看ByteToMessageDecoder 它是一個基類,主要使用模板方法接受管理bytebuf,子類通過實現鉤子方法,處理業務邏輯,處理完業務邏輯將寫入List<Object>中,之后在流水線上發送到下一站。流水線概念會在之后一章單獨講解。
public class Byte2IntegerDecoder extends ByteToMessageDecoder {//鉤子實現@Overridepublic void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {while (in.readableBytes() >= 4) {Integer anInt = in.readInt();Logger.info("解碼出一個整數: " + anInt);out.add(anInt);}}
}
來看看它的具體使用,繼承ByteToMessageDecoder,鉤子方法里實現業務邏輯。
回顧一下什么是模板方法:
核心點:抽象類會實現一系列公共方法共子類使用。
1.抽象類會定義一系列的模板方法,子類自動繼承。
2.子類通過鉤子方法處理具體業務邏輯。
好,我們再看一個它的核心子類ReplayingDecoder(回放解碼器)。
什么是回放呢?
我們讀取數據的時候是在操作一個二進制數組,數組元素完整的話,指針移動沒有問題。但是數據元素不完整,再次讀取的時候記數指針會回到開頭重新執行。
netty通過checkpoint指針來完成指針回放
再來看看ReplayingDecoder怎么結合模板模式和裝飾器模式完成整個解碼操作的:
使用模板模式主要是它繼承ByteToMessageDecoder父類解碼器,實現decode方法。重點看看包裝器的使用:
回放解碼器在解碼的時候會對傳進來的bytebuf進行一次包裝:
就是這個?replayable.setCumulation(in);
netty 實現了一個?ReplayingDecoderByteBuf,它繼承了ByteBuf,所以可以對對傳進來的bytebuf進行操作,這個過程就是包裝器在起作用。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {replayable.setCumulation(in);try {while (in.isReadable()) {int oldReaderIndex = checkpoint = in.readerIndex();int outSize = out.size();if (outSize > 0) {fireChannelRead(ctx, out, outSize);out.clear();
簡化一下看個簡單的包裝器的例子
public class WrapperModelDemo {// Pojo -- 被包裝的類型static class Pojo implements Sth {public void a() {System.out.println("a");}public void b() {System.out.println("b");}public void c() {System.out.println("c");}public void X() {System.out.println("ALL - X");}}// Wrapper模式static class PojoWrapper implements Sth {private Sth inner;protected PojoWrapper() {}public void setInner(Sth inner) {this.inner = inner;}@Overridepublic void a() {System.out.println("PojoWrapper - a");inner.a();}@Overridepublic void b() {System.out.println("PojoWrapper - a");inner.b();}@Overridepublic void c() {throw new UnsupportedOperationException("... c ");}@Overridepublic void X() {throw new UnsupportedOperationException("... X");}}@Testpublic void testWrapper() throws Exception {Sth pojo = new Pojo();pojo.a();pojo.b();pojo.c();PojoWrapper pojoWrapper = new PojoWrapper();pojoWrapper.setInner(pojo);// 可以嘗試注釋掉上面的某一行代碼, 查看輸出結果pojoWrapper.a();pojoWrapper.b();pojoWrapper.c();}}
運行結果:
核心是包裝類在實現sth接口時,又把它作為了一個屬性設置進來了。
這樣結合著看,對回放解碼器的解碼過程理解起來就會更加清楚。
現在來看看?MessageToMessageDecoder解碼器,它的傳入對象不再是一個bytebuf,而是一個具體的對象
public class Integer2StringDecoder extends MessageToMessageDecoder<Integer> {@Overridepublic void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {String target = String.valueOf(msg);out.add(target);}
}
它有幾個重要的子類:
LineBasedFrameDecoder:基于行的解碼器,通過換行符(\n 或者 \r\n)來作為幀的分隔符。當接收到數據時,它會在數據中查找換行符,一旦找到,就將從上次解碼位置到換行符之間的數據作為一個完整的幀進行解碼并返回。如果在接收到的數據中沒有找到換行符,那么數據會被緩存起來,直到換行符出現或者緩沖區滿了才進行處理。
DelimiterBasedFrameDecoder:基于分隔符的解碼器,它允許用戶自定義幀的分隔符。在接收到數據后,它會根據用戶提供的分隔符(一個或多個字節數組)在數據中進行查找。一旦找到分隔符,就將從上次解碼位置到分隔符之前的數據作為一個完整的幀返回。同樣,如果沒有找到分隔符,數據會被緩存,直到分隔符出現或緩沖區達到一定條件。
LengthFieldBasedFrameDecoder:通過消息中定義的長度字段來確定幀的長度。解碼器會首先讀取指定長度的字節,這些字節表示后續幀數據的長度。然后,根據這個長度值,從數據流中讀取相應長度的數據作為一個完整的幀。這種方式可以精確地定位每個幀的邊界,即使數據存在粘包和拆包的情況。
?下面是幾個具體實例,可以跑跑看看效果
public class NettyOpenBoxDecoder {public static final int MAGICCODE = 9999;public static final int VERSION = 100;static final String SPLITER = "\r\n";static final String SPLITER_3 = "\n";static final String SPLITER_2 = "\t";static final String CONTENT = "netty:18羅漢系列!";/*** LineBasedFrameDecoder 使用實例*/@Testpublic void testLineBasedFrameDecoder() {try {ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 0; j < 100; j++) {//1-3之間的隨機數int random = RandomUtil.randInMod(3);ByteBuf buf = Unpooled.buffer();for (int k = 0; k < random; k++) {buf.writeBytes(CONTENT.getBytes("UTF-8"));}buf.writeBytes(SPLITER.getBytes("UTF-8"));channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LineBasedFrameDecoder 使用實例*/@Testpublic void testDelimiterBasedFrameDecoder() {try {final ByteBuf delimiter = Unpooled.copiedBuffer(SPLITER_2.getBytes("UTF-8"));ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, true, delimiter));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 0; j < 100; j++) {//1-3之間的隨機數int random = RandomUtil.randInMod(3);ByteBuf buf = Unpooled.buffer();for (int k = 0; k < random; k++) {buf.writeBytes(CONTENT.getBytes("UTF-8"));}buf.writeBytes(SPLITER_2.getBytes("UTF-8"));channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LineBasedFrameDecoder 使用實例*/@Testpublic void testLengthFieldBasedFrameDecoder() {try {// 1、單字節(無符號):0到255;(有符號):-128到127。
//
//2、雙字節(無符號):0到65535;(有符號):-32768 到 32765。
//
//3、四字節(無符號):0到42 9496 7295;(有符號):-21 4748 3648到21 4748 3647。//定義一個 基于長度域解碼器final LengthFieldBasedFrameDecoder decoder =new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4);ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(decoder);ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 0; j < 100; j++) {//1-3之間的隨機數int random = RandomUtil.randInMod(3);ByteBuf buf = Unpooled.buffer();byte[] bytes = CONTENT.getBytes("UTF-8");//首先 寫入頭部 head,也就是后面的數據長度buf.writeInt(bytes.length * random);//然后 寫入contentfor (int k = 0; k < random; k++) {buf.writeBytes(bytes);}channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LengthFieldBasedFrameDecoder 使用實例*/@Testpublic void testLengthFieldBasedFrameDecoder1() {try {final LengthFieldBasedFrameDecoder spliter =new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4);ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(spliter);ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 1; j <= 100; j++) {ByteBuf buf = Unpooled.buffer();String s = j + "次發送->" + CONTENT;byte[] bytes = s.getBytes("UTF-8");buf.writeInt(bytes.length);System.out.println("bytes length = " + bytes.length);buf.writeBytes(bytes);channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LengthFieldBasedFrameDecoder 使用實例*/@Testpublic void testLengthFieldBasedFrameDecoder2() {try {final LengthFieldBasedFrameDecoder spliter =new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6);ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(spliter);ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 1; j <= 100; j++) {// 分配一個bytebufByteBuf buf = Unpooled.buffer();String s = j + "次發送->" + CONTENT;byte[] bytes = s.getBytes("UTF-8");//首先 寫入頭部 head,包括 后面的數據長度buf.writeInt(bytes.length);buf.writeChar(VERSION);//然后 寫入 contentbuf.writeBytes(bytes);channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LengthFieldBasedFrameDecoder 使用實例 3*/@Testpublic void testLengthFieldBasedFrameDecoder3() {try {final LengthFieldBasedFrameDecoder spliter =new LengthFieldBasedFrameDecoder(1024, 2, 4, 4, 10);ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(spliter);ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 1; j <= 100; j++) {ByteBuf buf = Unpooled.buffer();String s = j + "次發送->" + CONTENT;byte[] bytes = s.getBytes("UTF-8");buf.writeChar(VERSION);buf.writeInt(bytes.length);buf.writeInt(MAGICCODE);buf.writeBytes(bytes);channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}
最后總結一下:
本文主要是對netty解碼器的理解,核心點在 數據在網絡和操作系統間的傳播和netty結合模板模式裝飾器模式解決netty解碼過程中的數據問題。
預告一下,下一篇是18羅漢的第二章,netty編碼器。