Netty學習example示例

在這里插入圖片描述

文章目錄

  • simple
    • Server端
      • NettyServer
      • NettyServerHandler
    • Client端
      • NettyClient
      • NettyClientHandler
  • tcp(粘包和拆包)
    • Server端
      • NettyTcpServer
      • NettyTcpServerHandler
    • Client端
      • NettyTcpClient
      • NettyTcpClientHandler
  • protocol
    • codec
      • CustomMessageDecoder
      • CustomMessageEncoder
    • server端
      • ProtocolServer
      • ProtocolServerHandler
    • client端
      • ProtocolClient
      • ProtocolClientHandler
  • http
    • Server端
      • HttpServer
      • HttpServerHandler
    • Client端
      • HttpClient
      • HttpClientHandler
  • ws
    • Server端
      • WsServer
      • WsServerHandler
    • Client端
      • WsClient
      • WebSocketClientHandler
  • protobuf
    • Server端
      • NettyServer
      • NettyServerHandler
      • Student.proto
    • Client端
      • NettyClient
      • NettyClientHandler

simple

Server端

NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服務端接收到客戶端數據:{}", msg);ctx.writeAndFlush("服務端收到客戶端的數據: " + msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught異常:", cause);ctx.close();}public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客戶端連接:{}", ctx.channel().remoteAddress());}public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客戶端斷開連接:{}", ctx.channel().remoteAddress());}}

Client端

NettyClient

@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();System.out.println("客戶端連接成功");Scanner sc = new Scanner(System.in);while (true) {System.out.println("請輸入內容: ");String line = sc.nextLine();if (line == null || line.isEmpty()) {continue;} else if ("exit".equals(line)) {channel.close();break;}channel.writeAndFlush(line);}channel.closeFuture().sync();System.out.println("客戶端關閉");} catch (Exception e) {log.error("客戶端發生異常: ", e);}}}

NettyClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "異常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}

tcp(粘包和拆包)

Server端

NettyTcpServer

@Slf4j
public class NettyTcpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", 9090);Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

NettyTcpServerHandler

@Slf4j
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes, StandardCharsets.UTF_8);log.info("服務端接收到的數據字節長度為:{}, 內容為: {}", bytes.length, content);ByteBuf buf = Unpooled.copiedBuffer(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(buf);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("異常: {}", cause.getMessage());ctx.close();}
}

Client端

NettyTcpClient

@Slf4j
public class NettyTcpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpClientHandler());}});ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("客戶端發生異常: ", e);} finally {group.shutdownGracefully();}}}

NettyTcpClientHandler

@Slf4j
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info("客戶端接收到數據:{}", byteBuf.toString(StandardCharsets.UTF_8));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {/*粘包:1. 這里連續發送10次byteBuf,發現服務端有可能1次就全部接收了,也有可能3次接受了,也有可能4次接收了,這是不確定的,這也就意味著基于底層NIO的tcp的數據傳輸 是基于流式傳輸的,會出現粘包的問題。2. 因此服務端必須 自行處理粘包問題,區分消息邊界3. 這里測試的時候,可以多啟動幾個客戶端來觀察4. 這里示例的粘包示例與上面simple的區別在于:這里是在短時間內連續發送*//*for (int i = 0; i < 10; i++) {ByteBuf byteBuf = Unpooled.copiedBuffer(("hello, server " + i).getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(byteBuf);}*//*拆包:1. 這里1次發送了1個10000字節長的數據,而服務端分多次收到,有可能是2次,有可能是1次, 這是不確定的,2. 假設真實數據包就有這么長,那么服務端可能需要分多次才能接收到完整的數據包,3. 同時,我們發現總的數據長度服務端都接收到了,這說明底層NIO的tcp的數據傳輸 是可靠的4. 1條比較長的消息,服務端分多次才能收到,所以服務端需要解決拆包的問題,將多次接收到的消息轉為1條完整的消息5. 這里示例的拆包示例與上面simple的區別在于:這里1次發送的消息數據很長*/StringBuilder sb = new StringBuilder();for (int i = 0; i < 1000; i++) {sb.append("Netty拆包示例|");}ctx.writeAndFlush(Unpooled.copiedBuffer(sb.toString().getBytes(StandardCharsets.UTF_8)));log.info("客戶端發送數據長度:{}", sb.toString().length());/* 拆包 與 粘包 的核心問題就是 tcp是流式傳輸的,tcp可以保證數據可靠傳輸,但需要對方在接收時需要能區分出消息邊界,從而獲取1條完整的消息 */}}

