Netty基礎—6.Netty實現RPC服務三

大綱

1.RPC的相關概念

2.RPC服務調用端動態代理實現

3.Netty客戶端之RPC遠程調用過程分析

4.RPC網絡通信中的編碼解碼器

5.Netty服務端之RPC服務提供端的處理

6.RPC服務調用端實現超時功能

5.Netty服務端之RPC服務提供端的處理

(1)RPC服務提供端NettyServer

(2)基于反射調用請求對象的目標方法

(1)RPC服務提供端NettyRpcServer

public class ServiceConfig {private String serviceName;//調用方的服務名稱private Class serviceInterfaceClass;//服務接口類型private Class serviceClass;...
}public class NettyRpcServer {private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);private static final int DEFAULT_PORT = 8998;private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();private int port;public NettyRpcServer(int port) {this.port = port;}public void start() {logger.info("Netty RPC Server Starting...");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcDecoder(RpcRequest.class)).addLast(new RpcEncoder(RpcResponse.class)).addLast(new NettyRpcServerHandler(serviceConfigs));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);//到這一步為止,server啟動了而且監聽指定的端口號了ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty RPC Server started successfully, listened[" + port + "]");//進入一個阻塞的狀態,同步一直等待到你的server端要關閉掉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.error("Netty RPC Server failed to start, listened[" + port + "]");} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}//可以代理多個服務public void addServiceConfig(ServiceConfig serviceConfig) {this.serviceConfigs.add(serviceConfig);}public static void main(String[] args) {ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);nettyRpcServer.addServiceConfig(serviceConfig);nettyRpcServer.start();}
}

(2)基于反射調用請求對象的目標方法

//RpcRequest類需要修改字段調整為如下所示
public class RpcRequest implements Serializable {private String requestId;private String className;private String methodName;private Class[] parameterTypes;//參數類型private Object[] args;//參數值private String invokerApplicationName;//調用方的服務名稱private String invokerIp;//調用方的IP地址...
}public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {for (ServiceConfig serviceConfig : serviceConfigs) {String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();serviceConfigMap.put(serviceInterfaceClass, serviceConfig);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest)msg;logger.info("Netty RPC Server receives the request: " + rpcRequest);RpcResponse rpcResponse = new RpcResponse();rpcResponse.setRequestId(rpcRequest.getRequestId());try {//此時我們要實現什么呢?//我們需要根據RpcRequest指定的class,獲取到這個class//然后通過反射構建這個class對象實例//接著通過反射獲取到這個RpcRequest指定方法和入參類型的method//最后通過反射調用,傳入方法,拿到返回值//根據接口名字拿到接口實現類的名字后再獲取類ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());Class clazz = serviceConfig.getServiceClass();Object instance = clazz.newInstance();Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(instance, rpcRequest.getArgs());//把rpc調用結果封裝到響應里去rpcResponse.setResult(result);rpcResponse.setSuccess(RpcResponse.SUCCESS);} catch(Exception e) {logger.error("Netty RPC Server failed to response the request.", e);rpcResponse.setSuccess(RpcResponse.FAILURE);rpcResponse.setException(e);}ctx.write(rpcResponse);ctx.flush();logger.info("send RPC response to client: " + rpcResponse);}
}

6.RPC服務調用端實現超時功能

public class ReferenceConfig {private static final long DEFAULT_TIMEOUT = 5000;private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";private static final int DEFAULT_SERVICE_PORT = 8998;private Class serviceInterfaceClass;private String serviceHost;private int servicePort;private long timeout;...
}public class NettyRpcClient {private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);private ReferenceConfig referenceConfig;private ChannelFuture channelFuture;private NettyRpcClientHandler nettyRpcClientHandler;public NettyRpcClient(ReferenceConfig referenceConfig) {this.referenceConfig = referenceConfig;this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());}public void connect() {logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)//長時間沒有通信就發送一個檢測包.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcEncoder(RpcRequest.class)).addLast(new RpcDecoder(RpcResponse.class)).addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout())).addLast(nettyRpcClientHandler);}});       try {if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();logger.info("successfully connected.");}} catch(Exception e) {throw new RuntimeException(e);}}public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {//標記一下請求發起的時間NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());channelFuture.channel().writeAndFlush(rpcRequest).sync();RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());logger.info("receives response from netty rpc server.");if (rpcResponse.isSuccess()) {return rpcResponse;}throw rpcResponse.getException();}
}public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);private long timeout;public NettyRpcReadTimeoutHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse)msg;long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());long now = new Date().getTime();if (now - requestTime >= timeout) {rpcResponse.setTimeout(true);logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);}//移除發起請求時間的標記NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());ctx.fireChannelRead(rpcResponse);}
}public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();private long timeout;public NettyRpcClientHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse) msg;if (rpcResponse.getTimeout()) {logger.error("Netty RPC client receives the response timeout: " + rpcResponse);} else {rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);logger.info("Netty RPC client receives the response: " + rpcResponse);}}public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {long waitStartTime = new Date().getTime();while (rpcResponses.get(requestId) == null) {try {long now = new Date().getTime();if (now - waitStartTime >= timeout) {break;}Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);} catch (InterruptedException e) {logger.error("wait for response interrupted", e);}}RpcResponse rpcResponse = rpcResponses.get(requestId);if (rpcResponse == null) {logger.error("Get RPC response timeout.");throw new NettyRpcReadTimeoutException("Get RPC response timeout.");} else {rpcResponses.remove(requestId);}return rpcResponse;}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/73545.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/73545.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/73545.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

