[netty5: WebSocketClientHandshaker WebSocketClientHandshakerFactory]-源碼分析

在閱讀這篇文章前,推薦先閱讀以下內容:

  1. [netty5: WebSocketFrame]-源碼分析
  2. [netty5: WebSocketFrameEncoder & WebSocketFrameDecoder]-源碼解析

WebSocketClientHandshakerFactory

WebSocketClientHandshakerFactory 是用于根據 URI 和協議版本創建對應 WebSocket 握手器(Handshaker)的工廠類,簡化客戶端握手流程。

public final class WebSocketClientHandshakerFactory {private WebSocketClientHandshakerFactory() {}// ...// new WebSocketClientProtocolHandler(config)public static WebSocketClientHandshaker newHandshaker(URI webSocketURL, WebSocketVersion version, String subprotocol,boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,boolean performMasking, boolean allowMaskMismatch, long forceCloseTimeoutMillis,boolean absoluteUpgradeUrl, boolean generateOriginHeader) {return new WebSocketClientHandshaker13(webSocketURL, subprotocol, allowExtensions, customHeaders,maxFramePayloadLength, performMasking, allowMaskMismatch, forceCloseTimeoutMillis,absoluteUpgradeUrl, generateOriginHeader);}
}

WebSocketClientHandshaker13

WebSocketClientHandshaker13 是實現 WebSocket 協議 RFC 6455(版本13)的客戶端握手器,負責構造握手請求、驗證響應并完成協議升級。

public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {private final boolean allowExtensions;private final boolean performMasking;private final boolean allowMaskMismatch;private volatile String sentNonce;// WebSocketClientHandshakerFactory.newHandshakerWebSocketClientHandshaker13(URI webSocketURL, String subprotocol,boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,boolean performMasking, boolean allowMaskMismatch,long forceCloseTimeoutMillis, boolean absoluteUpgradeUrl,boolean generateOriginHeader) {super(webSocketURL, WebSocketVersion.V13, subprotocol, customHeaders, maxFramePayloadLength,forceCloseTimeoutMillis, absoluteUpgradeUrl, generateOriginHeader);this.allowExtensions = allowExtensions;this.performMasking = performMasking;this.allowMaskMismatch = allowMaskMismatch;}/*** /*** <p>* Sends the opening request to the server:* </p>** <pre>* GET /chat HTTP/1.1* Host: server.example.com* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==* Sec-WebSocket-Protocol: chat, superchat* Sec-WebSocket-Version: 13* </pre>**/@Overrideprotected FullHttpRequest newHandshakeRequest(BufferAllocator allocator) {URI wsURL = uri();FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, upgradeUrl(wsURL),allocator.allocate(0));HttpHeaders headers = request.headers();if (customHeaders != null) {headers.add(customHeaders);if (!headers.contains(HttpHeaderNames.HOST)) {headers.set(HttpHeaderNames.HOST, websocketHostValue(wsURL));}} else {headers.set(HttpHeaderNames.HOST, websocketHostValue(wsURL));}String nonce = createNonce();headers.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE).set(HttpHeaderNames.SEC_WEBSOCKET_KEY, nonce);if (generateOriginHeader && !headers.contains(HttpHeaderNames.ORIGIN)) {headers.set(HttpHeaderNames.ORIGIN, websocketHostValue(wsURL));}sentNonce = nonce;String expectedSubprotocol = expectedSubprotocol();if (!StringUtil.isNullOrEmpty(expectedSubprotocol)) {headers.set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, expectedSubprotocol);}headers.set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, version().toAsciiString());return request;}/*** <p>* Process server response:* </p>** <pre>* HTTP/1.1 101 Switching Protocols* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=* Sec-WebSocket-Protocol: chat* </pre>** @param response*            HTTP response returned from the server for the request sent by beginOpeningHandshake00().* @throws WebSocketHandshakeException if handshake response is invalid.*/@Overrideprotected void verify(FullHttpResponse response) {HttpResponseStatus status = response.status();if (!HttpResponseStatus.SWITCHING_PROTOCOLS.equals(status)) {throw new WebSocketClientHandshakeException("Invalid handshake response status: " + status, response);}HttpHeaders headers = response.headers();CharSequence upgrade = headers.get(HttpHeaderNames.UPGRADE);if (!HttpHeaderValues.WEBSOCKET.contentEqualsIgnoreCase(upgrade)) {throw new WebSocketClientHandshakeException("Invalid handshake response upgrade: " + upgrade, response);}if (!headers.containsIgnoreCase(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)) {throw new WebSocketClientHandshakeException("Invalid handshake response connection: " + headers.get(HttpHeaderNames.CONNECTION), response);}CharSequence accept = headers.get(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT);if (accept == null) {throw new WebSocketClientHandshakeException("Invalid handshake response sec-websocket-accept: null", response);}String expectedAccept = WebSocketUtil.calculateV13Accept(sentNonce);if (!AsciiString.contentEquals(expectedAccept, AsciiString.trim(accept))) {throw new WebSocketClientHandshakeException("Invalid handshake response sec-websocket-accept: " + accept + ", expected: " + expectedAccept, response);}}@Overrideprotected WebSocketFrameDecoder newWebsocketDecoder() {return new WebSocket13FrameDecoder(false, allowExtensions, maxFramePayloadLength(), allowMaskMismatch);}@Overrideprotected WebSocketFrameEncoder newWebSocketEncoder() {return new WebSocket13FrameEncoder(performMasking);}@Overridepublic WebSocketClientHandshaker13 setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {super.setForceCloseTimeoutMillis(forceCloseTimeoutMillis);return this;}// 生成一個符合 WebSocket 協議要求的 16 字節 Base64 編碼的隨機值,用作 Sec-WebSocket-Keyprivate static String createNonce() {var nonce = WebSocketUtil.randomBytes(16);return WebSocketUtil.base64(nonce);}
}

