第 5 篇 : 多節點Netty服務端(可擴展)

說明

前面消息互發以及廣播都是單機就可以完成測試, 但實際場景中客戶端的連接數量很大, 那就需要有一定數量的服務端去支撐, 所以準備虛擬機測試。

1. 虛擬機準備

1.1 準備1個1核1G的虛擬機(160), 配置java環境, 安裝redis和minio

1.2 準備6個1核1G的空虛擬機(161到166), 只需要java環境即可

2. 服務端改造

2.1 修改 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.hahashou.netty</groupId><artifactId>server</artifactId><version>1.0-SNAPSHOT</version><name>server</name><description>Netty Server Project For Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.100.Final</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.security</groupId><artifactId>spring-security-crypto</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

2.2 修改 application.yml (每個服務端的id是不一樣的)

server:port: 32000spring:redis:host: 192.168.109.160port: 6379password: rootlogging:level:com.hahashou.netty: infonetty:server:# 唯一標識(與hosts文件里對應)id : netty-server-1# 客戶端需要連接的端口port: 35000

2.3 config包下增加 NettyStatic類

package com.hahashou.netty.server.config;import io.netty.channel.Channel;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @description: 靜態常量* @author: 哼唧獸* @date: 9999/9/21**/
public class NettyStatic {/** key: 用戶code; value: channelId */public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);/** key: channelId; value: Channel */public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);public static Map<String, NettyClientHandler> NETTY_CLIENT_HANDLER = new ConcurrentHashMap<>(32);public static Map<NettyClientHandler, NettyClient> NETTY_CLIENT = new ConcurrentHashMap<>(32);
}

2.4 config包下增加 RedisConfig類

package com.hahashou.netty.server.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @description: Redis配置* @author: 哼唧獸* @date: 9999/9/21**/
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();// 使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());// 使用StringRedisSerializer來序列化和反序列化redis的keyredisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());// 開啟事務:redisTemplate.setEnableTransactionSupport(true); 我覺得一般用不到(該操作是為了執行一組命令而設置的)redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate;}@Beanpublic ValueOperations<String, Object> redisOperation(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForValue();}public static String NETTY_SERVER_LOCK = "NETTY_SERVER_LOCK";public static String NETTY_SERVER_LIST = "NETTY_SERVER_LIST";public static String OFFLINE_MESSAGE = "OFFLINE_MESSAGE_";
}

2.5 修改 EventLoopGroupConfig類

package com.hahashou.netty.server.config;import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** @description: Netty線程組* @author: 哼唧獸* @date: 9999/9/21**/
@Configuration
public class EventLoopGroupConfig {private int bossNum = 1;private int workerNum = 4;private int businessNum = 1;private int maxPending = 100000;/** ------------------------------ 服務端 ------------------------------ */@Bean("bossGroup")public NioEventLoopGroup bossGroup() {return new NioEventLoopGroup(bossNum);}@Bean("workerGroup")public NioEventLoopGroup workerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("businessGroup")public EventExecutorGroup businessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(),maxPending, RejectedExecutionHandlers.reject());}/** ------------------------------ 客戶端 ------------------------------ */@Bean("clientWorkerGroup")public NioEventLoopGroup clientWorkerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("clientBusinessGroup")public EventExecutorGroup clientBusinessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(), maxPending, RejectedExecutionHandlers.reject());}static class BusinessThreadFactory implements ThreadFactory {private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;BusinessThreadFactory() {SecurityManager securityManager = System.getSecurityManager();group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();namePrefix = "netty-server-";}@Overridepublic Thread newThread(Runnable runnable) {Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}
}

2.6 config包下增加 SpringBean類

