?netty系列文章:
01-netty基礎-socket |
02-netty基礎-java四種IO模型 |
03-netty基礎-多路復用select、poll、epoll |
04-netty基礎-Reactor三種模型 |
05-netty基礎-ByteBuf數據結構 |
06-netty基礎-編碼解碼 |
07-netty基礎-自定義編解碼器 |
08-netty基礎-自定義序列化和反序列化 |
09-netty基礎-手寫rpc-原理-01 |
10-netty基礎-手寫rpc-定義協議頭-02 |
11-netty基礎-手寫rpc-支持多序列化協議-03 |
12-netty基礎-手寫rpc-編解碼-04 |
13-netty基礎-手寫rpc-消費方生成代理-05 |
14-netty基礎-手寫rpc-提供方(服務端)-06 |
1 自定義編輯碼
編解碼都采用原生的ByteBuf,分別為MessageToByteEncoder、ByteToMessageDecoder;解決了拆包、粘包問題
編碼:將需要發送的數據封裝成RpcProtocol形式進行發送
解碼:將接收到的數據解釋成RpcProtocol形式然后處理相應的業務邏輯
2 代碼
2.1 編碼
package com.bonnie.protocol.code;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.serializer.ISerializer;
import com.bonnie.protocol.serializer.SerializerManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;/*** 編碼*/
@Slf4j
public class BonnieEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol<Object> msg, ByteBuf out) throws Exception {log.info("============begin BonnieEncoder=========");Header header = msg.getHeader();// 魔數out.writeShort(header.getMagic());// 序列化類型out.writeByte(header.getSerialType());// 消息類型out.writeByte(header.getReqType());// 請求idout.writeLong(header.getRequestId());// 消息體序列化ISerializer serializer = SerializerManager.getSerializer(header.getSerialType());byte[] contentByteArray = serializer.serialize(msg.getContent());System.out.println("body長度"+contentByteArray.length);// 消息體長度,4個字節out.writeInt(contentByteArray.length);System.out.println("發送數據:"+JSONObject.toJSONString(msg));// 寫入消息體out.writeBytes(contentByteArray);}}
2.2 解碼
package com.bonnie.protocol.code;import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.serializer.ISerializer;
import com.bonnie.protocol.serializer.SerializerManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.Objects;/*** 解碼*/
@Slf4j
public class BonnieDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {log.info("========begin BonnieDecoder==========");// 首先判斷可讀的字節是否小于頭的長度,如果小于,說明沒有body數據,甚至數據有問題,不解碼if (in.readableBytes()<= RpcConstant.HEAD_TOTOAL_LEN) {return;}// 標記讀取開始索引in.markReaderIndex();// 魔數short magic = in.readShort();if (!Objects.equals(magic, RpcConstant.MAGIC)) {throw new IllegalArgumentException("Illegal request parameter 'magic',"+magic);}// 序列化類型byte serialType = in.readByte();// 消息類型byte reqType = in.readByte();// 請求idlong requestId = in.readLong();// 報文長度int dataLength = in.readInt();// 可讀字節是否小于body的長度,如果小于,則不讀取,并且重置到讀指針的地方,等下一次讀if(in.readableBytes()<dataLength) {in.resetReaderIndex();return;}// 消息體byte[] bodyByteArray = new byte[dataLength];// body內容讀取到body中in.readBytes(bodyByteArray);// 封裝頭信息Header header = new Header();header.setMagic(magic);header.setSerialType(serialType);header.setReqType(reqType);header.setRequestId(requestId);header.setLength(dataLength);// 拿到對應的序列化ISerializer serializer = SerializerManager.getSerializer(serialType);/*** 根據請求類型,比如客戶端發送數據,就是REQUWST,服務端給客戶端回復數據就是RESPONSE,當然都是* 相對的,每一段都會發送REQUEST請求,每一段也會發送RESPONSE請求*/ReqTypeEnum reqTypeEnum = ReqTypeEnum.findByCode(reqType);switch (reqTypeEnum) {// 如果是請求報文 反序列化得到數據,封裝數據,繼續傳遞case REQUEST:RpcProtocol rpcProtocol = dealRequest(bodyByteArray, serializer, header);out.add(rpcProtocol);break;case RESPONSE:RpcProtocol rpcProtocolResponse = dealResponse(bodyByteArray, serializer, header);out.add(rpcProtocolResponse);break;case HEARTBEAT:// TODObreak;}}private RpcProtocol dealResponse(byte[] bodyByteArray, ISerializer serializer, Header header) {RpcResponse rpcResponse = serializer.deserialize(bodyByteArray, RpcResponse.class);RpcProtocol<RpcResponse> rpcProtocol = new RpcProtocol<>();rpcProtocol.setHeader(header);rpcProtocol.setContent(rpcResponse);return rpcProtocol;}private RpcProtocol dealRequest(byte[] bodyByteArray, ISerializer serializer, Header header) {RpcRequest rpcRequest = serializer.deserialize(bodyByteArray, RpcRequest.class);RpcProtocol<RpcRequest> rpcProtocol = new RpcProtocol<>();rpcProtocol.setHeader(header);rpcProtocol.setContent(rpcRequest);return rpcProtocol;}}