在閱讀這篇文章前,推薦先閱讀以下內容:
- [netty5: WebSocketFrame]-源碼分析
- [netty5: WebSocketFrameEncoder & WebSocketFrameDecoder]-源碼解析
WebSocketServerHandshakerFactory
WebSocketServerHandshakerFactory
用于根據客戶端請求中的 WebSocket 版本構造對應的 WebSocketServerHandshaker
實例,完成握手協議版本的協商與支持判斷。
public class WebSocketServerHandshakerFactory {private final String webSocketURL;private final String subprotocols;private final WebSocketDecoderConfig decoderConfig;// ...public WebSocketServerHandshakerFactory(String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {this.webSocketURL = webSocketURL;this.subprotocols = subprotocols;this.decoderConfig = Objects.requireNonNull(decoderConfig, "decoderConfig");}public WebSocketServerHandshaker newHandshaker(HttpRequest req) {return resolveHandshaker0(req, webSocketURL, subprotocols, decoderConfig);}public static WebSocketServerHandshaker resolveHandshaker(HttpRequest req, String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {Objects.requireNonNull(decoderConfig, "decoderConfig");return resolveHandshaker0(req, webSocketURL, subprotocols, decoderConfig);}private static WebSocketServerHandshaker resolveHandshaker0(HttpRequest req, String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);if (version != null && AsciiString.contentEqualsIgnoreCase(version, WebSocketVersion.V13.toAsciiString())) {// Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).return new WebSocketServerHandshaker13(webSocketURL, subprotocols, decoderConfig);}return null;}public static Future<Void> sendUnsupportedVersionResponse(Channel channel) {HttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.UPGRADE_REQUIRED, channel.bufferAllocator().allocate(0));res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue());HttpUtil.setContentLength(res, 0);return channel.writeAndFlush(res);}
}
WebSocketServerHandshaker13
WebSocketServerHandshaker13
負責基于 RFC 6455 實現 WebSocket 版本 13 的服務端握手處理流程,包括請求校驗、響應生成、子協議協商和幀編解碼器的安裝。
public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {public WebSocketServerHandshaker13(String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {super(WebSocketVersion.V13, webSocketURL, subprotocols, decoderConfig);}/*** <p>* Handle the web socket handshake for the web socket specification <a href=* "https://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17">HyBi versions 13-17</a>. Versions 13-17* share the same wire protocol.* </p>** <p>* Browser request to the server:* </p>** <pre>* GET /chat HTTP/1.1* Host: server.example.com* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==* Origin: http://example.com* Sec-WebSocket-Protocol: chat, superchat* Sec-WebSocket-Version: 13* </pre>** <p>* Server response:* </p>** <pre>* HTTP/1.1 101 Switching Protocols* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=* Sec-WebSocket-Protocol: chat* </pre>*/@Overrideprotected FullHttpResponse newHandshakeResponse(BufferAllocator allocator, FullHttpRequest req, HttpHeaders headers) {HttpMethod method = req.method();if (!HttpMethod.GET.equals(method)) {throw new WebSocketServerHandshakeException("Invalid WebSocket handshake method: " + method, req);}HttpHeaders reqHeaders = req.headers();if (!reqHeaders.contains(HttpHeaderNames.CONNECTION) || !reqHeaders.containsIgnoreCase(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)) {throw new WebSocketServerHandshakeException("not a WebSocket request: a |Connection| header must includes a token 'Upgrade'", req);}if (!reqHeaders.containsIgnoreCase(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET)) {throw new WebSocketServerHandshakeException("not a WebSocket request: a |Upgrade| header must containing the value 'websocket'", req);}CharSequence key = reqHeaders.get(HttpHeaderNames.SEC_WEBSOCKET_KEY);if (key == null) {throw new WebSocketServerHandshakeException("not a WebSocket request: missing key", req);}FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS,allocator.allocate(0));if (headers != null) {res.headers().add(headers);}String accept = WebSocketUtil.calculateV13Accept(key.toString());res.headers().set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE).set(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT, accept);CharSequence subprotocols = reqHeaders.get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);if (subprotocols != null) {String selectedSubprotocol = selectSubprotocol(subprotocols.toString());if (selectedSubprotocol == null) {if (logger.isDebugEnabled()) {logger.debug("Requested subprotocol(s) not supported: {}", subprotocols);}} else {res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, selectedSubprotocol);}}return res;}@Overrideprotected WebSocketFrameDecoder newWebsocketDecoder() {return new WebSocket13FrameDecoder(decoderConfig());}@Overrideprotected WebSocketFrameEncoder newWebSocketEncoder() {return new WebSocket13FrameEncoder(false);}
}
WebSocketServerHandshaker
WebSocketServerHandshaker
是 WebSocket 握手處理的抽象基類,定義了服務端握手響應、子協議選擇和編解碼器安裝等通用邏輯,供具體版本(如 V13)實現。
public abstract class WebSocketServerHandshaker {protected static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandshaker.class);private final String uri;private final String[] subprotocols;private final WebSocketVersion version;private final WebSocketDecoderConfig decoderConfig;private String selectedSubprotocol;public static final String SUB_PROTOCOL_WILDCARD = "*";protected WebSocketServerHandshaker(WebSocketVersion version, String uri, String subprotocols, WebSocketDecoderConfig decoderConfig) {this.version = version;this.uri = uri;if (subprotocols != null) {String[] subprotocolArray = subprotocols.split(",");for (int i = 0; i < subprotocolArray.length; i++) {subprotocolArray[i] = subprotocolArray[i].trim();}this.subprotocols = subprotocolArray;} else {this.subprotocols = EmptyArrays.EMPTY_STRINGS;}this.decoderConfig = requireNonNull(decoderConfig, "decoderConfig");}// 將當前 Handshaker 支持的子協議數組轉換為有序去重的 Set 返回,用于后續子協議協商。public Set<String> subprotocols() {Set<String> ret = new LinkedHashSet<>();Collections.addAll(ret, subprotocols);return ret;}// WebSocketServerProtocolHandshakeHandler.channelRead// 執行 WebSocket 握手響應、替換或插入編解碼器并清理不兼容的 HTTP 處理器,最終完成協議切換。public Future<Void> handshake(Channel channel, FullHttpRequest req) {return handshake(channel, req, null);}public final Future<Void> handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders) {if (logger.isDebugEnabled()) {logger.debug("{} WebSocket version {} server handshake", channel, version());}// WebSocketServerHandshaker13.newHandshakeResponseFullHttpResponse response = newHandshakeResponse(channel.bufferAllocator(), req, responseHeaders);// 移除 HttpObjectAggregator 和 HttpContentCompressorChannelPipeline p = channel.pipeline();if (p.get(HttpObjectAggregator.class) != null) {p.remove(HttpObjectAggregator.class);}if (p.get(HttpContentCompressor.class) != null) {p.remove(HttpContentCompressor.class);}ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);final String encoderName;if (ctx == null) {// this means the user use an HttpServerCodecctx = p.context(HttpServerCodec.class);if (ctx == null) {response.close();return channel.newFailedFuture(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));}p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());encoderName = ctx.name();} else {p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());encoderName = p.context(HttpResponseEncoder.class).name();p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());}return channel.writeAndFlush(response).addListener(channel, (ch, future) -> {if (future.isSuccess()) {ChannelPipeline p1 = ch.pipeline();p1.remove(encoderName);}});}// 處理非 FullHttpRequest 的 WebSocket 握手場景,通過臨時注入 ChannelHandler 聚合請求數據并完成協議切換public Future<Void> handshake(Channel channel, HttpRequest req) {return handshake(channel, req, null);}// 在沒有使用 HttpObjectAggregator 的情況下,// 動態地通過臨時注入一個 ChannelHandler 來手動聚合 HTTP 請求的各個部分// 最終組裝成一個 FullHttpRequest,完成 WebSocket 握手的流程public final Future<Void> handshake(final Channel channel, HttpRequest req, final HttpHeaders responseHeaders) {// 如果傳進來的 req 已經是 FullHttpRequest,直接調用已有的 handshake(Channel, FullHttpRequest, HttpHeaders) 方法處理。// 否則,說明請求是分段的(HttpRequest + HttpContent),需要手動聚合。if (req instanceof FullHttpRequest) {return handshake(channel, (FullHttpRequest) req, responseHeaders);}ChannelPipeline pipeline = channel.pipeline();// 先在 ChannelPipeline 里找 HttpRequestDecoder 的 ChannelHandlerContext。// 如果沒找到,再找 HttpServerCodec。// 如果都沒找到,直接失敗,返回異常。ChannelHandlerContext ctx = pipeline.context(HttpRequestDecoder.class);if (ctx == null) {// This means the user use a HttpServerCodecctx = pipeline.context(HttpServerCodec.class);if (ctx == null) {return channel.newFailedFuture(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));}}// 動態注入一個臨時的 ChannelHandlerAdapter,名字叫 "handshaker"// 它的職責是監聽接下來流入的 HttpObject 消息,把 HttpRequest、HttpContent、LastHttpContent 等部分組裝成一個完整的 FullHttpRequest// 當完整請求組裝完成后:// 1. 立刻移除自己(ctx.pipeline().remove(this)),避免繼續攔截后續消息。// 2. 調用真正的 handshake(Channel, FullHttpRequest, HttpHeaders) 繼續 WebSocket 握手。// 3. 把握手的 Future 結果關聯到當前的 promise 上。final Promise<Void> promise = channel.newPromise();pipeline.addAfter(ctx.name(), "handshaker", new ChannelHandlerAdapter() {private FullHttpRequest fullHttpRequest;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HttpObject) {try {handleHandshakeRequest(ctx, (HttpObject) msg);} finally {Resource.dispose(msg);}} else {super.channelRead(ctx, msg);}}@Overridepublic void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.pipeline().remove(this);promise.tryFailure(cause);super.channelExceptionCaught(ctx, cause);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {try {// Fail promise if Channel was closedif (!promise.isDone()) {promise.tryFailure(new ClosedChannelException());}ctx.fireChannelInactive();} finally {releaseFullHttpRequest();}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {releaseFullHttpRequest();}private void handleHandshakeRequest(ChannelHandlerContext ctx, HttpObject httpObject) {if (httpObject instanceof FullHttpRequest) {ctx.pipeline().remove(this);handshake(channel, (FullHttpRequest) httpObject, responseHeaders).cascadeTo(promise);return;}if (httpObject instanceof LastHttpContent) {assert fullHttpRequest != null;try (FullHttpRequest handshakeRequest = fullHttpRequest) {fullHttpRequest = null;ctx.pipeline().remove(this);handshake(channel, handshakeRequest, responseHeaders).cascadeTo(promise);}return;}if (httpObject instanceof HttpRequest) {HttpRequest httpRequest = (HttpRequest) httpObject;fullHttpRequest = new DefaultFullHttpRequest(httpRequest.protocolVersion(), httpRequest.method(),httpRequest.uri(), ctx.bufferAllocator().allocate(0),httpRequest.headers(), HttpHeaders.emptyHeaders());if (httpRequest.decoderResult().isFailure()) {fullHttpRequest.setDecoderResult(httpRequest.decoderResult());}}}private void releaseFullHttpRequest() {if (fullHttpRequest != null) {fullHttpRequest.close();fullHttpRequest = null;}}});try {ctx.fireChannelRead(ReferenceCountUtil.retain(req));} catch (Throwable cause) {promise.setFailure(cause);}return promise.asFuture();}public Future<Void> close(Channel channel, CloseWebSocketFrame frame) {requireNonNull(channel, "channel");return close0(channel, frame);}public Future<Void> close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) {requireNonNull(ctx, "ctx");return close0(ctx, frame);}private static Future<Void> close0(ChannelOutboundInvoker invoker, CloseWebSocketFrame frame) {return invoker.writeAndFlush(frame).addListener(invoker, ChannelFutureListeners.CLOSE);}// WebSocketServerHandshaker13.newHandshakeResponse// 服務端從客戶端請求的子協議中選出一個自己支持的返回給客戶端的過程protected String selectSubprotocol(String requestedSubprotocols) {if (requestedSubprotocols == null || subprotocols.length == 0) {return null;}String[] requestedSubprotocolArray = requestedSubprotocols.split(",");for (String p : requestedSubprotocolArray) {String requestedSubprotocol = p.trim();for (String supportedSubprotocol : subprotocols) {if (SUB_PROTOCOL_WILDCARD.equals(supportedSubprotocol) || requestedSubprotocol.equals(supportedSubprotocol)) {selectedSubprotocol = requestedSubprotocol;return requestedSubprotocol;}}}// No match foundreturn null;}protected abstract FullHttpResponse newHandshakeResponse(BufferAllocator allocator, FullHttpRequest req,HttpHeaders responseHeaders);protected abstract WebSocketFrameDecoder newWebsocketDecoder();protected abstract WebSocketFrameEncoder newWebSocketEncoder();
}