支持協議
- TCP/UDP
- HTTP/HTTPS
- WebSocket
- SPDY/HTTP2
- MQTT/CoAP
服務端
常用類
ServerBootstrap
服務端配置類
//設置線程組、parentGroup處理連接、childGroup處理I/O
group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
//Channel通過何種方式獲取新的連接(NioServerSocketChannel、NioSocketChannel)
channel(Class<? extends C> channelClass)
//ServerChannel一些配置項、ChannelOption.SO_BACKLOG
option(ChannelOption option, T value)
//ServerChannel的子Channel的選項
childOption(ChannelOption childOption, T value)
//自定義ChannelInboundHandlerAdapter、編碼解碼器等
childHandler(ChannelHandler childHandler)
NioServerSocketChannel
服務端通道
Bootstrap
客戶端配置類
NioSocketChannel
客戶端通道
EventLoopGroup
事件循環組,就是個定時器任務,線程組。
NioEventLoopGroup
ChannelFuture
尚未發生的 I/O 操作
ChannelInboundHandlerAdapter
在接收到新的請求進行回調、這個類定義了一系列回調方法。常見的回調如下:
channelRead(ChannelHandlerContext ctx, Object msg) // 收到消息調用
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) // 異常調用
channelInactive(ChannelHandlerContext ctx) throws Exception //連接斷開
channelActive(ChannelHandlerContext ctx) throws Exception //連接建立
ChannelOutboundHandlerAdapter
在請求進行響應、這個類定義了一系列適配方法。常見的適配如下:
代碼示例
1、新建通道處理類處理每一次I/O
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;/*** Handles a server-side channel.*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)// Discard the received data silently.((ByteBuf) msg).release(); // (3)}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)// Close the connection when an exception is raised.cause.printStackTrace();ctx.close();}
}
2、創建一個netty服務端
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Discards any incoming data.*/
public class DiscardServer {private int port;public DiscardServer(int port) {this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new ChannelInitializer<SocketChannel>() { // (4)@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128) // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// Bind and start to accept incoming connections.// 服務端綁定端口ChannelFuture f = b.bind(port).sync(); // (7)// Wait until the server socket is closed.// In this example, this does not happen, but you can do that to gracefully// shut down your server.// 一直阻塞知道通道關閉、再優雅的關閉線程組shutdownGracefully()f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {int port = 8080;if (args.length > 0) {port = Integer.parseInt(args[0]);}new DiscardServer(port).run();}
}
客戶端
代碼示例
public static void main(String[] args) throws Exception {String host = "192.168.32.29";int port = 8009;EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeClientHandler());}});// Start the client.ChannelFuture f = b.connect(host, port).sync(); // (5)// Wait until the connection is closed.f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}}
數據解析
1、為了正確的解析傳遞的數據、粘包、半包等問題。需要明確包的固定長度或者固定的解析方法、如長度、特殊字符結尾等等。
2、編寫編碼、解碼類 注意:新的編碼、解碼器需要在通道內配置 ChannelInitializer
public class TimeDecoder extends ByteToMessageDecoder { // (1)@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)if (in.readableBytes() < 4) {return; // (3)}//自定義UnixTime的POJO、從通道讀取四個字節、做一次轉換out.add(new UnixTime(in.readUnsignedInt())); //(4)}
}
解碼器
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {@Overrideprotected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {out.writeInt((int)msg.value());}
}
也可以通過ChannelOutboundHandler 實現將回傳POJO轉換為 ByteBuf。這比編寫解碼器要簡單得多,因為在對消息進行編碼時無需處理數據包碎片和匯編。UnixTime
public class TimeEncoder extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {UnixTime m = (UnixTime) msg;ByteBuf encoded = ctx.alloc().buffer(4);encoded.writeInt((int)m.value());ctx.write(encoded, promise); // (1)}
}
TimeDecoder 應用到通道中指定TimeClientHandler
b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());}
});