protocol

codec

使用自定義協議,編解碼器,識別消息邊界,處理粘包和拆包問題

CustomMessageDecoder

public class CustomMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int len = in.readInt();if (in.readableBytes() < len) {in.resetReaderIndex();return;}byte[] bytes = new byte[len];in.readBytes(bytes);out.add(CustomMessage.builder().len(len).content(bytes).build());}
}

CustomMessageEncoder

public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}
}

server端

ProtocolServer

@Slf4j
public class ProtocolServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new ProtocolServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 9090));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("server stop");}}

ProtocolServerHandler

@Slf4j
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 這里直接轉, 如果不能轉的話, 就說明前面的解碼有問題CustomMessage customMessage = (CustomMessage) msg;log.info("服務端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));// 將消息回過去(需要加上對應的編碼器)ctx.writeAndFlush(customMessage);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("ProtocolServerHandler異常: {}", cause.getMessage());ctx.close();}
}

client端

ProtocolClient

@Slf4j
public class ProtocolClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new ProtocolClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("localhost", 9090);Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.info("client error", e);} finally {group.shutdownGracefully();}}}

ProtocolClientHandler

@Slf4j
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 這里直接轉, 如果不能轉的話, 就說明前面的解碼有問題CustomMessage customMessage = (CustomMessage) msg;log.info("客戶端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));}@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 1; i <= 20; i++) {byte[] bytes = ("hello, server " + i).getBytes(StandardCharsets.UTF_8);CustomMessage message = CustomMessage.builder().content(bytes).len(bytes.length).build();ctx.writeAndFlush(message);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("【ProtocolClientHandler->exceptionCaught】: {}", cause.getMessage());}
}

http

Server端

HttpServer

@Slf4j
public class HttpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler("【服務端主】")).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler("【服務端從】"));pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));pipeline.addLast("httpServerHandler", new HttpServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080));channelFuture.sync();log.info("http服務器啟動成功, 您可以訪問: http://localhost:8080/test");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("服務端發生異常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

HttpServerHandler

@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {log.info("【HttpServerHandler->處理】:{}", msg);if (msg instanceof FullHttpRequest) {FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;String uri = fullHttpRequest.uri();log.info("【uri】:{}", uri);HttpMethod method = fullHttpRequest.method();log.info("【method】:{}", method);// 響應回去byte[] bytes = ("服務器收到時間" + LocalDateTime.now()).getBytes(StandardCharsets.UTF_8);DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,Unpooled.copiedBuffer(bytes));fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, bytes.length);fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");ChannelPromise promise = ctx.newPromise();promise.addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {log.info("操作完成");log.info("isDone: {}", future.isDone());log.info("isSuccess: {}", future.isSuccess());log.info("isCancelled: {}", future.isCancelled());log.info("hasException: {}", future.cause() != null, future.cause());}});ctx.writeAndFlush(fullHttpResponse, promise);log.info("剛剛寫完");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.error("【HttpServerHandler->userEventTriggered】:{}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【HttpServerHandler->exceptionCaught】", cause);}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelUnregistered】");}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerRemoved】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelInactive】");}}

Client端

HttpClient

@Slf4j
public class HttpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler(LogLevel.DEBUG));pipeline.addLast("httpClientCodec", new HttpClientCodec());pipeline.addLast("", new HttpObjectAggregator(10 * 1024));pipeline.addLast("httpClientHandler", new HttpClientHandler());}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);channelFuture.sync();Channel channel = channelFuture.channel();sendGetRequest(channel);// 等待通道關閉channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客戶端發生異常: ", e);} finally {// 遇到問題, 調用此方法后客戶端沒有正常關閉, 將netty版本4.1.20.FINAL切換到4.1.76.FINAL即可group.shutdownGracefully();log.info("關閉group-finally");}log.info("客戶端執行完畢");}private static void sendGetRequest(Channel channel) throws URISyntaxException {String url = "http://localhost:8080/test"; // 測試URLURI uri = new URI(url);String host = uri.getHost();String path = uri.getRawPath() + (uri.getRawQuery() == null ? "" : "?" + uri.getRawQuery());// 構建HTTP請求FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,HttpMethod.GET,path,Unpooled.EMPTY_BUFFER);request.headers().set(HttpHeaderNames.HOST, host).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE).set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);// 發送請求ChannelFuture channelFuture = channel.writeAndFlush(request);log.info("Request sent: " + request);}}

