?? 目錄
?? ActiveMQ簡介
什么是ActiveMQ?
核心概念
??? 基礎架構組件
?? 重要概念解釋
ActiveMQ vs 其他消息中間件
?? 環境搭建
1. ActiveMQ服務端安裝
Docker方式(推薦初學者)
手動安裝方式
2. 驗證安裝
訪問Web管理界面
連接參數
測試連接
??? Spring Boot集成配置
1. 添加依賴
2. 配置文件
3. ActiveMQ配置類
?? 基礎消息收發
1. 創建消息實體類
2. 消息生產者服務
3. 消息消費者服務
4. 測試控制器
?? 消息模式詳解
1. 點對點模式(Queue)
特點
實現示例
2. 發布訂閱模式(Topic)
特點
實現示例
?? 高級特性
1. 消息選擇器(Message Selector)
基于消息屬性過濾
2. 事務消息
JMS事務配置
3. 消息確認機制
手動確認模式
4. 死信隊列(DLQ)
死信隊列配置
??? Web管理界面
ActiveMQ Web Console使用
1. 訪問管理界面
2. 主要功能
隊列管理
主題管理
連接監控
消息瀏覽
2. 自定義監控頁面
創建監控控制器
創建監控頁面
?? 監控和管理
1. Spring Boot Actuator集成
添加Actuator依賴
配置監控端點
自定義健康指示器
2. 性能監控
消息處理性能監控
3. 日志配置
logback-spring.xml
?? 實戰案例
? 性能優化
?? 最佳實踐
? 常見問題解決
???????
?? ActiveMQ簡介
什么是ActiveMQ?
ActiveMQ是Apache軟件基金會開發的開源消息中間件,是Java消息服務(JMS)的完整實現,具有高性能、可靠性強、易于使用的特點。
核心概念
??? 基礎架構組件
Producer(生產者):負責發送消息
Consumer(消費者):負責接收和處理消息
Broker(代理):消息服務器,負責存儲和轉發消息
Destination(目的地):消息的目標,包括Queue和Topic
?? 重要概念解釋
- Queue(隊列):點對點模式,一條消息只能被一個消費者消費
- Topic(主題):發布訂閱模式,一條消息可以被多個消費者消費
- JMS(Java Message Service):Java消息服務API標準
- Message(消息):傳輸的數據單元
- Session(會話):生產和消費消息的上下文
- Connection(連接):客戶端與消息服務器的網絡連接
ActiveMQ vs 其他消息中間件
特性 | ActiveMQ | RabbitMQ | RocketMQ |
---|---|---|---|
開發語言 | Java | Erlang | Java |
協議支持 | JMS、AMQP、STOMP | AMQP | 自定義協議 |
管理界面 | Web Console | Management UI | Console |
集群支持 | ? | ? | ? |
事務支持 | ? | ? | ? |
消息持久化 | ? | ? | ? |
?? 環境搭建
1. ActiveMQ服務端安裝
Docker方式(推薦初學者)
# 1. 拉取ActiveMQ鏡像
docker pull webcenter/activemq:latest# 2. 啟動ActiveMQ容器
docker run -d \--name activemq \-p 61616:61616 \-p 8161:8161 \webcenter/activemq:latest# 3. 查看容器狀態
docker ps | grep activemq# 4. 查看日志
docker logs activemq
手動安裝方式
# 1. 下載ActiveMQ
wget https://archive.apache.org/dist/activemq/5.17.3/apache-activemq-5.17.3-bin.tar.gz# 2. 解壓
tar -zxvf apache-activemq-5.17.3-bin.tar.gz# 3. 啟動ActiveMQ
cd apache-activemq-5.17.3
./bin/activemq start# 4. 停止ActiveMQ
./bin/activemq stop# 5. 查看狀態
./bin/activemq status
2. 驗證安裝
訪問Web管理界面
URL: http://localhost:8161/admin
默認用戶名: admin
默認密碼: admin
連接參數
JMS連接URL: tcp://localhost:61616
Web管理端口: 8161
JMX端口: 1099
測試連接
# 使用ActiveMQ自帶的測試工具
cd apache-activemq-5.17.3# 啟動消費者
./bin/activemq consumer# 啟動生產者(新開終端)
./bin/activemq producer
??? Spring Boot集成配置
1. 添加依賴
<dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot ActiveMQ Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><!-- ActiveMQ連接池 --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId></dependency><!-- JSON處理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 開發工具 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency>
</dependencies>
2. 配置文件
application.yml
# ActiveMQ配置
spring:activemq:broker-url: tcp://localhost:61616 # ActiveMQ服務器地址user: admin # 用戶名password: admin # 密碼in-memory: false # 不使用內存模式pool:enabled: true # 啟用連接池max-connections: 50 # 最大連接數idle-timeout: 30000 # 空閑超時時間(毫秒)packages:trust-all: true # 信任所有包(開發環境)trusted: com.example.model # 信任的包(生產環境推薦)# JMS配置jms:pub-sub-domain: false # false=Queue模式,true=Topic模式template:default-destination: default-queue # 默認目的地delivery-mode: persistent # 消息持久化模式priority: 100 # 消息優先級(0-255)time-to-live: 36000000 # 消息存活時間(毫秒)receive-timeout: 10000 # 接收超時時間(毫秒)# 應用配置
server:port: 8080# 日志配置
logging:level:org.apache.activemq: INFOorg.springframework.jms: DEBUGcom.example: DEBUG
application.properties(可選)
# ActiveMQ連接配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=false# 連接池配置
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring.activemq.pool.idle-timeout=30000# JMS配置
spring.jms.pub-sub-domain=false
spring.jms.template.default-destination=default-queue
spring.jms.template.delivery-mode=persistent
3. ActiveMQ配置類
基礎配置
package com.example.config;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageType;@Configuration
@EnableJms // 啟用JMS
public class ActiveMQConfig {@Value("${spring.activemq.broker-url}")private String brokerUrl;@Value("${spring.activemq.user}")private String username;@Value("${spring.activemq.password}")private String password;/*** ActiveMQ連接工廠*/@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory() {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();factory.setBrokerURL(brokerUrl);factory.setUserName(username);factory.setPassword(password);// 信任所有包(開發環境)factory.setTrustAllPackages(true);// 生產環境推薦指定信任的包// factory.setTrustedPackages(Arrays.asList("com.example.model"));return factory;}/*** 連接池工廠*/@Beanpublic PooledConnectionFactory pooledConnectionFactory() {PooledConnectionFactory pooledFactory = new PooledConnectionFactory();pooledFactory.setConnectionFactory(activeMQConnectionFactory());pooledFactory.setMaxConnections(50); // 最大連接數pooledFactory.setIdleTimeout(30000); // 空閑超時return pooledFactory;}/*** JmsTemplate - 用于發送消息*/@Beanpublic JmsTemplate jmsTemplate() {JmsTemplate template = new JmsTemplate();template.setConnectionFactory(pooledConnectionFactory());template.setMessageConverter(jacksonJmsMessageConverter());template.setDeliveryPersistent(true); // 消息持久化template.setSessionTransacted(true); // 啟用事務return template;}/*** Queue模式的監聽器工廠*/@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(pooledConnectionFactory());factory.setMessageConverter(jacksonJmsMessageConverter());factory.setPubSubDomain(false); // Queue模式factory.setSessionTransacted(true); // 啟用事務factory.setConcurrency("3-10"); // 并發消費者數量return factory;}/*** Topic模式的監聽器工廠*/@Beanpublic DefaultJmsListenerContainerFactory topicListenerContainerFactory() {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(pooledConnectionFactory());factory.setMessageConverter(jacksonJmsMessageConverter());factory.setPubSubDomain(true); // Topic模式factory.setSessionTransacted(true);factory.setConcurrency("3-10");return factory;}/*** 消息轉換器 - 支持對象序列化*/@Beanpublic MappingJackson2MessageConverter jacksonJmsMessageConverter() {MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();converter.setTargetType(MessageType.TEXT); // 使用文本消息converter.setTypeIdPropertyName("_type"); // 類型標識屬性return converter;}
}
?? 基礎消息收發
1. 創建消息實體類
package com.example.model;import java.io.Serializable;
import java.time.LocalDateTime;/*** 用戶消息實體*/
public class UserMessage implements Serializable {private static final long serialVersionUID = 1L;private Long id;private String username;private String email;private String action;private LocalDateTime timestamp;private String description;// 無參構造函數(JSON反序列化需要)public UserMessage() {}// 全參構造函數public UserMessage(Long id, String username, String email, String action, String description) {this.id = id;this.username = username;this.email = email;this.action = action;this.description = description;this.timestamp = LocalDateTime.now();}// Getter和Setter方法public Long getId() { return id; }public void setId(Long id) { this.id = id; }public String getUsername() { return username; }public void setUsername(String username) { this.username = username; }public String getEmail() { return email; }public void setEmail(String email) { this.email = email; }public String getAction() { return action; }public void setAction(String action) { this.action = action; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }public String getDescription() { return description; }public void setDescription(String description) { this.description = description; }@Overridepublic String toString() {return "UserMessage{" +"id=" + id +", username='" + username + '\'' +", email='" + email + '\'' +", action='" + action + '\'' +", timestamp=" + timestamp +", description='" + description + '\'' +'}';}
}/*** 訂單消息實體*/
public class OrderMessage implements Serializable {private static final long serialVersionUID = 1L;private String orderId;private String userId;private String productName;private Integer quantity;private Double totalPrice;private String status;private LocalDateTime createTime;// 構造函數public OrderMessage() {}public OrderMessage(String orderId, String userId, String productName, Integer quantity, Double totalPrice, String status) {this.orderId = orderId;this.userId = userId;this.productName = productName;this.quantity = quantity;this.totalPrice = totalPrice;this.status = status;this.createTime = LocalDateTime.now();}// Getter和Setter方法省略...@Overridepublic String toString() {return "OrderMessage{" +"orderId='" + orderId + '\'' +", userId='" + userId + '\'' +", productName='" + productName + '\'' +", quantity=" + quantity +", totalPrice=" + totalPrice +", status='" + status + '\'' +", createTime=" + createTime +'}';}
}
2. 消息生產者服務
package com.example.service;import com.example.model.OrderMessage;
import com.example.model.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;import javax.jms.Queue;
import javax.jms.Topic;/*** 消息生產者服務*/
@Service
public class MessageProducerService {@Autowiredprivate JmsTemplate jmsTemplate;/*** 發送簡單文本消息到Queue*/public void sendTextMessage(String queueName, String message) {try {jmsTemplate.convertAndSend(queueName, message);System.out.println("? 文本消息發送成功到Queue: " + queueName);System.out.println(" 消息內容: " + message);} catch (Exception e) {System.err.println("? 文本消息發送失敗: " + e.getMessage());throw new RuntimeException("消息發送失敗", e);}}/*** 發送用戶消息對象到Queue*/public void sendUserMessage(String queueName, UserMessage userMessage) {try {jmsTemplate.convertAndSend(queueName, userMessage);System.out.println("? 用戶消息發送成功到Queue: " + queueName);System.out.println(" 消息內容: " + userMessage);} catch (Exception e) {System.err.println("? 用戶消息發送失敗: " + e.getMessage());throw new RuntimeException("用戶消息發送失敗", e);}}/*** 發送訂單消息到Queue*/public void sendOrderMessage(String queueName, OrderMessage orderMessage) {try {jmsTemplate.convertAndSend(queueName, orderMessage);System.out.println("? 訂單消息發送成功到Queue: " + queueName);System.out.println(" 訂單ID: " + orderMessage.getOrderId());} catch (Exception e) {System.err.println("? 訂單消息發送失敗: " + e.getMessage());throw new RuntimeException("訂單消息發送失敗", e);}}/*** 發送消息到Topic(發布訂閱模式)*/public void sendMessageToTopic(String topicName, Object message) {try {// 臨時設置為Topic模式jmsTemplate.setPubSubDomain(true);jmsTemplate.convertAndSend(topicName, message);// 恢復Queue模式jmsTemplate.setPubSubDomain(false);System.out.println("? 消息發布成功到Topic: " + topicName);System.out.println(" 消息內容: " + message);} catch (Exception e) {System.err.println("? Topic消息發送失敗: " + e.getMessage());throw new RuntimeException("Topic消息發送失敗", e);}}/*** 發送帶優先級的消息*/public void sendPriorityMessage(String queueName, String message, int priority) {try {jmsTemplate.convertAndSend(queueName, message, messagePostProcessor -> {messagePostProcessor.setJMSPriority(priority);return messagePostProcessor;});System.out.println("? 優先級消息發送成功: 優先級=" + priority);System.out.println(" 消息內容: " + message);} catch (Exception e) {System.err.println("? 優先級消息發送失敗: " + e.getMessage());}}/*** 發送延時消息*/public void sendDelayMessage(String queueName, String message, long delayTime) {try {jmsTemplate.convertAndSend(queueName, message, messagePostProcessor -> {messagePostProcessor.setLongProperty("AMQ_SCHEDULED_DELAY", delayTime);return messagePostProcessor;});System.out.println("? 延時消息發送成功: 延時=" + delayTime + "毫秒");System.out.println(" 消息內容: " + message);} catch (Exception e) {System.err.println("? 延時消息發送失敗: " + e.getMessage());}}/*** 批量發送消息*/public void sendBatchMessages(String queueName, String messagePrefix, int count) {try {for (int i = 1; i <= count; i++) {String message = messagePrefix + " #" + i;jmsTemplate.convertAndSend(queueName, message);}System.out.println("? 批量消息發送成功: " + count + " 條消息");} catch (Exception e) {System.err.println("? 批量消息發送失敗: " + e.getMessage());}}
}
3. 消息消費者服務
package com.example.service;import com.example.model.OrderMessage;
import com.example.model.UserMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;/*** 消息消費者服務*/
@Service
public class MessageConsumerService {/*** 消費文本消息 - Queue模式*/@JmsListener(destination = "text.queue")public void receiveTextMessage(String message) {try {System.out.println("?? 接收到文本消息: " + message);// 模擬業務處理processTextMessage(message);System.out.println("? 文本消息處理完成");} catch (Exception e) {System.err.println("? 文本消息處理失敗: " + e.getMessage());throw new RuntimeException("消息處理失敗", e);}}/*** 消費用戶消息對象 - Queue模式*/@JmsListener(destination = "user.queue")public void receiveUserMessage(UserMessage userMessage) {try {System.out.println("?? 接收到用戶消息: " + userMessage);// 根據用戶行為進行不同處理switch (userMessage.getAction().toLowerCase()) {case "register":handleUserRegistration(userMessage);break;case "login":handleUserLogin(userMessage);break;case "logout":handleUserLogout(userMessage);break;case "update":handleUserUpdate(userMessage);break;default:System.out.println("?? 未知的用戶行為: " + userMessage.getAction());}System.out.println("? 用戶消息處理完成");} catch (Exception e) {System.err.println("? 用戶消息處理失敗: " + e.getMessage());throw new RuntimeException("用戶消息處理失敗", e);}}/*** 消費訂單消息 - Queue模式*/@JmsListener(destination = "order.queue")public void receiveOrderMessage(OrderMessage orderMessage) {try {System.out.println("?? 接收到訂單消息: " + orderMessage);// 根據訂單狀態進行處理switch (orderMessage.getStatus().toLowerCase()) {case "created":handleOrderCreated(orderMessage);break;case "paid":handleOrderPaid(orderMessage);break;case "shipped":handleOrderShipped(orderMessage);break;case "completed":handleOrderCompleted(orderMessage);break;case "cancelled":handleOrderCancelled(orderMessage);break;default:System.out.println("?? 未知的訂單狀態: " + orderMessage.getStatus());}System.out.println("? 訂單消息處理完成");} catch (Exception e) {System.err.println("? 訂單消息處理失敗: " + e.getMessage());throw new RuntimeException("訂單消息處理失敗", e);}}/*** 消費Topic消息 - 發布訂閱模式* 多個消費者可以同時接收到同一條消息*/@JmsListener(destination = "news.topic", containerFactory = "topicListenerContainerFactory")public void receiveNewsFromTopic(String news) {try {System.out.println("?? [新聞訂閱者1] 接收到新聞: " + news);// 處理新聞消息processNews(news, "訂閱者1");System.out.println("? [新聞訂閱者1] 新聞處理完成");} catch (Exception e) {System.err.println("? [新聞訂閱者1] 新聞處理失敗: " + e.getMessage());}}/*** 另一個Topic消費者*/@JmsListener(destination = "news.topic", containerFactory = "topicListenerContainerFactory")public void receiveNewsFromTopic2(String news) {try {System.out.println("?? [新聞訂閱者2] 接收到新聞: " + news);// 處理新聞消息processNews(news, "訂閱者2");System.out.println("? [新聞訂閱者2] 新聞處理完成");} catch (Exception e) {System.err.println("? [新聞訂閱者2] 新聞處理失敗: " + e.getMessage());}}/*** 消費原始JMS消息(可以獲取更多消息屬性)*/@JmsListener(destination = "raw.message.queue")public void receiveRawMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;String content = textMessage.getText();// 獲取消息屬性String messageId = message.getJMSMessageID();int priority = message.getJMSPriority();long timestamp = message.getJMSTimestamp();System.out.println("