c++ websocket客戶端_websocket使用

websocket使用

一、介紹

????在項目開發過程中,很多時候,我們不可避免的需要實現的一個功能:??

服務端實時發送信息給客戶端。比如實時公告、實時訂單通知、實時報警推送等等,登錄后的客戶端需要知道與它相關的實時信息,以便進行下一步處理。

????從事服務端開發的特別是C/C++開發的技術人員都知道,客戶端可以通過套接口與服務端保持套接口長連接。這樣就服務端就可以實時給客戶端推送信息了,但是這是針對TCP的長連接,如果是針對HTTP協議(在TCP層之上的實現了超文本協議的短鏈接--一般情況下短鏈接),實現服務端與客戶端通知一般有一下兩種方式:

1、HTTP輪詢

一般情況下,http是短鏈接,也就是請求響應式的,每一次請求都對應一次回復,回復完成后連接斷開,這樣做的好處就是不需要保持與服務端的長連接,因為HTTP協議底層還是TCP協議,服務端根據機器性能都有一個最大的套接口連接數限制。以windows為例子,如windows下TCP連接數受多個參數影響:

  • 最大tcp連接數

[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
TcpNumConnections = 0x00fffffe (Default = 16,777,214)

以上注冊表信息配置單機的最大允許的TCP連接數,默認為 16M。這個數值看似很大,這個并不是限制最大連接數的唯一條件,還有其他條件會限制到TCP 連接的最大連接數。

  • 最大動態端口數
    TCP客戶端和服務器連接時,客戶端必須分配一個動態端口,默認情況下這個動態端口的分配范圍為 1024-5000 ,也就是說默認情況下,客戶端最多可以同時發起3977個Socket連接。我們可以修改如下注冊表來調整這個動態端口的范圍.

[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
MaxUserPort = 5000 (Default = 5000, Max = 65534)

最大調整值65535,也就是最大能有6w多tcp連接

  • 最大TCB數量
    系統為每個TCP連接分配一個TCP控制塊(TCP control block or TCB),這個控制塊用于緩存TCP連接的一些參數,每個TCB需要分配 0.5 KB的pagepool 和 0.5KB 的Non-pagepool,也就說,每個TCP連接會占用 1KB 的系統內存。換句話說TCP的連接數也受到系統的內存的限制。系統的最大TCB數量由如下注冊表設置決定:

[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
MaxFreeTcbs = 2000 (Default = RAM dependent, but usual Pro = 1000, Srv=2000)

非Server版本,MaxFreeTcbs 的默認值為1000(64M 以上物理內存),Server 版本,這個的默認值為 2000。也就是說,默認情況下,Server版本最多同時可以建立并保持2000個TCP連接

  • 最大TCB Hash table數量
    TCB是通過Hash table來管理的,下面注冊表設置決定了這個Hash table的大小

HKEY_LOCAL_MACHINE \System \CurrentControlSet \services \Tcpip \Parameters]
MaxHashTableSize = 512 (Default = 512, Range = 64-65536)

指明分配pagepool內存的數量,也就是說,如果MaxFreeTcbs = 1000, 則pagepool的內存數量為500KB那么 MaxHashTableSize應大于500才行。這個數量越大,則Hash table的冗余度就越高,每次分配和查找TCP.這里 MaxHashTableSize被配置為比MaxFreeTcbs大4倍,這樣可以大大增加TCP建立的速度。

????知道了底層TCP限制之后,我們可以知道實際上長連接在普通的windows機器上最多大概是1000路左右,也就并發是1000個http長連接。如果將長連接改為http短鏈接,http請求完成后立即釋放,那么服務端的并發就會大大增加,如果請求速度不太耗時,服務端的并發量有可能達到1w或者更大!!!c
????使用HTTP輪詢,就是使用HTTP短鏈接模式,定期與服務端進行通信主動獲取服務端信息的方式實現服務端“推送”信息至客戶端,它有如下特點:

  • 避免與服務端的長連接,減低服務端壓力,提升服務端的并發訪問能力

  • 客戶端主動與服務端通信,需要定期與服務端進行輪詢查詢獲取信息,但對客戶端而言存在延遲,延遲時間最大為輪詢時間。

  • 服務端需要做額外的工作包保存一些實時數據,等待客戶端拉取。

2、websocket

??? http長輪詢因為存在信息延遲的問題,有時候,我們需要實時收到服務端推送的信息就無法避免使用websocket了。在前面我已經說到,websocket實際上也是http升級upgrade之后的tcp長連接,長連接的數量限制經過調整后最大能有(65525-1024)= 64501個長連接(在內存、句柄數等不設限情況下)。但實際測試,可能服務端的websocket連接數可能維持的2w左右(經過實際測試),如果改為云主機,連接數可能達到6w左右。如果需要更多了連接,我們可以考慮集群的方式,如n臺高性能機器能支持最大n*6w的websocket連接!!它有如下優點:

  • WebSocket一次握手就可以使客戶端和服務端建立長連接,并進行雙向數據傳輸

  • 服務端可主動向客戶端發送信息,實時性很高

  • 與HTTP協議比起來,WebSocket協議每次數據傳輸的頭信息都較小,節約帶寬

針對瀏覽器本身,連接后臺最大的websocket數量也是有限制的,以下是我搜索到的各個瀏覽器支持的最大websocket連接數:

IE        6個
chrome 256個
Firefox 200個
safari 1273個(MAC版本)

超過各個瀏覽器最大數,后臺就收不到請求。

二、 websocket實現

說明了原理之后,接下來就是如何實現websocket,這里我提供了幾種實現方式

1、J2EE7自帶的原始實現

服務端實現
WebSocket是JavaEE7新支持的,Javax.websocket.server包含注解、類、接口用于創建和配置服務端點;Javax.websocket包則包含服務端點和客戶斷電公用的注解、類、接口、異常,創建一個注解式的端點,將自己的寫的類以及類中的一些方法用前面提到的包中的注解裝飾,這里我提供了一個基本的websocket實現接口,提供了連接、關閉、接收消息、發送消息接口:

package com.easystudy.websocket;

import java.io.IOException;
import javax.websocket.Session;
import lombok.extern.slf4j.Slf4j;

/**
* @歡迎加入群聊,一起分享,一起合作,一起進步
* QQ交流群:961179337
* 微信賬號:lixiang6153
* 微信公眾號:IT技術快餐
* 電子郵箱:lixx2048@163.com
*/
@Slf4j
public abstract class BaseWS {
/**
* 終端初次連接
* @param userId userId
* @param session session
* @throws IOException IOException
*/
abstract void onOpen(Session session, Long userId) throws IOException;

/**
* 終端斷開連接
*/
abstract void onClose();

/**
* 終端傳遞參數
* @param session session
* @param message message
*/
abstract void onMessage(String message, Session session);

/**
* 報錯
* @param session session
* @param error error
*/
abstract void onError(Session session, Throwable error);

/**
* 向終端發送
* @param message message
* @throws IOException IOException
*/
abstract void sendMessage(String message) throws IOException;

void heartBeat(Long user, String signal, Session session) {
if ("ping".equalsIgnoreCase(signal)) {
try {
log.info("heart beat=====> {},user:{}, sessionId:{}", signal, user, session.getId());
session.getBasicRemote().sendText("pong");
log.info("heart beat<====> {},user:{}, sessionId:{}", "pong", user, session.getId());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

websocket端點實現:

package com.easystudy.websocket;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

/**
* @ServerEndpoint 該注解可以將類定義成一個WebSocket服務器端,
* @OnOpen 表示有瀏覽器鏈接過來的時候被調用
* @OnClose 表示瀏覽器發出關閉請求的時候被調用
* @OnMessage 表示瀏覽器發消息的時候被調用
* @OnError 表示報錯了
* @歡迎加入群聊,一起分享,一起合作,一起進步
* QQ交流群:961179337
* 微信賬號:lixiang6153
* 微信公眾號:IT技術快餐
* 電子郵箱:lixx2048@163.com
*/
@Component
@ServerEndpoint("/ws/msg/{userid}")
public class MessageEndPoint extends BaseWS {
// concurrent包下線程安全的Set
private static final CopyOnWriteArraySet SESSIONS = new CopyOnWriteArraySet<>();
private Session session;
@Override
@OnOpen
public void onOpen(Session session, @PathParam("userid") Long userid) {
this.session = session;
SESSIONS.add(this);
System.out.println(String.format(userid + "成功建立連接~ 當前總連接數為:%s", SESSIONS.size()));
System.out.println(this);
}
@Override
@OnClose
public void onClose() {
SESSIONS.remove(this);
System.out.println(String.format("成功關閉連接~ 當前總連接數為:%s", SESSIONS.size()));
}
@Override
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("收到客戶端【" +session.getId()+ "】消息:" + message);
}
@Override
@OnError
public void onError(Session session, Throwable error) {
System.out.println("發生錯誤");
error.printStackTrace();
}
/**
* 指定發消息
* @param message
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群發消息: 靜態方法
* @param message
*/
public static void fanoutMessage(String message) {
SESSIONS.forEach(ws -> ws.sendMessage(message));
}
}

我們監聽的端點是

/ws/msg/{userid}

端點攜帶了一個參數userid,表示長連接的用戶id,我們在對應的方法實現中通過注解引用對應參數即可。

我們使用J2EE7標準注解,必須注入相應的ServerEndpointExporter類:

@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

最后,我們提供一個controller用于測試服務端往發送客戶端信息:

package com.easystudy.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.easystudy.websocket.MessageEndPoint;

@RestController
@RequestMapping("/response")
public class TestController {

@GetMapping("/send")
public String reponseMsgToClient(@RequestParam(name="content", required = true)String content){
System.out.println("發送消息:[" + content + "]給客戶端!");

MessageEndPoint.fanoutMessage(content);

return "消息【" +content+ "】發送成功!";
}
}

客戶端實現
在實現websocket服務端之后,我們就需要實現websocket客戶端,連接到服務器,接收服務端消息,實現代碼如下所示:





websocket測試





WebSocket Demo





服務器回復內容:

發送



啟動瀏覽器,打開控制臺,可以看到連接到服務器字樣,輸入內容,點擊發送,服務端后臺打印接收到客戶端信息并廣播到客戶端,客戶端控制臺也打印了相同字樣。

2、springboot實現

除了J2EE原始實現之外,使用springboot之后,功能就更強大了,它提供了一個核心的配置類WebSocketConfigurer用于注冊各種websocket端點、攔截器、處理器信息。如下我們通過繼承WebSocketConfigurer配置對應端點、處理器和攔截器:

package com.easystudy.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import com.easystudy.websocket.MsgWebSocketHandler;
import com.easystudy.websocket.MsgWebSocketInterceptor;

@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 設置端點連接路徑和處理器
registry.addHandler(new MsgWebSocketHandler(), "/ws/msg/{userid}")
.setAllowedOrigins("*")
// 設置攔截器
.addInterceptors(new MsgWebSocketInterceptor());
}

}

我們配置了自己的處理器處理對應端點、配置了攔截器進行信息攔截。

我這里的攔截器主要攔截請求參數,限定請求參數必須攜帶用戶名作為連接的唯一標識:

package com.easystudy.websocket;

import java.util.Map;

import javax.servlet.http.HttpServletRequest;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

/**
* 自定義攔截器攔截WebSocket請求
* @author Administrator
* QQ交流群:961179337
* 微信賬號:lixiang6153
* 微信公眾號:IT技術快餐
* 電子郵箱:lixx2048@163.com
*/
public class MsgWebSocketInterceptor implements HandshakeInterceptor{

/**
* 前置攔截一般用來注冊用戶信息,綁定 WebSocketSession
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map attributes) throws Exception {
System.out.println("前置攔截~~");
if (!(request instanceof ServletServerHttpRequest))
return true;
// 獲取用戶名信息
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
String path = servletRequest.getServletPath();
System.out.println("path:" + path);
String userName = servletRequest.getParameter("userName");
//String userName = (String) servletRequest.getSession().getAttribute("userName");
if (null == userName) {
userName = "lixx";
}
// 保存屬性到session屬性信息中
attributes.put("userName", userName);
return true;
}
/**
* 后置攔截器
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
System.out.println("后置攔截~~");
}
}

攔截器獲取到對應屬性之后存入到session的會話屬性之中,連接之后可以通過session獲取會話屬性。

處理器實現:

package com.easystudy.websocket;

import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* QQ交流群:961179337
* 微信賬號:lixiang6153
* 微信公眾號:IT技術快餐
* 電子郵箱:lixx2048@163.com
*/
public class MsgWebSocketHandler implements WebSocketHandler{
private static final Map SESSIONS = new ConcurrentHashMap<>();
/**
* 建立新的 socket 連接后回調的方法
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String userName = session.getAttributes().get("userName").toString();
SESSIONS.put(userName, session);
System.out.println(String.format("成功建立連接~ userName: %s", userName));
}
/**
* 連接關閉時,回調的方法
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
System.out.println("連接已關閉,status:" + closeStatus);
}
/**
* 接收客戶端發送的 Socket
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception {
String msg = message.getPayload().toString();
System.out.println("接收到消息:" + msg);
}
/**
* 連接出錯時,回調的方法
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.out.println("連接出錯");
if (session.isOpen()) {
session.close();
}
}
/**
* 這個是 WebSocketHandler是否處理部分消息,返回 false就完事了
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 指定發消息
* @param userName
* @param message
*/
public static void sendMessage(String userName, String message) {
WebSocketSession webSocketSession = SESSIONS.get(userName);
if (webSocketSession == null || !webSocketSession.isOpen())
return;
try {
webSocketSession.sendMessage(new TextMessage(message));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群發消息
* @param message
*/
public static void fanoutMessage(String message) {
SESSIONS.keySet().forEach(us -> sendMessage(us, message));
}
}

在建立連接之后,我們通過:session.getAttributes().get("userName").toString();獲取到連接時候提供的用戶參數,用于后續指定用戶P2P發送信息。

客戶端實現代碼如下:





測試





WebSocket Demo





服務器回復內容:

發送



最后測試發送信息如下:

b7c9a0528e483319964c8fa1134d7fdd.png

3、socketJS實現

一些瀏覽器中缺少對WebSocket的支持,因此,回退選項是必要的,而Spring框架提供了基于SockJS協議的透明的回退選項。SockJS的一大好處在于提供了瀏覽器兼容性。優先使用原生WebSocket,如果在不支持websocket的瀏覽器中,會自動降為輪詢的方式。

SockJS是一個瀏覽器JavaScript庫,它提供了一個類似于網絡的對象。SockJS提供了一個連貫的、跨瀏覽器的Javascript API,它在瀏覽器和web服務器之間創建了一個低延遲、全雙工、跨域通信通道。除此之外,spring也對socketJS提供了支持。此處實現與springboot實現相似,這里不具體介紹,只給出對應代碼,實現如下(后續提供的代碼是實際項目上的代碼,請各位保持修改個更新,謝謝!!!)。

端點、攔截器、通道等配置如下

package com.donwait.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

/**
* 同 HTTP 在 TCP 套接字上添加 請求-響應 模型層一樣,STOMP 在 WebSocket 之上提供了一個基于 幀的線路格式層,用來定義消息語義;
* (STOMP在 WebSocket 之上提供了一個基于 幀的線路格式層,用來定義消息語義)
* STOMP 幀:該幀由命令,一個或多個 頭信息 以及 負載所組成。如下就是發送 數據的一個 STOMP幀:
* SEND
* destination:/app/marco
* content-length:20
*
* {\"message\":\"Marco!\"}
*
* 分析:
* A1)SEND:STOMP命令,表明會發送一些內容;
* A2)destination:頭信息,用來表示消息發送到哪里;
* A3)content-length:頭信息,用來表示 負載內容的 大小;
* A4)空行:
* A5)幀內容(負載)內容:
*/
@Configuration
@EnableWebSocketMessageBroker // 能夠在 WebSocket 上啟用 STOMP
public class WebSocketAutoConfig implements WebSocketMessageBrokerConfigurer {
/*
* 將 "/dys" 路徑 注冊為 STOMP 端點,即客戶端在訂閱或發布消息 到目的地址前,要連接該端點,
* 就是說用戶發送請求 url='/項目名/dys'與 STOMP server進行連接,之后再轉發到訂閱url
* 端點的作用:客戶端在訂閱或發布消息 到目的地址前,要連接該端點
* 備注:client連接地址和發送地址是不同的,以本例為例,前者是/項目名/dys, 后者是/項目名/app/XX,先連接后發送
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 在網頁上我們就可以通過這個鏈接 /demon/websocket ==來和服務器的WebSocket連接
// 連接:new SockJS("http://127.0.0.1:7019/websocket/dys");
registry.addEndpoint("/dys") // 開啟 /dys端點
.setAllowedOrigins("*") // 允許跨域訪問
.setHandshakeHandler(new HandshakeHandler()) // 握手處理器
.addInterceptors(new HandshakeInterceptor()) // 握手攔截器
.withSockJS(); // 允許使用socketJs方式訪問
}
/*
* 消息傳輸參數配置
*/
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(8192) // 設置消息字節數大小
.setSendBufferSizeLimit(8192) // 設置消息緩存大小
.setSendTimeLimit(10000); // 設置消息發送時間限制毫秒
}
/*
* 輸入通道參數設置
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(8) // 設置消息輸入通道的線程池線程數
.maxPoolSize(16) // 最大線程數
.keepAliveSeconds(60); // 線程活動時間
registration.interceptors(createUserInterceptor()); // 注入用戶入站通道攔截器
}
/*
* 輸出通道參數設置
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(8)
.maxPoolSize(16);
}
/*
* 配置broker:
* 配置了一個 簡單的消息代理。如果不重載,默認case下,會自動配置一個簡單的內存消息代理,
* 用來處理 "/topic"為前綴的消息。但經過重載后,消息代理將會處理前綴為 "/topic" and "/queue"消息
* 分析:
* (1)應用程序的目的地 以 "/app" 為前綴,而代理的目的地以 "/topic" 和 "/queue" 作為前綴
* (2)以應用程序為目的地的消息將會直接路由到 帶有 @MessageMapping注解的控制器方法中
* (3)而發送到代理上的消息,包括 @MessageMapping注解方法的返回值所形成的消息,將會路由到代理上,并最終發送到訂閱這些目的地客戶端
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 代理的目的地址為topic或queque(代理目的地以 /topic為前綴)
// 廣播消息訂閱:stompClient.subscribe('/topic/alarm', function (response)
registry.enableSimpleBroker("/topic", "/queue");
// 全局使用的消息前綴(客戶端訂閱路徑上會體現出來):應用程序前綴:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注釋的方法.
// 客戶端發送端點前綴:stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));
registry.setApplicationDestinationPrefixes("/app");
// 點對點使用的訂閱前綴(客戶端訂閱路徑上會體現出來),不設置的話,默認也是/user/
// registry.setUserDestinationPrefix("/user/");
/*
// 啟用了STOMP代理中繼功能,并將其代理目的地前綴設置為 /topic and /queue,并將所有目的地前綴為 "/topic" or "/queue"的消息都會發送到STOMP代理中[真正消息代理activeMQ或RabbitMQ]
registry.enableStompBrokerRelay("/topic", "/queue") // 設置可以訂閱的地址,也就是服務器可以發送的地址
.setRelayHost("192.168.12.18")
.setRelayPort(5672)
.setClientLogin("admin")
.setClientPasscode("admin")
.setSystemHeartbeatReceiveInterval(2000) // 設置心跳信息接收時間間隔
.setSystemHeartbeatSendInterval(2000); // 設置心跳信息發送時間間隔
// 應用程序前綴:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注釋的方法.
registry.setApplicationDestinationPrefixes("/app");
*/
}
/**
*
* @Title: createUserInterceptor
* @Description: 將客戶端渠道攔截器加入spring ioc容器
* @return
*/
@Bean
public UserInterceptor createUserInterceptor() {
return new UserInterceptor();
}
}

握手攔截器配置:

package com.donwait.websocket;

import java.util.Map;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {

@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Map attributes) throws Exception {
log.info("============握手前===========");
/*
// 解決The extension [x-webkit-deflate-frame] is not supported問題
if(request.getHeaders().containsKey("Sec-WebSocket-Extensions")) {
request.getHeaders().set("Sec-WebSocket-Extensions", "permessage-deflate");
}
// 檢查session的值是否存在
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession(false);
String accountId = (String) session.getAttribute(Constants.SKEY_ACCOUNT_ID);
//把session和accountId存放起來
attributes.put(Constants.SESSIONID, session.getId());
attributes.put(Constants.SKEY_ACCOUNT_ID, accountId);
}
*/
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Exception ex) {
log.info("============握手后===========");
super.afterHandshake(request, response, wsHandler, ex);
}
}

用戶攔截器配置:

package com.donwait.websocket;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.MessageHeaderAccessor;

import com.donwait.amqp.RabbitMQ;
import com.donwait.model.RtmpInviteInfo;
import com.donwait.model.User;
import com.donwait.protobuf.RTMP_INVITE_PARAM;
import com.donwait.redis.RtmpInviteService;

/**
* @ClassName: UserInterceptor
* @Description: 客戶端渠道攔截適配器
*/
@SuppressWarnings("deprecation")
public class UserInterceptor extends ChannelInterceptorAdapter {
@Autowired
private RtmpInviteService redisRtmpInviteService;
@Autowired
private RabbitMQ rabbitMQ;
//@Autowired
//private UserCacheService userCacheService;

/**
* 獲取包含在stomp中的用戶信息
*/
@SuppressWarnings("rawtypes")
@Override
public Message> preSend(Message> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
Object name = ((Map) raw).get("name");
if (name instanceof LinkedList) {
// 設置當前訪問器的認證用戶
accessor.setUser(new User(((LinkedList) name).get(0).toString()));
}
}
}
return message;
}

@Override
public void postSend(Message> message, MessageChannel channel, boolean sent) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(message);
// ignore non-STOMP messages like heartbeat messages
if(sha.getCommand() == null) {
return;
}

//這里的sessionId和accountId對應HttpSessionIdHandshakeInterceptor攔截器的存放key
//String sessionId = sha.getSessionAttributes().get(Constants.SESSIONID).toString();
//String accountId = sha.getSessionAttributes().get(Constants.SKEY_ACCOUNT_ID).toString();
//判斷客戶端的連接狀態
switch(sha.getCommand()) {
case CONNECT:
connect(sha);
break;
case CONNECTED:
break;
case DISCONNECT:
disconnect(sha);
break;
default:
break;
}
}

// 連接成功
private void connect(StompHeaderAccessor sha){
System.out.println(" STOMP 連接成功:" + sha.getUser().getName());
}

// 斷開連接
private void disconnect(StompHeaderAccessor sha){
System.out.println(" STOMP 連接斷開" + sha.getUser().getName());

// 移除用戶信息
//userCacheService.delete(sha.getUser().getName());

String strKey = String.format("rtmp_invite_info::%s_*", sha.getUser().getName());
List invite_list = redisRtmpInviteService.findByKeyEx(strKey);
if (invite_list != null) {
for(RtmpInviteInfo rtmpInviteInfo : invite_list){
// 通知接入服務器
RTMP_INVITE_PARAM.Builder builder = RTMP_INVITE_PARAM.newBuilder();
builder.setRtmpIP(rtmpInviteInfo.getRtmpIp());
builder.setRtmpPort(rtmpInviteInfo.getRtmpPort());
builder.setDevID(rtmpInviteInfo.getDevId());
builder.setProtocolType(rtmpInviteInfo.getProtoType());
builder.setStreamType(rtmpInviteInfo.getStreamType());
rabbitMQ.send(rtmpInviteInfo.getExchangeName(), rtmpInviteInfo.getRouteKey(), builder.build().toByteArray());
strKey = String.format("%s::%s_%s_%d_%d_%d", rtmpInviteInfo.getCacheName(), sha.getUser().getName(), rtmpInviteInfo.getDevId(), rtmpInviteInfo.getChannelNum().longValue(), rtmpInviteInfo.getProtoType().longValue(), rtmpInviteInfo.getStreamType().longValue());
redisRtmpInviteService.deleteByKey(strKey);
}
}
}
}

