引言
關于Netty和Websocket的介紹我就不多講了,網上一搜一大片。現如今AI的趨勢發展很熱門,長連接對話也是會經常接觸到的,使用Websocket實現長連接,那么很多人為了快速開發快速集成就會使用spring-boot-starter-websocket依賴快速實現,但是注意該實現是基于tomcat的,有性能瓶頸的,那么就又有人說了那我使用spring-webflux(底層容器就是netty),但是用的人很少,那能不能單獨一個項目來處理長連接呢?
那肯定有的,基于netty自己實現
怎么使用?
其實怎么說呢,netty實現的websocket網上也是一大把,但是終究是個demo,網上也是很多人問:怎么實現動態多路由,像mvc一樣多個路由呢?用過spring-boot-starter-websocket都知道,搭配onOpen、onMesssage、onClose注解輕松使用,使用@ServerEndpoint實現多路由,那么netty怎么實現呢(netty本身是不支持的,都是需要自己去實現)?
我們要明白netty的定位,高性能、異步事件驅動的網絡應用框架??,主要用于快速開發可維護的高性能協議服務器和客戶端,提供底層網絡 I/O 的抽象,全是抽象,需要自己去自定義實現。
正題開始
廢話就不多說了,直接上代碼,如果文章對你有幫助,請3連!!!
maven依賴,只用netty和spring,不需要web容器:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.65.Final</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
1、核心接口WebSocketHandler
//Websocket處理器接口,入口WebSocketGlobalIntercept將會封裝連接處理邏輯和事件通知邏輯,專注實現業務
public interface WebSocketHandler {/*** 當首次握手連接成功后(升級為websocket時)將會觸發,可用于連接合法性處理** @param session 會話對象*/void onOpen(WebSocketSession session);/*** 當觸發websocket消息幀時,將會通知該方法** @param session 會話對象* @param message 消息對象:文本、二進制數據等*/void onMessage(WebSocketSession session, WebSocketFrame message);/*** 當連接關閉時將通知該方法,需要釋放資源并且清理session** @param session 會話對象*/void onClose(WebSocketSession session);/*** 當連接過程中、通信過程中出現異常將通知該方法** @param session 會話對象* @param error 異常信息*/void onError(WebSocketSession session, Throwable error);
}
2、會話Session類
public class WebSocketSession {/*** netty channelContext 對象,注意此對象不可序列化*/private ChannelHandlerContext channelContext;/*** 請求路由路徑*/private String path;/*** 擴展參數map,如需自定義攜帶參數時即可用于存入*/private Map<String, Object> attributes = new ConcurrentHashMap<>();/*** 只提供一個有參構造方法,channelContext和 path不能為空** @param channelContext channel上下文* @param path 請求路徑* @param attributes 擴展參數map*/public WebSocketSession(ChannelHandlerContext channelContext, String path, Map<String, Object> attributes) {this.channelContext = channelContext;this.path = path;this.attributes = attributes;}/*** 提供一個靜態方法獲取對象** @param channelContext channel上下文* @param path 請求路徑* @param attributes 擴展參數map* @return*/public static WebSocketSession of(ChannelHandlerContext channelContext, String path, Map<String, Object> attributes) {return new WebSocketSession(channelContext, path, attributes);}/*** 發送TextWebSocketFrame消息** @param text 消息文本*/public void sendText(String text) {this.channelContext.writeAndFlush(new TextWebSocketFrame(text));}/*** 發送BinaryWebSocketFrame 二進制消息** @param data*/public void sendBinary(ByteBuf data) {this.channelContext.writeAndFlush(new BinaryWebSocketFrame(data));}/*** 處理心跳檢測ping消息,響應pong** @param frame pong消息幀*/public void sendPong(PongWebSocketFrame frame) {this.channelContext.writeAndFlush(frame);}/*** 強制關閉連接*/public void close() {this.channelContext.close();}/*** 優雅關閉連接,其實就是發送了關閉協議幀** @param frame 關閉幀*/public void close(CloseWebSocketFrame frame) {this.channelContext.writeAndFlush(frame.retain()).addListener(ChannelFutureListener.CLOSE);}/*** 優雅關閉連接,其實就是發送了關閉協議幀** @param reason 關閉原因*/public void close(String reason) {CloseWebSocketFrame frame = new CloseWebSocketFrame(WebSocketCloseStatus.SERVICE_RESTART,reason);close(frame);}/*** set自定義擴展值** @param name 名稱* @param value 值*/public void setAttribute(String name, Object value) {this.attributes.put(name