本文將為前面構建的輕量級 RPC 框架添加“服務注冊與發現”功能,支持多服務節點動態上線、自動感知與調用路由,為構建真正可擴展的分布式系統打好基礎。
一、背景:為什么需要注冊中心?
如果每個客戶端都硬編碼連接某個 IP/端口的服務:
-
不利于服務水平擴展(多實例)
-
無法實現負載均衡
-
服務上線/下線無法感知
? 有了注冊中心后:
-
服務啟動時自動注冊
-
客戶端從注冊中心獲取最新服務列表
-
可實現輪詢/哈希/權重等負載均衡
二、系統結構圖
┌──────────────┐
│ 注冊中心 │?────────────┐
│(服務發現) │ │
└─────┬────────┘ │▲ │注冊服務│ │
┌─────┴──────┐ ┌─────────┴─────────┐
│ 服務節點A │ │ 服務節點B │
└────┬───────┘ └─────────┬─────────┘│注冊 │注冊▼ ▼客戶端 ?──── 查詢服務地址列表 ─────┐└── 負載均衡調用 ────────┘
三、注冊中心(基于 Netty 實現)
public class RegisterCenterServer {private static final Map<String, List<InetSocketAddress>> serviceMap = new ConcurrentHashMap<>();public static void main(String[] args) throws Exception {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))).addLast(new ObjectEncoder()).addLast(new RegisterHandler());}});bootstrap.bind(9000).sync();System.out.println("注冊中心啟動成功");}static class RegisterHandler extends SimpleChannelInboundHandler<Object> {protected void channelRead0(ChannelHandlerContext ctx, Object msg) {if (msg instanceof RegisterRequest) {RegisterRequest req = (RegisterRequest) msg;serviceMap.computeIfAbsent(req.getServiceName(), k -> new ArrayList<>()).add(req.getAddress());ctx.writeAndFlush("SUCCESS");} else if (msg instanceof LookupRequest) {LookupRequest req = (LookupRequest) msg;List<InetSocketAddress> list = serviceMap.getOrDefault(req.getServiceName(), Collections.emptyList());ctx.writeAndFlush(list);}}}
}
四、服務節點注冊流程
服務端在啟動時將自己的信息注冊到注冊中心:
RegisterRequest req = new RegisterRequest();
req.setServiceName("helloService");
req.setAddress(new InetSocketAddress("127.0.0.1", 8080));Socket socket = new Socket("127.0.0.1", 9000);
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
out.writeObject(req);
五、客戶端服務發現
public class ServiceDiscovery {public List<InetSocketAddress> lookup(String serviceName) {try (Socket socket = new Socket("127.0.0.1", 9000)) {ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());out.writeObject(new LookupRequest(serviceName));ObjectInputStream in = new ObjectInputStream(socket.getInputStream());return (List<InetSocketAddress>) in.readObject();} catch (Exception e) {throw new RuntimeException("服務發現失敗", e);}}
}
六、負載均衡策略
public class LoadBalancer {public static InetSocketAddress choose(List<InetSocketAddress> list) {return list.get(new Random().nextInt(list.size())); // 簡單輪詢/隨機}
}
七、調用流程整合
ServiceDiscovery discovery = new ServiceDiscovery();
List<InetSocketAddress> providers = discovery.lookup("helloService");
InetSocketAddress address = LoadBalancer.choose(providers);// 使用 address 建立 Netty 連接,發送 RPC 請求
八、總結
通過本篇內容,我們為 Netty RPC 框架實現了:
? 多服務節點注冊與發現
? 基于 Netty 的輕量級注冊中心
? 動態服務列表查詢
? 簡易負載均衡支持