在閱讀這篇文章前,推薦先閱讀以下內容:
- [netty5: WebSocketFrame]-源碼分析
- [netty5: WebSocketFrameEncoder & WebSocketFrameDecoder]-源碼解析
WebSocketClientHandshakerFactory
WebSocketClientHandshakerFactory
是用于根據 URI 和協議版本創建對應 WebSocket 握手器(Handshaker)的工廠類,簡化客戶端握手流程。
public final class WebSocketClientHandshakerFactory {private WebSocketClientHandshakerFactory() {}// ...// new WebSocketClientProtocolHandler(config)public static WebSocketClientHandshaker newHandshaker(URI webSocketURL, WebSocketVersion version, String subprotocol,boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,boolean performMasking, boolean allowMaskMismatch, long forceCloseTimeoutMillis,boolean absoluteUpgradeUrl, boolean generateOriginHeader) {return new WebSocketClientHandshaker13(webSocketURL, subprotocol, allowExtensions, customHeaders,maxFramePayloadLength, performMasking, allowMaskMismatch, forceCloseTimeoutMillis,absoluteUpgradeUrl, generateOriginHeader);}
}
WebSocketClientHandshaker13
WebSocketClientHandshaker13
是實現 WebSocket 協議 RFC 6455(版本13)的客戶端握手器,負責構造握手請求、驗證響應并完成協議升級。
public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {private final boolean allowExtensions;private final boolean performMasking;private final boolean allowMaskMismatch;private volatile String sentNonce;// WebSocketClientHandshakerFactory.newHandshakerWebSocketClientHandshaker13(URI webSocketURL, String subprotocol,boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,boolean performMasking, boolean allowMaskMismatch,long forceCloseTimeoutMillis, boolean absoluteUpgradeUrl,boolean generateOriginHeader) {super(webSocketURL, WebSocketVersion.V13, subprotocol, customHeaders, maxFramePayloadLength,forceCloseTimeoutMillis, absoluteUpgradeUrl, generateOriginHeader);this.allowExtensions = allowExtensions;this.performMasking = performMasking;this.allowMaskMismatch = allowMaskMismatch;}/*** /*** <p>* Sends the opening request to the server:* </p>** <pre>* GET /chat HTTP/1.1* Host: server.example.com* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==* Sec-WebSocket-Protocol: chat, superchat* Sec-WebSocket-Version: 13* </pre>**/@Overrideprotected FullHttpRequest newHandshakeRequest(BufferAllocator allocator) {URI wsURL = uri();FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, upgradeUrl(wsURL),allocator.allocate(0));HttpHeaders headers = request.headers();if (customHeaders != null) {headers.add(customHeaders);if (!headers.contains(HttpHeaderNames.HOST)) {headers.set(HttpHeaderNames.HOST, websocketHostValue(wsURL));}} else {headers.set(HttpHeaderNames.HOST, websocketHostValue(wsURL));}String nonce = createNonce();headers.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE).set(HttpHeaderNames.SEC_WEBSOCKET_KEY, nonce);if (generateOriginHeader && !headers.contains(HttpHeaderNames.ORIGIN)) {headers.set(HttpHeaderNames.ORIGIN, websocketHostValue(wsURL));}sentNonce = nonce;String expectedSubprotocol = expectedSubprotocol();if (!StringUtil.isNullOrEmpty(expectedSubprotocol)) {headers.set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, expectedSubprotocol);}headers.set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, version().toAsciiString());return request;}/*** <p>* Process server response:* </p>** <pre>* HTTP/1.1 101 Switching Protocols* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=* Sec-WebSocket-Protocol: chat* </pre>** @param response* HTTP response returned from the server for the request sent by beginOpeningHandshake00().* @throws WebSocketHandshakeException if handshake response is invalid.*/@Overrideprotected void verify(FullHttpResponse response) {HttpResponseStatus status = response.status();if (!HttpResponseStatus.SWITCHING_PROTOCOLS.equals(status)) {throw new WebSocketClientHandshakeException("Invalid handshake response status: " + status, response);}HttpHeaders headers = response.headers();CharSequence upgrade = headers.get(HttpHeaderNames.UPGRADE);if (!HttpHeaderValues.WEBSOCKET.contentEqualsIgnoreCase(upgrade)) {throw new WebSocketClientHandshakeException("Invalid handshake response upgrade: " + upgrade, response);}if (!headers.containsIgnoreCase(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)) {throw new WebSocketClientHandshakeException("Invalid handshake response connection: " + headers.get(HttpHeaderNames.CONNECTION), response);}CharSequence accept = headers.get(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT);if (accept == null) {throw new WebSocketClientHandshakeException("Invalid handshake response sec-websocket-accept: null", response);}String expectedAccept = WebSocketUtil.calculateV13Accept(sentNonce);if (!AsciiString.contentEquals(expectedAccept, AsciiString.trim(accept))) {throw new WebSocketClientHandshakeException("Invalid handshake response sec-websocket-accept: " + accept + ", expected: " + expectedAccept, response);}}@Overrideprotected WebSocketFrameDecoder newWebsocketDecoder() {return new WebSocket13FrameDecoder(false, allowExtensions, maxFramePayloadLength(), allowMaskMismatch);}@Overrideprotected WebSocketFrameEncoder newWebSocketEncoder() {return new WebSocket13FrameEncoder(performMasking);}@Overridepublic WebSocketClientHandshaker13 setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {super.setForceCloseTimeoutMillis(forceCloseTimeoutMillis);return this;}// 生成一個符合 WebSocket 協議要求的 16 字節 Base64 編碼的隨機值,用作 Sec-WebSocket-Keyprivate static String createNonce() {var nonce = WebSocketUtil.randomBytes(16);return WebSocketUtil.base64(nonce);}
}
WebSocketClientHandshaker
public abstract class WebSocketClientHandshaker {protected static final int DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS = 10000;// 代表握手時的目標地址, 例如 ws://example.com/chatprivate final URI uri;// 控制握手請求和數據幀的格式, 比如 RFC 6455 標準版本private final WebSocketVersion version;// 標記握手是否完成,volatile 保證多線程訪問時的可見性private volatile boolean handshakeComplete;// 握手完成后,如果關閉 WebSocket 連接時等待超時,會觸發強制關閉。private volatile long forceCloseTimeoutMillis;// 用于標記強制關閉流程是否初始化, 通過 AtomicIntegerFieldUpdater 原子更新private volatile int forceCloseInit;private static final AtomicIntegerFieldUpdater<WebSocketClientHandshaker> FORCE_CLOSE_INIT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WebSocketClientHandshaker.class, "forceCloseInit");// 標記強制關閉流程是否完成。private volatile boolean forceCloseComplete;// 握手時客戶端希望協商的子協議(Subprotocol), 例如視頻、聊天子協議名稱等private final String expectedSubprotocol;// 握手后服務器協商確認的子協議,握手成功后才有值。private volatile String actualSubprotocol;// 握手請求時使用,方便傳遞用戶自定義信息。protected final HttpHeaders customHeaders;// 最大單個 WebSocket 幀負載長度限制, 防止收到超大數據導致內存溢出。private final int maxFramePayloadLength;// 是否在握手請求中使用絕對 URI 作為 Upgrade URL, 一般用于特殊代理或協議場景private final boolean absoluteUpgradeUrl;// 是否自動生成 Origin 請求頭protected final boolean generateOriginHeader;protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,HttpHeaders customHeaders, int maxFramePayloadLength,long forceCloseTimeoutMillis, boolean absoluteUpgradeUrl, boolean generateOriginHeader) {this.uri = uri;this.version = version;expectedSubprotocol = subprotocol;this.customHeaders = customHeaders;this.maxFramePayloadLength = maxFramePayloadLength;this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;this.absoluteUpgradeUrl = absoluteUpgradeUrl;this.generateOriginHeader = generateOriginHeader;}// WebSocketClientProtocolHandshakeHandler.channelActivepublic Future<Void> handshake(Channel channel) {requireNonNull(channel, "channel");ChannelPipeline pipeline = channel.pipeline();// 檢查管道中解碼器HttpResponseDecoder decoder = pipeline.get(HttpResponseDecoder.class);if (decoder == null) {HttpClientCodec codec = pipeline.get(HttpClientCodec.class);if (codec == null) {return channel.newFailedFuture(new IllegalStateException("ChannelPipeline does not contain " + "an HttpResponseDecoder or HttpClientCodec"));}}// 檢查 URI 和 Header 相關的 Host 與 Originif (uri.getHost() == null) {if (customHeaders == null || !customHeaders.contains(HttpHeaderNames.HOST)) {return channel.newFailedFuture(new IllegalArgumentException("Cannot generate the 'host' header value," + " webSocketURI should contain host or passed through customHeaders"));}if (generateOriginHeader && !customHeaders.contains(HttpHeaderNames.ORIGIN)) {final String originName = HttpHeaderNames.ORIGIN.toString();return channel.newFailedFuture(new IllegalArgumentException("Cannot generate the '" + originName + "' header" + " value, webSocketURI should contain host or disable generateOriginHeader or pass value" + " through customHeaders"));}}// 創建握手請求FullHttpRequest request = newHandshakeRequest(channel.bufferAllocator());// 創建 Promise,異步寫出請求Promise<Void> promise = channel.newPromise();channel.writeAndFlush(request).addListener(channel, (ch, future) -> {// 如果寫操作成功if (future.isSuccess()) {ChannelPipeline p = ch.pipeline();//找出管道中 HTTP 請求編碼器 HttpRequestEncoder 或者 HttpClientCodec,ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);if (ctx == null) {ctx = p.context(HttpClientCodec.class);}if (ctx == null) {promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " + "an HttpRequestEncoder or HttpClientCodec"));return;}// 然后在其后面動態添加 WebSocket 專用的編碼器 ws-encoder(由 newWebSocketEncoder() 創建)p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());promise.setSuccess(null);} else {promise.setFailure(future.cause());}});return promise.asFuture();}// WebSocketClientProtocolHandshakeHandler.channelReadpublic final void finishHandshake(Channel channel, FullHttpResponse response) {verify(response);// 服務器返回的子協議CharSequence receivedProtocol = response.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);receivedProtocol = receivedProtocol != null ? AsciiString.trim(receivedProtocol) : null;// 客戶端期望的子協議String expectedProtocol = expectedSubprotocol != null ? expectedSubprotocol : "";boolean protocolValid = false;// 如果客戶端沒指定預期協議,且服務器也沒返回協議,視為通過。if (expectedProtocol.isEmpty() && receivedProtocol == null) {protocolValid = true;setActualSubprotocol(expectedSubprotocol);} else if (!expectedProtocol.isEmpty() && receivedProtocol != null && receivedProtocol.length() > 0) {// 如果客戶端有期望協議且服務器返回了協議,則判斷服務器返回的協議是否在客戶端允許的列表中for (String protocol : expectedProtocol.split(",")) {if (AsciiString.contentEquals(protocol.trim(), receivedProtocol)) {protocolValid = true;setActualSubprotocol(receivedProtocol.toString());break;}}}// 如果子協議校驗失敗,拋出握手異常。if (!protocolValid) {throw new WebSocketClientHandshakeException(String.format("Invalid subprotocol. Actual: %s. Expected one of: %s",receivedProtocol, expectedSubprotocol), response);}// 標記握手完成。setHandshakeComplete();final ChannelPipeline p = channel.pipeline();// 移除 HTTP 消息解壓處理器(如 gzip 解壓),以及 HTTP 聚合器,WebSocket 不需要這些HttpContentDecompressor decompressor = p.get(HttpContentDecompressor.class);if (decompressor != null) {p.remove(decompressor);}HttpObjectAggregator aggregator = p.get(HttpObjectAggregator.class);if (aggregator != null) {p.remove(aggregator);}// 查找 HTTP 解碼器上下文:// 1. 若是 HttpClientCodec,先調用 removeOutboundHandler(),然后添加 WebSocket 解碼器,最后異步移除 HTTP Codec。// 2. 若是單獨的 HttpResponseDecoder,先移除對應的請求編碼器,再添加 WebSocket 解碼器,異步移除響應解碼器。// 新加入的 ws-decoder 是 WebSocket 的解碼器,處理 WebSocket 幀。ChannelHandlerContext ctx = p.context(HttpResponseDecoder.class);if (ctx == null) {ctx = p.context(HttpClientCodec.class);if (ctx == null) {throw new IllegalStateException("ChannelPipeline does not contain " +"an HttpRequestEncoder or HttpClientCodec");}final HttpClientCodec codec = (HttpClientCodec) ctx.handler();codec.removeOutboundHandler();p.addAfter(ctx.name(), "ws-decoder", newWebsocketDecoder());channel.executor().execute(() -> p.remove(codec));} else {if (p.get(HttpRequestEncoder.class) != null) {p.remove(HttpRequestEncoder.class);}final ChannelHandlerContext context = ctx;p.addAfter(context.name(), "ws-decoder", newWebsocketDecoder());channel.executor().execute(() -> p.remove(context.handler()));}}// ...protected abstract FullHttpRequest newHandshakeRequest(BufferAllocator allocator);protected abstract void verify(FullHttpResponse response);protected abstract WebSocketFrameDecoder newWebsocketDecoder();protected abstract WebSocketFrameEncoder newWebSocketEncoder();
}