大綱
1.RPC的相關概念
2.RPC服務調用端動態代理實現
3.Netty客戶端之RPC遠程調用過程分析
4.RPC網絡通信中的編碼解碼器
5.Netty服務端之RPC服務提供端的處理
6.RPC服務調用端實現超時功能
5.Netty服務端之RPC服務提供端的處理
(1)RPC服務提供端NettyServer
(2)基于反射調用請求對象的目標方法
(1)RPC服務提供端NettyRpcServer
public class ServiceConfig {private String serviceName;//調用方的服務名稱private Class serviceInterfaceClass;//服務接口類型private Class serviceClass;...
}public class NettyRpcServer {private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);private static final int DEFAULT_PORT = 8998;private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();private int port;public NettyRpcServer(int port) {this.port = port;}public void start() {logger.info("Netty RPC Server Starting...");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcDecoder(RpcRequest.class)).addLast(new RpcEncoder(RpcResponse.class)).addLast(new NettyRpcServerHandler(serviceConfigs));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);//到這一步為止,server啟動了而且監聽指定的端口號了ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty RPC Server started successfully, listened[" + port + "]");//進入一個阻塞的狀態,同步一直等待到你的server端要關閉掉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.error("Netty RPC Server failed to start, listened[" + port + "]");} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}//可以代理多個服務public void addServiceConfig(ServiceConfig serviceConfig) {this.serviceConfigs.add(serviceConfig);}public static void main(String[] args) {ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);nettyRpcServer.addServiceConfig(serviceConfig);nettyRpcServer.start();}
}
(2)基于反射調用請求對象的目標方法
//RpcRequest類需要修改字段調整為如下所示
public class RpcRequest implements Serializable {private String requestId;private String className;private String methodName;private Class[] parameterTypes;//參數類型private Object[] args;//參數值private String invokerApplicationName;//調用方的服務名稱private String invokerIp;//調用方的IP地址...
}public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {for (ServiceConfig serviceConfig : serviceConfigs) {String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();serviceConfigMap.put(serviceInterfaceClass, serviceConfig);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest)msg;logger.info("Netty RPC Server receives the request: " + rpcRequest);RpcResponse rpcResponse = new RpcResponse();rpcResponse.setRequestId(rpcRequest.getRequestId());try {//此時我們要實現什么呢?//我們需要根據RpcRequest指定的class,獲取到這個class//然后通過反射構建這個class對象實例//接著通過反射獲取到這個RpcRequest指定方法和入參類型的method//最后通過反射調用,傳入方法,拿到返回值//根據接口名字拿到接口實現類的名字后再獲取類ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());Class clazz = serviceConfig.getServiceClass();Object instance = clazz.newInstance();Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(instance, rpcRequest.getArgs());//把rpc調用結果封裝到響應里去rpcResponse.setResult(result);rpcResponse.setSuccess(RpcResponse.SUCCESS);} catch(Exception e) {logger.error("Netty RPC Server failed to response the request.", e);rpcResponse.setSuccess(RpcResponse.FAILURE);rpcResponse.setException(e);}ctx.write(rpcResponse);ctx.flush();logger.info("send RPC response to client: " + rpcResponse);}
}
6.RPC服務調用端實現超時功能
public class ReferenceConfig {private static final long DEFAULT_TIMEOUT = 5000;private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";private static final int DEFAULT_SERVICE_PORT = 8998;private Class serviceInterfaceClass;private String serviceHost;private int servicePort;private long timeout;...
}public class NettyRpcClient {private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);private ReferenceConfig referenceConfig;private ChannelFuture channelFuture;private NettyRpcClientHandler nettyRpcClientHandler;public NettyRpcClient(ReferenceConfig referenceConfig) {this.referenceConfig = referenceConfig;this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());}public void connect() {logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)//長時間沒有通信就發送一個檢測包.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcEncoder(RpcRequest.class)).addLast(new RpcDecoder(RpcResponse.class)).addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout())).addLast(nettyRpcClientHandler);}}); try {if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();logger.info("successfully connected.");}} catch(Exception e) {throw new RuntimeException(e);}}public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {//標記一下請求發起的時間NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());channelFuture.channel().writeAndFlush(rpcRequest).sync();RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());logger.info("receives response from netty rpc server.");if (rpcResponse.isSuccess()) {return rpcResponse;}throw rpcResponse.getException();}
}public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);private long timeout;public NettyRpcReadTimeoutHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse)msg;long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());long now = new Date().getTime();if (now - requestTime >= timeout) {rpcResponse.setTimeout(true);logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);}//移除發起請求時間的標記NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());ctx.fireChannelRead(rpcResponse);}
}public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();private long timeout;public NettyRpcClientHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse) msg;if (rpcResponse.getTimeout()) {logger.error("Netty RPC client receives the response timeout: " + rpcResponse);} else {rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);logger.info("Netty RPC client receives the response: " + rpcResponse);}}public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {long waitStartTime = new Date().getTime();while (rpcResponses.get(requestId) == null) {try {long now = new Date().getTime();if (now - waitStartTime >= timeout) {break;}Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);} catch (InterruptedException e) {logger.error("wait for response interrupted", e);}}RpcResponse rpcResponse = rpcResponses.get(requestId);if (rpcResponse == null) {logger.error("Get RPC response timeout.");throw new NettyRpcReadTimeoutException("Get RPC response timeout.");} else {rpcResponses.remove(requestId);}return rpcResponse;}
}