本篇介紹Netty調優,在上篇聊天室的案例中進行改造,手寫一個簡單的RPC實現。
1、超時時間參數
??????? CONNECT_TIMEOUT_MILLIS 是Netty的超時時間參數,屬于客戶端SocketChannel的參數,客戶端連接時如果一定時間沒有連接上,就會拋出 timeout 異常
??????? 如何在代碼中添加參數?在new Bootstrap()時使用
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
??????? 啟動客戶端,不啟動服務器,發現連接超時
??????? 打上斷點(選擇多線程模式):
??????? 這一行獲取到的值是創建BootStrap時添加CONNECT_TIMEOUT_MILLIS 的值(300)
int connectTimeoutMillis = config().getConnectTimeoutMillis();
???????? 滿足條件,進入If塊:
???????? 這是一個定時任務,延遲CONNECT_TIMEOUT_MILLIS 的值(300)后觸發,執行Runnable中的邏輯,拋出超時異常。
connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() {ChannelPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
??????? 主線程和NIO線程也是通過connectPromise進行異步通信的,兩個線程持有的是同一個connectPromise對象。
2、SO_BACKLOG
??????? SO_BACKLOG是一個與服務器套接字相關的參數,主要用于配置服務器套接字的接受隊列大小,屬于ServerSocketChannel的參數
什么是套接字?
套接字是計算機網絡中的一種通信端點,用于在兩個節點之間建立連接并進行數據傳輸,它包含了IP地址和端口號,通過這兩個標識符,網絡上的設備可以相互定位和通信。
套接字在客戶端和服務器的工作順序:
服務器端:
- 創建套接字。
- 綁定到指定的IP地址和端口號。
- 監聽連接請求。
- 接受連接,創建一個新的套接字用于與客戶端通信。
- 讀取或寫入數據。
客戶端:
- 創建套接字。
- 連接到服務器的IP地址和端口號。
- 讀取或寫入數據。
???????? 而SO_BACKLOG參數決定了服務器套接字在操作系統內核中維護的一個掛起連接隊列的最大長度。
????????當一個服務器應用程序啟動并監聽某個端口時,它會創建一個服務器套接字,用于等待客戶端的連接請求。
????????當客戶端嘗試連接服務器時,連接請求會首先進入服務器端的一個等待隊列,稱為掛起連接隊列。這個隊列中的連接請求還沒有被服務器應用程序正式接受處理。
????????在大多數操作系統中,套接字操作是由操作系統內核負責管理的。內核會為每個監聽中的服務器套接字維護一個掛起連接隊列。
??????? 如果參數設置的如果隊列已滿,新的連接請求將被拒絕或被操作系統忽略。
????????假設SO_BACKLOG設置為50,這意味著掛起連接隊列的最大長度是50。當第51個連接請求到達時,如果前面的請求還沒有被處理,新的請求將被拒絕。
3、ulimit -n
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
???????ulimit -n控制了操作系統中一個進程可以打開的最大文件描述符(file descriptor)數量。
什么是文件描述符?
文件描述符是操作系統內核用于管理打開文件的一個抽象概念,包括普通文件、套接字、管道等。每個打開的文件、網絡連接都會占用一個文件描述符。
??????? 為什么要設置最大文件描述符
????????Netty 是一個高性能的網絡框架,設計用于處理大量并發連接。如果文件描述符的限制太低,當連接數超過此限制時,服務器將無法接受新的連接,這將導致連接失敗。?
4、TCP_NODELAY
????????TCP_NODELAY 是 TCP 協議中的一個選項,用于控制 Nagle 算法的啟用或禁用。
????????Nagle 算法 在前篇中有所提及,簡單的說,當發送方有小數據包要發送時,如果前一個數據包的確認(ACK)尚未收到,Nagle 算法會將這些小數據包暫時存儲起來,直到收到前一個數據包的確認或足夠多的數據可以組成一個較大的數據包。
??????? 可以通過以下的代碼設置是否開啟Nagle 算法 ,同樣地,這個參數屬于ServerSocketChannel
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
??????? 什么場景下應該禁用Nagle 算法?
- 實時應用:在需要低延遲的實時應用中(例如在線游戲、實時通信應用)。
- 小數據包頻繁發送:如果應用程序頻繁發送小數據包,并且對每個數據包的發送延遲敏感。
5、SO_SNDBUF & SO_RCVBUF
??????? SO_SNDBUF & SO_RCVBUF 和SO_BACKLOG類似,也是與網絡套接字相關的兩個重要參數,用于配置發送和接收緩沖區的大小。
????????發送緩沖區用于臨時存儲應用程序要發送到網絡的數據。
????????接收緩沖區用于臨時存儲從網絡接收到的數據,直到應用程序讀取它們。
????????緩沖區過大可能增加延遲,因為數據在緩沖區中停留的時間更長;緩沖區過小可能導致頻繁的緩沖區溢出和數據包丟失。
??????? 可以通過以下的代碼進行設置:
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.SO_SNDBUF, 32 * 1024); // 發送緩沖區大小32KB
bootstrap.childOption(ChannelOption.SO_RCVBUF, 32 * 1024); // 接收緩沖區大小32KB
????????如何選擇合適的緩沖區大小
- 根據網絡帶寬和延遲:在高帶寬和高延遲的網絡環境中,需要更大的緩沖區來充分利用帶寬。例如,寬帶網絡和跨國連接可能需要更大的緩沖區。
- 根據應用需求:不同的應用有不同的需求。實時應用(如視頻流和在線游戲)通常需要較小的緩沖區以減少延遲,而大數據傳輸(如文件下載和大數據處理)可能需要較大的緩沖區以提高吞吐量。
- 測試和調優:最佳的緩沖區大小通常需要通過測試和調優來確定。可以通過逐步調整緩沖區大小并監測網絡性能來找到最佳配置。
6、ALLOCATOR
????????ALLOCATOR 參數用于配置 ByteBuf 分配器,ByteBuf的相關概念在前篇中也提到過,大致可以分為池化和非池化:
- PooledByteBufAllocator:池化分配器,重復使用內存以減少分配和釋放內存的開銷,適用于高并發和性能敏感的應用。
- UnpooledByteBufAllocator:非池化分配器,每次都進行新的內存分配,適用于內存使用模式不可預測的應用。
??????? 可以通過以下代碼進行設置:
ServerBootstrap bootstrap = new ServerBootstrap();// 使用池化分配器
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);// 使用非池化分配器
// bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
7、RCVBUF_ALLOCATOR
????????RCVBUF_ALLOCATOR 是一個用于管理接收緩沖區大小的機制,用于確定和管理網絡連接上每次讀取操作時分配的字節緩沖區的大小。
??????? 它是一個接口,常用的實現類有:
- FixedRecvByteBufAllocator:每次讀操作分配固定大小的緩沖區。
???
- DefaultMaxBytesRecvByteBufAllocator:一個可以限制每次讀取消息數量的實現。
- ?AdaptiveRecvByteBufAllocator:根據流量動態調整緩沖區大小,這是最常用的實現之一。 ?
8、RPC簡單實現
??????? 接下來會通過一個案例實現簡單的RPC框架。
什么是RPC框架?
RPC(Remote Procedure Call,遠程過程調用)框架是一種使程序能夠通過網絡調用遠程服務器上的函數或方法的技術。
在表面上這種調用方式對用戶是透明的,就像調用本地函數一樣簡單,但實際上底層會通過網絡協議進行通信。
????????8.1、1.0版 ?????
????????首先需要新增RPC的請求和響應消息:
@Data
public abstract class Message implements Serializable {// 省略舊的代碼public static final int RPC_MESSAGE_TYPE_REQUEST = 101;public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}
???????? 然后定義一個RPC請求消息類,在請求消息類中,包括了調用接口及接口中方法的信息:
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {/*** 調用的接口全限定名,服務端根據它找到實現*/private String interfaceName;/*** 調用接口中的方法名*/private String methodName;/*** 方法返回類型*/private Class<?> returnType;/*** 方法參數類型數組*/private Class[] parameterTypes;/*** 方法參數值數組*/private Object[] parameterValue;public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName = interfaceName;this.methodName = methodName;this.returnType = returnType;this.parameterTypes = parameterTypes;this.parameterValue = parameterValue;}@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}
??????? 再定義一個響應消息類,包括正常返回的值以及發生異常時的返回值。
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {/*** 返回值*/private Object returnValue;/*** 異常值*/private Exception exceptionValue;@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}
??????? 定義一個獲取配置文件中接口實現類的工廠類:
public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try (InputStream in = SerializedConfig.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Service")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}
????????application.properties?
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
?????? HelloService接口及實現類:
public interface HelloService {String sayHello(String name);
}
public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String msg) {
// int i = 1 / 0;return "你好, " + msg;}
}
??????? 準備RPC服務器端和客戶端的代碼,和聊天室案例類似,但是加上了對應的RPC請求消息和響應消息的處理器:
??????? 服務器端:
/**
*
* RPC服務器端
**/
@Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
??????? 客戶端:
/**
*
*RPC 客戶端
**/
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 響應消息處理器,待實現RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
??????? 先編寫服務器端的自定義RPC消息處理器RpcRequestMessageHandler :
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage responseMessage = new RpcResponseMessage();int sequenceId = message.getSequenceId();responseMessage.setSequenceId(sequenceId);try {//獲取RPC消息對象中將要調用的接口的實現類 寫在配置文件中HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));//獲取實現類中的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());//通過反射調用方法Object result = method.invoke(service, message.getParameterValue());responseMessage.setReturnValue(result);} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {e.printStackTrace();responseMessage.setExceptionValue(e);}//觸發出站事件ctx.writeAndFlush(responseMessage);}
}
??????? 通過main方法測試一下:
/*** 測試代碼* @param args* @throws ClassNotFoundException* @throws NoSuchMethodException* @throws InvocationTargetException* @throws IllegalAccessException*/public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {//封裝RPC消息對象RpcRequestMessage message = new RpcRequestMessage(1,"cn.itcast.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"張三"});//獲取RPC消息對象中將要調用的接口的實現類HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));//獲取實現類中的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());//調用方法Object result = method.invoke(service, message.getParameterValue());System.out.println(result);}
??????? 編寫客戶端的代碼以及自定義RPC消息返回處理器RpcResponseMessageHandler 暫時只將接收到的消息返回出去:
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
}
??????? 改造客戶端的代碼,發送調用方法請求:
ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.itcast.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"張三"})).addListener(promise -> {if (!promise.isSuccess()) {Throwable cause = promise.cause();log.error("error", cause);}});
??????? 它的執行順序是:
??????? 客戶端發送消息,觸發所有出站處理器:
??????? 然后到服務器:
??????? 在RpcRequestMessageHandler 中無論消息處理是否報錯,都會觸發出站處理器將返回值傳遞給客戶端:
??????? 最后再回到客戶端:
??????? 注意:LOGGING_HANDLER和MESSAGE_CODEC是雙向處理,既可以是入站,也可以是出站!
??????? 這樣一個簡單的RPC通信案例就已經實現了。
??????? 8.2、2.0版
??????? 但是在第一版中,用戶在客戶端發送調用請求時,需要自己封裝RpcRequestMessage 請求對象,參數復雜,換做是我是絕對不愿意這樣做的。那么我們對其進行優化。
??????? 改造客戶端,首先定義一個成員變量channel:
private static volatile Channel channel = null;
??????? 然后將原有客戶端的代碼抽取成一個初始化channel的方法:
private static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());//雙向事件ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);//入站事件ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (Exception e) {log.error("client error", e);}}
??????? 這個channel只應該存在一個實例,采用雙檢鎖單例的方式獲取:
/*** 初始化單例channel* @return*/private static Channel getChannel(){if (channel != null){return channel;}synchronized (LOCK){if (channel!=null){return channel;}initChannel();return channel;}}
??????? 復習一下,為什么要使用雙檢鎖模式?
??????? (成員位置的channel可以不用volatile關鍵字?此時的channel對象不是走構造方法new出來的)
???????? 然后創建一個代理對象,代理對象負責將請求參數打包并發送給遠程服務器:
public static <T> T getProxyService(Class<T> serviceClass){ClassLoader classLoader = serviceClass.getClassLoader();Class<?>[] interfaces = new Class[]{serviceClass};Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {//將方法調用轉換成消息對象int sequenceId = SequenceIdGenerator.nextId();RpcRequestMessage message = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);//發送消息Channel channel = getChannel();channel.writeAndFlush(message);//異步通信獲取結果DefaultPromise<Object> objectDefaultPromise = new DefaultPromise<>(channel.eventLoop());//向PROMISE中注冊ID和DefaultPromiseRpcResponseMessageHandler.PROMISES.put(sequenceId,objectDefaultPromise);//等待結果objectDefaultPromise.await();if (objectDefaultPromise.isSuccess()){return objectDefaultPromise.getNow();}else {throw new RuntimeException(objectDefaultPromise.cause());}});return (T) o;}
??????? 重點在于向客戶端接收服務器響應的RpcResponseMessageHandler? 中注冊自己的消息ID和promise對象。
//向PROMISE中注冊ID和DefaultPromise
RpcResponseMessageHandler.PROMISES.put(sequenceId,objectDefaultPromise);
??????? 這樣用戶只需要調用代理對象的方法就可以了:
public static void main(String[] args) {HelloService helloService = getProxyService(HelloService.class);System.out.println(helloService.sayHello("張三"));
}
??????? 同時需要修改客戶端接受服務器響應的RpcResponseMessageHandler ,去找到對應消息ID的promise對象,并且移除,然后根據服務器返回的結果寫入成功或異常情況,這時客戶端的
//等待結果
objectDefaultPromise.await();
??????? 獲取到了結果,進行最后的處理。
/*** 接受服務器的響應*/
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {/*** k:消息id* v:消息ID對應的promise對象*/public static final ConcurrentHashMap<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {int sequenceId = msg.getSequenceId();Promise<Object> promise = PROMISES.remove(sequenceId);Exception exceptionValue = msg.getExceptionValue();Object returnValue = msg.getReturnValue();if (exceptionValue == null) {promise.setSuccess(returnValue);} else {promise.setFailure(exceptionValue);}}
}