本文將基于前文實現的編解碼與心跳機制,構建一個簡單的 RPC 框架,包括請求封裝、響應解析、動態代理調用。為打造微服務通信基礎打下基礎。
一、什么是 RPC?
RPC(Remote Procedure Call,遠程過程調用)允許你像調用本地方法一樣調用遠程服務。
RPC 框架的核心包括:
-
通信協議(我們用 Netty 實現)
-
服務注冊與發現(此處簡化為直連)
-
編碼/解碼機制(TLV、自定義協議)
-
動態代理與調用(Java 反射)
二、定義協議數據結構
請求對象
public class RpcRequest {private String className;private String methodName;private Class<?>[] paramTypes;private Object[] args;
}
響應對象
public class RpcResponse {private Object result;private Throwable error;
}
三、編碼器和解碼器(簡化版)
此處建議使用 Java 內置序列化或 JSON,避免自行實現復雜字節協議
public class RpcEncoder<T> extends MessageToByteEncoder<T> {@Overrideprotected void encode(ChannelHandlerContext ctx, T msg, ByteBuf out) throws Exception {byte[] data = SerializationUtil.serialize(msg); // 自定義序列化工具out.writeInt(data.length);out.writeBytes(data);}
}
public class RpcDecoder<T> extends ByteToMessageDecoder {private final Class<T> clazz;public RpcDecoder(Class<T> clazz) { this.clazz = clazz; }@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) return;in.markReaderIndex();int length = in.readInt();if (in.readableBytes() < length) {in.resetReaderIndex();return;}byte[] bytes = new byte[length];in.readBytes(bytes);out.add(SerializationUtil.deserialize(bytes, clazz));}
}
四、客戶端動態代理
public class RpcClientProxy {private final String host;private final int port;public RpcClientProxy(String host, int port) {this.host = host;this.port = port;}@SuppressWarnings("unchecked")public <T> T getProxy(Class<T> serviceClass) {return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),new Class<?>[]{serviceClass},(proxy, method, args) -> {RpcRequest request = new RpcRequest();request.setClassName(serviceClass.getName());request.setMethodName(method.getName());request.setParamTypes(method.getParameterTypes());request.setArgs(args);// Netty 同步發送請求,獲取響應(略)RpcResponse response = NettyClient.send(request, host, port);return response.getResult();});}
}
五、服務端調用分發
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) {RpcResponse response = new RpcResponse();try {Class<?> clazz = Class.forName(request.getClassName());Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());Object result = method.invoke(clazz.getDeclaredConstructor().newInstance(), request.getArgs());response.setResult(result);} catch (Exception e) {response.setError(e);}ctx.writeAndFlush(response);}
}
六、服務端注冊
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new RpcDecoder(RpcRequest.class));pipeline.addLast(new RpcEncoder<>(RpcResponse.class));pipeline.addLast(new RpcServerHandler());}});
bootstrap.bind(8080).sync();
七、調用示例
// 定義服務接口
public interface HelloService {String hello(String name);
}// 服務實現類
public class HelloServiceImpl implements HelloService {public String hello(String name) {return "Hello, " + name;}
}// 客戶端調用
RpcClientProxy proxy = new RpcClientProxy("localhost", 8080);
HelloService service = proxy.getProxy(HelloService.class);
System.out.println(service.hello("Netty"));
八、總結
通過本篇,你已經實現了:
-
基于 Netty 的 RPC 協議通信
-
編解碼框架構建
-
動態代理與服務遠程調用
-
基礎版 Netty RPC 框架原型
雖然還不完善(無注冊中心、無連接池、無異步支持),但已具備通信能力,是 Netty 技術棧走向分布式架構的重要一步。