處理器代碼:

package com.donwait.websocket;

import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HandshakeHandler extends DefaultHandshakeHandler{

public HandshakeHandler(){
log.debug("new HandshakeHandler");
}
}

配置完成之后,需要封裝一個消息服務實現點對點和廣播形式發送:

package com.donwait.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Service;

/**
* websocket廣播推送服務
* @author Administrator
*
*/
@Service
public class MessageService {
@Autowired
SimpMessageSendingOperations sendOperation; // 消息發送模板
@Autowired
private SimpUserRegistry userRegistry; // 用戶列表【連接的客戶端信息】

/**
* 廣播形式發送報警信息
* @param
*/
public void broadcast(String destination,String message) {
sendOperation.convertAndSend(destination, message);
System.out.println("路由:"+ destination + " 推送消息:" + message);
}

/**
* 單獨發送信息給某用戶
* 客戶端發起連接時候必須攜帶用戶名參數
* stompClient.connect(
* {
* name: 'lixx' // 攜帶客戶端信息
* }
* @param
*/
public void send(String destination,String username, String message) {
for (SimpUser user : userRegistry.getUsers()) {
if (user.getName().equals(username)){
sendOperation.convertAndSendToUser(username, destination, message);
System.out.println("路由:"+ destination + " 推送消息:" + message);
break;
}
}
}
}

