什么是Netty?
Netty是一個基于Java NIO的異步事件驅動的網絡應用框架,主要用于快速開發高性能、高可靠性的網絡服務器和客戶端程序。它簡化了網絡編程的復雜性,提供了豐富的協議支持,被廣泛應用于各種高性能網絡應用中。
為什么選擇Netty?
- 高性能:基于NIO,支持異步非阻塞I/O
- 高并發:能夠處理大量并發連接
- 易用性:提供了簡潔的API,降低了網絡編程的復雜度
- 可擴展性:模塊化設計,易于擴展和定制
- 穩定性:經過大量生產環境驗證
核心概念
Channel(通道)
Channel是Netty網絡操作的基礎,代表一個到實體的連接,如硬件設備、文件、網絡套接字等。
EventLoop(事件循環)
EventLoop負責處理Channel上的I/O操作,一個EventLoop可以處理多個Channel。
ChannelHandler(通道處理器)
ChannelHandler處理I/O事件,如連接建立、數據讀取、數據寫入等。
Pipeline(管道)
Pipeline是ChannelHandler的容器,定義了ChannelHandler的處理順序。
基本代碼演示
1. Maven依賴配置
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.94.Final</version>
</dependency>
2.?簡單的Echo服務器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class EchoServer {private final int port;public EchoServer(int port) {this.port = port;}public void start() throws Exception {// 創建boss線程組,用于接收連接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 創建worker線程組,用于處理連接EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 創建服務器啟動引導類ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加字符串編解碼器pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());// 添加自定義處理器pipeline.addLast(new EchoServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// 綁定端口并啟動服務器ChannelFuture future = bootstrap.bind(port).sync();System.out.println("Echo服務器啟動成功,監聽端口: " + port);// 等待服務器關閉future.channel().closeFuture().sync();} finally {// 優雅關閉線程組bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new EchoServer(8080).start();}
}
3. 服務器處理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String message = (String) msg;System.out.println("服務器收到消息: " + message);// 回顯消息給客戶端ctx.writeAndFlush("服務器回復: " + message + "\n");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端連接: " + ctx.channel().remoteAddress());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端斷開連接: " + ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
4. 客戶端實現
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;public class EchoClient {private final String host;private final int port;public EchoClient(String host, int port) {this.host = host;this.port = port;}public void start() throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new EchoClientHandler());}});// 連接服務器ChannelFuture future = bootstrap.connect(host, port).sync();Channel channel = future.channel();System.out.println("連接到服務器: " + host + ":" + port);// 從控制臺讀取輸入并發送Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String line = scanner.nextLine();if ("quit".equals(line)) {break;}channel.writeAndFlush(line + "\n");}// 關閉連接channel.closeFuture().sync();} finally {group.shutdownGracefully();}}public static void main(String[] args) throws Exception {new EchoClient("localhost", 8080).start();}
}
5. 客戶端處理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class EchoClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String message = (String) msg;System.out.println("客戶端收到消息: " + message);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端連接成功");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
運行步驟
- 首先運行?EchoServer?類啟動服務器
- 然后運行?EchoClient?類啟動客戶端
- 在客戶端控制臺輸入消息,服務器會回顯消息
運行EchoServer 輸出:
09:53:01.748 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
09:53:01.755 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
09:53:01.780 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
09:53:01.780 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
09:53:01.820 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
09:53:01.821 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
09:53:01.822 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
09:53:01.823 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
09:53:01.823 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available
09:53:01.824 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
09:53:01.824 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
09:53:01.825 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
09:53:01.826 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\H-ZHON~1\AppData\Local\Temp (java.io.tmpdir)
09:53:01.826 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
09:53:01.827 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
09:53:01.830 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3678404608 bytes
09:53:01.830 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
09:53:01.832 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
09:53:01.832 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
09:53:01.833 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
09:53:01.833 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
09:53:01.849 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
09:53:02.186 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 2552 (auto-detected)
09:53:02.187 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
09:53:02.187 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
09:53:02.489 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
09:53:02.490 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
09:53:02.744 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:ff:86:ff:fe:50:58:66 (auto-detected)
09:53:02.760 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
09:53:02.760 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false
09:53:02.786 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
09:53:02.796 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
Echo服務器啟動成功,監聽端口: 8080
運行?EchoClient,控制臺輸入消息, 輸出:
09:55:09.541 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
09:55:09.547 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
09:55:09.568 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
09:55:09.568 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
09:55:09.597 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
09:55:09.597 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
09:55:09.599 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
09:55:09.599 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
09:55:09.600 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available
09:55:09.600 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
09:55:09.601 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
09:55:09.602 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\H-ZHON~1\AppData\Local\Temp (java.io.tmpdir)
09:55:09.602 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
09:55:09.603 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
09:55:09.605 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3678404608 bytes
09:55:09.605 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
09:55:09.606 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
09:55:09.607 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
09:55:09.607 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
09:55:09.608 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
09:55:09.618 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
09:55:10.051 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 21460 (auto-detected)
09:55:10.054 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
09:55:10.054 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
09:55:10.475 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
09:55:10.477 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
09:55:10.773 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:ff:86:ff:fe:50:58:66 (auto-detected)
09:55:10.786 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
09:55:10.787 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false
09:55:10.813 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
09:55:10.821 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
09:55:10.821 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
09:55:10.822 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
連接到服務器: localhost:8080
客戶端連接成功09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.chunkSize: 32
09:56:56.659 [main] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.blocking: false
09:56:56.672 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
09:56:56.672 [nioEventLoopGroup-2-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
09:56:56.673 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@692bca4c
客戶端收到消息: 服務器回復: hello
客戶端收到消息: 服務器回復: hellotestNettyServer
客戶端收到消息: 服務器回復: testNettyServer
代碼解析
服務器端關鍵點:
- EventLoopGroup:創建兩個線程組,bossGroup負責接收連接,workerGroup負責處理連接
- ServerBootstrap:服務器啟動引導類,配置各種參數
- ChannelInitializer:初始化Channel,添加編解碼器和處理器
- Pipeline:處理器鏈,定義了消息處理的順序
客戶端關鍵點:
- Bootstrap:客戶端啟動引導類
- 連接建立:通過connect方法連接到服務器
- 消息發送:通過Channel發送消息
實際應用建議
- 合理配置線程池大小:根據CPU核心數和業務特點調整