[netty5: WebSocketServerHandshaker WebSocketServerHandshakerFactory]-源碼分析

在閱讀這篇文章前,推薦先閱讀以下內容:

  1. [netty5: WebSocketFrame]-源碼分析
  2. [netty5: WebSocketFrameEncoder & WebSocketFrameDecoder]-源碼解析

WebSocketServerHandshakerFactory

WebSocketServerHandshakerFactory 用于根據客戶端請求中的 WebSocket 版本構造對應的 WebSocketServerHandshaker 實例,完成握手協議版本的協商與支持判斷。

public class WebSocketServerHandshakerFactory {private final String webSocketURL;private final String subprotocols;private final WebSocketDecoderConfig decoderConfig;// ...public WebSocketServerHandshakerFactory(String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {this.webSocketURL = webSocketURL;this.subprotocols = subprotocols;this.decoderConfig = Objects.requireNonNull(decoderConfig, "decoderConfig");}public WebSocketServerHandshaker newHandshaker(HttpRequest req) {return resolveHandshaker0(req, webSocketURL, subprotocols, decoderConfig);}public static WebSocketServerHandshaker resolveHandshaker(HttpRequest req, String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {Objects.requireNonNull(decoderConfig, "decoderConfig");return resolveHandshaker0(req, webSocketURL, subprotocols, decoderConfig);}private static WebSocketServerHandshaker resolveHandshaker0(HttpRequest req, String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);if (version != null && AsciiString.contentEqualsIgnoreCase(version, WebSocketVersion.V13.toAsciiString())) {// Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).return new WebSocketServerHandshaker13(webSocketURL, subprotocols, decoderConfig);}return null;}public static Future<Void> sendUnsupportedVersionResponse(Channel channel) {HttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.UPGRADE_REQUIRED, channel.bufferAllocator().allocate(0));res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue());HttpUtil.setContentLength(res, 0);return channel.writeAndFlush(res);}
}

WebSocketServerHandshaker13

WebSocketServerHandshaker13 負責基于 RFC 6455 實現 WebSocket 版本 13 的服務端握手處理流程,包括請求校驗、響應生成、子協議協商和幀編解碼器的安裝。

public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {public WebSocketServerHandshaker13(String webSocketURL, String subprotocols, WebSocketDecoderConfig decoderConfig) {super(WebSocketVersion.V13, webSocketURL, subprotocols, decoderConfig);}/*** <p>* Handle the web socket handshake for the web socket specification <a href=* "https://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17">HyBi versions 13-17</a>. Versions 13-17* share the same wire protocol.* </p>** <p>* Browser request to the server:* </p>** <pre>* GET /chat HTTP/1.1* Host: server.example.com* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==* Origin: http://example.com* Sec-WebSocket-Protocol: chat, superchat* Sec-WebSocket-Version: 13* </pre>** <p>* Server response:* </p>** <pre>* HTTP/1.1 101 Switching Protocols* Upgrade: websocket* Connection: Upgrade* Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=* Sec-WebSocket-Protocol: chat* </pre>*/@Overrideprotected FullHttpResponse newHandshakeResponse(BufferAllocator allocator, FullHttpRequest req, HttpHeaders headers) {HttpMethod method = req.method();if (!HttpMethod.GET.equals(method)) {throw new WebSocketServerHandshakeException("Invalid WebSocket handshake method: " + method, req);}HttpHeaders reqHeaders = req.headers();if (!reqHeaders.contains(HttpHeaderNames.CONNECTION) || !reqHeaders.containsIgnoreCase(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)) {throw new WebSocketServerHandshakeException("not a WebSocket request: a |Connection| header must includes a token 'Upgrade'", req);}if (!reqHeaders.containsIgnoreCase(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET)) {throw new WebSocketServerHandshakeException("not a WebSocket request: a |Upgrade| header must containing the value 'websocket'", req);}CharSequence key = reqHeaders.get(HttpHeaderNames.SEC_WEBSOCKET_KEY);if (key == null) {throw new WebSocketServerHandshakeException("not a WebSocket request: missing key", req);}FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS,allocator.allocate(0));if (headers != null) {res.headers().add(headers);}String accept = WebSocketUtil.calculateV13Accept(key.toString());res.headers().set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE).set(HttpHeaderNames.SEC_WEBSOCKET_ACCEPT, accept);CharSequence subprotocols = reqHeaders.get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);if (subprotocols != null) {String selectedSubprotocol = selectSubprotocol(subprotocols.toString());if (selectedSubprotocol == null) {if (logger.isDebugEnabled()) {logger.debug("Requested subprotocol(s) not supported: {}", subprotocols);}} else {res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, selectedSubprotocol);}}return res;}@Overrideprotected WebSocketFrameDecoder newWebsocketDecoder() {return new WebSocket13FrameDecoder(decoderConfig());}@Overrideprotected WebSocketFrameEncoder newWebSocketEncoder() {return new WebSocket13FrameEncoder(false);}
}