最后,送上測試的html客戶端頁面:




stomp



Welcome

發送消息
訂閱用戶消息/user/queue/message
訂閱報警消息/topic/alarm







至此,websocket的具體介紹與實例都已送上,如果需要源碼或者技術交流或者合作請聯系一下方式

源碼獲取、合作、技術交流請獲取如下聯系方式:

QQ交流群:961179337

91030e818df566455654448910741f74.png

微信賬號:lixiang6153
公眾號:IT技術快餐
電子郵箱:lixx2048@163.com

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/542563.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/542563.shtml
英文地址,請注明出處:http://en.pswp.cn/news/542563.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

漢子編碼比字母編碼長_字母/博客作者編碼問題(使用動態編程)

漢子編碼比字母編碼長Problem statement: 問題陳述&#xff1a; Shivang is a blog writer and he is working on two websites simultaneously. He has to write two types of blogs which are: Shivang是一位博客作家&#xff0c;他同時在兩個網站上工作。 他必須寫兩種類型…

php parent報錯,mac brew 安裝php擴展報錯:parent directory is world writable but not sticky

$ brew install php70-mcrypt報錯&#xff1a;Error: parent directory is world writable but not sticky搜索到github的答案https://github.com/Homebrew/legacy-homebrew/issues/40345原因&#xff1a;/tmp目錄權限不對$ ls -ld /private/tmp打印出來 /private/tmp 被標黃了…

