一、背景
二、Netty 單體架構的優缺點
????????優點
????????缺點
三、Netty 集群架構的優缺點
????????優點
????????缺點
四、適用場景對比
五、Netty單體架構代碼實現
六、Netty集群架構方案實現
????????方案一、Nginx負載均衡實現集群(較為簡單)
????????????????Nginx配置
????????????????前端連接方式
????????方案二、Nacos+Gateway(結合SpringCloud生態)
????????????????Netty服務
????????????????gateway網關服務
????????????????前端連接方式
????????方案三、Zookeeper + Redis + RabbitMQ方案實現
????????????????redis自動分配端口
????????????????Zookeeper實現Netty服務的注冊、在線人數
????????????????Chat-Web服務根據人數最少策略拿到Netty地址
????????????????RabbitMQ實現Netty服務對消息的監聽消費
????????????????Chat-Web服務監聽Zookeeper節點清理Redis與RabbitMQ殘留數據
七、結語
?一、背景
Netty 是一個基于 Java NIO 的高性能網絡應用框架,廣泛應用于高并發、低延遲的通信場景(如游戲服務器、即時通訊、RPC 框架等)。
單體架構:單臺服務器運行一個 Netty 實例,處理所有客戶端請求,適合輕量級應用或開發測試階段,資源集中但存在單點風險。
集群架構:多臺服務器協同工作,通過負載均衡、分布式通信等技術共同處理請求,提升性能和可靠性,通過橫向擴展解決性能瓶頸,適合高并發、高可用性要求的場景,但需處理分布式復雜性。
二、Netty 單體架構的優缺點
優點
簡單易用:無需考慮分布式協調、數據分片等問題,開發邏輯直接(如直接操作 Channel 和 EventLoop)。部署方便,一臺服務器即可運行,適合快速驗證業務邏輯。
低延遲通信:所有請求在同一進程內處理,避免網絡傳輸和序列化開銷,適合對延遲敏感的場景(如實時游戲)。
資源集中管理:共享線程池、緩存等資源,減少重復創建開銷。調試方便,可直接通過日志或調試工具定位問題。
成本低:無需額外負載均衡器或分布式中間件,硬件和運維成本較低。
缺點
單點故障風險:服務器宕機或網絡中斷會導致整個服務不可用,缺乏容災能力。
性能瓶頸:單臺服務器的 CPU、內存、網絡帶寬有限,無法支撐超大規模并發(如百萬級連接)。
擴展性差:垂直擴展(升級硬件)成本高,且受物理限制;水平擴展(增加服務器)需重構為集群架構。
維護困難:隨著業務增長,單體代碼可能變得臃腫,模塊間耦合度高,難以維護和迭代。
三、Netty 集群架構的優缺點
優點
高可用性:通過多節點部署和心跳檢測,實現故障自動轉移(如使用 ZooKeeper 或 etcd 管理節點狀態)。單節點故障不影響整體服務,適合金融、電商等對穩定性要求高的場景。
彈性擴展:水平擴展方便,通過增加服務器即可提升處理能力(如支持千萬級連接)。結合負載均衡(如 Nginx、LVS)或服務發現(如 Consul)動態分配流量。
負載均衡:請求均勻分發到多個節點,避免單節點過載,提升資源利用率。支持根據業務優先級或用戶特征進行智能路由(如灰度發布)。
數據一致性支持:結合分布式緩存(如 Redis)或數據庫分片,解決多節點數據同步問題。適合需要強一致性的場景(如訂單處理、支付系統)。
缺點
復雜性增加:需處理分布式事務、序列化、網絡分區(腦裂)等問題,開發難度顯著提升。需要引入中間件(如 Kafka、RocketMQ)或框架(如 Spring Cloud)協調節點間通信。
性能開銷:節點間通信需經過網絡傳輸和序列化/反序列化,增加延遲(如 gRPC 的 Protobuf 編碼)。負載均衡器可能成為瓶頸(如 Nginx 性能不足時需升級或分片)。
運維成本高:需監控多節點狀態、日志聚合(如 ELK)、分布式追蹤(如 SkyWalking)等。部署和升級需考慮滾動重啟、數據遷移等操作,流程復雜。
一致性挑戰:分布式環境下難以保證強一致性,需權衡 CAP 理論(如采用最終一致性模型)。需設計冪等、重試、補償機制應對網絡異常。
四、適用場景對比
?
選擇單體:若業務規模小、對延遲敏感且無需高可用,單體 Netty 是簡單高效的選擇。
選擇集群:若需支撐高并發、高可用或未來擴展,集群架構是必然趨勢,但需投入更多資源解決分布式問題。
五、Netty單體架構代碼實現
請參考:【Netty實戰】基于Netty+WebSocket的IM通信后臺服務代碼詳解-CSDN博客
六、Netty集群架構方案實現
方案一、Nginx負載均衡實現集群(較為簡單)
Nginx配置
http {upstream netty_cluster {server 192.168.1.101:875; # 節點1server 192.168.1.102:875; # 節點2ip_hash; # 基于客戶端IP的會話保持}server {listen 80;location /ws {proxy_pass http://netty_cluster;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}}
}
前端連接方式
const socket = new WebSocket("ws://your-nginx-ip:875/ws");
由于單節點不可能有全部的channel信息,后續的會話轉發可參考方案三中的RabbitMQ實現
方案二、Nacos+Gateway(結合SpringCloud生態)
Netty服務
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project><dependencies><!-- Netty核心 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.86.Final</version></dependency><!-- Nacos服務發現 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
</project>
application.yml?
server:port: 875 # Netty服務端口spring:application:name: netty-servicecloud:nacos:discovery:server-addr: nacos-server:8848namespace: prodephemeral: truenetty:websocket:path: /ws
啟動類?
@SpringBootApplication
@EnableDiscoveryClient
public class NettyServerApplication {public static void main(String[] args) {SpringApplication.run(NettyServerApplication.class, args);}@Beanpublic ApplicationRunner nettyStarter() {return args -> {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new WSServerInitializer()).bind(875).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}};}
}
gateway網關服務
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project><dependencies><!-- Spring Cloud Gateway --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency><!-- Nacos服務發現 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency></dependencies>
</project>
application.yml
server:port: 8080spring:application:name: api-gatewaycloud:gateway:discovery:locator:enabled: trueroutes:- id: netty-ws-routeuri: lb://netty-servicepredicates:- Path=/ws/**filters:- StripPrefix=1- name: RequestRateLimiterargs:redis-rate-limiter.replenishRate: 100redis-rate-limiter.burstCapacity: 200nacos:discovery:server-addr: nacos-server:8848
前端連接方式
// 通過Gateway連接
const socket = new WebSocket("ws://your-gateway-ip/ws");
由于單節點不可能有全部的channel信息,后續的會話轉發可參考方案三中的RabbitMQ實現
方案三、Zookeeper + Redis + RabbitMQ方案實現
redis自動分配端口
其實這里也可以將端口與在線人數放在Redis中,改成zookeeper方案可以不需要在中斷連接后,監聽并且清理在線人數和端口,因為netty與zk建立的臨時節點,中斷連接后,會自動刪除該臨時節點。
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.4.3</version>
</dependency>
/*** Jedis 連接池工具類*/
public class JedisPoolUtils {private static final JedisPool jedisPool;static {//配置連接池JedisPoolConfig poolConfig = new JedisPoolConfig();//最大連接數poolConfig.setMaxTotal(10);//最大空閑連接poolConfig.setMaxIdle(10);//最小空閑連接poolConfig.setMinIdle(5);//最長等待時間,mspoolConfig.setMaxWaitMillis(1500);//創建連接池對象jedisPool = new JedisPool(poolConfig,"127.0.0.1",6379,1000,"root");}public static Jedis getJedis(){return jedisPool.getResource();}}
// 動態分配端口
public static Integer selectPort(Integer port) {String portKey = "netty_port";Jedis jedis = JedisPoolUtils.getJedis();Map<String, String> portMap = jedis.hgetAll(portKey);System.out.println(portMap);// 由于map中的key都應該是整數類型的port,所以先轉換成整數后,再比對,否則string類型的比對會有問題List<Integer> portList = portMap.entrySet().stream().map(entry -> Integer.valueOf(entry.getKey())).collect(Collectors.toList());// step1: 編碼到此處先運行測試看一下結果System.out.println(portList);Integer nettyPort = null;if (portList == null || portList.isEmpty()) {// step2: 編碼到此處先運行測試看一下結果jedis.hset(portKey, port+"", initOnlineCounts);nettyPort = port;} else {// 循環portList,獲得最大值,并且累加10Optional<Integer> maxInteger = portList.stream().max(Integer::compareTo);Integer maxPort = maxInteger.get().intValue();Integer currentPort = maxPort + 10;jedis.hset(portKey, currentPort+"", initOnlineCounts);nettyPort = currentPort;}// step3: 編碼到此處先運行測試看一下最終結果return nettyPort;}// 刪除端口分配關系
public static void removePort(Integer port) {String portKey = "netty_port";Jedis jedis = JedisPoolUtils.getJedis();jedis.hdel(portKey, port+"");}
?這樣就可以在啟動類中自動分配端口
public static void main(String[] args) throws Exception {// 定義主從線程組// 定義主線程池,用于接受客戶端的連接,但是不做任何處理,比如老板會談業務,拉到業務就會交給下面的員工去做了EventLoopGroup bossGroup = new NioEventLoopGroup();// 定義從線程池,處理主線程池交過來的任務,公司業務員開展業務,完成老板交代的任務EventLoopGroup workerGroup = new NioEventLoopGroup();// Netty服務啟動的時候,從redis中查找有沒有端口,如果沒有則用875,如果有則把端口累加1(或10)再啟動Integer nettyPort = selectPort(875);try {// 構建Netty服務器ServerBootstrap server = new ServerBootstrap(); // 服務的啟動類server.group(bossGroup, workerGroup) // 把主從線程池組放入到啟動類中.channel(NioServerSocketChannel.class) // 設置Nio的雙向通道.childHandler(new WSServerInitializer()); // 設置處理器,用于處理workerGroup// 啟動server,并且綁定分配的端口號,同時啟動方式為"同步"ChannelFuture channelFuture = server.bind(nettyPort).sync();// 監聽關閉的channelchannelFuture.channel().closeFuture().sync();} finally {// 優雅的關閉線程池組bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();// 移除現有的redis與netty的端口關系removePort(nettyPort);}}
Zookeeper實現Netty服務的注冊、在線人數
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.5.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.5.0</version>
</dependency>
/*** Zookeeper 配置類*/
public class CuratorConfig {private static String host = "127.0.0.1:3191"; // 單機/集群的ip:port地址private static Integer connectionTimeoutMs = 30 * 1000; // 連接超時時間private static Integer sessionTimeoutMs = 3 * 1000; // 會話超時時間private static Integer sleepMsBetweenRetry = 2 * 1000; // 每次重試的間隔時間private static Integer maxRetries = 3; // 最大重試次數private static String namespace = "IM"; // 命名空間(root根節點名稱)// curator客戶端private static CuratorFramework client;static {// 聲明重試策略RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);// 聲明初始化客戶端client = CuratorFrameworkFactory.builder().connectString(host).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(backoffRetry).namespace(namespace).build();client.start(); // 啟動curator客戶端}public static CuratorFramework getClient() {return client;}}
/*** Netty服務節點類*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class NettyServerNode {private String ip; // IP地址private Integer port; // 服務端口private Integer onlineCounts = 0; // 在線人數}
/*** Zookeeper注冊工具類 - 用于注冊Netty服務節點和管理在線人數統計*/
public class ZookeeperRegister {/*** 注冊Netty服務到Zookeeper* @param nodeName 節點名稱(如服務名稱)* @param ip Netty服務IP地址* @param port Netty服務端口號* @throws Exception 可能拋出的異常*/public static void registerNettyServer(String nodeName,String ip,Integer port) throws Exception {// 獲取Zookeeper客戶端連接CuratorFramework zkClient = CuratorConfig.getClient();String path = "/" + nodeName;// 檢查父節點是否存在,不存在則創建持久化節點Stat stat = zkClient.checkExists().forPath(path);if (stat == null) {zkClient.create().creatingParentsIfNeeded() // 自動創建父節點.withMode(CreateMode.PERSISTENT) // 持久化節點.forPath(path);} else {System.out.println(stat.toString());}// 創建臨時順序節點存儲Netty服務信息(EPHEMERAL_SEQUENTIAL表示臨時順序節點)NettyServerNode serverNode = new NettyServerNode();serverNode.setIp(ip);serverNode.setPort(port);String nodeJson = JsonUtils.objectToJson(serverNode); // 對象轉JSONzkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) // 臨時順序節點.forPath(path + "/im-", nodeJson.getBytes()); // 節點路徑格式:/nodeName/im-0000000001}/*** 獲取本機IP地址* @return 本機IP地址* @throws Exception 可能拋出的異常*/public static String getLocalIp() throws Exception {InetAddress addr = InetAddress.getLocalHost();String ip = addr.getHostAddress();System.out.println("本機IP地址:" + ip);return ip;}/*** 增加在線人數統計* @param serverNode Netty服務節點信息* @throws Exception 可能拋出的異常*/public static void incrementOnlineCounts(NettyServerNode serverNode) throws Exception {dealOnlineCounts(serverNode, 1); // 增加1個在線人數}/*** 減少在線人數統計* @param serverNode Netty服務節點信息* @throws Exception 可能拋出的異常*/public static void decrementOnlineCounts(NettyServerNode serverNode) throws Exception {dealOnlineCounts(serverNode, -1); // 減少1個在線人數}/*** 處理在線人數的增減操作(核心方法)* @param serverNode Netty服務節點信息* @param counts 變化量(+1表示增加,-1表示減少)* @throws Exception 可能拋出的異常*/public static void dealOnlineCounts(NettyServerNode serverNode,Integer counts) throws Exception {// 獲取Zookeeper客戶端連接CuratorFramework zkClient = CuratorConfig.getClient();// 創建分布式讀寫鎖(防止并發修改問題)InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(zkClient, "/rw-locks");readWriteLock.writeLock().acquire(); // 獲取寫鎖try {String path = "/server-list";// 獲取所有子節點List<String> list = zkClient.getChildren().forPath(path);// 遍歷所有節點for (String node : list) {String pendingNodePath = path + "/" + node;// 獲取節點數據String nodeValue = new String(zkClient.getData().forPath(pendingNodePath));// 反序列化為NettyServerNode對象NettyServerNode pendingNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);// 匹配IP和端口的服務節點if (pendingNode.getIp().equals(serverNode.getIp()) &&(pendingNode.getPort().intValue() == serverNode.getPort().intValue())) {// 更新在線人數pendingNode.setOnlineCounts(pendingNode.getOnlineCounts() + counts);String nodeJson = JsonUtils.objectToJson(pendingNode);// 寫回ZookeeperzkClient.setData().forPath(pendingNodePath, nodeJson.getBytes());}}} finally {readWriteLock.writeLock().release(); // 釋放寫鎖}}
}
?然后啟動服務時將節點注冊到Zookeeper上
public static void main(String[] args) throws Exception {// 定義主從線程組// 定義主線程池,用于接受客戶端的連接,但是不做任何處理,比如老板會談業務,拉到業務就會交給下面的員工去做了EventLoopGroup bossGroup = new NioEventLoopGroup();// 定義從線程池,處理主線程池交過來的任務,公司業務員開展業務,完成老板交代的任務EventLoopGroup workerGroup = new NioEventLoopGroup();// Netty服務啟動的時候,從redis中查找有沒有端口,如果沒有則用875,如果有則把端口累加1(或10)再啟動Integer nettyPort = selectPort(875);// 注冊當前netty服務到zookeeper中ZookeeperRegister.registerNettyServer("server-list",ZookeeperRegister.getLocalIp(),nettyPort);try {// 構建Netty服務器ServerBootstrap server = new ServerBootstrap(); // 服務的啟動類server.group(bossGroup, workerGroup) // 把主從線程池組放入到啟動類中.channel(NioServerSocketChannel.class) // 設置Nio的雙向通道.childHandler(new WSServerInitializer()); // 設置處理器,用于處理workerGroup// 啟動server,并且綁定自動分配的端口號,同時啟動方式為"同步"ChannelFuture channelFuture = server.bind(nettyPort).sync();// 監聽關閉的channelchannelFuture.channel().closeFuture().sync();} finally {// 優雅的關閉線程池組bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();// 移除現有的redis與netty的端口關系removePort(nettyPort);}}
Chat-Web服務根據人數最少策略拿到Netty地址
<!-- zookeeper -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.9.2</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.3.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.3.0</version>
</dependency>
zookeeper:curator:host: 127.0.0.1:3191connectionTimeoutMs: 30000sessionTimeoutMs: 3000sleepMsBetweenRetry: 2000maxRetries: 3namespace: itzixi-im
@Component
@ConfigurationProperties(prefix = "zookeeper.curator")
@Data
public class CuratorConfig extends BaseInfoProperties {private String host; // 單機/集群的ip:port地址private Integer connectionTimeoutMs; // 連接超時時間private Integer sessionTimeoutMs; // 會話超時時間private Integer sleepMsBetweenRetry; // 每次重試的間隔時間private Integer maxRetries; // 最大重試次數private String namespace; // 命名空間(root根節點名稱)public static final String path = "/server-list";@Bean("curatorClient")public CuratorFramework curatorClient() {// 三秒后重連一次,只連一次//RetryPolicy retryOneTime = new RetryOneTime(3000);// 每3秒重連一次,重連3次//RetryPolicy retryNTimes = new RetryNTimes(3, 3000);// 每3秒重連一次,總等待時間超過10秒則停止重連//RetryPolicy retryPolicy = new RetryUntilElapsed(10 * 1000, 3000);// 隨著重試次數的增加,重試的間隔時間也會增加(推薦)RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);// 聲明初始化客戶端CuratorFramework client = CuratorFrameworkFactory.builder().connectString(host).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(backoffRetry).namespace(namespace).build();client.start(); // 啟動curator客戶端return client;}
@Resource(name = "curatorClient")private CuratorFramework zkClient;@PostMapping("getNettyOnlineInfo")public GraceJSONResult getNettyOnlineInfo() throws Exception {// 從zookeeper中獲得當前已經注冊的netty 服務列表String path = "/server-list";List<String> list = zkClient.getChildren().forPath(path);List<NettyServerNode> serverNodeList = new ArrayList<>();for (String node:list) {// System.out.println(node);String nodeValue = new String(zkClient.getData().forPath(path + "/" + node));// System.out.println(nodeValue);NettyServerNode serverNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);serverNodeList.add(serverNode);}// 計算當前哪個zk的node是最少人數連接,獲得[ip:port]并且返回給前端Optional<NettyServerNode> minNodeOptional = serverNodeList.stream().min(Comparator.comparing(nettyServerNode -> nettyServerNode.getOnlineCounts()));NettyServerNode minNode = minNodeOptional.get();return Result.ok(minNode);}
這樣前端就可以根據調用此接口獲得的Netty節點進行連接
RabbitMQ實現Netty服務對消息的監聽消費
我們這里將使用RabbitMQ的topic消息隊列將消息廣播到所有Netty服務,各個Netty服務進行查找要發送的用戶的channel,最終會有一臺找到了并且進行發送或者都沒找到存儲到數據庫。
當然我們也可以用Redis實現,只需要將用戶ID與Netty服務的節點進行綁定,當發送消息時去Redis找到要發送的用戶channel所在的節點,使用RabbitMQ發送到對應節點的隊列即可,可以不用廣播到所有Netty節點了。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
/*** RabbitMQ連接工具類 - 提供RabbitMQ連接池管理和消息收發功能*/
public class RabbitMQConnectUtils {// 連接池集合,用于復用連接private final List<Connection> connections = new ArrayList<>();// 連接池最大連接數限制private final int maxConnection = 20;// RabbitMQ服務器配置private final String host = "127.0.0.1";private final int port = 5682;private final String username = "root";private final String password = "1234";private final String virtualHost = "IM"; // 虛擬主機// RabbitMQ連接工廠public ConnectionFactory factory;/*** 獲取RabbitMQ連接工廠* @return ConnectionFactory實例*/public ConnectionFactory getRabbitMqConnection() {return getFactory();}/*** 獲取連接工廠(單例模式)* @return 初始化好的ConnectionFactory*/public ConnectionFactory getFactory() {initFactory();return factory;}/*** 初始化連接工廠配置*/private void initFactory() {try {if (factory == null) {factory = new ConnectionFactory();factory.setHost(host); // 設置主機地址factory.setPort(port); // 設置端口factory.setUsername(username); // 設置用戶名factory.setPassword(password); // 設置密碼factory.setVirtualHost(virtualHost); // 設置虛擬主機}} catch (Exception e) {e.printStackTrace();}}/*** 發送消息到RabbitMQ* @param message 消息內容* @param exchange 交換機名稱* @param routingKey 路由鍵* @throws Exception 可能拋出的異常*/public void sendMsg(String message, String exchange, String routingKey) throws Exception {// 從連接池獲取連接Connection connection = getConnection();// 創建通道Channel channel = connection.createChannel();// 發布消息(消息持久化)channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));// 關閉通道channel.close();// 歸還連接到連接池setConnection(connection);}/*** 從指定隊列獲取單條消息* @param queue 隊列名稱* @param autoAck 是否自動確認* @return GetResponse對象,包含消息內容* @throws Exception 可能拋出的異常*/public GetResponse basicGet(String queue, boolean autoAck) throws Exception {GetResponse getResponse = null;// 從連接池獲取連接Connection connection = getConnection();// 創建通道Channel channel = connection.createChannel();// 獲取消息getResponse = channel.basicGet(queue, autoAck);// 關閉通道channel.close();// 歸還連接到連接池setConnection(connection);return getResponse;}/*** 從連接池獲取連接* @return RabbitMQ連接* @throws Exception 可能拋出的異常*/public Connection getConnection() throws Exception {return getAndSetConnection(true, null);}/*** 歸還連接到連接池* @param connection 要歸還的連接* @throws Exception 可能拋出的異常*/public void setConnection(Connection connection) throws Exception {getAndSetConnection(false, connection);}/*** 監聽指定交換機的隊列消息(FANOUT模式)* @param fanout_exchange 交換機名稱* @param queueName 隊列名稱* @throws Exception 可能拋出的異常*/public void listen(String fanout_exchange, String queueName) throws Exception {// 獲取連接Connection connection = getConnection();// 創建通道Channel channel = connection.createChannel();// 聲明FANOUT類型交換機(持久化)channel.exchangeDeclare(fanout_exchange,BuiltinExchangeType.FANOUT,true, false, false, null);// 聲明隊列(持久化,非排他,非自動刪除)channel.queueDeclare(queueName, true, false, false, null);// 綁定隊列到交換機(FANOUT模式不需要路由鍵)channel.queueBind(queueName, fanout_exchange, "");// 創建消費者Consumer consumer = new DefaultConsumer(channel){/*** 消息處理回調方法* @param consumerTag 消費者標簽* @param envelope 消息信封(包含交換機和路由信息)* @param properties 消息屬性* @param body 消息體*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {// 解析消息內容String msg = new String(body);System.out.println("body = " + msg);String exchange = envelope.getExchange();System.out.println("exchange = " + exchange);// 處理fanout_exchange類型的消息if (exchange.equalsIgnoreCase("fanout_exchange")) {// 反序列化消息內容DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);String senderId = dataContent.getChatMsg().getSenderId();String receiverId = dataContent.getChatMsg().getReceiverId();// 1. 發送消息給接收者(支持多設備)List<io.netty.channel.Channel> receiverChannels =UserChannelSession.getMultiChannels(receiverId);UserChannelSession.sendToTarget(receiverChannels, dataContent);// 2. 同步消息給發送者的其他設備(排除當前設備)String currentChannelId = dataContent.getExtend();List<io.netty.channel.Channel> senderChannels =UserChannelSession.getMyOtherChannels(senderId, currentChannelId);UserChannelSession.sendToTarget(senderChannels, dataContent);}}};// 開始消費消息(自動確認模式)channel.basicConsume(queueName, true, consumer);}/*** 連接池核心管理方法(線程安全)* @param isGet true表示獲取連接,false表示歸還連接* @param connection 要歸還的連接(isGet為false時有效)* @return 獲取到的連接(isGet為true時有效)* @throws Exception 可能拋出的異常*/private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {// 確保連接工廠已初始化getRabbitMqConnection();if (isGet) {// 獲取連接邏輯if (connections.isEmpty()) {// 連接池為空,創建新連接return factory.newConnection();}// 從連接池取出第一個連接Connection newConnection = connections.get(0);connections.remove(0);// 檢查連接是否有效if (newConnection.isOpen()) {return newConnection;} else {// 連接已關閉,創建新連接return factory.newConnection();}} else {// 歸還連接邏輯if (connections.size() < maxConnection) {// 連接池未滿,回收連接connections.add(connection);}// 連接池已滿,不回收(連接會被自動關閉)return null;}}
}
修改ChatHandler信息處理類,消息不再在此類中處理,而是發給RabbitMQ
/*** ChatHandler類*/
// SimpleChannelInboundHandler: 對于請求來說,相當于入站(入境)
// TextWebSocketFrame: 用于為websocket專門處理的文本數據對象,Frame是數據(消息)的載體
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {// 用于記錄和管理所有客戶端的channel組public static ChannelGroup clients =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {// 獲得客戶端傳輸過來的消息String content = msg.text();System.out.println("接受到的數據:" + content);// 1. 獲取客戶端發來的消息并且解析DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);ChatMsg chatMsg = dataContent.getChatMsg();String msgText = chatMsg.getMsg();String receiverId = chatMsg.getReceiverId();String senderId = chatMsg.getSenderId();// 判斷是否黑名單 start// 如果雙方只要有一方是黑名單,則終止發送Result result = OkHttpUtil.get("http://127.0.0.1:1000/friendship/isBlack?friendId1st=" + receiverId+ "&friendId2nd=" + senderId);boolean isBlack = (Boolean)result.getData();System.out.println("當前的黑名單關系為: " + isBlack);if (isBlack) {return;}// 判斷是否黑名單 end// 時間校準,以服務器的時間為準chatMsg.setChatTime(LocalDateTime.now());Integer msgType = chatMsg.getMsgType();// 獲取channelChannel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();String currentChannelIdShort = currentChannel.id().asShortText();// 2. 判斷消息類型,根據不同的類型來處理不同的業務if (msgType == MsgTypeEnum.CONNECT_INIT.type) {// 當websocket初次open的時候,初始化channel,把channel和用戶userid關聯起來UserChannelSession.putMultiChannels(senderId, currentChannel);UserChannelSession.putUserChannelIdRelation(currentChannelId, senderId);NettyServerNode minNode = dataContent.getServerNode();// System.out.println(minNode);// 初次連接后,該節點下的在線人數累加ZookeeperRegister.incrementOnlineCounts(minNode);// 獲得ip+端口,在redis中設置關系,以便在前端設備斷線后減少在線人數Jedis jedis = JedisPoolUtils.getJedis();jedis.set(senderId, JsonUtils.objectToJson(minNode));} else if (msgType == MsgTypeEnum.WORDS.type|| msgType == MsgTypeEnum.IMAGE.type|| msgType == MsgTypeEnum.VIDEO.type|| msgType == MsgTypeEnum.VOICE.type) {// 此處為mq異步解耦,保存信息到數據庫,數據庫無法獲得信息的主鍵id,// 所以此處可以用snowflake直接生成唯一的主鍵idSnowflake snowflake = new Snowflake(new IdWorkerConfigBean());String sid = snowflake.nextId();System.out.println("sid = " + sid);String iid = IdWorker.getIdStr();System.out.println("iid = " + iid);chatMsg.setMsgId(sid);// 此處receiverId所對應的channel為空// 發送消息// List<Channel> receiverChannels = UserChannelSession.getMultiChannels(receiverId);// if (receiverChannels == null || receiverChannels.size() == 0 || receiverChannels.isEmpty()) {// receiverChannels為空,表示用戶離線/斷線狀態,消息不需要發送,后續可以存儲到數據庫// chatMsg.setIsReceiverOnLine(false);// } else {// chatMsg.setIsReceiverOnLine(true);if (msgType == MsgTypeEnum.VOICE.type) {chatMsg.setIsRead(false);}dataContent.setChatMsg(chatMsg);String chatTimeFormat = LocalDateUtils.format(chatMsg.getChatTime(),LocalDateUtils.DATETIME_PATTERN_2);dataContent.setChatTime(chatTimeFormat);// UserChannelSession.sendToTarget(receiverChannels, dataContent);// 通過RabbitMQ發送消息MessagePublisher.sendMsgToOtherNettyServer(JsonUtils.objectToJson(dataContent)); // 當receiverChannels為空不為空的時候,同賬戶多端設備接受消息// for (Channel c : receiverChannels) {// Channel findChannel = clients.find(c.id());// if (findChannel != null) {//// // if (msgType == MsgTypeEnum.VOICE.type) {// // chatMsg.setIsRead(false);// // }// // dataContent.setChatMsg(chatMsg);// // String chatTimeFormat = LocalDateUtils// // .format(chatMsg.getChatTime(),// // LocalDateUtils.DATETIME_PATTERN_2);// // dataContent.setChatTime(chatTimeFormat);// // 發送消息給在線的用戶// findChannel.writeAndFlush(// new TextWebSocketFrame(// JsonUtils.objectToJson(dataContent)));// }//// }// }// TODO: 消息持久化到數據庫(通過MQ異步處理或者其他方式)}// 此處也不需要了,都在mq的監聽中完成// dataContent.setChatMsg(chatMsg);// String chatTimeFormat = LocalDateUtils// .format(chatMsg.getChatTime(),// LocalDateUtils.DATETIME_PATTERN_2);// dataContent.setChatTime(chatTimeFormat);// dataContent.setExtend(currentChannelId);//// List<Channel> myOtherChannels = UserChannelSession// .getMyOtherChannels(senderId, currentChannelId);// UserChannelSession.sendToMyOthers(myOtherChannels, dataContent);// for (Channel c : myOtherChannels) {// Channel findChannel = clients.find(c.id());// if (findChannel != null) {// // dataContent.setChatMsg(chatMsg);// // String chatTimeFormat = LocalDateUtils// // .format(chatMsg.getChatTime(),// // LocalDateUtils.DATETIME_PATTERN_2);// // dataContent.setChatTime(chatTimeFormat);// // 同步消息給在線的其他設備端// findChannel.writeAndFlush(// new TextWebSocketFrame(// JsonUtils.objectToJson(dataContent)));// }// }// currentChannel.writeAndFlush(new TextWebSocketFrame(currentChannelId));// clients.writeAndFlush(new TextWebSocketFrame(currentChannelId));// 調試輸出當前會話狀態UserChannelSession.outputMulti();}/*** 客戶端連接到服務端之后(打開鏈接)* @param ctx* @throws Exception*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();System.out.println("客戶端建立連接,channel對應的長id為:" + currentChannelId);// 獲得客戶端的channel,并且存入到ChannelGroup中進行管理(作為一個客戶端群組)clients.add(currentChannel);}/*** 關閉連接,移除channel* @param ctx* @throws Exception*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();System.out.println("客戶端關閉連接,channel對應的長id為:" + currentChannelId);// 移除多余的會話String userId = UserChannelSession.getUserIdByChannelId(currentChannelId);UserChannelSession.removeUselessChannels(userId, currentChannelId);clients.remove(currentChannel);// zk中在線人數累減Jedis jedis = JedisPoolUtils.getJedis();NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),NettyServerNode.class);ZookeeperRegister.decrementOnlineCounts(minNode);}/*** 發生異常并且捕獲,移除channel* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {Channel currentChannel = ctx.channel();String currentChannelId = currentChannel.id().asLongText();System.out.println("發生異常捕獲,channel對應的長id為:" + currentChannelId);// 發生異常之后關閉連接(關閉channel)ctx.channel().close();// 隨后從ChannelGroup中移除對應的channelclients.remove(currentChannel);// 移除多余的會話String userId = UserChannelSession.getUserIdByChannelId(currentChannelId);UserChannelSession.removeUselessChannels(userId, currentChannelId);// zk中在線人數累減Jedis jedis = JedisPoolUtils.getJedis();NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),NettyServerNode.class);ZookeeperRegister.decrementOnlineCounts(minNode);}}public class RabbitMQConnectUtils {private final List<Connection> connections = new ArrayList<>();private final int maxConnection = 20;private final String host = "127.0.0.1";private final int port = 5682;private final String username = "root";private final String password = "1234";private final String virtualHost = "IM";public ConnectionFactory factory;public ConnectionFactory getRabbitMqConnection() {return getFactory();}public ConnectionFactory getFactory() {initFactory();return factory;}private void initFactory() {try {if (factory == null) {factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);factory.setVirtualHost(virtualHost);}} catch (Exception e) {e.printStackTrace();}}public void sendMsg(String message, String exchange, String routingKey) throws Exception {Connection connection = getConnection();Channel channel = connection.createChannel();channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));channel.close();setConnection(connection);}public GetResponse basicGet(String queue, boolean autoAck) throws Exception {GetResponse getResponse = null;Connection connection = getConnection();Channel channel = connection.createChannel();getResponse = channel.basicGet(queue, autoAck);channel.close();setConnection(connection);return getResponse;}public Connection getConnection() throws Exception {return getAndSetConnection(true, null);}public void setConnection(Connection connection) throws Exception {getAndSetConnection(false, connection);}public void listen(String fanout_exchange, String queueName) throws Exception {Connection connection = getConnection();Channel channel = connection.createChannel();// FANOUT 發布訂閱模式(廣播模式)channel.exchangeDeclare(fanout_exchange,BuiltinExchangeType.FANOUT,true, false, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, fanout_exchange, "");Consumer consumer = new DefaultConsumer(channel){/*** 重寫消息配送方法* @param consumerTag 消息的標簽(標識)* @param envelope 信封(一些信息,比如交換機路由等等信息)* @param properties 配置信息* @param body 收到的消息數據* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body);System.out.println("body = " + msg);String exchange = envelope.getExchange();System.out.println("exchange = " + exchange);if (exchange.equalsIgnoreCase("fanout_exchange")) {DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);String senderId = dataContent.getChatMsg().getSenderId();String receiverId = dataContent.getChatMsg().getReceiverId();// 廣播至集群的其他節點并且發送給用戶聊天信息List<io.netty.channel.Channel> receiverChannels =UserChannelSession.getMultiChannels(receiverId);UserChannelSession.sendToTarget(receiverChannels, dataContent);// 廣播至集群的其他節點并且同步給自己其他設備聊天信息String currentChannelId = dataContent.getExtend();List<io.netty.channel.Channel> senderChannels =UserChannelSession.getMyOtherChannels(senderId, currentChannelId);UserChannelSession.sendToTarget(senderChannels, dataContent);}}};/*** queue: 監聽的隊列名* autoAck: 是否自動確認,true:告知mq消費者已經消費的確認通知* callback: 回調函數,處理監聽到的消息*/channel.basicConsume(queueName, true, consumer);}private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {getRabbitMqConnection();if (isGet) {if (connections.isEmpty()) {return factory.newConnection();}Connection newConnection = connections.get(0);connections.remove(0);if (newConnection.isOpen()) {return newConnection;} else {return factory.newConnection();}} else {if (connections.size() < maxConnection) {connections.add(connection);}return null;}}
}
public class MessagePublisher {public static void sendMsgToOtherNettyServer(String msg) throws Exception {RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();String fanout_exchange = "fanout_exchange";connectUtils.sendMsg(msg, fanout_exchange, "");}
}
Chat-Web服務監聽Zookeeper節點清理Redis與RabbitMQ殘留數據
spring: rabbitmq:host: 127.0.0.1port: 5682username: rootpassword: 1234virtual-host: wechat-dev
刪除隊列只可以使用RabbitAdmin,RabbitTemplate無法刪除
/*** RabbitAdmin的配置類*/
@Configuration
public class RabbitAdminConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private Integer port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;/*** 構建RabbitMQ的連接工廠* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setPassword(username);connectionFactory.setUsername(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}/*** 構建RabbitAdmin* @param connectionFactory* @return*/@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){return new RabbitAdmin(connectionFactory);}}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryPolicy;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;/*** Zookeeper Curator 客戶端配置類* 功能:* 1. 初始化Curator客戶端連接* 2. 監聽Zookeeper節點變化* 3. 處理節點刪除時的清理工作(Redis/RabbitMQ)*/
@Slf4j
@Component
@ConfigurationProperties(prefix = "zookeeper.curator") // 從配置文件中讀取前綴為zookeeper.curator的屬性
@Data // Lombok注解,自動生成getter/setter
public class CuratorConfig extends BaseInfoProperties {// Zookeeper連接配置private String host; // Zookeeper服務器地址(格式:ip:port)private Integer connectionTimeoutMs; // 連接超時時間(毫秒)private Integer sessionTimeoutMs; // 會話超時時間(毫秒)private Integer sleepMsBetweenRetry; // 重試間隔時間(毫秒)private Integer maxRetries; // 最大重試次數private String namespace; // 命名空間(相當于根節點)// 監聽的Zookeeper路徑public static final String path = "/server-list";// Redis和RabbitMQ操作模板@Autowiredprivate RedisTemplate redisTemplate;@Resourceprivate RabbitAdmin rabbitAdmin;/*** 創建CuratorFramework客戶端Bean* @return 配置好的Curator客戶端實例*/@Bean("curatorClient")public CuratorFramework curatorClient() {// 使用指數退避策略進行重試(推薦)// 參數:初始重試間隔時間,最大重試次數RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);// 構建Curator客戶端CuratorFramework client = CuratorFrameworkFactory.builder().connectString(host) // Zookeeper服務器地址.connectionTimeoutMs(connectionTimeoutMs) // 連接超時時間.sessionTimeoutMs(sessionTimeoutMs) // 會話超時時間.retryPolicy(backoffRetry) // 重試策略.namespace(namespace) // 命名空間隔離.build();client.start(); // 啟動客戶端// 注冊節點監聽器add(path, client);return client;}/*** 注冊Zookeeper節點監聽器* @param path 監聽的節點路徑* @param client Curator客戶端實例*/public void add(String path, CuratorFramework client) {// 創建節點緩存CuratorCache curatorCache = CuratorCache.build(client, path);// 添加監聽器curatorCache.listenable().addListener((type, oldData, data) -> {// type: 事件類型(NODE_CREATED, NODE_CHANGED, NODE_DELETED)// oldData: 事件發生前的節點數據// data: 事件發生后的節點數據switch (type.name()) {case "NODE_CREATED":log.info("(子)節點創建");break;case "NODE_CHANGED":log.info("(子)節點數據變更");break;case "NODE_DELETED":log.info("(子)節點刪除");// 反序列化被刪除節點的數據NettyServerNode oldNode = JsonUtils.jsonToPojo(new String(oldData.getData()), NettyServerNode.class);log.info("被刪除節點路徑: {}, 節點值: {}", oldData.getPath(), oldNode);// 1. 清理Redis中的相關數據String oldPort = oldNode.getPort() + "";String portKey = "netty_port";redis.hdel(portKey, oldPort); // 刪除Redis中存儲的端口信息// 2. 刪除RabbitMQ中對應的隊列String queueName = "netty_queue_" + oldPort;rabbitAdmin.deleteQueue(queueName); // 刪除RabbitMQ隊列break;default:log.info("未處理的事件類型: {}", type);break;}});curatorCache.start(); // 啟動監聽}
}
七、結語
在分布式系統日益普及的今天,Netty 作為高性能網絡通信框架,其單體架構與集群架構的選擇需緊密結合業務需求、團隊能力和資源投入進行權衡。
單體架構?以?簡單、低延遲、低成本?為核心優勢,適合快速驗證、輕量級應用或資源受限的場景。然而,其?單點故障風險?和?性能天花板?決定了它難以支撐大規模并發或高可用性要求,長期來看可能成為業務增長的瓶頸。
集群架構?通過?分布式擴展、容災能力和負載均衡?解決了單體架構的痛點,是支撐高并發、高穩定性系統的關鍵方案。但隨之而來的是?復雜性提升、性能開銷增加?以及?運維成本高企?等挑戰,需要團隊具備分布式系統設計、監控治理和故障恢復的成熟經驗。
實踐建議:
初期優先單體:在業務初期或內部工具開發中,優先選擇單體架構以快速迭代,降低開發成本。
漸進式遷移:當并發量接近單機極限(如 10K+ 連接)或可用性要求提升時,通過服務拆分、網關層抽象或消息隊列(如 Kafka)逐步向集群過渡。
技術選型平衡:集群架構中需合理選擇負載均衡策略(如輪詢、最少連接)、序列化協議(如 Protobuf、JSON)和一致性模型(如最終一致性),避免過度設計。
關注可觀測性:集群環境下需加強日志聚合(ELK)、分布式追蹤(SkyWalking)和鏈路壓測,確保問題可定位、性能可優化。
最終目標:無論選擇單體還是集群,均應以?業務價值?為導向,避免為“分布式而分布式”。在技術復雜性與業務需求間找到平衡點,才能構建出既高效又穩定的網絡通信系統。
上述三種方案可大致實現Netty集群,如果有更高性能的方案或者疑問歡迎評論區留言討論!?