WebSocketServerHandshaker

WebSocketServerHandshaker 是 WebSocket 握手處理的抽象基類,定義了服務端握手響應、子協議選擇和編解碼器安裝等通用邏輯,供具體版本(如 V13)實現。

public abstract class WebSocketServerHandshaker {protected static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandshaker.class);private final String uri;private final String[] subprotocols;private final WebSocketVersion version;private final WebSocketDecoderConfig decoderConfig;private String selectedSubprotocol;public static final String SUB_PROTOCOL_WILDCARD = "*";protected WebSocketServerHandshaker(WebSocketVersion version, String uri, String subprotocols, WebSocketDecoderConfig decoderConfig) {this.version = version;this.uri = uri;if (subprotocols != null) {String[] subprotocolArray = subprotocols.split(",");for (int i = 0; i < subprotocolArray.length; i++) {subprotocolArray[i] = subprotocolArray[i].trim();}this.subprotocols = subprotocolArray;} else {this.subprotocols = EmptyArrays.EMPTY_STRINGS;}this.decoderConfig = requireNonNull(decoderConfig, "decoderConfig");}// 將當前 Handshaker 支持的子協議數組轉換為有序去重的 Set 返回,用于后續子協議協商。public Set<String> subprotocols() {Set<String> ret = new LinkedHashSet<>();Collections.addAll(ret, subprotocols);return ret;}// WebSocketServerProtocolHandshakeHandler.channelRead// 執行 WebSocket 握手響應、替換或插入編解碼器并清理不兼容的 HTTP 處理器,最終完成協議切換。public Future<Void> handshake(Channel channel, FullHttpRequest req) {return handshake(channel, req, null);}public final Future<Void> handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders) {if (logger.isDebugEnabled()) {logger.debug("{} WebSocket version {} server handshake", channel, version());}//  WebSocketServerHandshaker13.newHandshakeResponseFullHttpResponse response = newHandshakeResponse(channel.bufferAllocator(), req, responseHeaders);// 移除 HttpObjectAggregator 和 HttpContentCompressorChannelPipeline p = channel.pipeline();if (p.get(HttpObjectAggregator.class) != null) {p.remove(HttpObjectAggregator.class);}if (p.get(HttpContentCompressor.class) != null) {p.remove(HttpContentCompressor.class);}ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);final String encoderName;if (ctx == null) {// this means the user use an HttpServerCodecctx = p.context(HttpServerCodec.class);if (ctx == null) {response.close();return channel.newFailedFuture(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));}p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());encoderName = ctx.name();} else {p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());encoderName = p.context(HttpResponseEncoder.class).name();p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());}return channel.writeAndFlush(response).addListener(channel, (ch, future) -> {if (future.isSuccess()) {ChannelPipeline p1 = ch.pipeline();p1.remove(encoderName);}});}// 處理非 FullHttpRequest 的 WebSocket 握手場景,通過臨時注入 ChannelHandler 聚合請求數據并完成協議切換public Future<Void> handshake(Channel channel, HttpRequest req) {return handshake(channel, req, null);}// 在沒有使用 HttpObjectAggregator 的情況下,// 動態地通過臨時注入一個 ChannelHandler 來手動聚合 HTTP 請求的各個部分// 最終組裝成一個 FullHttpRequest,完成 WebSocket 握手的流程public final Future<Void> handshake(final Channel channel, HttpRequest req, final HttpHeaders responseHeaders) {// 如果傳進來的 req 已經是 FullHttpRequest,直接調用已有的 handshake(Channel, FullHttpRequest, HttpHeaders) 方法處理。// 否則,說明請求是分段的(HttpRequest + HttpContent),需要手動聚合。if (req instanceof FullHttpRequest) {return handshake(channel, (FullHttpRequest) req, responseHeaders);}ChannelPipeline pipeline = channel.pipeline();//  先在 ChannelPipeline 里找 HttpRequestDecoder 的 ChannelHandlerContext。// 如果沒找到,再找 HttpServerCodec。// 如果都沒找到,直接失敗,返回異常。ChannelHandlerContext ctx = pipeline.context(HttpRequestDecoder.class);if (ctx == null) {// This means the user use a HttpServerCodecctx = pipeline.context(HttpServerCodec.class);if (ctx == null) {return channel.newFailedFuture(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));}}// 動態注入一個臨時的 ChannelHandlerAdapter,名字叫 "handshaker"// 它的職責是監聽接下來流入的 HttpObject 消息,把 HttpRequest、HttpContent、LastHttpContent 等部分組裝成一個完整的 FullHttpRequest// 當完整請求組裝完成后:// 	1. 立刻移除自己(ctx.pipeline().remove(this)),避免繼續攔截后續消息。// 	2. 調用真正的 handshake(Channel, FullHttpRequest, HttpHeaders) 繼續 WebSocket 握手。// 	3. 把握手的 Future 結果關聯到當前的 promise 上。final Promise<Void> promise = channel.newPromise();pipeline.addAfter(ctx.name(), "handshaker", new ChannelHandlerAdapter() {private FullHttpRequest fullHttpRequest;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HttpObject) {try {handleHandshakeRequest(ctx, (HttpObject) msg);} finally {Resource.dispose(msg);}} else {super.channelRead(ctx, msg);}}@Overridepublic void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.pipeline().remove(this);promise.tryFailure(cause);super.channelExceptionCaught(ctx, cause);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {try {// Fail promise if Channel was closedif (!promise.isDone()) {promise.tryFailure(new ClosedChannelException());}ctx.fireChannelInactive();} finally {releaseFullHttpRequest();}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {releaseFullHttpRequest();}private void handleHandshakeRequest(ChannelHandlerContext ctx, HttpObject httpObject) {if (httpObject instanceof FullHttpRequest) {ctx.pipeline().remove(this);handshake(channel, (FullHttpRequest) httpObject, responseHeaders).cascadeTo(promise);return;}if (httpObject instanceof LastHttpContent) {assert fullHttpRequest != null;try (FullHttpRequest handshakeRequest = fullHttpRequest) {fullHttpRequest = null;ctx.pipeline().remove(this);handshake(channel, handshakeRequest, responseHeaders).cascadeTo(promise);}return;}if (httpObject instanceof HttpRequest) {HttpRequest httpRequest = (HttpRequest) httpObject;fullHttpRequest = new DefaultFullHttpRequest(httpRequest.protocolVersion(), httpRequest.method(),httpRequest.uri(), ctx.bufferAllocator().allocate(0),httpRequest.headers(), HttpHeaders.emptyHeaders());if (httpRequest.decoderResult().isFailure()) {fullHttpRequest.setDecoderResult(httpRequest.decoderResult());}}}private void releaseFullHttpRequest() {if (fullHttpRequest != null) {fullHttpRequest.close();fullHttpRequest = null;}}});try {ctx.fireChannelRead(ReferenceCountUtil.retain(req));} catch (Throwable cause) {promise.setFailure(cause);}return promise.asFuture();}public Future<Void> close(Channel channel, CloseWebSocketFrame frame) {requireNonNull(channel, "channel");return close0(channel, frame);}public Future<Void> close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) {requireNonNull(ctx, "ctx");return close0(ctx, frame);}private static Future<Void> close0(ChannelOutboundInvoker invoker, CloseWebSocketFrame frame) {return invoker.writeAndFlush(frame).addListener(invoker, ChannelFutureListeners.CLOSE);}// WebSocketServerHandshaker13.newHandshakeResponse// 服務端從客戶端請求的子協議中選出一個自己支持的返回給客戶端的過程protected String selectSubprotocol(String requestedSubprotocols) {if (requestedSubprotocols == null || subprotocols.length == 0) {return null;}String[] requestedSubprotocolArray = requestedSubprotocols.split(",");for (String p : requestedSubprotocolArray) {String requestedSubprotocol = p.trim();for (String supportedSubprotocol : subprotocols) {if (SUB_PROTOCOL_WILDCARD.equals(supportedSubprotocol) || requestedSubprotocol.equals(supportedSubprotocol)) {selectedSubprotocol = requestedSubprotocol;return requestedSubprotocol;}}}// No match foundreturn null;}protected abstract FullHttpResponse newHandshakeResponse(BufferAllocator allocator, FullHttpRequest req,HttpHeaders responseHeaders);protected abstract WebSocketFrameDecoder newWebsocketDecoder();protected abstract WebSocketFrameEncoder newWebSocketEncoder();
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/89991.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/89991.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/89991.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數據挖掘:深度解析與實戰應用

