在閱讀這篇文章前,推薦先閱讀:[netty5: MessageToMessageCodec & MessageToMessageEncoder & MessageToMessageDecoder]-源碼分析
WebSocketProtocolHandler
WebSocketProtocolHandler
是 WebSocket 處理的基礎抽象類,負責管理 WebSocket 幀的解碼、關閉流程及通用協議邏輯。
abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocketFrame> {private final boolean dropPongFrames;private final WebSocketCloseStatus closeStatus;private final long forceCloseTimeoutMillis;private Promise<Void> closeSent;WebSocketProtocolHandler() {this(true);}WebSocketProtocolHandler(boolean dropPongFrames) {this(dropPongFrames, null, 0L);}WebSocketProtocolHandler(boolean dropPongFrames, WebSocketCloseStatus closeStatus, long forceCloseTimeoutMillis) {this.dropPongFrames = dropPongFrames;this.closeStatus = closeStatus;this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;}@Overrideprotected void decode(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {throw new UnsupportedOperationException("WebSocketProtocolHandler use decodeAndClose().");}@Overrideprotected void decodeAndClose(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {if (frame instanceof PingWebSocketFrame) {try (frame) {ctx.writeAndFlush(new PongWebSocketFrame(frame.binaryData().send()));}readIfNeeded(ctx);return;}if (frame instanceof PongWebSocketFrame && dropPongFrames) {frame.close();readIfNeeded(ctx);return;}ctx.fireChannelRead(frame);}private static void readIfNeeded(ChannelHandlerContext ctx) {if (!ctx.channel().getOption(ChannelOption.AUTO_READ)) {ctx.read();}}@Overridepublic Future<Void> close(final ChannelHandlerContext ctx) {if (closeStatus == null || !ctx.channel().isActive()) {return ctx.close();}final Future<Void> future = closeSent == null ?write(ctx, new CloseWebSocketFrame(ctx.bufferAllocator(), closeStatus)) : closeSent.asFuture();flush(ctx);applyCloseSentTimeout(ctx);Promise<Void> promise = ctx.newPromise();future.addListener(f -> ctx.close().cascadeTo(promise));return promise.asFuture();}@Overridepublic Future<Void> write(final ChannelHandlerContext ctx, Object msg) {if (closeSent != null) {Resource.dispose(msg);return ctx.newFailedFuture(new ClosedChannelException());}if (msg instanceof CloseWebSocketFrame) {Promise<Void> promise = ctx.newPromise();closeSent(promise);ctx.write(msg).cascadeTo(closeSent);return promise.asFuture();}return ctx.write(msg);}void closeSent(Promise<Void> promise) {closeSent = promise;}private void applyCloseSentTimeout(ChannelHandlerContext ctx) {if (closeSent.isDone() || forceCloseTimeoutMillis < 0) {return;}Future<?> timeoutTask = ctx.executor().schedule(() -> {if (!closeSent.isDone()) {closeSent.tryFailure(buildHandshakeException("send close frame timed out"));}}, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);closeSent.asFuture().addListener(future -> timeoutTask.cancel());}protected WebSocketHandshakeException buildHandshakeException(String message) {return new WebSocketHandshakeException(message);}@Overridepublic void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireChannelExceptionCaught(cause);ctx.close();}
}
WebSocketServerProtocolHandler
WebSocketServerProtocolHandler
負責在服務器端管理 WebSocket 握手、幀的解碼與關閉處理,并支持協議校驗與異常處理。
public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {private static final AttributeKey<WebSocketServerHandshaker> HANDSHAKER_ATTR_KEY = AttributeKey.valueOf(WebSocketServerHandshaker.class, "HANDSHAKER");private final WebSocketServerProtocolConfig serverConfig;public WebSocketServerProtocolHandler(WebSocketServerProtocolConfig serverConfig) {super(Objects.requireNonNull(serverConfig, "serverConfig").dropPongFrames(),serverConfig.sendCloseFrame(),serverConfig.forceCloseTimeoutMillis());this.serverConfig = serverConfig;}// `handlerAdded` 方法負責在 ChannelPipeline 中動態添加握手處理器和 UTF-8 校驗器,確保 WebSocket 握手和數據幀合法性校驗功能生效。@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {ChannelPipeline cp = ctx.pipeline();if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {// Add the WebSocketHandshakeHandler before this one.cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),new WebSocketServerProtocolHandshakeHandler(serverConfig));}if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {// Add the UFT8 checking before this one.cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),new Utf8FrameValidator(serverConfig.decoderConfig().closeOnProtocolViolation()));}}@Overrideprotected void decodeAndClose(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {// 當收到關閉幀時,優先通過已綁定的 WebSocketServerHandshaker 進行優雅關閉,否則直接關閉連接;非關閉幀則繼續正常處理。if (serverConfig.handleCloseFrames() && frame instanceof CloseWebSocketFrame) {WebSocketServerHandshaker handshaker = getHandshaker(ctx.channel());if (handshaker != null) {Promise<Void> promise = ctx.newPromise();closeSent(promise);handshaker.close(ctx, (CloseWebSocketFrame) frame).cascadeTo(promise);} else {frame.close();ctx.writeAndFlush(ctx.bufferAllocator().allocate(0)).addListener(ctx, ChannelFutureListeners.CLOSE);}return;}super.decodeAndClose(ctx, frame);}@Overrideprotected WebSocketServerHandshakeException buildHandshakeException(String message) {return new WebSocketServerHandshakeException(message);}@Overridepublic void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof WebSocketHandshakeException) {final byte[] bytes = cause.getMessage().getBytes();FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST,ctx.bufferAllocator().allocate(bytes.length).writeBytes(bytes));ctx.channel().writeAndFlush(response).addListener(ctx, ChannelFutureListeners.CLOSE);} else {ctx.fireChannelExceptionCaught(cause);ctx.close();}}static WebSocketServerHandshaker getHandshaker(Channel channel) {return channel.attr(HANDSHAKER_ATTR_KEY).get();}static void setHandshaker(Channel channel, WebSocketServerHandshaker handshaker) {channel.attr(HANDSHAKER_ATTR_KEY).set(handshaker);}
}
WebSocketClientProtocolHandler
WebSocketClientProtocolHandler
是 Netty 中用于處理 WebSocket 客戶端協議升級、幀處理與自動注入握手與 UTF-8 校驗器的核心 ChannelHandler。
public class WebSocketClientProtocolHandler extends WebSocketProtocolHandler {private final WebSocketClientHandshaker handshaker;private final WebSocketClientProtocolConfig clientConfig;public WebSocketClientHandshaker handshaker() {return handshaker;}public WebSocketClientProtocolHandler(WebSocketClientProtocolConfig clientConfig) {super(Objects.requireNonNull(clientConfig, "clientConfig").dropPongFrames(),clientConfig.sendCloseFrame(), clientConfig.forceCloseTimeoutMillis());this.handshaker = WebSocketClientHandshakerFactory.newHandshaker(clientConfig.webSocketUri(),clientConfig.version(),clientConfig.subprotocol(),clientConfig.allowExtensions(),clientConfig.customHeaders(),clientConfig.maxFramePayloadLength(),clientConfig.performMasking(),clientConfig.allowMaskMismatch(),clientConfig.forceCloseTimeoutMillis(),clientConfig.absoluteUpgradeUrl(),clientConfig.generateOriginHeader());this.clientConfig = clientConfig;}@Overrideprotected void decodeAndClose(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {if (clientConfig.handleCloseFrames() && frame instanceof CloseWebSocketFrame) {Resource.dispose(frame);ctx.close();return;}super.decodeAndClose(ctx, frame);}@Overrideprotected WebSocketClientHandshakeException buildHandshakeException(String message) {return new WebSocketClientHandshakeException(message);}// `handlerAdded` 方法會在當前 Handler 加入 pipeline 時,// 自動向其前方插入握手處理器和(可選的)UTF-8 校驗器,以確保 WebSocket 客戶端協議的正確初始化與安全性。@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {ChannelPipeline cp = ctx.pipeline();if (cp.get(WebSocketClientProtocolHandshakeHandler.class) == null) {// Add the WebSocketClientProtocolHandshakeHandler before this one.ctx.pipeline().addBefore(ctx.name(), WebSocketClientProtocolHandshakeHandler.class.getName(),new WebSocketClientProtocolHandshakeHandler(handshaker, clientConfig.handshakeTimeoutMillis()));}if (clientConfig.withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {// Add the UFT8 checking before this one.ctx.pipeline().addBefore(ctx.name(), Utf8FrameValidator.class.getName(),new Utf8FrameValidator());}}
}