package com.hahashou.netty.server.config;import io.netty.util.HashedWheelTimer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;/*** @description: Spring Bean管理* @author: 哼唧獸* @date: 9999/9/21**/
@Configuration
public class SpringBean {@Beanpublic PasswordEncoder passwordEncoder() {return new BCryptPasswordEncoder();}/*** 最多能new64個, private static final int INSTANCE_COUNT_LIMIT = 64;* @return*/@Beanpublic HashedWheelTimer hashedWheelTimer() {// 默認tick間隔100毫秒, 輪子大小為512return new HashedWheelTimer();}
}

2.7 server包下增加 ApplicationInitial類

package com.hahashou.netty.server;import com.hahashou.netty.server.config.NettyServer;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** @description: 應用初始化* @author: 哼唧獸* @date: 9999/9/21**/
@Component
@Slf4j
public class ApplicationInitial implements ApplicationRunner {@Resourceprivate HashedWheelTimer hashedWheelTimer;@Resourceprivate NettyServer nettyServer;@Overridepublic void run(ApplicationArguments args) {hashedWheelTimer.newTimeout(nettyServer, 1 , TimeUnit.SECONDS);}
}

2.8 修改 Message類

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.Getter;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@Data
public class Message {/** 廣播秘鑰 */private String secretKey;/** 發送者用戶code */private String userCode;/** 中轉的服務端Id */private String serverId;/** 接收者用戶code */private String friendUserCode;/** 連接時專用 */private String channelId;/** 消息類型 */private Integer type;public enum TypeEnum {TEXT(0, "文字", "", new ArrayList<>()),IMAGE(1, "圖片", "image", Arrays.asList("bmp", "gif", "jpeg", "jpg", "png")),VOICE(2, "語音", "voice", Arrays.asList("mp3", "amr", "flac", "wma", "aac")),VIDEO(3, "視頻", "video", Arrays.asList("mp4", "avi", "rmvb", "flv", "3gp", "ts", "mkv")),;@Getterprivate Integer key;@Getterprivate String describe;@Getterprivate String bucketName;@Getterprivate List<String> formatList;TypeEnum(int key, String describe, String bucketName, List<String> formatList) {this.key = key;this.describe = describe;this.bucketName = bucketName;this.formatList = formatList;}public static TypeEnum select(String format) {TypeEnum result = null;for (TypeEnum typeEnum : TypeEnum.values()) {if (typeEnum.getFormatList().contains(format)) {result = typeEnum;break;}}return result;}}/** 文字或文件的全路徑名稱 */private String text;public static ByteBuf transfer(Message message) {return Unpooled.copiedBuffer(JSON.toJSONString(message), CharsetUtil.UTF_8);}/*** 生成指定長度的隨機字符串* @param length* @return*/public static String randomString (int length) {if (length > 64) {length = 64;}List<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {list.add(i + "");}for (char i = 'A'; i <= 'Z'; i++) {list.add(String.valueOf(i));}for (char i = 'a'; i <= 'z'; i++) {list.add(String.valueOf(i));}list.add("α");list.add("ω");Collections.shuffle(list);String string = list.toString();return string.replace("[", "").replace("]", "").replace(", ", "").substring(0, length);}
}

2.9 config包下增加 NettyClientHandler類

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Getter@Setterprivate String userCode;@Getter@Setterprivate String hostName;@Getter@Setterprivate int port;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("{}, 作為客戶端, 與其他服務端連接", LocalDateTime.now());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {NettyStatic.CHANNEL.remove(ctx.channel().id().asLongText());NettyClientHandler nettyClientHandler = NettyStatic.NETTY_CLIENT_HANDLER.remove(hostName + "@" + port);NettyClient nettyClient = NettyStatic.NETTY_CLIENT.remove(nettyClientHandler);nettyClient = null;nettyClientHandler = null;System.gc();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String channelId = message.getChannelId(),text = message.getText();if (StringUtils.hasText(channelId)) {Channel channel = ctx.channel();message.setUserCode(userCode);NettyStatic.USER_CHANNEL.put(hostName, channelId);NettyStatic.CHANNEL.put(channelId, channel);channel.writeAndFlush(Message.transfer(message));} else if (StringUtils.hasText(text)) {String friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(message.getServerId())) {String queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}// 此時, 已不需要serverIdmessage.setServerId(null);channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}}}}/*** 離線消息存儲Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {}
}

2.10 config包下增加 NettyClient類

package com.hahashou.netty.server.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;import javax.annotation.PreDestroy;
import java.net.*;
import java.nio.charset.Charset;/*** @description: Netty-客戶端TCP服務* @author: 哼唧獸* @date: 9999/9/21**/
@Slf4j
public class NettyClient {@Getter@Setterprivate NioEventLoopGroup clientWorkerGroup;@Getter@Setterprivate EventExecutorGroup clientBusinessGroup;public void createClient(NettyClientHandler nettyClientHandler) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(clientWorkerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(clientBusinessGroup, nettyClientHandler);}});try {InetAddress inetAddress = InetAddress.getByName(nettyClientHandler.getHostName());SocketAddress socketAddress = new InetSocketAddress(inetAddress, nettyClientHandler.getPort());bootstrap.connect(socketAddress).sync().channel();} catch (UnknownHostException exception) {log.error("請檢查hosts文件是否配置正確 : {}", exception.getMessage());} catch (InterruptedException exception) {log.error("客戶端中斷異常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {clientWorkerGroup.shutdownGracefully().syncUninterruptibly();log.info("客戶端關閉成功");}
}

2.11 修改 NettyServer類

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @description: Netty-服務端TCP服務* @author: 哼唧獸* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyServer implements TimerTask {@Value("${netty.server.id}")private String serverId;@Value("${netty.server.port}")private int port;@Resourceprivate NioEventLoopGroup bossGroup;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyServerHandler nettyServerHandler;@Resourceprivate NioEventLoopGroup clientWorkerGroup;@Resourceprivate EventExecutorGroup clientBusinessGroup;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate HashedWheelTimer hashedWheelTimer;@Overridepublic void run(Timeout timeout) {Object nettyServerLock = redisOperation.get(RedisConfig.NETTY_SERVER_LOCK);if (nettyServerLock != null) {hashedWheelTimer.newTimeout(this, 10, TimeUnit.SECONDS);return;}try {redisOperation.set(RedisConfig.NETTY_SERVER_LOCK, true);//String hostAddress = InetAddress.getLocalHost().getHostAddress();ServerBootstrap serverBootstrap = new ServerBootstrap();ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyServerHandler);}})// 服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true)// 此處有個大坑, 詳見文章脫坑指南.bind(port).sync();if (channelFuture.isSuccess()) {log.info("{} 啟動成功", serverId);redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}thisNodeHandle(port);channelFuture.channel().closeFuture().sync();} catch (InterruptedException exception) {log.error("{} 啟動失敗: {}", serverId, exception.getMessage());} finally {redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}}private void thisNodeHandle(int port) {Set<String> nodeList = new HashSet<>();Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);if (nettyServerList != null) {nodeList = new HashSet<>(JSON.parseArray(nettyServerList.toString(), String.class));for (String hostAndPort : nodeList) {String[] split = hostAndPort.split("@");String connectHost = split[0];int connectPort = Integer.parseInt(split[1]);NettyClient nettyClient = new NettyClient();nettyClient.setClientWorkerGroup(clientWorkerGroup);nettyClient.setClientBusinessGroup(clientBusinessGroup);NettyClientHandler nettyClientHandler = new NettyClientHandler();nettyClientHandler.setUserCode(serverId);nettyClientHandler.setHostName(connectHost);nettyClientHandler.setPort(connectPort);nettyClient.createClient(nettyClientHandler);NettyStatic.NETTY_CLIENT_HANDLER.put(connectHost + "@" + connectPort, nettyClientHandler);NettyStatic.NETTY_CLIENT.put(nettyClientHandler, nettyClient);}}nodeList.add(serverId + "@" + port);redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(nodeList));}public void stop() {bossGroup.shutdownGracefully().syncUninterruptibly();workerGroup.shutdownGracefully().syncUninterruptibly();log.info("TCP服務關閉成功");Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);List<String> hostList = JSON.parseArray(nettyServerList.toString(), String.class);hostList.remove(serverId + "@" + port);if (CollectionUtils.isEmpty(hostList)) {redisTemplate.delete(RedisConfig.NETTY_SERVER_LIST);} else {redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(hostList));}}@PreDestroypublic void destroy() {stop();}
}

2.12 修改 NettyServerHandler類

package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Value("${netty.server.id}")private String serverId;public static String SERVER_PREFIX = "netty-server-";@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asLongText();log.info("有客戶端連接, channelId : {}", channelId);NettyStatic.CHANNEL.put(channelId, channel);Message message = new Message();message.setChannelId(channelId);channel.writeAndFlush(Message.transfer(message));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {String channelId = ctx.channel().id().asLongText();log.info("有客戶端斷開連接, channelId : {}", channelId);NettyStatic.CHANNEL.remove(channelId);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {if (entry.getValue().equals(channelId)) {redisTemplate.delete(entry.getKey());break;}}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String userCode = message.getUserCode(),channelId = message.getChannelId(),friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {connect(userCode, channelId);} else if (StringUtils.hasText(message.getText())) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();message.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(message);} else {sendAdmin(ctx.channel(), message);}} else {offlineMessage(friendUserCode, message);}}}}/*** 建立連接* @param userCode* @param channelId*/private void connect(String userCode, String channelId) {log.info("{} 連接", userCode);NettyStatic.USER_CHANNEL.put(userCode, channelId);if (!userCode.startsWith(SERVER_PREFIX)) {redisOperation.set(userCode, serverId);}}/*** 發送給其他客戶端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 進行轉發");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 離線消息存儲Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {// 1條message在redis中大概是100B, 1萬條算1M, redis.conf的maxmemory設置的是256MList<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}/*** 發送給服務端* @param channel* @param message*/private void sendAdmin(Channel channel, Message message) {message.setUserCode("ADMIN");message.setText(LocalDateTime.now().toString());channel.writeAndFlush(Message.transfer(message));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("有客戶端發生異常, channelId : {}", ctx.channel().id().asLongText());}
}