WebSocketClientHandshaker

public abstract class WebSocketClientHandshaker {protected static final int DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS = 10000;// 代表握手時的目標地址, 例如 ws://example.com/chatprivate final URI uri;// 控制握手請求和數據幀的格式, 比如 RFC 6455 標準版本private final WebSocketVersion version;// 標記握手是否完成,volatile 保證多線程訪問時的可見性private volatile boolean handshakeComplete;// 握手完成后,如果關閉 WebSocket 連接時等待超時,會觸發強制關閉。private volatile long forceCloseTimeoutMillis;// 用于標記強制關閉流程是否初始化, 通過 AtomicIntegerFieldUpdater 原子更新private volatile int forceCloseInit;private static final AtomicIntegerFieldUpdater<WebSocketClientHandshaker> FORCE_CLOSE_INIT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(WebSocketClientHandshaker.class, "forceCloseInit");// 標記強制關閉流程是否完成。private volatile boolean forceCloseComplete;// 握手時客戶端希望協商的子協議(Subprotocol), 例如視頻、聊天子協議名稱等private final String expectedSubprotocol;// 握手后服務器協商確認的子協議,握手成功后才有值。private volatile String actualSubprotocol;// 握手請求時使用,方便傳遞用戶自定義信息。protected final HttpHeaders customHeaders;// 最大單個 WebSocket 幀負載長度限制, 防止收到超大數據導致內存溢出。private final int maxFramePayloadLength;// 是否在握手請求中使用絕對 URI 作為 Upgrade URL, 一般用于特殊代理或協議場景private final boolean absoluteUpgradeUrl;// 是否自動生成 Origin 請求頭protected final boolean generateOriginHeader;protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,HttpHeaders customHeaders, int maxFramePayloadLength,long forceCloseTimeoutMillis, boolean absoluteUpgradeUrl, boolean generateOriginHeader) {this.uri = uri;this.version = version;expectedSubprotocol = subprotocol;this.customHeaders = customHeaders;this.maxFramePayloadLength = maxFramePayloadLength;this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;this.absoluteUpgradeUrl = absoluteUpgradeUrl;this.generateOriginHeader = generateOriginHeader;}// WebSocketClientProtocolHandshakeHandler.channelActivepublic Future<Void> handshake(Channel channel) {requireNonNull(channel, "channel");ChannelPipeline pipeline = channel.pipeline();// 檢查管道中解碼器HttpResponseDecoder decoder = pipeline.get(HttpResponseDecoder.class);if (decoder == null) {HttpClientCodec codec = pipeline.get(HttpClientCodec.class);if (codec == null) {return channel.newFailedFuture(new IllegalStateException("ChannelPipeline does not contain " + "an HttpResponseDecoder or HttpClientCodec"));}}// 檢查 URI 和 Header 相關的 Host 與 Originif (uri.getHost() == null) {if (customHeaders == null || !customHeaders.contains(HttpHeaderNames.HOST)) {return channel.newFailedFuture(new IllegalArgumentException("Cannot generate the 'host' header value," + " webSocketURI should contain host or passed through customHeaders"));}if (generateOriginHeader && !customHeaders.contains(HttpHeaderNames.ORIGIN)) {final String originName = HttpHeaderNames.ORIGIN.toString();return channel.newFailedFuture(new IllegalArgumentException("Cannot generate the '" + originName + "' header" + " value, webSocketURI should contain host or disable generateOriginHeader or pass value" + " through customHeaders"));}}// 創建握手請求FullHttpRequest request = newHandshakeRequest(channel.bufferAllocator());// 創建 Promise,異步寫出請求Promise<Void> promise = channel.newPromise();channel.writeAndFlush(request).addListener(channel, (ch, future) -> {// 如果寫操作成功if (future.isSuccess()) {ChannelPipeline p = ch.pipeline();//找出管道中 HTTP 請求編碼器 HttpRequestEncoder 或者 HttpClientCodec,ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);if (ctx == null) {ctx = p.context(HttpClientCodec.class);}if (ctx == null) {promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " + "an HttpRequestEncoder or HttpClientCodec"));return;}// 然后在其后面動態添加 WebSocket 專用的編碼器 ws-encoder(由 newWebSocketEncoder() 創建)p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());promise.setSuccess(null);} else {promise.setFailure(future.cause());}});return promise.asFuture();}// WebSocketClientProtocolHandshakeHandler.channelReadpublic final void finishHandshake(Channel channel, FullHttpResponse response) {verify(response);// 服務器返回的子協議CharSequence receivedProtocol = response.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);receivedProtocol = receivedProtocol != null ? AsciiString.trim(receivedProtocol) : null;// 客戶端期望的子協議String expectedProtocol = expectedSubprotocol != null ? expectedSubprotocol : "";boolean protocolValid = false;// 如果客戶端沒指定預期協議,且服務器也沒返回協議,視為通過。if (expectedProtocol.isEmpty() && receivedProtocol == null) {protocolValid = true;setActualSubprotocol(expectedSubprotocol);} else if (!expectedProtocol.isEmpty() && receivedProtocol != null && receivedProtocol.length() > 0) {// 如果客戶端有期望協議且服務器返回了協議,則判斷服務器返回的協議是否在客戶端允許的列表中for (String protocol : expectedProtocol.split(",")) {if (AsciiString.contentEquals(protocol.trim(), receivedProtocol)) {protocolValid = true;setActualSubprotocol(receivedProtocol.toString());break;}}}// 如果子協議校驗失敗,拋出握手異常。if (!protocolValid) {throw new WebSocketClientHandshakeException(String.format("Invalid subprotocol. Actual: %s. Expected one of: %s",receivedProtocol, expectedSubprotocol), response);}// 標記握手完成。setHandshakeComplete();final ChannelPipeline p = channel.pipeline();// 移除 HTTP 消息解壓處理器(如 gzip 解壓),以及 HTTP 聚合器,WebSocket 不需要這些HttpContentDecompressor decompressor = p.get(HttpContentDecompressor.class);if (decompressor != null) {p.remove(decompressor);}HttpObjectAggregator aggregator = p.get(HttpObjectAggregator.class);if (aggregator != null) {p.remove(aggregator);}// 查找 HTTP 解碼器上下文:// 		1. 若是 HttpClientCodec,先調用 removeOutboundHandler(),然后添加 WebSocket 解碼器,最后異步移除 HTTP Codec。// 		2. 若是單獨的 HttpResponseDecoder,先移除對應的請求編碼器,再添加 WebSocket 解碼器,異步移除響應解碼器。// 新加入的 ws-decoder 是 WebSocket 的解碼器,處理 WebSocket 幀。ChannelHandlerContext ctx = p.context(HttpResponseDecoder.class);if (ctx == null) {ctx = p.context(HttpClientCodec.class);if (ctx == null) {throw new IllegalStateException("ChannelPipeline does not contain " +"an HttpRequestEncoder or HttpClientCodec");}final HttpClientCodec codec =  (HttpClientCodec) ctx.handler();codec.removeOutboundHandler();p.addAfter(ctx.name(), "ws-decoder", newWebsocketDecoder());channel.executor().execute(() -> p.remove(codec));} else {if (p.get(HttpRequestEncoder.class) != null) {p.remove(HttpRequestEncoder.class);}final ChannelHandlerContext context = ctx;p.addAfter(context.name(), "ws-decoder", newWebsocketDecoder());channel.executor().execute(() -> p.remove(context.handler()));}}// ...protected abstract FullHttpRequest newHandshakeRequest(BufferAllocator allocator);protected abstract void verify(FullHttpResponse response);protected abstract WebSocketFrameDecoder newWebsocketDecoder();protected abstract WebSocketFrameEncoder newWebSocketEncoder();
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/87870.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/87870.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/87870.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

4.2 如何訓練?個 LLM

?般??&#xff0c;訓練?個完整的 LLM 需要經過圖1中的三個階段——Pretrain、SFT 和 RLHF。 4.2.1 Pretrain 預訓練任務與架構 任務類型&#xff1a;采用因果語言模型&#xff08;CLM&#xff09;&#xff0c;通過預測下一個 token 進行訓練&#xff0c;與傳統預訓練模型…

Qt中的QObject::moveToThread方法詳解

一、QObject::moveToThread方法QObject::moveToThread()是Qt框架中一個非常重要的功能&#xff0c;它允許改變QObject及其子對象的線程關聯性。這個功能在多線程編程中特別有用&#xff0c;可以將耗時操作移到工作線程執行&#xff0c;避免阻塞主線程/GUI線程。基本用法void QO…

【9】用戶接入與認證配置

本文旨在幫助網絡管理員在 SD-WAN 環境中實現安全、穩定的用戶接入與認證策略,涵蓋本地/遠程認證、權限管理、密碼策略、SSH、會話控制等關鍵配置要素。 1.密碼策略與賬戶安全 從 IOS XE SD-WAN 17.3.1 起,Cisco 引入密碼強化功能,用于統一用戶密碼的復雜度與有效性要求。密…

第十六節:第三部分:多線程:線程安全問題、取錢問題的模擬

線程安全問題介紹&#xff1a;取錢的線程安全問題 取錢的線程安全問題 取錢案例需求分析 線程安全問題出現的原因 代碼&#xff1a;模擬線程安全問題&#xff08;上述取錢案例&#xff09; Account類&#xff08;賬戶類&#xff09; package com.itheima.day3_thread_safe;pu…

APE:大語言模型具有人類水平的提示工程能力

摘要 通過以自然語言指令作為條件輸入&#xff0c;大型語言模型&#xff08;LLMs&#xff09;展現出令人印象深刻的通用計算能力。然而&#xff0c;任務表現嚴重依賴于用于引導模型的提示&#xff08;prompt&#xff09;質量&#xff0c;而最有效的提示通常是由人類手工設計的…

X86 CPU 工作模式

1.概述 1.實模式 實模式又稱實地址模式&#xff0c;實&#xff0c;即真實&#xff0c;這個真實分為兩個方面&#xff0c;一個方面是運行真實的指令&#xff0c;對指令的動作不作區分&#xff0c;直接執行指令的真實功能&#xff0c;另一方面是發往內存的地址是真實的&#xff…

Java設計模式之行為型模式(策略模式)介紹與說明

一、策略模式簡介 策略模式&#xff08;Strategy Pattern&#xff09;是一種行為型設計模式&#xff0c;它定義了一系列算法&#xff0c;并將每個算法封裝起來&#xff0c;使它們可以相互替換&#xff0c;且算法的變化不會影響使用算法的客戶。策略模式讓算法獨立于使用它的客…

【BIOS+MBR 微內核手寫實現】

本文基于BIOS+MBR的架構,從四部分講解微內核是如何實現的: 1)搭建微內核編譯調試環境 2)梳理微內核的代碼結構:偽指令講解 3)手寫實現微內核框架,輸出簡單的字符串 4)講解微內核啟動階段的具體運行過程 先完成內核工程創建,如下圖 我們這里使用nasm風格的匯編編寫,…