在當今數字化時代&#xff0c;數據挖掘已經成為企業獲取競爭優勢的關鍵技術之一。通過從大量數據中提取有價值的信息&#xff0c;企業可以更好地理解客戶需求、優化業務流程、提高運營效率。本文將深入探討數據挖掘的核心技術、實際應用案例以及如何在企業中實施數據挖掘項目。…

LLM面試題14

算法崗面試題 介紹下Transformer模型。 Transformer本身是一個典型的encoder-decoder模型&#xff0c;Encoder端和Decoder端均有6個Block,Encoder端的Block包括兩個模塊&#xff0c;多頭self-attention模塊以及一個前饋神經網絡模塊&#xff1b;Decoder端的Block包括三個模塊&…

Java金融場景中為什么金額字段禁止使用浮點類型(float/double)?

引言 Java金融場景中為什么金額字段禁止使用浮點類型&#xff1f;這是一篇你不能忽視的“爆雷”警告&#xff01; 在金融、電商、支付、清結算等業務系統中&#xff0c;浮點類型是絕對禁區&#xff01; &#x1f6a8;一、核心警告&#xff1a;浮點類型不是十進制數&#xff01;…

SVN下載與拉取

大家好我是蘇麟&#xff0c;今天聊一聊SVN。 SVN官網&#xff1a;下載 TortoiseSVN - TortoiseSVN 軟件 根據系統選擇32位還是64位 打開文件 安裝&#xff0c;下一步&#xff0c;下一步 安裝成功后&#xff0c;右鍵找到SVNcheck 輸入地址 輸入用戶名和密碼就OK了 這期就到這里…