2.13 新建service包, 并新增 ServerService接口

package com.hahashou.netty.server.service;import com.hahashou.netty.server.config.Message;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
public interface ServerService {/*** 發送消息* @param dto*/void send(Message dto);/*** 停止服務(為后續斷線重連做準備)*/void stop();
}

2.14 service包下新建impl包, 并新增 ServerServiceImpl類

package com.hahashou.netty.server.service.impl;import com.alibaba.fastjson.JSON;
import com.hahashou.netty.server.config.*;
import com.hahashou.netty.server.service.ServerService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@Service
@Slf4j
public class ServerServiceImpl implements ServerService {@Value("${netty.server.id}")private String serverId;@Resourceprivate PasswordEncoder passwordEncoder;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate NettyServer nettyServer;@Overridepublic void send(Message dto) {String friendUserCode = dto.getFriendUserCode();if (StringUtils.hasText(friendUserCode)) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();dto.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(dto);}} else {offlineMessage(friendUserCode, dto);}} else {// 全體廣播, 需要校驗秘鑰(inputSecretKey應該是一個動態值, 通過手機+驗證碼每次廣播時獲取, 自行實現)String inputSecretKey = dto.getSecretKey();// encodedPassword生成見main方法String encodedPassword = "$2a$10$J/UEqtme/w2D0TWB4gJKFeSsyc3s8pepr6ahzOsORkC9zpaLSvZbG";if (StringUtils.hasText(inputSecretKey) && passwordEncoder.matches(inputSecretKey, encodedPassword)) {dto.setSecretKey(null);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {String key = entry.getKey();if (key.startsWith(NettyServerHandler.SERVER_PREFIX)) {// 這里可以用http調用其他服務端, 自行補充(信息redis都有)continue;}// 只處理連接本端的客戶端String value = entry.getValue();Channel channel = NettyStatic.CHANNEL.get(value);if (channel == null) {offlineMessage(friendUserCode, dto);return;}channel.writeAndFlush(Message.transfer(dto));}}}}public static void main(String[] args) {String text = "uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU";PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();String encode = passwordEncoder.encode(text);log.info(encode);if (passwordEncoder.matches(text, encode)) {log.info("秘鑰正確");}}/*** 發送給其他客戶端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 進行轉發");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 離線消息存儲Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void stop() {nettyServer.stop();}
}