從C/C++遷移到Go:內存管理思維轉變

一、引言 在當今高速發展的軟件開發世界中&#xff0c;語言遷移已成為技術進化的常態。作為一名曾經的C/C開發者&#xff0c;我經歷了向Go語言轉變的全過程&#xff0c;其中最大的認知挑戰來自內存管理模式的根本性差異。 我記得第一次接觸Go項目時的困惑&#xff1a;沒有析構函…

正確設置 FreeRTOS 與 STM32 的中斷優先級

在裸機開發&#xff08;非 RTOS&#xff09;時&#xff0c;大多數 STM32 外設的中斷優先級通常不需要手動配置&#xff0c;原因如下&#xff1a; ? 裸機開發中默認中斷優先級行為 特點說明默認中斷優先級為 0如果你不設置&#xff0c;STM32 HAL 默認設置所有外設中斷為 0&…

EasyExcel之SheetWriteHandler:解鎖Excel寫入的高階玩法

引言在 EasyExcel 強大的功能體系中&#xff0c;SheetWriteHandler 接口是一個關鍵的組成部分。它允許開發者在寫入 Excel 的 Sheet 時進行自定義處理&#xff0c;為實現各種復雜的業務需求提供了強大的支持。通過深入了解和運用 SheetWriteHandler 接口&#xff0c;我們能夠更…

