記錄spring4中websocket的使用方式
pom jar包配置
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>${spring.version}</version>
</dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>${spring.version}</version>
</dependency>
其中spring.version的配置是:
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><spring.version>4.0.0.RELEASE</spring.version><java.version>1.8</java.version><druid.version>1.1.6</druid.version></properties>
涉及到json消息的支持jar用的是alibaba提供的:
<!-- json --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency>
配置websocket服務
在spring webscoket中有兩種方式配置webscoket服務,一種是xml中配置,一種是使用代碼繼承WebSocketConfigurer,這里使用第二種:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;/*** spring websocket配置* @author ThatWay* 2018-5-8*/
@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {//注冊webscoket處理類、webscocket的訪問地址、過濾處理類registry.addHandler(webSocketHandler(), "/ws").addInterceptors(webSocketInterceptor());}/*** websocket請求處理* @return*/@Beanpublic WebSocketHandler webSocketHandler() {return new WebScoketHandler();}/*** websocket攔截器* @return*/@Beanpublic WebSocketInterceptor webSocketInterceptor(){return new WebSocketInterceptor();}}
webscoket請求過濾
在上一步的服務配置中,使用的webSocketInterceptor是實現了HandshakeInterceptor接口的過濾處理類,它將攔截所有到達服務端的websocket請求,可websocket消息處理前和處理后插入動作。
這里面主要做的事是,客戶端創建連接時傳遞的參數可以取出來,放入到創建連接后產生的session中,在服務端下發消息時可以通過參數來區分session,下面代碼中作為session標識的是pageFlag參數。客戶端請求的地址是這樣的:ws://localhost:8080/integrate_pipe/ws?pageFlag=p1&actionFlag=simple
import javax.servlet.http.HttpServletRequest;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;/*** websocket請求過濾器* @author ThatWay* 2018-5-8*/
public class WebSocketInterceptor implements HandshakeInterceptor {private static Logger logger = LoggerFactory.getLogger(WebSocketInterceptor.class); @Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {logger.info("webscoket處理后過濾回調觸發");}@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {logger.info("webscoket處理前過濾回調觸發");boolean flag = true;//在調用handler前處理方法if (request instanceof ServletServerHttpRequest) {ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request; HttpServletRequest req = serverHttpRequest.getServletRequest();// 從請求中獲取頁面標志String pageFlag = req.getParameter("pageFlag");// 獲取初始化需要的數據String actionFlag = req.getParameter("actionFlag");if(StringUtils.isEmpty(pageFlag) || StringUtils.isEmpty(actionFlag) ){flag = false;logger.info("webscoket連接請求,頁面標志pageFlag:"+pageFlag+",動作標志:"+actionFlag+",參數不正確,請求拒絕");} else {logger.info("webscoket連接請求,頁面標志pageFlag:"+pageFlag+",動作標志:"+actionFlag);// 將頁面標識放入參數中,之后的session將根據這個值來區分attributes.put("pageFlag", pageFlag.trim());attributes.put("actionFlag", actionFlag.trim());}} else {flag = false;}return flag;}
}
消息處理
在服務配置中,使用的WebSocketHandler是繼承了TextWebSocketHandler的消息處理類,將由這個類來處理消息,spring中將webscoket相關的生命周期回調也封裝到了這里。另外,通過@Service將此類注解為服務,在其他業務controller中就可以使用此類方法觸發消息下發了。
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;import cn.qingk.entity.User;/*** 消息處理類* @author ThatWay* 2018-5-5*/
@Service
public class WebScoketHandler extends TextWebSocketHandler {private static Logger logger = LoggerFactory.getLogger(WebSocketHandler.class); // 頁面標識名稱private final String CLIENT_ID = "pageFlag";// 初始化動作標識名稱private final String ACTION_INIT = "actionFlag";// 頁面集合private static Map<String, WebSocketSession> clients = new ConcurrentHashMap<String, WebSocketSession>(); // 靜態變量,用來記錄當前在線連接數private static final AtomicInteger connectCount = new AtomicInteger(0);/***********/*** 連接建立成功后的回調*/@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { logger.info("wescoket成功建立連接"); // 頁面標識String pageFlag = getAttributeFlag(session,this.CLIENT_ID);// 初始化動作標識String reqAction = getAttributeFlag(session,this.ACTION_INIT);// 返回結果int code = WebSocketStatus.CODE_FAIL;String msg = WebSocketStatus.MSG_FAIL;String returnJson = "";if (!StringUtils.isEmpty(pageFlag)) {// 連接數加一,為了保證多個同頁面標識的請求能被處理addOnlineCount();int onlineCount = getOnlineCount();String key = pageFlag+"_"+onlineCount;//管理已連接的sessionclients.put(key, session);logger.info("在線屏數:"+onlineCount);// 從數據庫里查詢需要信息返回code = WebSocketStatus.CODE_SUCCESS;msg = WebSocketStatus.MSG_SUCCESS;// 查詢數據庫得到typeString type = WebSocketStatus.TYPE_BDXW;if (reqAction.toLowerCase().equals(WebSocketStatus.ACTION_SIMPLE)) {// DB基本數據logger.info("數據庫查詢【"+pageFlag+"】的基本數據");Map<String, Object> infoMap = new HashMap<String, Object>();infoMap.put("type", "qwzx");infoMap.put("title", "全網資訊");returnJson = this.makeInfoResponseJson(code, type,reqAction, msg, infoMap);} else if (reqAction.toLowerCase().equals(WebSocketStatus.ACTION_DETAIL)) {// DB數據列表logger.info("數據庫查詢【"+pageFlag+"】的列表數據");int totalCount = 1;List<Object> userList = new ArrayList<Object>();User user1 = new User();user1.setAddress("address 1");user1.setAge(18);user1.setId(1);user1.setName("name 1");userList.add(user1);returnJson = this.makeListResponseJson(code, type,reqAction, msg, totalCount,userList);} else {code = WebSocketStatus.CODE_FAIL;msg = WebSocketStatus.MSG_FAIL;logger.error("客戶端請求的action為:"+reqAction);}// 返回信息TextMessage returnMessage = new TextMessage(returnJson); session.sendMessage(returnMessage); } else {session.sendMessage(new TextMessage("無頁面標識,連接關閉!")); session.close();}} /*** 接收消息處理* 客戶端發送消息需遵循的格式:{"pageFlag": "p1","actionFlag": "simple/detail"}*/@Override public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { long start = System.currentTimeMillis();// 返回結果int code = WebSocketStatus.CODE_FAIL;String msg = WebSocketStatus.MSG_FAIL;String returnJson = "";//接收終端發過來的消息String reqMsg = message.getPayload();// 根據頁面標識進行邏輯處理,提取需要的數據if (!StringUtils.isEmpty(msg)) {JSONObject terminalMsg = JSONObject.parseObject(reqMsg);if (!terminalMsg.isEmpty()) {if (terminalMsg.containsKey("pageFlag") && terminalMsg.containsKey("actionFlag")) {//pageFlagString reqPageFlag = terminalMsg.getString("pageFlag");String reqAction = terminalMsg.getString("actionFlag");// 從數據庫里查詢需要信息返回code = WebSocketStatus.CODE_SUCCESS;msg = WebSocketStatus.MSG_SUCCESS;// 查詢數據庫得到typeString type = WebSocketStatus.TYPE_BDXW;if (reqAction.toLowerCase().equals(WebSocketStatus.ACTION_SIMPLE)) {// DB基本數據logger.info("數據庫查詢【"+reqPageFlag+"】的基本數據");Map<String, Object> infoMap = new HashMap<String, Object>();infoMap.put("type", "qwzx");infoMap.put("title", "全網資訊");returnJson = this.makeInfoResponseJson(code, type,reqAction, msg, infoMap);} else if (reqAction.toLowerCase().equals(WebSocketStatus.ACTION_DETAIL)) {// DB數據列表logger.info("數據庫查詢【"+reqPageFlag+"】的列表數據");int totalCount = 1;List<Object> userList = new ArrayList<Object>();User user1 = new User();user1.setAddress("address 1");user1.setAge(18);user1.setId(1);user1.setName("name 1");userList.add(user1);returnJson = this.makeListResponseJson(code, type,reqAction, msg, totalCount,userList);} else {code = WebSocketStatus.CODE_FAIL;msg = WebSocketStatus.MSG_FAIL;logger.error("客戶端請求的action為:"+reqAction);}}} else {logger.error("客戶端請求的消息轉換json為空");}} else {logger.error("客戶端請求的消息為空");}// 返回信息TextMessage returnMessage = new TextMessage(returnJson); long pass = System.currentTimeMillis() - start;logger.info("接收終端請求返回:" + returnMessage.toString()+",耗時:"+pass+"ms");// 向終端發送信息session.sendMessage(returnMessage); } /*** 出現異常時的回調*/@Override public void handleTransportError(WebSocketSession session, Throwable thrwbl) throws Exception { if(session.isOpen()){ session.close(); } logger.info("websocket 連接出現異常準備關閉");} /*** 連接關閉后的回調*/@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus cs) throws Exception { // 連接數減1for (Entry<String, WebSocketSession> entry : clients.entrySet()) {String clientKey = entry.getKey();WebSocketSession closeSession = entry.getValue();if(closeSession == session){logger.info("移除clientKey:"+clientKey);clients.remove(clientKey);decOnlineCount();int leftOnlineCount = getOnlineCount();logger.info("剩余在線屏數:"+leftOnlineCount);}}logger.info("websocket 連接關閉了"); } @Override public boolean supportsPartialMessages() { return false; } /*** 發送信息給指定頁面* @param clientId* @param message* @return*/public boolean sendMessageToPage(String pageFlag, TextMessage message) {boolean flag = false;int all_counter = 0;int send_counter = 0;long start = System.currentTimeMillis();if(!StringUtils.isEmpty(pageFlag)){for (Entry<String, WebSocketSession> entry : clients.entrySet()) {String clientKey = entry.getKey();// 給所有以此id標識開頭的終端發送消息if(clientKey.startsWith(pageFlag)){all_counter++;WebSocketSession session = entry.getValue();if (!session.isOpen()) {flag = false;} else {try {session.sendMessage(message);send_counter++;flag = true;logger.info("sendMessageToPage:[clientKey:"+clientKey+"],flag:"+flag);} catch (IOException e) {e.printStackTrace();flag = false;}}}}}long pass = System.currentTimeMillis() - start;logger.info("sendMessageToPage:"+pageFlag+",flag:"+flag+",all_counter:"+all_counter+",send_counter:"+send_counter+",pass:"+pass+"ms"); return flag;}/*** 發送信息給所有頁面* @param clientId* @param message* @return*/public boolean sendMessageToAll(TextMessage message) {boolean flag = false;int all_counter = 0;int send_counter = 0;long start = System.currentTimeMillis();for (Entry<String, WebSocketSession> entry : clients.entrySet()) { all_counter++;String clientKey = entry.getKey();WebSocketSession session = entry.getValue();if (!session.isOpen()) {flag = false;} else {try {session.sendMessage(message);flag = true;send_counter++;logger.info("sendMessageToAll:[clientKey:"+clientKey+"],flag:"+flag);} catch (IOException e) {e.printStackTrace();flag = false;}} } long pass = System.currentTimeMillis() - start;logger.info("sendMessageToAll,flag:"+flag+",all_counter:"+all_counter+",send_counter:"+send_counter+",pass:"+pass+"ms"); return flag;}/*** 給指定的精準發送消息* @param message* @param toUser* @throws IOException*/public boolean sendMessageToId(String clientId,TextMessage message) throws IOException { boolean flag = false;int all_counter = 0;int send_counter = 0;long start = System.currentTimeMillis();if(!StringUtils.isEmpty(clientId)){all_counter++;WebSocketSession session = clients.get(clientId);if (!session.isOpen()) {flag = false;} else {try {session.sendMessage(message);flag = true;send_counter++;} catch (IOException e) {e.printStackTrace();flag = false;}} }long pass = System.currentTimeMillis() - start;logger.info("sendMessageToId:"+clientId+",flag:"+flag+",all_counter:"+all_counter+",send_counter:"+send_counter+",pass:"+pass+"ms");return flag;} /*** 獲取參數標識* @param session* @return*/private String getAttributeFlag(WebSocketSession session,String flagName) {String flag = null;try {flag = (String) session.getHandshakeAttributes().get(flagName);} catch (Exception e) {logger.error(e.getMessage());}return flag;}/*** 當前連接數* @return*/private synchronized int getOnlineCount() { return connectCount.get(); } /*** 新增連接數*/private synchronized void addOnlineCount() { connectCount.getAndIncrement();} /*** 減連接數*/private synchronized void decOnlineCount() { connectCount.getAndDecrement();} /*** 生成列表響應json* @param code 狀態碼* @param type 數據類型* @param action 操作類選* @param msg 提示信息* @param totalCount 總數量* @param dataList 數據列表* @return json*/public synchronized String makeListResponseJson(int code,String type,String action,String msg,int totalCount,List<Object> dataList){JSONObject jsonObj = new JSONObject();jsonObj.put("code", code);jsonObj.put("type", type);jsonObj.put("action", action);jsonObj.put("msg", msg);JSONObject contentObj = new JSONObject();contentObj.put("totalCount", totalCount);JSONArray listArray = new JSONArray(dataList);contentObj.put("list", listArray);jsonObj.put("body", contentObj);logger.info("生成list json:" + jsonObj.toString());return jsonObj.toString();}/*** 生成詳情響應json* @param code 狀態* @param type 數據類型* @param action 操作類型* @param msg 提示消息* @param info 數據詳情* @return json*/public synchronized String makeInfoResponseJson(int code,String type,String action,String msg,Object info){JSONObject jsonObj = new JSONObject();jsonObj.put("code", code);jsonObj.put("type", type);jsonObj.put("action", action);jsonObj.put("msg", msg);jsonObj.put("body", info);logger.info("生成info json:" + jsonObj.toString());return jsonObj.toString();}}
狀態輔助類
在消息處理類中用到了一些狀態碼、下發消息等靜態變量主要是為了和客戶端交互時定義好消息格式的。這個類不一定需要。
public class WebSocketStatus {/*********************狀態碼 開始**********************///需要根據業務具體情況擴展狀態碼// 處理成功public static final int CODE_SUCCESS = 200;// 處理失敗public static final int CODE_FAIL = 200;/*********************狀態碼 結束**********************//*********************信息 開始**********************///需要根據業務具體情況擴展信息// 處理成功public static final String MSG_SUCCESS = "OK";// 處理失敗public static final String MSG_FAIL = "FAIL";/*********************信息 結束**********************//*********************數據類型 開始**********************/// 全網熱點public static final String TYPE_QWRD = "qwrd";// 本地新聞public static final String TYPE_BDXW = "bdxw";// 網絡熱搜public static final String TYPE_WLRS = "wlrs";// 地方輿論public static final String TYPE_DFYL = "dfyl";// 新聞選題public static final String TYPE_XWXT = "xwxt";// 外采調度public static final String TYPE_WCDD = "wcdd";// 生產力統計public static final String TYPE_SCLTJ = "scltj";// 影響力統計public static final String TYPE_YXLTJ = "yxltj";// 任務統計public static final String TYPE_RWTJ = "rwtj";// 資訊熱榜public static final String TYPE_ZXRB = "zxrb";// 視頻熱榜public static final String TYPE_SPRB = "sprb";// 列表自定義public static final String TYPE_LBZDY = "lbzdy";// 圖表自定義public static final String TYPE_TBZDY = "tbzdy";/*********************數據類型 結束**********************//*********************動作類型 開始**********************/// 基本信息public static final String ACTION_SIMPLE = "simple";// 詳情信息public static final String ACTION_DETAIL = "detail";/*********************動作類型 開始**********************/}
控制器中調用
這里主要是模擬了控制器中由于某個動作需要觸發給指定的session發送消息。
@Controller
@RequestMapping("/testController")
public class TestController {public static final Logger LOGGER = Logger.getLogger(TestController.class);@Autowiredprivate TestService testService;@Autowiredprivate WebScoketHandler handler;@RequestMapping("/test")public void test(HttpServletRequest request, HttpServletResponse response) {try {Map<String, Object> infoMap = new HashMap<String, Object>();infoMap.put("type", "qwzx");infoMap.put("title", "全網資訊");TextMessage infoMessage = new TextMessage(handler.makeInfoResponseJson(WebSocketStatus.CODE_SUCCESS, WebSocketStatus.TYPE_QWRD,WebSocketStatus.ACTION_SIMPLE, WebSocketStatus.MSG_SUCCESS, infoMap));int totalCount = 3;User user1 = new User();user1.setAddress("address 1");user1.setAge(18);user1.setId(1);user1.setName("name 1");User user2 = new User();user2.setAddress("address 2");user2.setAge(18);user2.setId(1);user2.setName("name 2");User user3 = new User();user3.setAddress("address 3");user3.setAge(18);user3.setId(1);user3.setName("name 3");List<Object> userList = new ArrayList<Object>();userList.add(user1);userList.add(user2);userList.add(user3);TextMessage listMessage = new TextMessage(handler.makeListResponseJson(WebSocketStatus.CODE_SUCCESS, WebSocketStatus.TYPE_QWRD,WebSocketStatus.ACTION_DETAIL, WebSocketStatus.MSG_SUCCESS, totalCount,userList));String pageFlag = "p1";//向所有打開P1的瀏覽器發送消息boolean sendFlag1 = this.handler.sendMessageToPage(pageFlag, infoMessage);System.out.println("sendFlag1:"+sendFlag1);response.getWriter().print(sendFlag1);boolean sendFlag2 = this.handler.sendMessageToPage(pageFlag, listMessage);System.out.println("sendFlag1:"+sendFlag2);response.getWriter().print(sendFlag2);} catch (IOException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}
}