說明
前面消息互發以及廣播都是單機就可以完成測試, 但實際場景中客戶端的連接數量很大, 那就需要有一定數量的服務端去支撐, 所以準備虛擬機測試。
1. 虛擬機準備
1.1 準備1個1核1G的虛擬機(160), 配置java環境, 安裝redis和minio
1.2 準備6個1核1G的空虛擬機(161到166), 只需要java環境即可
2. 服務端改造
2.1 修改 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.hahashou.netty</groupId><artifactId>server</artifactId><version>1.0-SNAPSHOT</version><name>server</name><description>Netty Server Project For Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.100.Final</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.security</groupId><artifactId>spring-security-crypto</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
2.2 修改 application.yml (每個服務端的id是不一樣的)
server:port: 32000spring:redis:host: 192.168.109.160port: 6379password: rootlogging:level:com.hahashou.netty: infonetty:server:# 唯一標識(與hosts文件里對應)id : netty-server-1# 客戶端需要連接的端口port: 35000
2.3 config包下增加 NettyStatic類
package com.hahashou.netty.server.config;import io.netty.channel.Channel;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @description: 靜態常量* @author: 哼唧獸* @date: 9999/9/21**/
public class NettyStatic {/** key: 用戶code; value: channelId */public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);/** key: channelId; value: Channel */public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);public static Map<String, NettyClientHandler> NETTY_CLIENT_HANDLER = new ConcurrentHashMap<>(32);public static Map<NettyClientHandler, NettyClient> NETTY_CLIENT = new ConcurrentHashMap<>(32);
}
2.4 config包下增加 RedisConfig類
package com.hahashou.netty.server.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @description: Redis配置* @author: 哼唧獸* @date: 9999/9/21**/
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();// 使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());// 使用StringRedisSerializer來序列化和反序列化redis的keyredisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());// 開啟事務:redisTemplate.setEnableTransactionSupport(true); 我覺得一般用不到(該操作是為了執行一組命令而設置的)redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate;}@Beanpublic ValueOperations<String, Object> redisOperation(RedisTemplate<String, Object> redisTemplate) {return redisTemplate.opsForValue();}public static String NETTY_SERVER_LOCK = "NETTY_SERVER_LOCK";public static String NETTY_SERVER_LIST = "NETTY_SERVER_LIST";public static String OFFLINE_MESSAGE = "OFFLINE_MESSAGE_";
}
2.5 修改 EventLoopGroupConfig類
package com.hahashou.netty.server.config;import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** @description: Netty線程組* @author: 哼唧獸* @date: 9999/9/21**/
@Configuration
public class EventLoopGroupConfig {private int bossNum = 1;private int workerNum = 4;private int businessNum = 1;private int maxPending = 100000;/** ------------------------------ 服務端 ------------------------------ */@Bean("bossGroup")public NioEventLoopGroup bossGroup() {return new NioEventLoopGroup(bossNum);}@Bean("workerGroup")public NioEventLoopGroup workerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("businessGroup")public EventExecutorGroup businessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(),maxPending, RejectedExecutionHandlers.reject());}/** ------------------------------ 客戶端 ------------------------------ */@Bean("clientWorkerGroup")public NioEventLoopGroup clientWorkerGroup() {return new NioEventLoopGroup(workerNum);}@Bean("clientBusinessGroup")public EventExecutorGroup clientBusinessGroup() {return new DefaultEventExecutorGroup(businessNum, new BusinessThreadFactory(), maxPending, RejectedExecutionHandlers.reject());}static class BusinessThreadFactory implements ThreadFactory {private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;BusinessThreadFactory() {SecurityManager securityManager = System.getSecurityManager();group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();namePrefix = "netty-server-";}@Overridepublic Thread newThread(Runnable runnable) {Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}
}
2.6 config包下增加 SpringBean類
package com.hahashou.netty.server.config;import io.netty.util.HashedWheelTimer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;/*** @description: Spring Bean管理* @author: 哼唧獸* @date: 9999/9/21**/
@Configuration
public class SpringBean {@Beanpublic PasswordEncoder passwordEncoder() {return new BCryptPasswordEncoder();}/*** 最多能new64個, private static final int INSTANCE_COUNT_LIMIT = 64;* @return*/@Beanpublic HashedWheelTimer hashedWheelTimer() {// 默認tick間隔100毫秒, 輪子大小為512return new HashedWheelTimer();}
}
2.7 server包下增加 ApplicationInitial類
package com.hahashou.netty.server;import com.hahashou.netty.server.config.NettyServer;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** @description: 應用初始化* @author: 哼唧獸* @date: 9999/9/21**/
@Component
@Slf4j
public class ApplicationInitial implements ApplicationRunner {@Resourceprivate HashedWheelTimer hashedWheelTimer;@Resourceprivate NettyServer nettyServer;@Overridepublic void run(ApplicationArguments args) {hashedWheelTimer.newTimeout(nettyServer, 1 , TimeUnit.SECONDS);}
}
2.8 修改 Message類
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.Getter;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@Data
public class Message {/** 廣播秘鑰 */private String secretKey;/** 發送者用戶code */private String userCode;/** 中轉的服務端Id */private String serverId;/** 接收者用戶code */private String friendUserCode;/** 連接時專用 */private String channelId;/** 消息類型 */private Integer type;public enum TypeEnum {TEXT(0, "文字", "", new ArrayList<>()),IMAGE(1, "圖片", "image", Arrays.asList("bmp", "gif", "jpeg", "jpg", "png")),VOICE(2, "語音", "voice", Arrays.asList("mp3", "amr", "flac", "wma", "aac")),VIDEO(3, "視頻", "video", Arrays.asList("mp4", "avi", "rmvb", "flv", "3gp", "ts", "mkv")),;@Getterprivate Integer key;@Getterprivate String describe;@Getterprivate String bucketName;@Getterprivate List<String> formatList;TypeEnum(int key, String describe, String bucketName, List<String> formatList) {this.key = key;this.describe = describe;this.bucketName = bucketName;this.formatList = formatList;}public static TypeEnum select(String format) {TypeEnum result = null;for (TypeEnum typeEnum : TypeEnum.values()) {if (typeEnum.getFormatList().contains(format)) {result = typeEnum;break;}}return result;}}/** 文字或文件的全路徑名稱 */private String text;public static ByteBuf transfer(Message message) {return Unpooled.copiedBuffer(JSON.toJSONString(message), CharsetUtil.UTF_8);}/*** 生成指定長度的隨機字符串* @param length* @return*/public static String randomString (int length) {if (length > 64) {length = 64;}List<String> list = new ArrayList<>();for (int i = 0; i < 10; i++) {list.add(i + "");}for (char i = 'A'; i <= 'Z'; i++) {list.add(String.valueOf(i));}for (char i = 'a'; i <= 'z'; i++) {list.add(String.valueOf(i));}list.add("α");list.add("ω");Collections.shuffle(list);String string = list.toString();return string.replace("[", "").replace("]", "").replace(", ", "").substring(0, length);}
}
2.9 config包下增加 NettyClientHandler類
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Getter@Setterprivate String userCode;@Getter@Setterprivate String hostName;@Getter@Setterprivate int port;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {log.info("{}, 作為客戶端, 與其他服務端連接", LocalDateTime.now());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {NettyStatic.CHANNEL.remove(ctx.channel().id().asLongText());NettyClientHandler nettyClientHandler = NettyStatic.NETTY_CLIENT_HANDLER.remove(hostName + "@" + port);NettyClient nettyClient = NettyStatic.NETTY_CLIENT.remove(nettyClientHandler);nettyClient = null;nettyClientHandler = null;System.gc();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String channelId = message.getChannelId(),text = message.getText();if (StringUtils.hasText(channelId)) {Channel channel = ctx.channel();message.setUserCode(userCode);NettyStatic.USER_CHANNEL.put(hostName, channelId);NettyStatic.CHANNEL.put(channelId, channel);channel.writeAndFlush(Message.transfer(message));} else if (StringUtils.hasText(text)) {String friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(message.getServerId())) {String queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}// 此時, 已不需要serverIdmessage.setServerId(null);channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}}}}/*** 離線消息存儲Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {}
}
2.10 config包下增加 NettyClient類
package com.hahashou.netty.server.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;import javax.annotation.PreDestroy;
import java.net.*;
import java.nio.charset.Charset;/*** @description: Netty-客戶端TCP服務* @author: 哼唧獸* @date: 9999/9/21**/
@Slf4j
public class NettyClient {@Getter@Setterprivate NioEventLoopGroup clientWorkerGroup;@Getter@Setterprivate EventExecutorGroup clientBusinessGroup;public void createClient(NettyClientHandler nettyClientHandler) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(clientWorkerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(clientBusinessGroup, nettyClientHandler);}});try {InetAddress inetAddress = InetAddress.getByName(nettyClientHandler.getHostName());SocketAddress socketAddress = new InetSocketAddress(inetAddress, nettyClientHandler.getPort());bootstrap.connect(socketAddress).sync().channel();} catch (UnknownHostException exception) {log.error("請檢查hosts文件是否配置正確 : {}", exception.getMessage());} catch (InterruptedException exception) {log.error("客戶端中斷異常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {clientWorkerGroup.shutdownGracefully().syncUninterruptibly();log.info("客戶端關閉成功");}
}
2.11 修改 NettyServer類
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @description: Netty-服務端TCP服務* @author: 哼唧獸* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyServer implements TimerTask {@Value("${netty.server.id}")private String serverId;@Value("${netty.server.port}")private int port;@Resourceprivate NioEventLoopGroup bossGroup;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyServerHandler nettyServerHandler;@Resourceprivate NioEventLoopGroup clientWorkerGroup;@Resourceprivate EventExecutorGroup clientBusinessGroup;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate HashedWheelTimer hashedWheelTimer;@Overridepublic void run(Timeout timeout) {Object nettyServerLock = redisOperation.get(RedisConfig.NETTY_SERVER_LOCK);if (nettyServerLock != null) {hashedWheelTimer.newTimeout(this, 10, TimeUnit.SECONDS);return;}try {redisOperation.set(RedisConfig.NETTY_SERVER_LOCK, true);//String hostAddress = InetAddress.getLocalHost().getHostAddress();ServerBootstrap serverBootstrap = new ServerBootstrap();ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyServerHandler);}})// 服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true)// 此處有個大坑, 詳見文章脫坑指南.bind(port).sync();if (channelFuture.isSuccess()) {log.info("{} 啟動成功", serverId);redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}thisNodeHandle(port);channelFuture.channel().closeFuture().sync();} catch (InterruptedException exception) {log.error("{} 啟動失敗: {}", serverId, exception.getMessage());} finally {redisTemplate.delete(RedisConfig.NETTY_SERVER_LOCK);}}private void thisNodeHandle(int port) {Set<String> nodeList = new HashSet<>();Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);if (nettyServerList != null) {nodeList = new HashSet<>(JSON.parseArray(nettyServerList.toString(), String.class));for (String hostAndPort : nodeList) {String[] split = hostAndPort.split("@");String connectHost = split[0];int connectPort = Integer.parseInt(split[1]);NettyClient nettyClient = new NettyClient();nettyClient.setClientWorkerGroup(clientWorkerGroup);nettyClient.setClientBusinessGroup(clientBusinessGroup);NettyClientHandler nettyClientHandler = new NettyClientHandler();nettyClientHandler.setUserCode(serverId);nettyClientHandler.setHostName(connectHost);nettyClientHandler.setPort(connectPort);nettyClient.createClient(nettyClientHandler);NettyStatic.NETTY_CLIENT_HANDLER.put(connectHost + "@" + connectPort, nettyClientHandler);NettyStatic.NETTY_CLIENT.put(nettyClientHandler, nettyClient);}}nodeList.add(serverId + "@" + port);redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(nodeList));}public void stop() {bossGroup.shutdownGracefully().syncUninterruptibly();workerGroup.shutdownGracefully().syncUninterruptibly();log.info("TCP服務關閉成功");Object nettyServerList = redisOperation.get(RedisConfig.NETTY_SERVER_LIST);List<String> hostList = JSON.parseArray(nettyServerList.toString(), String.class);hostList.remove(serverId + "@" + port);if (CollectionUtils.isEmpty(hostList)) {redisTemplate.delete(RedisConfig.NETTY_SERVER_LIST);} else {redisOperation.set(RedisConfig.NETTY_SERVER_LIST, JSON.toJSONString(hostList));}}@PreDestroypublic void destroy() {stop();}
}
2.12 修改 NettyServerHandler類
package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Value("${netty.server.id}")private String serverId;public static String SERVER_PREFIX = "netty-server-";@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Resourceprivate ValueOperations<String, Object> redisOperation;@Overridepublic void channelActive(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asLongText();log.info("有客戶端連接, channelId : {}", channelId);NettyStatic.CHANNEL.put(channelId, channel);Message message = new Message();message.setChannelId(channelId);channel.writeAndFlush(Message.transfer(message));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {String channelId = ctx.channel().id().asLongText();log.info("有客戶端斷開連接, channelId : {}", channelId);NettyStatic.CHANNEL.remove(channelId);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {if (entry.getValue().equals(channelId)) {redisTemplate.delete(entry.getKey());break;}}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg != null) {Message message = JSON.parseObject(msg.toString(), Message.class);String userCode = message.getUserCode(),channelId = message.getChannelId(),friendUserCode = message.getFriendUserCode();if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {connect(userCode, channelId);} else if (StringUtils.hasText(message.getText())) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();message.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(message);} else {sendAdmin(ctx.channel(), message);}} else {offlineMessage(friendUserCode, message);}}}}/*** 建立連接* @param userCode* @param channelId*/private void connect(String userCode, String channelId) {log.info("{} 連接", userCode);NettyStatic.USER_CHANNEL.put(userCode, channelId);if (!userCode.startsWith(SERVER_PREFIX)) {redisOperation.set(userCode, serverId);}}/*** 發送給其他客戶端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 進行轉發");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 離線消息存儲Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {// 1條message在redis中大概是100B, 1萬條算1M, redis.conf的maxmemory設置的是256MList<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}/*** 發送給服務端* @param channel* @param message*/private void sendAdmin(Channel channel, Message message) {message.setUserCode("ADMIN");message.setText(LocalDateTime.now().toString());channel.writeAndFlush(Message.transfer(message));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("有客戶端發生異常, channelId : {}", ctx.channel().id().asLongText());}
}
2.13 新建service包, 并新增 ServerService接口
package com.hahashou.netty.server.service;import com.hahashou.netty.server.config.Message;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
public interface ServerService {/*** 發送消息* @param dto*/void send(Message dto);/*** 停止服務(為后續斷線重連做準備)*/void stop();
}
2.14 service包下新建impl包, 并新增 ServerServiceImpl類
package com.hahashou.netty.server.service.impl;import com.alibaba.fastjson.JSON;
import com.hahashou.netty.server.config.*;
import com.hahashou.netty.server.service.ServerService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@Service
@Slf4j
public class ServerServiceImpl implements ServerService {@Value("${netty.server.id}")private String serverId;@Resourceprivate PasswordEncoder passwordEncoder;@Resourceprivate ValueOperations<String, Object> redisOperation;@Resourceprivate NettyServer nettyServer;@Overridepublic void send(Message dto) {String friendUserCode = dto.getFriendUserCode();if (StringUtils.hasText(friendUserCode)) {Object code = redisOperation.get(friendUserCode);if (code != null) {String queryServerId = code.toString();dto.setServerId(serverId.equals(queryServerId) ? null : queryServerId);if (StringUtils.hasText(friendUserCode)) {sendOtherClient(dto);}} else {offlineMessage(friendUserCode, dto);}} else {// 全體廣播, 需要校驗秘鑰(inputSecretKey應該是一個動態值, 通過手機+驗證碼每次廣播時獲取, 自行實現)String inputSecretKey = dto.getSecretKey();// encodedPassword生成見main方法String encodedPassword = "$2a$10$J/UEqtme/w2D0TWB4gJKFeSsyc3s8pepr6ahzOsORkC9zpaLSvZbG";if (StringUtils.hasText(inputSecretKey) && passwordEncoder.matches(inputSecretKey, encodedPassword)) {dto.setSecretKey(null);for (Map.Entry<String, String> entry : NettyStatic.USER_CHANNEL.entrySet()) {String key = entry.getKey();if (key.startsWith(NettyServerHandler.SERVER_PREFIX)) {// 這里可以用http調用其他服務端, 自行補充(信息redis都有)continue;}// 只處理連接本端的客戶端String value = entry.getValue();Channel channel = NettyStatic.CHANNEL.get(value);if (channel == null) {offlineMessage(friendUserCode, dto);return;}channel.writeAndFlush(Message.transfer(dto));}}}}public static void main(String[] args) {String text = "uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU";PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();String encode = passwordEncoder.encode(text);log.info(encode);if (passwordEncoder.matches(text, encode)) {log.info("秘鑰正確");}}/*** 發送給其他客戶端* @param message*/private void sendOtherClient(Message message) {String friendUserCode = message.getFriendUserCode(),serverId = message.getServerId();String queryChannelId;if (StringUtils.hasText(serverId)) {log.info("向" + serverId + " 進行轉發");queryChannelId = NettyStatic.USER_CHANNEL.get(serverId);} else {queryChannelId = NettyStatic.USER_CHANNEL.get(friendUserCode);}if (StringUtils.hasText(queryChannelId)) {Channel channel = NettyStatic.CHANNEL.get(queryChannelId);if (channel == null) {offlineMessage(friendUserCode, message);return;}channel.writeAndFlush(Message.transfer(message));} else {offlineMessage(friendUserCode, message);}}/*** 離線消息存儲Redis* @param friendUserCode* @param message*/public void offlineMessage(String friendUserCode, Message message) {List<Message> messageList = new ArrayList<>();Object offlineMessage = redisOperation.get(RedisConfig.OFFLINE_MESSAGE + friendUserCode);if (offlineMessage != null) {messageList = JSON.parseArray(offlineMessage.toString(), Message.class);}messageList.add(message);redisOperation.set(RedisConfig.OFFLINE_MESSAGE + friendUserCode, JSON.toJSONString(messageList));}@Overridepublic void stop() {nettyServer.stop();}
}
2.15 修改 ServerController類
package com.hahashou.netty.server.controller;import com.hahashou.netty.server.config.Message;
import com.hahashou.netty.server.service.ServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @description:* @author: 哼唧獸* @date: 9999/9/21**/
@RestController
@RequestMapping("/server")
@Slf4j
public class ServerController {@Resourceprivate ServerService serverService;/*** 秘鑰記錄: uTωAoJIGBcy7piYCFgQntVvEh8RH6WMU* @param dto* @return*/@PostMapping("/send")public String send(@RequestBody Message dto) {serverService.send(dto);return "success";}@GetMapping("/stop")public String stop() {serverService.stop();return "stop netty success";}
}
3. 脫坑指南, 針對 NettyServer類
工具
yum -y install net-tools
netstat -tunlp
防火墻打開時, 當使用 bind(String inetHost, int inetPort) 方法時, 因為inetHost是127.0.0.1, 所以只有本機可以訪問35000, 要想讓其他機器可以連接到, 需使用 bind(int inetPort) 方法, 下圖是前后兩次端口占用情況
結論
當使用bind(String inetHost, int inetPort)方法時, 無論防火墻關閉以及啟動, 虛擬機均有問題; 但當機器有公網IP, 且防火墻關閉或端口開放時, 通過DNS解析映射是沒有問題的, 建議還是用bind(int inetPort)方法
4. 服務端準備
4.1 打包3個服務端的jar包, id分別為netty-server-1、netty-server-2、netty-server-3, 分別放在161到163上
4.2 161、162、163端口開放
firewall-cmd --zone=public --add-port=35000/tcp --permanent
firewall-cmd --zone=public --add-port=32000/tcp --permanent
firewall-cmd --reload
4.3 161、162、163修改hosts
vi /etc/hosts
追加內容
192.168.109.161 netty-server-1
192.168.109.162 netty-server-2
192.168.109.163 netty-server-3
4.4 依次啟動161、162、163
java -Dfile.encoding=UTF-8 -jar server-1.0-SNAPSHOT.jar
161
162
163
redis中記錄的服務列表
5. 客戶端改造
5.1 修改 application.yml
server:port: 32001logging:level:com.hahashou.netty: infospring:servlet:multipart:max-file-size: 128MBmax-request-size: 256MBuserCode: Aa
host: 192.168.109.161minio:endpoint: http://192.168.109.160:9000accessKey: rootsecretKey: root123456
5.2 修改 NettyClient類
package com.hahashou.netty.client.config;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.Charset;/*** @description: Netty-TCP服務* @author: 哼唧獸* @date: 9999/9/21**/
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ApplicationStartedEvent> {@Value("${host}")private String host;public static int PORT = 35000;@Resourceprivate NioEventLoopGroup workerGroup;@Resourceprivate EventExecutorGroup businessGroup;@Resourceprivate NettyClientHandler nettyClientHandler;public static Channel CHANNEL;@SneakyThrows@Overridepublic void onApplicationEvent(ApplicationStartedEvent event) {createClient(workerGroup, businessGroup, nettyClientHandler, host, PORT);}public void createClient(NioEventLoopGroup workerGroup, EventExecutorGroup businessGroup,NettyClientHandler nettyClientHandler, String host, int port) {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));pipeline.addLast(businessGroup, nettyClientHandler);}});try {CHANNEL = bootstrap.connect(host, port).sync().channel();} catch (InterruptedException exception) {log.error("客戶端中斷異常 : {}", exception.getMessage());}}@PreDestroypublic void destroy() {workerGroup.shutdownGracefully().syncUninterruptibly();log.info("客戶端關閉成功");}
}
6. 客戶端準備
6.1 準備6個jar包, 修改application.yml, 并根據下述規則放到對應機器上
Aa放在163上, Bb放在164上, Cc放在165上, Dd放在166上, Ee放在161上, Ff放在162上
userCode: Aa
host: 192.168.109.161
userCode: Bb
host: 192.168.109.161
userCode: Cc
host: 192.168.109.162
userCode: Dd
host: 192.168.109.162
userCode: Ee
host: 192.168.109.163
userCode: Ff
host: 192.168.109.163
6.2 161到166端口開放
firewall-cmd --zone=public --add-port=32001/tcp --permanent
firewall-cmd --reload
6.3 啟動所有客戶端
7. 測試
7.1 兩個客戶端連同一服務端, 不會出現轉發
Aa向Bb發送消息, 且Bb收到后回復Aa
7.2 兩個客戶端連不同服務端
Aa向Cc發送消息(通過服務端1轉發到服務端2), 且Cc收到后回復Aa(通過服務端2轉發到服務端1)
Aa向Ee發送消息, 且Ee收到后回復Aa
7.3 廣播