Python單例模式魔法方法or屬性

1.單例模式概念定義:單例模式(Singleton Pattern)是一種創建型設計模式&#xff0c;它確保一個類只能有一個實例&#xff0c;并提供一個全局訪問點來獲取該實例。這種模式在需要控制資源訪問、配置管理或協調系統操作時特別有用。核心特點:私有構造函數&#xff1a;防止外部通過…

【Kubernetes系列】Kubernetes 資源請求(Requests)

博客目錄 引言一、資源請求的基本概念1.1 什么是資源請求1.2 請求與限制的區別 二、CPU 請求的深入解析2.1 CPU 請求的單位與含義2.2 CPU 請求的調度影響2.3 CPU 請求與限制的關系 三、內存請求的深入解析3.1 內存請求的單位與含義3.2 內存請求的調度影響3.3 內存請求的特殊性 …

大型語言模型中的自動化思維鏈提示

摘要 大型語言模型&#xff08;LLMs&#xff09;能夠通過生成中間推理步驟來執行復雜的推理任務。為提示演示提供這些步驟的過程被稱為思維鏈&#xff08;CoT&#xff09;提示。CoT提示有兩種主要范式。一種使用簡單的提示語&#xff0c;如“讓我們一步一步思考”&#xff0c;…

Private Set Generation with Discriminative Information(2211.04446v1)

