Springboot整合Netty簡單實現1對1聊天(vx小程序服務端)

本文功能實現較為簡陋,demo內容僅供參考,有不足之處還請指正。

背景

一個小項目,用于微信小程序的服務端,需要實現小程序端可以和他人1對1聊天

實現功能

Websocket、心跳檢測、消息持久化、離線消息存儲

Netty配置類

/*** @author Aseubel*/
@Component
@Slf4j
@EnableConfigurationProperties(NettyServerConfigProperties.class)
public class NettyServerConfig {private ChannelFuture serverChannelFuture;// 心跳間隔(秒)private static final int HEARTBEAT_INTERVAL = 15;// 讀超時時間private static final int READ_TIMEOUT = HEARTBEAT_INTERVAL * 2;// 使用線程池管理private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);private final EventLoopGroup workerGroup = new NioEventLoopGroup();private final NettyServerConfigProperties properties;// 由于在后面的handler中有依賴注入類,所以要通過springboot的ApplicationContext來獲取Bean實例@Autowiredprivate ApplicationContext applicationContext;public NettyServerConfig(NettyServerConfigProperties properties) {this.properties = properties;}@PostConstructpublic void startNettyServer() {// 使用獨立線程啟動Netty服務new Thread(() -> {try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();SSLContext sslContext = SslUtil.createSSLContext("PKCS12",properties.getSslPath(), properties.getSslPassword());// SSLEngine 此類允許使用ssl安全套接層協議進行安全通信SSLEngine engine = sslContext.createSSLEngine();engine.setUseClientMode(false);pipeline.addLast(new SslHandler(engine)); // 設置SSLpipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(10 * 1024 * 1024));// 最大10MBpipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpHandler());// 只有text和binarytext的幀能經過WebSocketServerProtocolHandler,所以心跳檢測這兩個都得放前面pipeline.addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HeartbeatHandler());pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024 * 1024));pipeline.addLast(applicationContext.getBean(MessageHandler.class));pipeline.addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 統一處理所有未被前面handler捕獲的異常log.error("全局異常捕獲: {}", cause.getMessage());ctx.channel().close();}});}});serverChannelFuture = bootstrap.bind(properties.getPort()).sync();// 保持通道開放serverChannelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}@PreDestroypublic void stopNettyServer() {// 優雅關閉if (serverChannelFuture != null) {serverChannelFuture.channel().close();}bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}

Handler

心跳檢測

/*** @author Aseubel*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {private static final int HEARTBEAT_INTERVAL = 15; // 心跳間隔(秒)private static final int MAX_MISSED_HEARTBEATS = 2; // 允許丟失的心跳次數// 記錄每個連接的丟失心跳次數private final Map<ChannelId, Integer> missedHeartbeats = new ConcurrentHashMap<>();@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 添加 IdleStateHandler 觸發讀空閑事件ctx.pipeline().addLast(new IdleStateHandler(HEARTBEAT_INTERVAL * MAX_MISSED_HEARTBEATS, 0, 0));scheduleHeartbeat(ctx);}private void scheduleHeartbeat(ChannelHandlerContext ctx) {ctx.executor().scheduleAtFixedRate(() -> {if (ctx.channel().isActive()) {ctx.writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)));// 記錄丟失的心跳次數missedHeartbeats.compute(ctx.channel().id(), (k, v) -> v == null ? 1 : v + 1);}}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof PongWebSocketFrame) {// 收到 Pong 后重置丟失計數missedHeartbeats.remove(ctx.channel().id());ctx.fireChannelRead(msg); // 傳遞消息給后續處理器} else {ctx.fireChannelRead(msg);}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {int missed = missedHeartbeats.getOrDefault(ctx.channel().id(), 0);if (missed >= MAX_MISSED_HEARTBEATS) {// 超過最大丟失次數,關閉連接System.out.println("連接超時,關閉連接" + ctx.channel().id().asLongText());ctx.close();cleanOfflineResources(ctx.channel());}}}private void cleanOfflineResources(Channel channel) {MessageHandler.removeUserChannel(channel);missedHeartbeats.remove(channel.id());}
}

處理http請求,建立連接

/*** @author Aseubel* @description 處理websocket連接請求,將code參數存入channel的attribute中* @date 2025-02-21 15:34*/
public class HttpHandler extends ChannelInboundHandlerAdapter {public static final AttributeKey<String> WS_TOKEN_KEY = AttributeKey.valueOf("code");@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 判斷是否是連接請求if (msg instanceof FullHttpRequest) {FullHttpRequest request = (FullHttpRequest) msg;try {QueryStringDecoder decoder = new QueryStringDecoder(request.uri());ctx.channel().attr(WS_TOKEN_KEY).set(decoder.parameters().get("code").get(0));} catch (Exception e) {throw new AppException("非法的websocket連接請求");}// 將 FullHttpRequest 轉發到 MessageHandlerctx.fireChannelRead(request);// 重新設置 uri,將請求轉發到 websocket handler,否則無法成功建立連接request.setUri("/ws");}// 消息直接交給下一個 handlersuper.channelRead(ctx, msg);}}

?消息處理

/*** @author Aseubel* @description 處理 WebSocket 消息* @date 2025-02-21 15:33*/
@Component
@Slf4j
@Sharable
public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {public static final AttributeKey<String> WS_TOKEN_KEY = AttributeKey.valueOf("code");public static final AttributeKey<String> WS_USER_ID_KEY = AttributeKey.valueOf("userId");private static final Map<String, Queue<WebSocketFrame>> OFFLINE_MSGS = new ConcurrentHashMap<>();private static final Map<String, Channel> userChannels = new ConcurrentHashMap<>();@Autowiredprivate ThreadPoolTaskExecutor threadPoolExecutor;@Resourceprivate IMessageRepository messageRepository;// 提供受控的訪問方法public static void removeUserChannel(Channel channel) {userChannels.values().remove(channel);}public static boolean containsUser(String userId) {return userChannels.containsKey(userId);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object req) throws Exception {if (req instanceof FullHttpRequest) {String code = getCodeFromRequest(ctx); // 從請求中提取 codeString userId = getOpenid(APPID, SECRET, code);    // 驗證 code 獲取 openiduserChannels.put(userId, ctx.channel());ctx.channel().attr(WS_USER_ID_KEY).set(userId);System.out.println("客戶端連接成功,用戶id:" + userId);// 由于這里還在處理握手請求也就是建立連接,所以需要延遲發送離線消息new Thread(() -> {try {Thread.sleep(50);OFFLINE_MSGS.getOrDefault(userId, new LinkedList<>()).forEach(ctx::writeAndFlush);OFFLINE_MSGS.remove(userId);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();} else if (req instanceof TextWebSocketFrame ) {this.channelRead0(ctx, (TextWebSocketFrame) req);} else {ctx.fireChannelRead(req);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {if (frame instanceof TextWebSocketFrame) {MessageEntity message = validateMessage(ctx.channel().attr(WS_USER_ID_KEY).get(), (TextWebSocketFrame) frame);saveMessage(message);sendOrStoreMessage(message.getToUserId(), frame);} else {ctx.close();}}// 處理連接斷開@Overridepublic void channelInactive(ChannelHandlerContext ctx) {System.out.println("客戶端斷開連接,用戶id:" + ctx.channel().attr(WS_USER_ID_KEY).get());Channel channel = ctx.channel();for (Map.Entry<String, Channel> entry : userChannels.entrySet()) {if (entry.getValue() == channel) {userChannels.remove(entry.getKey());break;}}}private MessageEntity validateMessage(String userId, TextWebSocketFrame textFrame) {String message = textFrame.text();try {JsonObject json = JsonParser.parseString(message).getAsJsonObject();String toUserId = json.get("toUserId").getAsString();String content = json.get("content").getAsString();String type = json.get("type").getAsString();if (type.equals("text") || type.equals("image")) {return new MessageEntity(userId, toUserId, content, type);} else {throw new AppException("非法的消息類型!");}} catch (Exception e) {throw new AppException("非法的消息格式!");}}private void sendOrStoreMessage(String toUserId, WebSocketFrame message) {if (isUserOnline(toUserId)) {Channel targetChannel = userChannels.get(toUserId);if (targetChannel != null && targetChannel.isActive()) {targetChannel.writeAndFlush(message.retain());}} else {// 存儲原始WebSocketFrame(需保留引用)OFFLINE_MSGS.computeIfAbsent(toUserId, k -> new LinkedList<>()).add(message.retain());}}private void saveMessage(MessageEntity message) {threadPoolExecutor.execute(() -> {messageRepository.saveMessage(message);});}private boolean isUserOnline(String userId) {return userChannels.containsKey(userId);}private String getCodeFromRequest(ChannelHandlerContext ctx) {String code = ctx.channel().attr(WS_TOKEN_KEY).get();// 檢查 code 參數是否存在且非空if (code == null || code.isEmpty()) {throw new IllegalArgumentException("WebSocket token  is missing or empty");}return code;}private String getOpenid(String appid, String secret, String code) {Map<String, String> paramMap = new HashMap<>();paramMap.put("appid", appid);paramMap.put("secret", secret);paramMap.put("js_code", code);paramMap.put("grant_type", "authorization_code");String result = HttpClientUtil.doGet(WX_LOGIN, paramMap);//獲取請求結果JSONObject jsonObject = JSON.parseObject(result);String openid = jsonObject.getString("openid");//判斷openid是否存在if (StringUtils.isEmpty(openid)) {throw new WxException(jsonObject.getString("errcode"), jsonObject.getString("errmsg"));}return openid;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof AppException appCause) {log.error("AppException caught: {}", appCause.getInfo());} else if (cause instanceof WxException wxCause) {log.error("WxException caught: {}", wxCause.getMessage());} else {log.error("Exception caught: {}", cause.getMessage(), cause);}ctx.close(); // 建議關閉發生異常的連接}}

連接及消息格式:

wss://127.0.0.1:21611/ws?code=xxxxxx{"toUserId": "1001","type": "text","content": "Hello World!"
}

規定了type只有text和image兩種,text為文本content,image則為Base64編碼格式

本文功能實現較為簡陋,demo內容僅供參考,可能有注釋錯誤或設計不合理的地方

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/75607.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/75607.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/75607.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

GitLab 中文版17.10正式發布,27項重點功能解讀【二】

GitLab 是一個全球知名的一體化 DevOps 平臺&#xff0c;很多人都通過私有化部署 GitLab 來進行源代碼托管。極狐GitLab 是 GitLab 在中國的發行版&#xff0c;專門為中國程序員服務。可以一鍵式部署極狐GitLab。 學習極狐GitLab 的相關資料&#xff1a; 極狐GitLab 官網極狐…

好消息!軟航文檔控件(NTKO WebOffice)在Chrome 133版本上提示擴展已停用的解決方案

軟航文檔控件現有版本依賴Manifest V2擴展技術支持才能正常運行&#xff0c;然而這個擴展技術到2025年6月在Chrome高版本上就徹底不支持了&#xff0c;現在Chrome 133開始的版本已經開始彈出警告&#xff0c;必須手工開啟擴展支持才能正常運行。那么如何解決這個技術難題呢&…

字典樹與01trie

字典樹簡介 當我們通過字典查一個字或單詞的時候&#xff0c;我們會通過前綴或關鍵字的來快速定位一個字的位置&#xff0c;進行快速查找。 字典樹就是類似字典中索引表的一種數據結構&#xff0c;能夠幫助我們快速定位一個字符串的位置。 字典樹是一種存儲字符串的數據結構…

二十五、實戰開發 uni-app x 項目(仿京東)- 前后端輪播圖

定義了一個名為 Swiper 的Java類,用于表示一個輪播圖實體。它使用了 Jakarta Persistence API (JPA) 來映射數據庫表,并使用了 Lombok 庫來簡化代碼。以下是對代碼的詳細講解: 1. 包聲明 package com.jd.jdmall.model; 這行代碼聲明了該類所在的包路徑為 com.jd.jdmall.mode…

游戲搖桿開發:利用 Windows API 實現搖桿輸入捕獲

在現代游戲開發中&#xff0c;游戲搖桿&#xff08;Joystick&#xff09;作為一種重要的輸入設備&#xff0c;能夠為玩家提供更加沉浸式的游戲體驗。Windows 操作系統提供了一系列 API 函數&#xff0c;允許開發者輕松地捕獲和處理游戲搖桿的輸入。本文將介紹如何使用 Windows …

Ceph集群2025(Squid版)快速對接K8S cephFS文件存儲

ceph的塊存儲太簡單了。所以不做演示 查看集群 創建一個 CephFS 文件系統 # ceph fs volume create cephfs01 需要創建一個子卷# ceph fs subvolume create cephfs01 my-subvol -----------------#以下全部自動創建好 # ceph fs ls name: cephfs01, metadata pool: c…

Python中數據結構元組詳解

在Python中&#xff0c;元組&#xff08;Tuple&#xff09;是一種不可變的序列類型&#xff0c;常用于存儲一組有序的數據。與列表&#xff08;List&#xff09;不同&#xff0c;元組一旦創建&#xff0c;其內容無法修改。本文將詳細介紹元組的基本操作、常見運算、內置函數以及…

游戲引擎學習第183天

回顧和今天的計劃 我對接下來的進展感到非常興奮。雖然我們可能會遇到一些問題&#xff0c;但昨天我們差不多完成了將所有內容遷移到新的日志系統的工作&#xff0c;我們正在把一些內容整合進來&#xff0c;甚至是之前通過不同方式記錄時間戳的舊平臺層部分&#xff0c;現在也…

Spring 如何處理循環依賴

在 Spring 框架里&#xff0c;循環依賴指的是多個 Bean 之間相互依賴&#xff0c;從而形成一個閉環。例如&#xff0c;Bean A 依賴 Bean B&#xff0c;而 Bean B 又依賴 Bean A。Spring 主要通過三級緩存機制來處理循環依賴&#xff0c;下面詳細介紹相關內容。 1. 三級緩存的定…

Android開發layer-list

Android開發layer-list 它的用處可以在drawable上進行多圖拼接&#xff0c;比如啟動頁&#xff0c;不想圖片被拉伸就這么做。還有做某些線突出來。 示例代碼&#xff1a; <?xml version"1.0" encoding"utf-8"?> <layer-list xmlns:android&q…

手機測試,工作中學習

要學習各種機型的截圖方式、開發模式在哪。 榮耀機型&#xff1a;截圖&#xff1a;關節快速敲兩下。開發者模式在“系統和更新”里。 1.出現缺陷&#xff0c;需要獲取日志。 學習adb生成日志&#xff1a;當測試中出現缺陷的&#xff0c;使用adb logcat -d > d:/log.txt …

OBS虛擬背景深度解析:無需綠幕也能打造專業教學視頻(附插件對比)

想要錄制教學視頻卻苦于背景雜亂&#xff1f;本文將手把手教你用OBS實現專業級虛擬背景效果&#xff0c;無需綠幕也能輕松營造沉浸式教學場景。文末附6個提升畫面質感的免費背景資源&#xff01; 一、虛擬背景的核心價值&#xff1a;從「教師宿舍」到「虛擬講堂」的蛻變 我們調…

零基礎搭建智能法律知識庫!騰訊云HAI實戰教程

為什么需要法律知識庫&#xff1f; 想象一下&#xff0c;你的所有法律文件都在手邊&#xff0c;隨時可以搜索和分析。這就是法律知識庫的魅力所在。對于法律專業人士、處理大量法律文檔的企業&#xff0c;甚至是希望了解法律事項的普通人來說&#xff0c;法律知識庫都是一個不…

Rust從入門到精通之進階篇:19.Rust 生態系統

Rust 生態系統 Rust 擁有一個豐富而活躍的生態系統&#xff0c;提供了各種庫和框架來支持不同領域的開發。在本章中&#xff0c;我們將探索 Rust 生態系統中的主要組件&#xff0c;了解常用的庫和工具&#xff0c;以及如何在項目中有效地使用它們。 Rust 包管理&#xff1a;C…

前端面試:如何去衡量用戶操作過程中否卡頓?

衡量用戶在應用中的操作是否卡頓是前端開發中的一個關鍵任務&#xff0c;涉及用戶體驗的各個方面。以下是一些常用的方法和工具&#xff0c;可以幫助你有效地評估用戶操作中的卡頓情況&#xff1a; 1. 使用性能分析工具 瀏覽器開發者工具&#xff1a;大多數現代瀏覽器&#xf…

Python技術棧與數據可視化創意實踐詳解(三)

Python在數據可視化領域憑借豐富的庫和靈活的生態系統&#xff0c;能夠實現從基礎圖表到復雜交互式可視化的全場景覆蓋。以下從技術選型、創意實現到實戰優化進行系統化解析&#xff0c;并提供可直接落地的代碼示例。 一、Python數據可視化技術棧 1. 基礎與統計可視化 Matplotl…

訂票系統|基于Java+vue的火車票訂票系統(源碼+數據庫+文檔)

訂票系統目錄 基于Springbootvue的火車票訂票系統 一、前言 二、系統設計 三、系統功能設計 1會員信息管理 2 車次信息管理 3訂票訂單管理 4留言板管理 四、數據庫設計 五、核心代碼 六、論文參考 七、最新計算機畢設選題推薦 八、源碼獲取&#xff1a; 博主介紹…

Snowflake 算法的實現

snowflake(雪花算法)是一個開源的分布式 ID 生成算法&#xff0c;結果是一個 long 型的 ID。snowflake 算法將 64bit 劃分為多段&#xff0c;分開來標識機器、時間等信息&#xff0c;具體組成結構如下圖所示&#xff1a; snowflake 算法的核心思想是使用 41bit 作為毫秒數&…

C 語言中, scanf 函數在哪些情況下會結束輸入讀取:

在 C 語言中&#xff0c; scanf 函數在以下幾種情況下會結束輸入讀取&#xff1a; &#xff1a; 1. 遇到指定格式匹配失敗&#xff1a; scanf 按照格式字符串要求讀取輸入。當輸入數據格式與格式字符串不匹配時&#xff0c;就會結束讀取。例如 scanf(“%d”, &num) 要求輸…

括號合法題

一、括號合法題 2116. 判斷一個括號字符串是否有效 //采用從左往右和從右往左遍歷的貪心算法&#xff0c;分別保證前綴合法&#xff0c;后綴合法。public boolean canBeValid(String s, String locked) {int ns.length();if (n%21) return false;int num0;// 從左到右掃描&…