HttpClientHandler

@Slf4j
public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) {// 處理響應log.info("處理響應, 響應頭: {}", response.headers().toString());log.info("處理響應, 響應體: {}", response.content().toString(CharsetUtil.UTF_8));// 關閉連接ctx.channel().close();log.info("關閉連接");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info( "異常: {}", cause.getMessage());ctx.close();}
}

ws

Server端

WsServer

@Slf4j
public class WsServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));WebSocketServerProtocolConfig config = WebSocketServerProtocolConfig.newBuilder().websocketPath("/ws").checkStartsWith(true).build();pipeline.addLast("wsProtocolHandler", new WebSocketServerProtocolHandler(config));pipeline.addLast("wsServerHandler", new WsServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 9090);channelFuture.sync();log.info("ws服務啟動成功");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.error("服務端發生異常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("ws服務關閉");}}

WsServerHandler

@Slf4j
public class WsServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private static Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();private static AttributeKey<String> ATTRIBUTE_KEY_TOKEN = AttributeKey.valueOf("token");private static AttributeKey<Boolean> ATTRIBUTE_KEY_REPEAT = AttributeKey.valueOf("repeat");@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) throws Exception {log.info("【WsServerHandler->處理】:{}", webSocketFrame);if (webSocketFrame instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;log.info("【textWebSocketFrame.text()】:{}", textWebSocketFrame.text());sendAll(ctx.channel(), textWebSocketFrame.text());}}private void sendAll(Channel channel, String text) {CHANNELS.forEach((token, ch) -> {if (channel != ch) {ch.writeAndFlush(new TextWebSocketFrame(text));}});}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【WsServerHandler->userEventTriggered】: {}", evt);if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String requestUri = handshakeComplete.requestUri();String subprotocol = handshakeComplete.selectedSubprotocol();log.info("【requestUri】:{}", requestUri);log.info("【subprotocol】:{}", subprotocol);handleAuth(requestUri, ctx);}}private void handleAuth(String requestUri, ChannelHandlerContext ctx) {try {Map<String, String> queryParams = getQueryParams(requestUri);String token = queryParams.get("token");log.info("【token】:{}", token);if (token == null) {ctx.close();log.info("token為空, 關閉channel");} else {ctx.channel().attr(ATTRIBUTE_KEY_TOKEN).set(token);Channel oldChannel = CHANNELS.put(token, ctx.channel());if (oldChannel != null) {oldChannel.attr(ATTRIBUTE_KEY_REPEAT).set(true);oldChannel.close();} else {sendAll(ctx.channel(), "歡迎" + token + "進入聊天室");}}} catch (Exception e) {ctx.close();}}private static Map<String, String> getQueryParams(String requestUri) throws URISyntaxException {URI uri = new URI(requestUri);String query = uri.getQuery();Map<String, String> queryParams = new HashMap<>();if (query != null) {String[] params = query.split("&");for (String param : params) {String[] keyValue = param.split("=");String key = keyValue[0];String value = keyValue.length > 1 ? keyValue[1] : "";queryParams.put(key, value);}}return queryParams;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【WsServerHandler->exceptionCaught】", cause);}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerRemoved】");}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelUnregistered】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelInactive】");Channel channel = ctx.channel();Boolean isRepeat = channel.attr(ATTRIBUTE_KEY_REPEAT).get() != null&& channel.attr(ATTRIBUTE_KEY_REPEAT).get();if (!isRepeat) {CHANNELS.computeIfPresent(ctx.attr(ATTRIBUTE_KEY_TOKEN).get(), (key, ch) -> {CHANNELS.remove(channel.attr(ATTRIBUTE_KEY_TOKEN));sendAll(channel, channel.attr(ATTRIBUTE_KEY_TOKEN).get() + "離開聊天室");return null;});}}}