數據結構筆記8:堆

目錄 滿二叉樹&#xff1a; 完全二叉樹&#xff1a; 堆是一種特殊的完全二叉樹&#xff1a; 我們可以以數組的方式存儲堆。 父節點和子節點下標關系的推導&#xff1a; 1.使用數學歸納法證明n2 1 n0&#xff1a; 2.使用邊和節點的關系證明n2 1 n0&#xff1a; 我們…

3. lvgl 9.3 vscode 模擬環境搭建 lv_port_pc_vscode-release-v9.3

文章目錄1. 資源下載1. 1 lv_port_pc_vscode1.2 cmake 和 mingw 環境搭建1.3 sdl 下載1.4 下載lvgl_v9.32. 環境搭建2.1 拷貝lvgl 源碼到工程2.2 添加SDL2 依賴2.3 執行工程3. 運行示例1. 資源下載 1. 1 lv_port_pc_vscode 那么多模擬器&#xff0c;為什么選擇這個&#xff1…

【牛客刷題】小紅的爆炸串(二)

一、題目介紹 本題鏈接為:小紅的爆炸串(二) 小紅定義一個字符串會爆炸,當且僅當至少有k對相鄰的字母不同。 例如,當 k k k=2時,"arc"會爆炸,而"aabb"則不會爆炸。 小紅拿到了一個長度為

【實戰】如何訓練一個客服語音對話場景VAD模型

1. 引言:客服場景下的VAD模型 在客服中心,每天都會產生海量的通話錄音。對這些錄音進行有效分析,可以用于服務質量監控、客戶意圖洞察、流程優化等。VAD在其中扮演著“預處理器”和“過濾器”的關鍵角色: 提升ASR效率與準確性:只將檢測到的語音片段送入ASR引擎,可以避免…

在 Dokploy 中為 PostgreSQL 搭建 PgBouncer 數據庫連接池(圖文)

前言&#xff1a;為什么你需要一個連接池&#xff1f; 如果你正在使用 Node.js (尤其是像 Next.js 這樣的框架) 配合 Prisma 操作 PostgreSQL 數據庫&#xff0c;你很可能在某個階段會遇到那個令人頭疼的錯誤&#xff1a;“Error: Too many clients already”。這通常發生在應…

Mac獲取終端歷史

在 macOS 中&#xff0c;歷史記錄文件的位置取決于你使用的 shell。以下是針對不同 shell 的歷史記錄文件的默認位置&#xff1a;對于 Bash 用戶&#xff1a; 歷史記錄文件通常位于 ~/.bash_history。對于 Zsh 用戶&#xff08;macOS Catalina及以后版本默認使用的shell&#x…