1. 遇到什么問題&#xff0c;解決了什么遇到的問題現有差分隱私生成模型受限于高維數據分布建模的復雜性&#xff0c;合成樣本實用性不足。深度生成模型訓練依賴大量數據&#xff0c;加入隱私約束后更難優化&#xff0c;且不保證下游任務&#xff08;如分類&#xff09;的最優解…

C++編程語言入門指南

一、C語言概述 C是由丹麥計算機科學家Bjarne Stroustrup于1979年在貝爾實驗室開發的一種靜態類型、編譯式、通用型編程語言。最初被稱為"C with Classes"(帶類的C)&#xff0c;1983年更名為C。它既具有高級語言的抽象特性&#xff0c;又保留了底層硬件操作能力&…

ZED相機與Foxglove集成:加速機器人視覺調試效率的實用方案

隨著機器人技術的發展&#xff0c;實時視覺數據流的高效傳輸和可視化成為提升系統性能的重要因素。通過ZED相機&#xff08;包括ZED 2i和ZED X&#xff09;與Foxglove Studio平臺的結合&#xff0c;開發者能夠輕松訪問高質量的2D圖像、深度圖和點云數據&#xff0c;從而顯著提高…

目標檢測新紀元:DETR到Mamba實戰解析

&#x1f680;【實戰分享】目標檢測的“后 DE?”時代&#xff1a;DETR/DINO/RT-DETR及新型骨干網絡探索&#xff08;含示例代碼&#xff09; 目標檢測從 YOLO、Faster R-CNN 到 Transformer 結構的 DETR&#xff0c;再到 DINO、RT-DETR&#xff0c;近兩年出現了許多新趨勢&am…

【IOS】XCode創建firstapp并運行(成為IOS開發者)

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 這篇文章主要介紹XCode創建firstapp并運行 學其所用&#xff0c;用其所學。——梁啟超 歡迎來到我的博客&#xff0c;一起學習&#xff0c;共同進步。 喜歡的朋友可以關注一下&#xff0c;下次更新不迷路…

class類和style內聯樣式的綁定 + 事件處理 + uniapp創建自定義頁面模板

目錄 一.class類的綁定 1.靜態編寫 2.動態編寫 二.style內聯樣式的綁定 三.事件處理 1.案例1 2.案例2 四.uniapp創建自定義頁面模板 1.為什么要這么做&#xff1f; 2.步驟 ①打開新建頁面的界面 ②在彈出的目錄下&#xff0c;新建模板文件 ③用HBuilderX打開該模板…