Client端

WsClient

@Slf4j
public class WsClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {CountDownLatch connectLatch = new CountDownLatch(1);Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpClientCodec());pipeline.addLast(new HttpObjectAggregator(10 * 1024));WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder().handleCloseFrames(false).build();WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:9090/ws/1?token=abc"),WebSocketVersion.V13,null,true,new DefaultHttpHeaders());pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker, config));pipeline.addLast(new WebSocketClientHandler(connectLatch));}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);Channel channel = channelFuture.channel();channelFuture.addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {System.err.println("Connection failed: " + future.cause());connectLatch.countDown(); // 確保不會死等}});// 等待連接完成(帶超時)if (!connectLatch.await(10, TimeUnit.SECONDS)) {throw new RuntimeException("Connection timed out");}Scanner sc = new Scanner(System.in);while (true) {System.out.print("請輸入:");String line = sc.nextLine();if (StringUtil.isNullOrEmpty(line)) {continue;}if ("exit".equals(line)) {channel.close();break;} else {// 發送消息WebSocketFrame frame = new TextWebSocketFrame(line);channelFuture.channel().writeAndFlush(frame);}}channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客戶端發生異常: ", e);} finally {group.shutdownGracefully();}}}

WebSocketClientHandler

@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private CountDownLatch connectLatch;public WebSocketClientHandler(CountDownLatch connectLatch) {this.connectLatch = connectLatch;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {// 處理接收到的WebSocket幀if (frame instanceof TextWebSocketFrame) {String text = ((TextWebSocketFrame) frame).text();System.out.println("Received: " + text);} else if (frame instanceof PingWebSocketFrame) {// 響應Ping幀ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));System.out.println("Responded to ping");} else if (frame instanceof CloseWebSocketFrame) {System.out.println("Received close frame");ctx.close();} else if (frame instanceof BinaryWebSocketFrame) {System.out.println("Received binary data: " + frame.content().readableBytes() + " bytes");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 處理握手完成事件if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {System.out.println("WebSocket handshake complete event");// 握手完成后可以發送初始消息connectLatch.countDown();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("WebSocket error: ");cause.printStackTrace();ctx.close();}}

protobuf

Server端

NettyServer

@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服務端接收到客戶端數據:{}", msg);if (msg instanceof StudentPOJO.Student) {StudentPOJO.Student student = (StudentPOJO.Student) msg;log.info( "客戶端發送的數據:{}, {}, {}", student, student.getId(), student.getName());}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught異常:", cause);ctx.close();}public void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("channelActive:{}", ctx.channel().remoteAddress());}public void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive:{}", ctx.channel().remoteAddress());}}

Student.proto

syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部類名,同時也是文件名
//protobuf 使用message 管理數據
message Student { //會在 StudentPOJO 外部類生成一個內部類 Student, 他是真正發送的POJO對象int32 id = 1; // Student 類中有 一個屬性 名字為 id 類型為int32(protobuf類型) 1表示屬性序號,不是值string name = 2;
}
// 執行命令 protoc.exe --java_out=生成路徑 Student.proto路徑

Client端

NettyClient

@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();log.info("客戶端連接成功");channel.closeFuture().sync();log.info("客戶端關閉");} catch (Exception e) {log.error("客戶端發生異常: ", e);}}}

NettyClientHandler

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "異常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("張三san").build();ctx.writeAndFlush(student);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/83379.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/83379.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/83379.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ThreadLocal ,底層原理,強引用,弱引用,內存泄漏