在cordova中使用HTML5的多文件上傳

2019獨角獸企業重金招聘Python工程師標準>>> 我們先看看linkface給開放的接口&#xff1a; 字段類型必需描述api_idstring是API 賬戶api_secretstring是API 密鑰selfie_filefile見下方注釋需上傳的圖片文件 1&#xff0c;上傳本地圖片進行檢測時選取此參數selfie_ur…

python dataframe切片_python pandas dataframe 行列選擇,切片操作方法

SQL中的select是根據列的名稱來選取&#xff1b;Pandas則更為靈活&#xff0c;不但可根據列名稱選取&#xff0c;還可以根據列所在的position&#xff08;數字&#xff0c;在第幾行第幾列&#xff0c;注意pandas行列的position是從0開始&#xff09;選取。相關函數如下&#xf…

php根據設備判斷訪問,PHP判斷設備訪問來源

/*** 判斷用戶請求設備是否是移動設備* return bool*/function isMobile() {//如果有HTTP_X_WAP_PROFILE則一定是移動設備if (isset($_SERVER[HTTP_X_WAP_PROFILE])) {return true;}//如果via信息含有wap則一定是移動設備,部分服務商會屏蔽該信息if (isset($_SERVER[HTTP_VIA])…

機器學習 深度學習 ai_如何學習機器學習和人工智能?

機器學習 深度學習 aiSTRATEGY 戰略 Learn theory practical aspects. 學習理論和實踐方面的知識。 (At first get an overview of what you are going to learn). (首先獲得要學習的內容的概述)。 Gain a good hold/insight on each concept. 掌握/理解每個概念。 If you …

linux常用命令和配置

2019獨角獸企業重金招聘Python工程師標準>>> 啟動php&#xff1a; /etc/init.d/php-fpm restart 查看PHP運行目錄&#xff1a; which php /usr/bin/php 查看php-fpm進程數&#xff1a; ps aux | grep -c php-fpm 查看運行內存 /usr/bin/php -i|grep mem iptables如…

centos7時間同步_centos 8.x系統配置chrony時間同步服務

centos 8.x系統配置chrony時間同步服務CentOS 7.x默認使用的時間同步服務為ntp服務&#xff0c;但是CentOS 8開始在官方的倉庫中移除了ntp軟件&#xff0c;換成默認的chrony進行時間同步的服務&#xff0c;chrony既可以作為客戶端向其他時間服務器發送時間同步請求&#xff0c;…

php可以用scanf,C/C++中 使用scanf和printf如何讀入輸出double型數據。

黃舟2017-04-17 13:47:232樓注意scanf函數和printf函數是不同尋常的函數&#xff0c;因為它們都沒有將函數的參數限制為固定數量。scanf函數和printf函數又可變長度的參數列表。當調用帶可變長度參數列表的函數時&#xff0c;編譯器會安排float參數自動轉換成為double類型&…

ICWAI和ICWA的完整形式是什么?

ICWAI / ICWA&#xff1a;印度成本與工程會計師協會/印度兒童福利法 (ICWAI / ICWA: Institute of Cost and Works Accountants of India / Indian Child Welfare Act) 1)ICWAI&#xff1a;印度成本與工程會計師協會 (1) ICWAI: Institute of Cost and Works Accountants of In…

深入研究java.lang.Runtime類【轉】

轉自&#xff1a;http://blog.csdn.net/lastsweetop/article/details/3961911 目錄(?)[-] javalang 類 RuntimegetRuntimeexitaddShutdownHookremoveShutdownHookhaltrunFinalizersOnExitexecexecexecexecexecexecavailableProcessorsfreeMemorytotalMemorymaxMemorygcrunFina…

java隊列實現限流,java中應對高并發的兩種策略

目的&#xff1a;提高可用性通過ExecutorService實現隊列泄洪//含有20個線程的線程池private ExecutorService executorService Executors.newFixedThreadPool(20);將有并發壓力的下游代碼放入到線程池的submit方法中&#xff0c;如下&#xff1a;//同步調用線程池的submit方法…

crontab 日志_liunx 中定時清理過期日志文件

問題描述經常遇到日志文件過多&#xff0c;占用大量磁盤空間&#xff0c;需要定期刪除過期日志。問題涉及方面刪除過期日志的腳本。定時任務刪除任務腳本先查詢到過期的日志文件&#xff0c;然后刪除。語法find path -option [ -print ] [ -exec -ok command ] …

JavaScript | 數組的常用屬性和方法

JavaScript的通用屬性和數組方法 (Common properties and methods of array in JavaScript ) Properties/MethodsDescriptionsarray.lengthReturns the length of the array/total number of elements of the array array[index]Returns the item name stored at “index” pos…

php dbutils 使用,dbutilsapi

相對lisp?而?言,可以使?用.net的強?大api,實現各種酷炫功能。相對c#及arx?...文件utils.py的模塊名分別是mycompany.utils和 mycompany.web.utils。 mycompany......CouchDB -b 關閉后臺運行的 CouchDB : CouchDB -d web 訪問:http://127.0.0.1:5984/_utils/index.html E-…

html 導航欄

<!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>lvnian學習(http://lvnian.blog.51cto.com/)</title> <style> ul {list-style-type:none;margin:0;padding:0; }a:link,a:visited{display:block;font-weigh…

將搜索二叉樹轉換為鏈表_將給定的二叉樹轉換為雙鏈表(DLL)

將搜索二叉樹轉換為鏈表Given a Binary tree and we have to convert it to a Doubly Linked List (DLL). 給定二叉樹&#xff0c;我們必須將其轉換為雙鏈表(DLL)。 Algorithm: 算法&#xff1a; To solve the problem we can follow this algorithm: 為了解決這個問題&#…

cuda編程_CUDA刷新器:CUDA編程模型

CUDA刷新器&#xff1a;CUDA編程模型 CUDA Refresher: The CUDA Programming Model CUDA&#xff0c;CUDA刷新器&#xff0c;并行編程 這是CUDA更新系列的第四篇文章&#xff0c;它的目標是刷新CUDA中的關鍵概念、工具和初級或中級開發人員的優化。 CUDA編程模型提供了GPU體系結…

php curl_error源碼,PHP curl_error函數

PHP curl_error函數(PHP 4 > 4.0.3, PHP 5)curl_error — 返回一個保護當前會話最近一次錯誤的字符串說明string curl_error ( resource $ch )返回一條最近一次cURL操作明確的文本的錯誤信息。參數ch由 curl_init() 返回的 cURL 句柄。返回值返回錯誤信息或 (空字符串) 如果…

SQL中Where與Having的區別

“Where” 是一個約束聲明&#xff0c;使用Where來約束來之數據庫的數據&#xff0c;Where是在結果返回之前起作用的&#xff0c;且Where中不能使用聚合函數。“Having”是一個過濾聲明&#xff0c;是在查詢返回結果集以后對查詢結果進行的過濾操作&#xff0c;在Having中可以使…