路由器與防火墻配置命令

路由器與防火墻配置命令 小明啊&#xff0c;你不是學計算機的嘛&#xff0c;叔叔家的路由器壞了&#xff0c;可以過來幫叔叔看看嗎 命令可以用縮寫&#xff0c;造就一堆容易造成歧義的縮寫&#xff0c;比如add是address的縮寫&#xff0c;sh是shutdown的縮寫。 默認為Cisco路…

Go語言進化之旅:從1.18到1.24的語法變革

文章目錄 里程碑變革&#xff1a;泛型支持Go 1.18&#xff1a;泛型的引入Go 1.19-1.21&#xff1a;泛型的完善Go 1.24&#xff1a;泛型類型別名全面支持 循環與迭代的進化Go 1.22&#xff1a;循環變量作用域變化與整數遍歷Go 1.23&#xff1a;迭代器函數的支持Go 1.24&#xff…

發現一個GoVCL的問題

之前用govcl寫了一個服務端的界面程序&#xff0c;用來控制服務的開啟和關閉。 由于這個服務程序運行的時間比較長&#xff0c;經常是掛著在服務器上24小時不間斷運行。 后來經過調試發現&#xff0c;govcl的界面按鈕控件&#xff0c;在程序長時間運行后&#xff0c;會出現無法…

34個適合機械工程及自動化專業【論文選題】

論文選題具有極其重要的意義&#xff0c;它直接關系到論文的質量、價值以及研究的可行性和順利程度。選題明確了研究的具體領域和核心問題&#xff0c;就像給研究旅程設定了方向和目的地。例如&#xff0c;選擇 “人工智能在醫療影像診斷中的應用” 這一選題&#xff0c;就確定…

電腦實用小工具--VMware常用功能簡介

一、創建、編輯虛擬機 1.1 創建新的虛擬機 詳見文章新創建虛擬機流程 1.2 編輯虛擬機 創建完成后&#xff0c;點擊編輯虛擬機設置&#xff0c;可對虛擬機內存、處理器、硬盤等各再次進行編輯設置。 二、虛擬機開關機 2.1 打開虛擬機 虛擬機創建成功后&#xff0c;點擊…

雙指針算法專題之——有效三角形的個數

文章目錄 題目介紹思路分析AC代碼 題目介紹 鏈接: 611. 有效三角形的個數 思路分析 如果判斷三個數能否構成一個三角形&#xff0c;相信大家都知道&#xff1a; 只要任意兩邊之和大于第三邊即可。 比如三條邊長度為a&#xff0c;b&#xff0c;c 那只要滿足 ab>c ac>b b…

Linux內核實時機制27 - RT調度器10 - RT throttling 帶寬控制下

文章目錄 1、初始化帶寬 init_rt_bandwidth1.1、init_rt_bandwidth2、定時器處理2.1、sched_rt_period_timer2.2、do_sched_rt_period_timer3、總結1、初始化帶寬 init_rt_bandwidth rt_runtime : 一個時間周期內的運行時間,超過則限流,默認值為0.95ms 1、init_rt_bandwidth…

1.5[hardware][day5]

Link類跳轉指令可以拆分為兩個部分&#xff0c;一個是跳轉&#xff0c;即下一個PC的生成&#xff0c;如果將分支條件的比較放到譯碼級來進行&#xff0c;則這部分只涉及取值級和譯碼級流水&#xff1b;另一個是Link操作&#xff0c;簡單來說就是寫寄存器&#xff0c;這部則主要…

Tomcat 與 Java 環境變量配置簡明教程

Tomcat 與 Java 環境變量配置簡明教程 一、Tomcat 環境變量配置 1. 確認安裝路徑 假設 Tomcat 安裝在&#xff1a;D:\Tomcat\apache-tomcat-9.0.70 2. 設置 CATALINA_HOME 步驟&#xff1a; 右鍵點擊「此電腦」→「屬性」點擊「高級系統設置」→「環境變量」在「系統變量…