目錄 ThreadLocal的基本概念 底層實現原理 強引用與弱引用 內存泄漏問題 內存泄漏的解決方案 示例代碼 ThreadLocal的基本概念 ThreadLocal是Java中的一個類&#xff0c;位于java.lang包下&#xff0c;它提供了線程局部變量的功能。每個使用該變量的線程都有自己獨立的初…

TomSolver 庫 | config詳解及其測試

一、C 關鍵特性解析 1. enum class 強類型枚舉 enum class LogLevel { OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL }; enum class NonlinearMethod { NEWTON_RAPHSON, LM };核心特性&#xff1a; 類型安全&#xff1a;禁止隱式轉換為整數作用域限定&#xff1a;必須…

【DB2】ERRORCODE=-4499, SQLSTATE=08001

客戶在連接DB2壓測時報錯ERRORCODE-4499, SQLSTATE08001&#xff0c;連接失敗&#xff0c;主要是因為通信失敗 在本地進行復現&#xff0c;用DBeaver代替java程序&#xff0c;將DB2COMM從TCPIP置為空&#xff0c;重啟后重新連接&#xff0c;報一樣的錯誤 而將防火墻開啟&…

MicroPython+L298N+ESP32控制電機轉速

要使用MicroPython控制L298N電機驅動板來控制電機的轉速&#xff0c;你可以通過PWM&#xff08;脈沖寬度調制&#xff09;信號來調節電機速度。L298N是一個雙H橋驅動器&#xff0c;可以同時控制兩個電機的正反轉和速度。 硬件準備&#xff1a; 1. L298N 電機控制板 2. ESP32…

WPF 全局加載界面、多界面實現漸變過渡效果

WPF 全局加載界面與漸變過渡效果 完整實現方案 MainWindow.xaml <Window x:Class"LoadingScreenDemo.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml&quo…

RabbitMQ深度解析:從基礎實踐到高階架構設計

引言?? 在分布式系統與微服務架構主導的現代軟件開發中&#xff0c;服務間通信的可靠性、異步處理能力及流量管控成為核心挑戰。??RabbitMQ??作為基于AMQP協議的企業級消息中間件&#xff0c;憑借其靈活的路由機制、高可用架構與豐富的擴展能力&#xff0c;成為異步通信…