高頻交易服務器篇

在 Binance 進行高頻交易&#xff08;HFT&#xff09;時&#xff0c;服務器的低延遲、高穩定性和快速網絡是關鍵。亞馬遜云&#xff08;AWS&#xff09; 提供了多種適合高頻交易的方案&#xff0c;以下是推薦的配置和優化策略&#xff1a;1. 選擇 AWS 區域&#xff08;Region&a…

MVC與MVVM架構模式詳解:原理、區別與JavaScript實現

Hi&#xff0c;我是布蘭妮甜 &#xff01;在當今復雜的前端開發領域&#xff0c;如何組織代碼結構一直是開發者面臨的核心挑戰。MVC和MVVM作為兩種經典的架構模式&#xff0c;為前端應用提供了清晰的責任劃分和可維護的代碼組織方案。本文將深入探討這兩種模式的原理、實現差異…

從小白到進階:解鎖linux與c語言高級編程知識點嵌入式開發的任督二脈(2)

【硬核揭秘】Linux與C高級編程&#xff1a;從入門到精通&#xff0c;你的全棧之路&#xff01; 第三部分&#xff1a;Shell腳本編程——自動化你的Linux世界&#xff0c;讓效率飛起來&#xff01; 嘿&#xff0c;各位C語言的“卷王”們&#xff01; 在Linux的世界里&#xf…

鎖和事務的關系

事務的4大特性(ACID) 原子性&#xff08;Atomicity&#xff09;&#xff1a;事務被視為一個單一的、不可分割的工作單元一致性&#xff08;Consistency&#xff09;&#xff1a;事務執行前后&#xff0c;數據庫從一個一致狀態轉變為另一個一致狀態&#xff0c;并且強制執行所有…

電動車信用免押小程序免押租賃小程序php方案

電動車信用免押租賃小程序&#xff0c;免押租小程序&#xff0c;信用免押接口申請、對接開發&#xff0c;可源碼搭建&#xff0c;可二開或定制。開發語言后端php&#xff0c;前端uniapp。可二開定制 在線選擇門店&#xff0c;選擇車輛類型&#xff0c;選擇租賃方式&#xff08…

機器學習在智能安防中的應用:視頻監控與異常行為檢測

隨著人工智能技術的飛速發展&#xff0c;智能安防領域正經歷著一場深刻的變革。智能安防通過整合先進的信息技術&#xff0c;如物聯網&#xff08;IoT&#xff09;、大數據和機器學習&#xff0c;能夠實現從傳統的被動防御到主動預防的轉變。機器學習技術在智能安防中的應用尤為…

MySQL中DROP、DELETE與TRUNCATE的深度解析

在MySQL數據庫操作中&#xff0c;DROP、DELETE和TRUNCATE是三個常用的數據操作命令&#xff0c;它們都可以用于刪除數據&#xff0c;但在功能、執行效率、事務處理以及對表結構的影響等方面存在顯著差異。本文將從多個維度對這三個命令進行詳細對比和解析&#xff0c;幫助讀者更…

一條 SQL 語句的內部執行流程詳解(MySQL為例)

當執行如下 SQL&#xff1a; SELECT * FROM users WHERE id 1;在數據庫內部&#xff0c;其實會經歷多個復雜且有序的階段。以下是 MySQL&#xff08;InnoDB 引擎&#xff09;中 SQL 查詢語句從發送到結果返回的完整執行流程。 客戶端連接階段 客戶端&#xff08;如 JDBC、My…

超詳細yolo8/11-detect目標檢測全流程概述:配置環境、數據標注、訓練、驗證/預測、onnx部署(c++/python)詳解

文章目錄 一、配置環境二、數據標注三、模型訓練四、驗證預測五、onnx部署c 版python版本 一、配置環境 我的都是在Linux系統下&#xff0c;訓練部署的&#xff1b;模型訓練之前&#xff0c;需要配置好環境&#xff0c;Anaconda、顯卡驅動、cuda、cudnn、pytorch等&#xff1b…

阿里云Flink:開啟大數據實時處理新時代

走進阿里云 Flink 在大數據處理的廣袤領域中&#xff0c;阿里云 Flink 猶如一顆璀璨的明星&#xff0c;占據著舉足輕重的地位。隨著數據量呈指數級增長&#xff0c;企業對數據處理的實時性、高效性和準確性提出了前所未有的挑戰 。傳統的數據處理方式逐漸難以滿足這些嚴苛的需…