2.15 修改 ServerController類

package com.hahashou.netty.server.controller;import com.hahashou.netty.server.config.Message;
import com.hahashou.netty.server.service.ServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@RestController
@RequestMapping("/server")
@Slf4j
public class ServerController {@Resourceprivate ServerService serverService;/*** 秘鑰記錄: uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU* @param dto* @return*/@PostMapping("/send")public String send(@RequestBody Message dto) {serverService.send(dto);return "success";}@GetMapping("/stop")public String stop() {serverService.stop();return "stop netty success";}
}

3. 脫坑指南, 針對 NettyServer類

工具

yum -y install net-tools
netstat -tunlp

防火墻打開時, 當使用 bind(String inetHost, int inetPort) 方法時, 因為inetHost是127.0.0.1, 所以只有本機可以訪問35000, 要想讓其他機器可以連接到, 需使用 bind(int inetPort) 方法, 下圖是前后兩次端口占用情況
端口占用情況
結論
當使用bind(String inetHost, int inetPort)方法時, 無論防火墻關閉以及啟動, 虛擬機均有問題; 但當機器有公網IP, 且防火墻關閉或端口開放時, 通過DNS解析映射是沒有問題的, 建議還是用bind(int inetPort)方法

4. 服務端準備

4.1 打包3個服務端的jar包, id分別為netty-server-1、netty-server-2、netty-server-3, 分別放在161到163上