華為OD機試真題——矩形相交的面積(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳實現

2025 A卷 100分 題型 本專欄內全部題目均提供Java、python、JavaScript、C、C++、GO六種語言的最佳實現方式; 并且每種語言均涵蓋詳細的問題分析、解題思路、代碼實現、代碼詳解、3個測試用例以及綜合分析; 本文收錄于專欄:《2025華為OD真題目錄+全流程解析+備考攻略+經驗分…

基于隨機函數鏈接神經網絡(RVFL)的鋰電池健康狀態(SOH)預測

基于隨機函數鏈接神經網絡(RVFL)的鋰電池健康狀態(SOH)預測 一、RVFL網絡的基本原理與結構 隨機向量功能鏈接(Random Vector Functional Link, RVFL)網絡是一種單隱藏層前饋神經網絡的隨機化版本,其核心特征在于輸入層到隱藏層的權重隨機生成且固定,輸出層權重通過最…

阿里云國際站,如何通過代理商邀請的鏈接注冊賬號

阿里云國際站&#xff1a;如何通過代理商邀請鏈接注冊&#xff0c;解鎖“云端超能力”與專屬福利&#xff1f; 渴望在全球化浪潮中搶占先機&#xff1f;想獲得阿里云國際站的海量云資源、遍布全球的加速節點與前沿AI服務&#xff0c;同時又能享受專屬折扣、VIP級增值服務支持或…

PMOS以及電源轉換電路設計

PMOS的使用 5V_EN5V時&#xff0c;PMOS截止&#xff1b; 5V_EN0V時&#xff0c;PMOS導通&#xff1b; 電源轉換電路 當Vout0V時&#xff0c;Vg0V, Vgs>Vth, PMOS導通&#xff0c;只有電池供電&#xff1b; 當Vout5V時&#xff0c;Vg4.9V, Vs4.8V?, Vgs<Vth, PMOS截止&am…

云時代:DMZ安全架構的演進與實踐

隨著云計算的普及,傳統的DMZ安全邊界正在經歷根本性變革。本文探討如何在云環境中重新設計和實現DMZ架構,以應對現代安全挑戰。 1. 傳統DMZ與云DMZ的對比 傳統DMZ(隔離區)是網絡安全的經典架構,但云環境帶來了新的挑戰: 特性傳統DMZ云DMZ物理邊界明確的物理網絡分區虛擬網…

mqtt協議連接阿里云平臺

首先現在的阿里云物聯網平臺已經不在新購了&#xff0c;如下圖所示&#xff1a; 解決辦法&#xff1a;在咸魚上租用一個賬號&#xff0c;先用起來。 搭建阿里云平臺&#xff0c;參考博客&#xff1a; &#xff08;一&#xff09;MQTT連接阿里云物聯網平臺&#xff08;小白向&…

職業本科院校無人機專業人才培養解決方案

2023年的中央經濟工作會議強調了以科技創新推動現代化產業體系構建的重要性&#xff0c;并提出發展生物制造、商業航天、低空經濟等戰略性新興產業。低空經濟&#xff0c;依托民用無人機等低空飛行器&#xff0c;在多場景低空飛行活動的牽引下&#xff0c;正逐步形成一個輻射廣…

Go語言字符串類型詳解

1. 定義字符串類型 package mainimport ("fmt");func main() {var str1 string "你好 GoLang 1"var str2 "你好 GoLang 2"str3 : "你好 GoLang 3"fmt.Printf("%v--%T\n", str1, str1)// 你好 GoLang 1--stringfmt.Printf…

設計模式——中介者設計模式(行為型)

摘要 文章詳細介紹了中介者設計模式&#xff0c;這是一種行為型設計模式&#xff0c;通過中介者對象封裝多個對象間的交互&#xff0c;降低系統耦合度。文中闡述了其核心角色、優缺點、適用場景&#xff0c;并通過類圖、時序圖、實現方式、實戰示例等多方面進行講解&#xff0…

也說字母L:柔軟的長舌

英語單詞 tongue&#xff0c;意為“舌頭” tongue n.舌&#xff0c;舌頭&#xff1b;語言 很顯然&#xff0c;“語言”是引申義&#xff0c;因為語言是抽象的&#xff0c;但舌頭是具象的&#xff0c;根據由簡入繁的原則&#xff0c;tongue顯然首先是象形起義&#xff0c;表達…

性能測試實例(http和ldap協議壓測)

一、某授權服務器生成授權碼效率驗證&#xff08;http協議&#xff09; 測試背景 在存量數據23萬條的情況下&#xff0c;生成一條授權數據&#xff0c;需要10秒左右&#xff0c;用戶反應數據生成效率太差&#xff0c;需要優化。初步判斷是由于在授權數據生成時&#xff0c;有查…

Spring Boot中的事件與JMS消息集成

Spring Boot事件機制 Spring框架的事件處理是其核心特性之一,通過ApplicationEvent類和ApplicationListener接口實現。在Spring Boot應用中,事件機制是實現模塊間消息傳遞的重要方式,通常用于業務邏輯內部通信。 內置事件類型 Spring應用上下文在啟動時會觸發多種內置事件…

第12次12: 修改和刪除收貨地址

第1步&#xff1a;在users應用下views.py中新增實現修改收貨地址的視圖類 class UpdateDestroyAddressView(LoginRequiredJSONMixin, View):def put(self, request, address_id):"""修改收貨地址"""json_dict json.loads(request.body.decode(…

python常用庫-pandas、Hugging Face的datasets庫(大模型之JSONL(JSON Lines))

文章目錄 python常用庫pandas、Hugging Face的datasets庫&#xff08;大模型之JSONL&#xff08;JSON Lines&#xff09;&#xff09;背景什么是JSONL&#xff08;JSON Lines&#xff09;通過pandas讀取和保存JSONL文件pandas讀取和保存JSONL文件 Hugging Face的datasets庫Hugg…