netty中的Future,繼承自 jdk中的Future,, jdk中的Future,很垃圾,只能同步阻塞獲取結果,,,
netty中的Future進行了升級,,可以addListener()
異步獲取結果,,可以isSuccess()
判斷任務成功還是失敗,,
- jdk的Future
- get()
- isDone()
- cancel() : 取消當前任務
public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService service = Executors.newFixedThreadPool(2);Future<Integer> future = service.submit(() -> {log.debug("running...");Thread.sleep(2000);return 2;});Integer i = future.get();log.debug("i = " + i);}
- netty中的Future
- isSuccess() : 判斷任務是否成功
- sync() : 同步等待,,任務不成功會拋錯
- getNow() : 獲取結果,沒有就返回null
- await() : 同步等待,,任務不成功不會報錯,,后面通過isSuccess()判斷是否成功
- addListener() : 任務結束回調
public static void main(String[] args) {// netty中的線程池 eventLoop,, eventloop中就一個線程NioEventLoopGroup group = new NioEventLoopGroup(2);EventLoop eventLoop = group.next();Future<String> future = eventLoop.submit(() -> {Thread.sleep(2000);return "hehe";});String now = future.getNow();System.out.println("now = " + now);boolean success = future.isSuccess();System.out.println("success = " + success);future.addListener(new GenericFutureListener<Future<? super String>>() {@Overridepublic void operationComplete(Future<? super String> future) throws Exception {Object now1 = future.getNow();System.out.println("now1 = " + now1);boolean success = future.isSuccess();System.out.println("success = " + success);}});}
- netty中的Promise
繼承自netty的Future,
Promise可以設置成功和失敗,,不用等到任務結束
public static void main(String[] args) throws ExecutionException, InterruptedException {EventLoopGroup group = new NioEventLoopGroup(2);EventLoop eventLoop = group.next();// 主動創建promise ===> 結果的容器,DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);new Thread(()->{System.out.println("開始計算");try {int i = 1/0;Thread.sleep(1000);promise.setSuccess(1000);} catch (InterruptedException e) {e.printStackTrace();promise.setFailure(e);}//}).start();Integer i = promise.get();System.out.println("i = " + i);}
ChannelHandler
netty中handler分為兩類:
- ChannelInboundHandler : 入站,, 讀取數據,,,channel按照添加順序依次執行
- ChannelOutboundHandler :出站 : 發送數據,,channel 逆序執行
channel.wirte() : 從末尾逆序執行
ctx.wirte() : 是從當前的handler,往前面找ChannelOutboundHandler執行
public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加處理器 ,,, netty會自動添加兩個handler,,一個叫head,,一個叫tail,,,// 底層是 雙向鏈表pipeline.addLast("handle01",new ChannelInboundHandlerAdapter(){// 入站的handler,,一般關心的 read@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg:{}",msg);ByteBuf byteBuf = (ByteBuf) msg;String s = byteBuf.toString(Charset.defaultCharset());// 調用下一個handle ctx.fireChannelRead(msg);,,并且將處理完成的結果,傳遞給下一個handlersuper.channelRead(ctx, s);}});pipeline.addLast("handle02",new ChannelInboundHandlerAdapter(){// 入站的handler,,一般關心的 read@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg222:{}",msg);User user = new User();user.setName(((String) msg));super.channelRead(ctx, user);}});pipeline.addLast("handle03",new ChannelInboundHandlerAdapter(){// 入站的handler,,一般關心的 read@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg333:{}",msg);super.channelRead(ctx, msg);ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));}});// 出站是,,從后面往前走 ,,只有有寫出的時候,才會觸發出站方法,,,,pipeline.addLast("handle04",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("msg444:{}",msg);super.write(ctx, msg, promise);}});pipeline.addLast("handle05",new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("msg555:{}",msg);super.write(ctx, msg, promise);}});}}).bind(new InetSocketAddress(8080));}
EmbeddedChannel 模擬channel執行
public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("msg = " + msg);super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h2 = new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception {System.out.println(4444);}};EmbeddedChannel channel = new EmbeddedChannel(h1,h2);
// channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes()));channel.writeOutbound(channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hehe".getBytes())));}