JavaWebsocket-demo

Websocket客戶端

pom依賴

		<dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.4.0</version></dependency>

客戶端代碼片段


@Component
@Slf4j
public class PositionAlarmListener {@Autowiredprivate BigScreenService bigScreenService;@Autowiredprivate ConfigurationSystemService configurationSystemService;@Beanpublic WebSocketClient webSocketClient() {WebSocketClient wsc = null;ThirdPartConfDto thirdPartConfDto = configurationSystemService.getConfig();Map<String, String> httpHeaders = new HashMap<>();try {
//            String reqUrl = String.format("ws://%s%s?apikey=%s", servicePort, SOCKET_URL, apikey);String reqUrl = thirdPartConfDto.getAlarmWebsocketUrl();log.info("websocketclient.position.reqUrl:{}",reqUrl);wsc = new WebSocketClient(new URI(reqUrl), httpHeaders) {@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("UnmannedPlane==connect==build");}@Overridepublic void onMessage(String message) {log.info("websocketclient.position.receive.message:{}", message);CompletableFuture.runAsync(() -> {try {if (StringUtils.isEmpty(message)) {return;}
//                            JSONObject parse = JSONObject.parseObject(message);ThirdPositionAlarmDto thirdPositionAlarmDto = JSONObject.parseObject(message,ThirdPositionAlarmDto.class);String type = thirdPositionAlarmDto.getType();log.info("websocketclient.position.receive.message-type:{}", type);if (StringUtils.isEmpty(type)) {log.error("websocket.type.is null");return;}if(!type.equals(ThirdPositionAlarmEnum.TYPE_TAG.getCode())){log.error("websocket.type.is not tag");return;}boolean bigScreenPush = bigScreenService.pusdata(thirdPositionAlarmDto,thirdPartConfDto);} catch (Exception e) {log.error("websocketclient.position.error:", e);}});}@Overridepublic void onClose(int i, String s, boolean b) {log.info("websocketclient.position.close code:{} reason:{} {}", i, s, b);}@Overridepublic void onError(Exception e) {log.info("websocketclient.position.connect==error:", e);}};wsc.connect();return wsc;} catch (Exception e) {log.error("websocketclient.position==webSocketClient:", e);}return wsc;}}

客戶端代碼片段(新增心跳檢測、重連)

客戶端代碼片段–配置新增

# websocketclient config
#websocket.client.config.wsUrl=ws://10.212.188.45:8880/position
websocket.client.config.wsUrl=ws://10.81.12.100:8090/websocket
websocket.client.config.enableHeartbeat=true
websocket.client.config.heartbeatInterval=20000
websocket.client.config.enableReconnection=true
@Configuration
@ConfigurationProperties(prefix="websocket.client.config")
public class WebsocketClientConfiguration {/*** websocket server ws://ip:port*/private String wsUrl;/*** 是否啟用心跳監測 默認開啟*/private Boolean enableHeartbeat;/*** 心跳監測間隔 默認20000毫秒*/private Integer heartbeatInterval;/*** 是否啟用重連接 默認啟用*/private Boolean enableReconnection;public String getWsUrl() {return wsUrl;}public void setWsUrl(String wsUrl) {this.wsUrl = wsUrl;}public Boolean getEnableHeartbeat() {return enableHeartbeat;}public void setEnableHeartbeat(Boolean enableHeartbeat) {this.enableHeartbeat = enableHeartbeat;}public Integer getHeartbeatInterval() {return heartbeatInterval;}public void setHeartbeatInterval(Integer heartbeatInterval) {this.heartbeatInterval = heartbeatInterval;}public Boolean getEnableReconnection() {return enableReconnection;}public void setEnableReconnection(Boolean enableReconnection) {this.enableReconnection = enableReconnection;}
}

客戶端代碼片段–客戶端創建

@Slf4j
@Configuration
public class WebsocketClientBeanConfig {/*** 系統配置實現類*/@Autowiredprivate ConfigurationSystemService configurationSystemService;@Beanpublic WebsocketRunClient websocketRunClient(WebsocketClientConfiguration websocketClientConfiguration){String wsUrl = websocketClientConfiguration.getWsUrl();Boolean enableHeartbeat = websocketClientConfiguration.getEnableHeartbeat();Integer heartbeatInterval = websocketClientConfiguration.getHeartbeatInterval();Boolean enableReconnection = websocketClientConfiguration.getEnableReconnection();try {WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl));websocketRunClient.connect();websocketRunClient.setConnectionLostTimeout(0);new Thread(()->{while (true){try {Thread.sleep(heartbeatInterval);if(enableHeartbeat){websocketRunClient.send("[websocketclient] 心跳檢測");log.info("[websocketclient] 心跳檢測");}} catch (Exception e) {log.error("[websocketclient] 發生異常{}",e.getMessage());try {if(enableReconnection){log.info("[websocketclient] 重新連接");websocketRunClient.reconnect();websocketRunClient.setConnectionLostTimeout(0);}}catch (Exception ex){log.error("[websocketclient] 重連異常,{}",ex.getMessage());}}}}).start();return websocketRunClient;} catch (URISyntaxException ex) {log.error("[websocketclient] 連接異常,{}",ex.getMessage());}return null;}}

客戶端代碼片段–客戶端心跳檢測

@Slf4j
@Component
public class WebsocketRunClient extends WebSocketClient {/*** 大屏推送地址*/@Value("${thirdpart.bigscreen.positionhttpurl}")private String httpUrl;/*** 位置檢測距離*/@Value("${thirdpart.positionrange:100}")private Double positionrange;/*** 大屏接口推送實現*/@Autowiredprivate BigScreenService bigScreenService;public WebsocketRunClient(URI serverUri) {super(serverUri);}@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("[websocketclient] Websocket客戶端連接成功");}@Overridepublic void onMessage(String message) {log.info("[websocketclient.receive] 收到消息:{}",message);
//        ThirdPartConfDto thirdPartConfDto = configurationSystemService.getConfig();ThirdPartConfDto thirdPartConfDto = ThirdPartConfDto.builder().bigScreenHttpUrl(httpUrl).positionRange(positionrange).build();CompletableFuture.runAsync(() -> {try {if (StringUtils.isEmpty(message.trim())) {return;}if(message.contains("心跳檢測")){return;}List<ThirdPositionAlarmDto> thirdPositionAlarmDtoList = JSONObject.parseArray(message,ThirdPositionAlarmDto.class);for(ThirdPositionAlarmDto thirdPositionAlarmDto: thirdPositionAlarmDtoList){String type = thirdPositionAlarmDto.getType();log.info("websocketclient.position.receive.message-type:{}", type);if (StringUtils.isEmpty(type)) {log.error("websocket.type.is null");return;}if(!type.equals(ThirdPositionAlarmEnum.TYPE_TAG.getCode())){log.error("websocket.type.is not tag");return;}boolean bigScreenPush = bigScreenService.pusdata(thirdPositionAlarmDto,thirdPartConfDto);}} catch (Exception e) {log.error("websocketclient.position.error:", e);}});}@Overridepublic void onClose(int code, String reason, boolean remote) {log.info("[websocketclient] Websocket客戶端關閉");System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);}@Overridepublic void onError(Exception e) {log.info("[websocketclient] Websocket客戶端出現異常, 異常原因為:{}",e.getMessage());}

Websocket 服務端

服務端pom依賴

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

服務端代碼片段,websocket-配置

websocket-服務配置
@Configuration
public class WebSocketConfig {/*** ServerEndpointExporter注入* 該Bean會自動注冊使用@ServerEndpoint注解申明的WebSocket endpoint** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

服務端代碼片段,websocket-服務端廣播消息

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArraySet;/*** @author 謫居馬道* @describe:websocket,消息廣播* @date 2025/5/21*/
@Component
@ServerEndpoint("/websocket")
public class WebSocket {private final Logger log = LoggerFactory.getLogger(WebSocket.class);//與某個客戶端的連接會話,需要通過它來給客戶端發送數據private Session session;//concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();/**         * 連接建立成功調用的方法         */@OnOpenpublic void onOpen(Session session){this.session=session;webSocketSet.add(this);//加入set中log.info("【WebSocket消息】有新的連接,總數:{}",webSocketSet.size());}/**         * 連接關閉調用的方法         */@OnClosepublic void onClose(){webSocketSet.remove(this);//從set中刪除log.info("【WebSocket消息】連接斷開,總數:{}",webSocketSet.size());        }/**         * 收到客戶端消息后調用的方法         * @param message 客戶端發送過來的消息         */@OnMessagepublic void onMessage(String message ){log.info("【WebSocket消息】收到客戶端發來的消息:{}",message);sendMessage(message);}public void sendMessage(String message){for (WebSocket webSocket:webSocketSet) {log.info("【webSocket消息】廣播消息,message={}",message);try {webSocket.session.getBasicRemote ().sendText(message);} catch (Exception e) {e.printStackTrace ();}            }}}

服務端代碼片段,websocket-服務端一對一消息

@Component
@ServerEndpoint("/websocket/{terminalId}")
public class WebSocketService {private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);/*** 保存連接信息*/private static final Map<String, Session> CLIENTS = new ConcurrentHashMap<>();private static final Map<String, AtomicInteger> TERMINAL_IDS = new HashMap<>();/*** 需要注入的Service聲明為靜態,讓其屬于類*/private static TerminalService terminalService;/*** 注入的時候,給類的Service注入*/@Autowiredpublic void setMchDeviceInfoService(TerminalService terminalService) {WebSocketService.terminalService = terminalService;}@OnOpenpublic void onOpen(@PathParam("terminalId") String terminalId, Session session) throws Exception {logger.info(session.getRequestURI().getPath() + ",打開連接開始:" + session.getId());//當前連接已存在,關閉if (CLIENTS.containsKey(terminalId)) {onClose(CLIENTS.get(terminalId));}TERMINAL_IDS.put(terminalId, new AtomicInteger(0));CLIENTS.put(terminalId, session);logger.info(session.getRequestURI().getPath() + ",打開連接完成:" + session.getId());terminalService.terminal();}@OnClosepublic void onClose(@PathParam("terminalId") String terminalId, Session session) throws Exception {logger.info(session.getRequestURI().getPath() + ",關閉連接開始:" + session.getId());CLIENTS.remove(terminalId);TERMINAL_IDS.remove(terminalId);logger.info(session.getRequestURI().getPath() + ",關閉連接完成:" + session.getId());}@OnMessagepublic void onMessage(String message, Session session) {logger.info("前臺發送消息:" + message);if ("心跳".equals(message)) {//重置當前終端心跳次數TERMINAL_IDS.get(message).set(0);return;}sendMessage("收到消息:" + message, session);}@OnErrorpublic void onError(Session session, Throwable error) {logger.error(error.toString());}public void onClose(Session session) {//判斷當前連接是否在線
//        if (!session.isOpen()) {
//            return;
//        }try {session.close();} catch (IOException e) {logger.error("金斗云關閉連接異常:" + e);}}public void heartbeat() {//檢查所有終端心跳次數for (String key : TERMINAL_IDS.keySet()) {//心跳3次及以上的主動斷開if ((TERMINAL_IDS.get(key).intValue() >= 3)) {logger.info("心跳超時,關閉連接:" + key);onClose(CLIENTS.get(key));}}for (String key : CLIENTS.keySet()) {//記錄當前終端心跳次數TERMINAL_IDS.get(key).incrementAndGet();sendMessage("心跳", CLIENTS.get(key));}}public void sendMessage(String message, Session session) {try {session.getAsyncRemote().sendText(message);logger.info("推送成功:" + message);} catch (Exception e) {logger.error("推送異常:" + e);}}public boolean sendMessage(String terminalId, String message) {try {Session session = CLIENTS.get(terminalId);session.getAsyncRemote().sendText(message);logger.info("推送成功:" + message);return true;} catch (Exception e) {logger.error("推送異常:" + e);return false;}}
}

Websocket測試工具

postman-測試

參考:
site1: https://maimai.cn/article/detail?fid=1747304025&efid=p7JdUMG2Gi0PrMX7xSXpXw
site2: https://blog.csdn.net/weixin_46768610/article/details/128711019

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/82444.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/82444.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/82444.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java Collection(集合) 接口

Date: 2025-05-21 20:21:32 author: lijianzhan Java 集合框架提供了一組接口和類&#xff0c;以實現各種數據結構和算法。 以下是關于 Java 集合的核心內容說明&#xff1a; /*** Java Collection Framework 說明&#xff1a;** 在 Java 中&#xff0c;集合&#xff08;Collec…

讓MySQL更快:EXPLAIN語句詳盡解析

前言 在數據庫性能調優中&#xff0c;SQL 查詢的執行效率是影響系統整體性能的關鍵因素之一。MySQL 提供了強大的工具——EXPLAIN 語句&#xff0c;幫助開發者和數據庫管理員深入分析查詢的執行計劃&#xff0c;從而發現潛在的性能瓶頸并進行針對性優化。 EXPLAIN 語句能夠模…

Java基礎 Day20

一、HashSet 集合類 1、簡介 HashSet 集合底層采取哈希表存儲數據 底層是HashMap 不能使存取有序 JDK8之前的哈希表是數組和鏈表&#xff0c;頭插法 JDK8之后的哈希表是數組、鏈表和紅黑樹&#xff0c;尾插法 2、存儲元素 &#xff08;1&#xff09;如果要保證元素的唯…

2505C++,32位轉64位

原文 假設有個想要將一個32位值傳遞給一個帶64位值的函數的函數.你不關心高32位的內容,因為該值是傳遞給回調函數的直通值,回調函數會把它截斷為32位值. 因此,你都擔心編譯器一般生成的將32位值擴展到64位值的那條指令的性能影響. 我懷疑這條指令不是程序中的性能瓶頸. 我想出…

光伏電站及時巡檢:守護清潔能源的“生命線”

在“雙碳”目標驅動下&#xff0c;光伏電站作為清潔能源的主力軍&#xff0c;正以年均20%以上的裝機增速重塑全球能源格局。然而&#xff0c;這些遍布荒漠、屋頂的“光伏矩陣”并非一勞永逸的能源提款機&#xff0c;其穩定運行高度依賴精細化的巡檢維護。山東棗莊觸電事故、衢州…

C++初階-list的使用2

目錄 1.std::list::splice的使用 2.std::list::remove和std::list::remove_if的使用 2.1remove_if函數的簡單介紹 基本用法 函數原型 使用函數對象作為謂詞 使用普通函數作為謂詞 注意事項 復雜對象示例 2.2remove與remove_if的簡單使用 3.std::list::unique的使用 …

OpenHarmony平臺驅動使用(一),ADC

OpenHarmony平臺驅動使用&#xff08;一&#xff09; ADC 概述 功能簡介 ADC&#xff08;Analog to Digital Converter&#xff09;&#xff0c;即模擬-數字轉換器&#xff0c;可將模擬信號轉換成對應的數字信號&#xff0c;便于存儲與計算等操作。除電源線和地線之外&#…

CSS【詳解】彈性布局 flex

適用場景 一維&#xff08;行或列&#xff09;布局 基本概念 包裹所有被布局元素的父元素為容器 所有被布局的元素為項目 項目的排列方向&#xff08;垂直/水平&#xff09;為主軸 與主軸垂直的方向交交叉軸 容器上啟用 flex 布局 將容器的 display 樣式設置為 flex 或 i…

基于MATLAB實現傳統譜減法以及兩種改進的譜減法(增益函數譜減法、多帶譜減法)的語音增強

基于MATLAB實現傳統譜減法以及兩種改進的譜減法&#xff08;增益函數譜減法、多帶譜減法&#xff09;的語音增強代碼示例&#xff1a; 傳統譜減法 function enhanced traditional_spectral_subtraction(noisy, fs, wlen, inc, NIS, a, b)% 參數說明&#xff1a;% noisy - 帶…

symbol【ES6】

你一閉眼世界就黑了&#xff0c;你不是主角是什么&#xff1f; 目錄 什么是Symbol&#xff1f;?Symbol特點?&#xff1a;創建方法&#xff1a;注意點&#xff1a;不能進行運算&#xff1a;顯示調用toString() --沒有意義隱式轉換boolean 如果屬性名沖突了怎么辦&#xff1f;o…

LeetCode 649. Dota2 參議院 java題解

https://leetcode.cn/problems/dota2-senate/description/ 貪心。不會寫。 class Solution {public String predictPartyVictory(String senate) {boolean rtrue,dtrue;int flag0;//flag>0,d前面有r;flag<0,r前面有dchar[] senatessenate.toCharArray();//每一輪while(r…

機器學習第二十二講:感知機 → 模仿大腦神經元的開關系統

機器學習第二十二講&#xff1a;感知機 → 模仿大腦神經元的開關系統 資料取自《零基礎學機器學習》。 查看總目錄&#xff1a;學習大綱 關于DeepSeek本地部署指南可以看下我之前寫的文章&#xff1a;DeepSeek R1本地與線上滿血版部署&#xff1a;超詳細手把手指南 感知機詳解…

maven快速上手

之前我們項目如果要用到其他額外的jar包&#xff0c;需要自己去官網下載并且導入。但是有maven后&#xff0c;直接在maven的pom.xml文件里用代碼配置即可&#xff0c;配置好后maven會自動幫我們聯網下載并且會自動導入該jar包 在右邊的maven中&#xff0c;我們可以看到下載安裝…

科學養生指南:解鎖健康生活密碼

健康是人生最寶貴的財富&#xff0c;在快節奏的現代生活中&#xff0c;科學養生成為保持良好狀態的關鍵。遵循現代醫學與營養學的研究成果&#xff0c;無需依賴傳統中醫理論&#xff0c;我們也能找到適合自己的養生之道。? 均衡飲食是健康的基石。現代營養學強調 “食物多樣&…

Qt狀態機QStateMachine

QStateMachine QState 提供了一種強大且靈活的方式來表示狀態機中的狀態&#xff0c;通過與狀態機類(QStateMachine)和轉換類(QSignalTransition&#xff0c; QEventTransition)結合&#xff0c;可以實現復雜的狀態邏輯和用戶交互。合理使用嵌套狀態機、信號轉換、動作與動畫、…

C++八股 —— 原子操作

文章目錄 1. 什么是原子操作2. 原子操作的特點3. 原子操作的底層原理4. 內存序內存屏障 5. 原子操作和互斥鎖的對比6. 常用的原子操作7. 相關問題討論 參考&#xff1a; C atomic 原子操作_c 原子操作-CSDN博客DeepSeek 1. 什么是原子操作 原子操作&#xff08;Atomic Opera…

雙紫擒龍紫紫紅指標源碼學習,2025升級版紫紫紅指標公式-重點技術

VAR1:MA((LOWHIGHCLOSE)/3,5); VAR2:CLOSEHHV(C,4) AND REF(C,1)LLV(C,4); 雙紫擒龍:REF(C,1)LLV(C,4) AND C>REF(C,2) OR REF(C,2)LLV(C,4) AND REF(C,1)<REF(C,3) AND REF(C,2)<REF(C,4) AND C>REF(C,1); VAR4:VAR1>REF(VAR1,1) AND REF(VAR1,1)<REF(VAR1,…

NeuralRecon技術詳解:從單目視頻中實現三維重建

引言 三維重建是計算機視覺領域中的一項關鍵技術&#xff0c;它能夠從二維圖像中恢復出三維形狀和結構。隨著深度學習的發展&#xff0c;基于學習的方法已經成為三維重建的主流。NeuralRecon是一種先進的三維重建方法&#xff0c;它能夠從單目視頻中實時生成高質量的三維模型。…

Ubuntu 上開啟 SSH 服務、禁用密碼登錄并僅允許密鑰認證

1. 安裝 OpenSSH 服務 如果尚未安裝 SSH 服務&#xff0c;運行以下命令&#xff1a; sudo apt update sudo apt install openssh-server2. 啟動 SSH 服務并設置開機自啟 sudo systemctl start ssh sudo systemctl enable ssh3. 生成 SSH 密鑰對&#xff08;本地機器&#xf…

MySQL 索引的增刪改查

MySQL 索引的增刪改查 1 建表時創建索引 [UNIQUE|FULLTEXT|SPATIAL] INDEX|KEY [別名] (字段名 [(長度)] [ASC|DESC] )主鍵直接寫&#xff1a; PRIMARY KEY (Id)例如&#xff1a; CREATE TABLE people (id int NOT NULL PRIMARY KEY AUTO_INCREMENT,last_name varchar(10)…