3.16學習總結

學習了Java的知識點 基本數據類型 byte占1字節&#xff0c;儲存范圍-128~127 short占2字節&#xff0c;儲存范圍-32768~32767 int占4字節&#xff0c;儲存范圍-2147483648~2147483647 long占8字節&#xff0c;儲存范圍是-9223372036854775808~9223372036854775807 float占…

Android手機中各類安全相關知識總結

更多內容請見: 爬蟲和逆向教程-專欄介紹和目錄 文章目錄 1. Android 安全威脅2. Android 安全防護措施3. Android 安全建議和最佳實踐4. Android 安全工具推薦5. Android 安全常見問題5.1 如何檢測設備是否感染惡意軟件?5.2 如何防止應用濫用權限?5.3 如何保護設備免受網絡攻…

【Ratis】項目總覽

Apache Ratis 項目源碼分析與運行原理 Apache Ratis 是一個高性能、可擴展的分布式一致性協議實現,是對Raft協議的Java版本的很好的工程實現。它提供了靈活的 API 和多種傳輸層支持(如 gRPC 和 Netty),適用于構建分布式系統中的核心組件,例如分布式存儲、配置管理和服務發…

以太網 MAC 幀格式

文章目錄 以太網 MAC 幀格式以太網幀間隔參考 本文為筆者學習以太網對網上資料歸納整理所做的筆記&#xff0c;文末均附有參考鏈接&#xff0c;如侵權&#xff0c;請聯系刪除。 以太網 MAC 幀格式 以太網技術的正式標準是 IEEE 802.3&#xff0c;它規定了以太網傳輸數據的幀結…

pycharm配置鏡像源【pycharm最新版(23.2.5及以上)方法】

經常遇到pycharm中無法安裝或者安裝慢的問題&#xff0c;糾結了好久&#xff0c;終于找到這個解決辦法了。 為什么要配置鏡像源&#xff1a; 因為Python的包管理工具pip一般從PyPI&#xff08;Python Package Index&#xff09;下載安裝包&#xff0c;但是PyPI位于國外&#x…

駕馭 DeepSeek 科技之翼,翱翔現代學習新天際

在當今這個信息爆炸的時代&#xff0c;學習的方式和途徑正在經歷著前所未有的變革。人工智能技術的飛速發展&#xff0c;為我們的學習帶來了全新的機遇和挑戰。DeepSeek 作為一款強大的大語言模型&#xff0c;憑借其卓越的性能和豐富的功能&#xff0c;為現代學習注入了新的活力…

科普:WOE編碼與One-Hot編碼

WOE編碼是業務邏輯與統計建模的結合&#xff0c;適合強業務導向的場景&#xff1b; One-Hot編碼是數據驅動的特征工程&#xff0c;適合追求模型性能的場景。 編碼方式核心價值典型案例WOE編碼保留變量預測能力&#xff0c;適配線性模型銀行違約預測邏輯回歸One-Hot編碼釋放特征…

Python----數據分析(Pandas一:pandas庫介紹,pandas操作文件讀取和保存)

一、Pandas庫 1.1、概念 Pandas是一個開源的、用于數據處理和分析的Python庫&#xff0c;特別適合處理表格類數 據。它建立在NumPy數組之上&#xff0c;提供了高效的數據結構和數據分析工具&#xff0c;使得數據操作變得更加簡單、便捷和高效。 Pandas 的目標是成為 Python 數據…

Type-C:智能家居的電力革命與空間美學重構

在萬物互聯的時代浪潮中&#xff0c;家居空間正經歷著從功能容器到智慧終端的蛻變。當意大利設計師安東尼奧奇特里奧提出"消失的設計"理念二十年后&#xff0c;Type-C充電技術正以潤物無聲的方式重塑著現代家居的形態與內核&#xff0c;開啟了一場靜默的居住革命。 【…

C++ 左值(lvalue)和右值(rvalue)

在 C 中&#xff0c;左值&#xff08;lvalue&#xff09;和右值&#xff08;rvalue&#xff09;是指對象的不同類別&#xff0c;區分它們對于理解 C 中的表達式求值和資源管理非常重要&#xff0c;尤其在現代 C 中涉及到移動語義&#xff08;Move Semantics&#xff09;和完美轉…

【含文檔+PPT+源碼】基于SpringBoot和Vue的編程學習系統

項目介紹 本課程演示的是一款 基于SpringBoot和Vue的編程學習系統&#xff0c;主要針對計算機相關專業的正在做畢設的學生與需要項目實戰練習的 Java 學習者。 1.包含&#xff1a;項目源碼、項目文檔、數據庫腳本、軟件工具等所有資料 2.帶你從零開始部署運行本套系統 3.該項…