我們在上篇實現了一個輕量級 RPC 框架,現在要進一步優化 —— 加入異步響應支持,讓 RPC 通信變得真正高效、非阻塞、支持并發。
一、為什么需要異步調用?
上篇的 RPC 框架是“同步阻塞”的:
-
每次發送請求后,必須等待服務端響應后才能返回結果
-
對于高并發請求,會導致線程阻塞、資源浪費、吞吐低下
理想方案:客戶端請求后立即返回一個 Future 對象,等響應來了再異步設置結果,主線程不用阻塞。
二、核心機制:RequestId + ChannelFuture + Map 緩存
我們引入一個全局 ConcurrentHashMap<Long, CompletableFuture<RpcResponse>>:
public class RpcResponseRegistry {public static final Map<Long, CompletableFuture<RpcResponse>> futures = new ConcurrentHashMap<>();
}
每次發送請求時:
-
生成唯一 requestId
-
創建一個 CompletableFuture 存入
RpcResponseRegistry
-
異步監聽服務端響應,當收到響應時設置結果
三、發送請求(非阻塞)
public class NettyClient {private static AtomicLong REQUEST_ID_GEN = new AtomicLong();public static CompletableFuture<RpcResponse> send(RpcRequest request, Channel channel) {long requestId = REQUEST_ID_GEN.incrementAndGet();request.setRequestId(requestId);CompletableFuture<RpcResponse> future = new CompletableFuture<>();RpcResponseRegistry.futures.put(requestId, future);channel.writeAndFlush(request); // 不阻塞等待return future;}
}
四、接收響應并完成 Future
服務端處理完成后寫出響應:
ctx.writeAndFlush(response);
客戶端接收時通過 requestId
找到對應的 future 并完成它:
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) {long requestId = response.getRequestId();CompletableFuture<RpcResponse> future = RpcResponseRegistry.futures.remove(requestId);if (future != null) {future.complete(response);}}
}
五、客戶端調用示例
// 獲取動態代理
HelloService proxy = proxyFactory.getProxy(HelloService.class);// 發起異步調用
CompletableFuture<RpcResponse> future = NettyClient.send(request, channel);
future.thenAccept(response -> {System.out.println("異步結果:" + response.getResult());
});// 主線程可以干其他事情
System.out.println("我可以繼續做別的事情了!");
六、增強:自動類型轉換 + 異常處理
你可以在 Future 結果返回時:
-
自動封裝返回值類型(比如String,List<T>)
-
自動處理異常并打印日志
future.whenComplete((resp, ex) -> {if (ex != null) {System.err.println("RPC 異常: " + ex.getMessage());} else {System.out.println("返回結果: " + resp.getResult());}
});
七、總結
通過本篇的異步機制改造,我們的 Netty RPC 框架已經擁有了:
? 異步非阻塞調用
? CompletableFuture 響應映射
? 并發安全的請求響應管理
? 更高的吞吐能力和性能提升