大綱
- Rabbitmq
- 開啟STOMP支持
- 服務端
- 依賴
- 參數
- 參數映射類
- 配置類
- 邏輯處理類
- 測試
- 測試頁面
- Controller
- 測試案例
在《Websocket在Java中的實踐——STOMP通信的最小Demo》一文中,我們使用enableSimpleBroker啟用一個內置的內存級消息代理。本文我們將使用Rabbitmq作為消息代理,這樣我們的服務就可以變成分布式部署。
Rabbitmq
開啟STOMP支持
在Rabbitmq所在的機器上執行下面的命令:
sudo -H -u rabbitmq bash -c "/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_stomp"
然后啟動Rabbitmq
sudo service rabbitmq-server start
服務端
依賴
spring-boot-starter-websocket用于Websocket服務。
spring-boot-starter-amqp和spring-rabbit-stream都是用于Rabbitmq操作。
reactor-netty用于Broker。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId>
</dependency>
<dependency><groupId>io.projectreactor.netty</groupId><artifactId>reactor-netty</artifactId><version>1.1.20</version>
</dependency>
參數
src/main/resources/application.properties
需要注意的是,rabbitmq_stomp啟動后會開啟61613端口。
spring.rabbitmq.host=172.30.254.255
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=fangliang
spring.rabbitmq.stomp.port=61613
還有一點需要注意,很多文章上說使用guest用戶登錄。但是guest用戶只能在Rabbitmq所在的機器上使用,如果跨機器使用會報下列錯誤。而且這和是否設置guest為全域無關。所以我們使用admin賬戶。
Received ERROR {message=[Bad CONNECT], content-type=[text/plain], version=[1.0,1.1,1.2], content-length=[26]} session=system text/plain payload=non-loopback access denied
spring.rabbitmq.stomp.port是一個自定義參數,它只是供Broker連接Rabbitmq使用。
spring.rabbitmq.port在當前本文例子中沒有使用。
參數映射類
這個類主要是映射上述參數,方便后續使用。
src/main/java/com/nyctlc/stomprbmq/component/RabbitMQProperties.java
package com.nyctlc.stomprbmq.component;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
public class RabbitMQProperties {@Value("${spring.rabbitmq.password}")private String rabbitmqPassword;public String getRabbitmqPassword() {return rabbitmqPassword;}@Value("${spring.rabbitmq.username}")private String rabbitmqUsername;public String getRabbitmqUsername() {return rabbitmqUsername;}@Value("${spring.rabbitmq.host}")private String rabbitmqHost;public String getRabbitmqHost() {return rabbitmqHost;}@Value("${spring.rabbitmq.port}")private String rabbitmqPort;public String getRabbitmqPort() {return rabbitmqPort;}@Value("${spring.rabbitmq.stomp.port}")private String rabbitmqStompPort;public String getRabbitmqStompPort() {return rabbitmqStompPort;}
}
配置類
/handshake是STOMP和Websocket建立握手的接口。
enableStompBrokerRelay(“/topic”)會訂閱Rabbitmq默認的交換器amq.topic的綁定關系中定義的隊列。(所以我們看到很多文章訂閱的前綴使用的是“topic”,而不用其他字段,這是有淵源的)
setRelayPort方法傳遞的是Rabbitmq的STOMP端口,即61613。
setClientLogin、setClientPasscode、setSystemLogin和setSystemPasscode都要設置為admin及其密碼,否則會報錯。
src/main/java/com/nyctlc/stomprbmq/config/WebSocketConfig.java
package com.nyctlc.stomprbmq.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;import com.nyctlc.stomprbmq.component.RabbitMQProperties;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Autowiredprivate RabbitMQProperties rabbitMQProperties;@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/handshake");}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.setApplicationDestinationPrefixes("/send");registry.enableStompBrokerRelay("/topic").setRelayHost(rabbitMQProperties.getRabbitmqHost()).setRelayPort(Integer.parseInt(rabbitMQProperties.getRabbitmqStompPort())).setClientLogin(rabbitMQProperties.getRabbitmqUsername()).setClientPasscode(rabbitMQProperties.getRabbitmqPassword()).setSystemLogin(rabbitMQProperties.getRabbitmqUsername()).setSystemPasscode(rabbitMQProperties.getRabbitmqPassword());}
}
邏輯處理類
這個類的handle方法會接受/send/msg-from-user端點發來的消息,然后轉發給Rabbitmq的amp.topic交換器下msg-to-user路由鍵對應的隊列。上述代碼創建的Broker會持續監聽這個隊列,如果收到消息,則發送給客戶端。
src/main/java/com/nyctlc/stomprbmq/controller/WebSocketController.java
package com.nyctlc.stomprbmq.controller;import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;@Controller
public class WebSocketController {@MessageMapping("/msg-from-user")@SendTo("/topic/msg-to-user")public String handle(String msg) {System.out.println("Received message: " + msg);return msg;}
}
測試
測試頁面
src/main/resources/static/index.html
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>STOMP over WebSocket Example with StompJs.Client</title><script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs"></script>
</head>
<body><h2>STOMP over WebSocket Example with StompJs.Client</h2><button id="connectButton">Connect</button><form id="messageForm"><input type="text" id="messageInput" placeholder="Type a message..."/><button type="submit">Send</button></form><div id="messages"></div><script>var client = null;function connect() {client = new StompJs.Client({brokerURL: 'ws://localhost:8080/handshake', // WebSocket服務端點connectHeaders: {},debug: function (str) {console.log(str);},reconnectDelay: 5000,heartbeatIncoming: 4000,heartbeatOutgoing: 4000,});client.onConnect = function(frame) {console.log('Connected: ' + frame);client.subscribe('/topic/msg-to-user', function(message) { // 訂閱端點showMessageOutput(JSON.parse(message.body).content);});};client.onStompError = function(frame) {console.error('Broker reported error: ' + frame.headers['message']);console.error('Additional details: ' + frame.body);};client.activate();}function sendMessage(event) {event.preventDefault(); // 阻止表單默認提交行為var messageContent = document.getElementById('messageInput').value.trim();if(messageContent && client && client.connected) {var chatMessage = { content: messageContent };client.publish({destination: "/send/msg-from-user", body: JSON.stringify(chatMessage)}); // 發送端點document.getElementById('messageInput').value = '';}}function showMessageOutput(message) {var messagesDiv = document.getElementById('messages');var messageElement = document.createElement('div');messageElement.appendChild(document.createTextNode(message));messagesDiv.appendChild(messageElement);}document.getElementById('messageForm').addEventListener('submit', sendMessage);document.getElementById('connectButton').addEventListener('click', connect);</script>
</body>
</html>
Controller
這個Controller主要是為了讓上述HTML可以通過URL訪問。
src/main/java/com/nyctlc/stomprbmq/controller/FileController.java
package com.nyctlc.stomprbmq.controller;import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;@Controller
public class FileController {@GetMapping("/")public String index() {return "index"; // 返回index.html}@RequestMapping(value = "/favicon.ico")@ResponseStatus(value = HttpStatus.NO_CONTENT)public void favicon() {// No operation. Just to avoid 404 error for favicon.ico}
}
測試案例
我們在管理后臺直接給這個隊列發送消息,前端頁面也會收到。比如我們發送{“content”:“message from management”}