4.2 161、162、163端口開放

firewall-cmd --zone=public --add-port=35000/tcp --permanent
firewall-cmd --zone=public --add-port=32000/tcp --permanent
firewall-cmd --reload

4.3 161、162、163修改hosts

vi /etc/hosts

追加內容

192.168.109.161 netty-server-1
192.168.109.162 netty-server-2
192.168.109.163 netty-server-3

4.4 依次啟動161、162、163

java -Dfile.encoding=UTF-8 -jar server-1.0-SNAPSHOT.jar

161
服務端1啟動
162
服務端2啟動
163
服務端3啟動
redis中記錄的服務列表
redis中記錄的服務列表

5. 客戶端改造

5.1 修改 application.yml

server:port: 32001logging:level:com.hahashou.netty: infospring:servlet:multipart:max-file-size: 128MBmax-request-size: 256MBuserCode: Aa
host: 192.168.109.161minio:endpoint: http://192.168.109.160:9000accessKey: rootsecretKey: root123456

5.2 修改 NettyClient類

package com.hahashou.netty.client.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;/*** @description: Netty-TCP服務* @author: 哼唧獸* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ApplicationStartedEvent> {@Value("${host}")private String host;public static int PORT = 35000;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyClientHandler nettyClientHandler;public static Channel CHANNEL;@SneakyThrows@Overridepublic void onApplicationEvent(ApplicationStartedEvent event) {createClient(workerGroup, businessGroup, nettyClientHandler, host, PORT);}public void createClient(NioEventLoopGroup workerGroup, EventExecutorGroup businessGroup,NettyClientHandler nettyClientHandler, String host, int port) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyClientHandler);}});try {CHANNEL = bootstrap.connect(host, port).sync().channel();} catch (InterruptedException exception) {log.error("客戶端中斷異常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {workerGroup.shutdownGracefully().syncUninterruptibly();log.info("客戶端關閉成功");}
}

6. 客戶端準備

6.1 準備6個jar包, 修改application.yml, 并根據下述規則放到對應機器上

Aa放在163上, Bb放在164上, Cc放在165上, Dd放在166上, Ee放在161上, Ff放在162上

userCode: Aa
host: 192.168.109.161
userCode: Bb
host: 192.168.109.161
userCode: Cc
host: 192.168.109.162
userCode: Dd
host: 192.168.109.162
userCode: Ee
host: 192.168.109.163
userCode: Ff
host: 192.168.109.163

6.2 161到166端口開放

firewall-cmd --zone=public --add-port=32001/tcp --permanent
firewall-cmd --reload

6.3 啟動所有客戶端

AB連接
CD連接
EF連接

7. 測試

請求參數

7.1 兩個客戶端連同一服務端, 不會出現轉發

Aa向Bb發送消息, 且Bb收到后回復Aa
Aa向Bb
Bb向Aa

7.2 兩個客戶端連不同服務端

Aa向Cc發送消息(通過服務端1轉發到服務端2), 且Cc收到后回復Aa(通過服務端2轉發到服務端1)
A到C的轉發
Aa向CcC到A的轉發
Cc向Aa
Aa向Ee發送消息, 且Ee收到后回復Aa
Aa向Ee
Ee向Aa

7.3 廣播

廣播請求參數
收到廣播

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/12431.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/12431.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/12431.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Elasticsearch的使用

Elasticsearch 1、認識和安裝 Elasticsearch的官方網站如下&#xff1a; https://www.elastic.co/cn/elasticsearch Elasticsearch是由elastic公司開發的一套搜索引擎技術&#xff0c;它是elastic技術棧中的一部分。完整的技術棧包括&#xff1a; Elasticsearch&#xff1…

MySQL變量的定義與使用(二)

一、通過變量進行實際的操作 set cityNameRotterdam; SELECT * from city where Name cityName; 二、變量只能處理字符&#xff0c;并不能代替符號或者關鍵字進行使用 set cityName1Rotterdam; set cityName2Zaanstad; set cityName3Zwolle; SELECT * from city where Name…

2024CCPC全國邀請賽(鄭州)暨河南省賽

2024CCPC全國邀請賽&#xff08;鄭州站&#xff09;暨河南省賽 一銅一銀&#xff0c;雖不是線下第一次參賽但是第一次拿xcpc獎牌&#xff0c;還有個國賽獎真是不戳。感謝學長&#xff0c;感謝隊友&#xff01; 雖然遺憾沒有沖到省賽金&#xff0c;不過還有icpc商丘&#xff08…

SpringBoot項目中使用Redis,Mybatis和JWT

在Spring Boot項目中&#xff0c;結合Redis&#xff0c;MyBatis和JWT的使用可以提供以下功能&#xff1a; Redis的作用&#xff1a; 1.緩存&#xff1a;Redis可以用作緩存存儲&#xff0c;提高應用程序的性能和響應速度。特別是對于頻繁讀取但不經常更新的數據&#xff0c;如配…

Milvus Cloud:打造向量數據庫的Airtable級體驗

向量數據庫Milvus Cloud是一種用于處理和存儲向量數據的數據庫,它通常用于機器學習、圖像和視頻檢索、自然語言處理等領域。要將其升級為類似Airtable那樣易用且一體化的系統,需要考慮以下幾個關鍵方面: 1. 用戶界面(UI)設計 Airtable之所以用戶友好,很大程度上歸功于其直…

整型進制轉換

整型常量的不同進制表示 計算機中只能存儲二進制數&#xff0c;即0和1&#xff0c;而在對應的物理硬件上則是高&#xff0c;低電平。為了更方便地觀察內存中的二進制情況&#xff0c;除我們正常使用的十進制數外&#xff0c;計算機還提供了十六進制數和八進制數。 下面介紹不…

類圖及類的關系

類圖&#xff08;Class Diagram&#xff09;是UML&#xff08;Unified Modeling Language&#xff0c;統一建模語言&#xff09;中的一種圖&#xff0c;用于描述系統中類的靜態結構&#xff0c;包括類的屬性、方法以及類之間的關系。 一、類 類&#xff08;Class&#xff09;…

海外倉混合訂單揀貨策略:人工與海外倉系統的最佳搭配模式

根據訂單高效揀貨是任何海外倉都要面對的問題。只有當訂單可以被高效&#xff0c;準確的揀貨之后&#xff0c;才能繼續走下面的物流流程&#xff0c;所以盡可能的縮短揀貨時間&#xff0c;提升揀貨精準度&#xff0c;才是提升訂單交付率的最佳方法。 海外倉企業都在不斷尋找&am…

Vue如何引入公用方法

文章目錄 1. 在全局范圍內引入2. 在單文件組件中引入3. 使用Vuex或Vue Composition API4. 使用mixins5. 使用插件 1. 在全局范圍內引入 在你的main.js或main.ts文件中引入并注冊你的公用方法&#xff0c;使得它們可以在整個Vue應用中使用。 // 引入你的公用方法文件 import {…

Android動態布局framelayout

功能說明 最近碰到一個需求&#xff0c;要求在網頁端拖控件&#xff0c;動態配置app控件的模塊&#xff0c;大小和位置&#xff0c;顯示不同的功能&#xff0c;然后在app大屏展示。 技術難點&#xff1a; 1.動態控件位置和大小難調&#xff0c;會出現布局混亂&#xff0c;位置錯…

129.哈希表:有效的字母異位詞(力扣)

242. 有效的字母異位詞 - 力扣&#xff08;LeetCode&#xff09; 題目描述 代碼解決以及思路 這個方法的時間復雜度為O(N)&#xff0c;其中N是字符串的長度&#xff0c;空間復雜度為O(1)&#xff08;因為輔助數組的大小是固定的26&#xff09;。 class Solution { public:bo…

python通過ctypes調用C/C++ SDK,當SDK異常時,同時打印C/C++/Python的棧信息

python通過ctypes調用C/C SDK,當SDK異常時,同時打印C/C/Python的棧信息 一.復現步驟二.輸出 本文演示了python通過ctypes調用C/C SDK,當SDK異常時,同時打印C/C/Python的棧信息.基于traceback、addr2line、PyErr_SetString、backtrace_symbols 一.復現步驟 cat > print_bac…

自媒體的發展趨勢:從個人表達到全球話語權

一、引言隨著數字技術的快速發展&#xff0c;信息傳播的方式和格局也在不斷變化。自媒體&#xff0c;作為其中的一股重要力量&#xff0c;正在以它的獨特方式改變著全球的信息傳播和社會發展。本文將從自媒體的定義及發展歷程入手&#xff0c;深入探討自媒體未來的發展趨勢&…

感知局部規劃--似然場局部規劃

系列文章目錄 提示&#xff1a;這里可以添加系列文章的所有文章的目錄&#xff0c;目錄需要自己手動添加 TODO:寫完再整理 文章目錄 系列文章目錄前言感知導航感知似然場局部規劃&#xff08;很像DWA但是不依賴地圖&#xff0c;完全依賴感知&#xff09; 前言 認知有限&#x…

Uniapp開發入門:構建跨平臺應用的全面指南

引言 什么是Uniapp Uniapp是一款由DCloud公司推出的基于Vue.js的跨平臺應用開發框架。它的核心理念是“一套代碼&#xff0c;多端運行”&#xff0c;開發者只需編寫一份代碼&#xff0c;即可生成包括iOS、Android、H5、微信小程序、支付寶小程序、百度小程序等多平臺的應用。…

初識C++ · string的使用(2)

目錄 1 Modifiers部分 1.1 assign的使用 1.2 insert的使用 1.3 erase的使用 1.4 replace的使用 2 capacity部分 2.1 max_size的使用 2.2 capacity的使用 2.3 reserve的使用 2.4 shrink_to_fit簡介 2.5 resize的使用 2.6 clear的使用 3 String operations部分 3.1 …

[數據結構1.0]快速排序

最近學習了快速排序&#xff0c;鼠鼠俺來做筆記了&#xff01; 本篇博客用排升序為例介紹快速排序&#xff01; 1.快速排序 快速排序是Hoare于1962年提出的一種二叉樹結構的交換排序方法&#xff0c;其基本思想為&#xff1a;任取待排序元素序列中的某元素作為基準值&#x…

202103青少年軟件編程(Python)等級考試試卷(一級)

一、單選題&#xff08;共25題&#xff0c;每題2分&#xff0c;共50分&#xff09; 下列哪個操作不能退出IDLE環境&#xff1f;&#xff08; &#xff09; A、AltF4 B、CtrlQ C、按ESC鍵 D、exit() 試題編號&#xff1a;20210124-yfj-003 題型&#xff1a;單選題 答案&#xf…

Java面試八股之一個char類型變量能不能存儲一個中文字符

Java中一個char類型變量能不能存儲一個中文字符&#xff1f;為什么&#xff1f; Java中一個char類型變量可以存儲一個中文字符。原因如下&#xff1a; Unicode編碼支持&#xff1a;Java語言采用Unicode字符集作為其內建字符編碼方式。Unicode是一種廣泛接受的字符編碼標準&am…

兩小時看完花書(深度學習入門篇)

1.深度學習花書前言 機器學習早期的時候十分依賴于已有的知識庫和人為的邏輯規則&#xff0c;需要人們花大量的時間去制定合理的邏輯判定&#xff0c;可以說是有多少人工&#xff0c;就有多少智能。后來逐漸發展出一些簡單的機器學習方法例如logistic regression、naive bayes等…