一、消息隊列
消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合、異步消息、流量削鋒等問題,實現高性能、高可用、可伸縮和最終一致性架構,是大型分布式系統不可缺少的中間件。
目前在生產環境中使用較多的消息隊列有ActiveMQ、RabbitMQ、Kafka、RocketMQ等。
A、特性
- 異步性:將耗時的同步操作以消息的方式進行異步化處理,減少了同步等待的時間;
- 松耦合:消息隊列減少了服務之間的耦合性,不同的服務可以通過消息隊列進行通信,而不用關心彼此的實現細節,只要定義好消息的格式就行;
- 分布式:通過對消費者的橫向擴展,降低了消息隊列阻塞的風險,以及單個消費者產生單點故障的可能性;
- 可靠性:消息隊列一般會把接收到的消息存儲到本地硬盤上,這樣即使應用掛掉或者消息隊列本身掛掉,消息也能夠重新加載。
B、JMS規范
JMS即Java消息服務(Java Message Service)應用程序接口,是Java面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
JMS的消息機制有2種模型,一種是Point to Point,表現為隊列的形式,發送的消息,只能被一個接收者取走;另一種是Topic,可以被多個訂閱者訂閱,類似于群發。
ActiveMQ就是JMS的一個實現。
二、ActiveMQ介紹
ActiveMQ是Apache軟件基金下的一個開源軟件,它遵循JMS 1.1規范,是消息驅動中間件軟件。它為企業消息傳遞提供高可用、出色性能、可擴展、穩定和安全保障。ActiveMQ使用Apache許可協議,因此,任何人都可以使用和修改它而不必反饋任何改變。
ActiveMQ的目標是在盡可能多的平臺和語言上提供一個標準的,消息驅動的應用集成。ActiveMQ實現JMS規范并在此之上提供大量額外的特性。ActiveMQ支持隊列和訂閱兩種模式的消息發送。
Spring Boot提供了ActiveMQ組件spring-boot-starter-activemq,用來支持ActiveMQ在Spring Boot體系內使用。
A、ActiveMQ安裝
1、下載安裝啟動
# 安裝JDK并配置環境# 下載activemq
wget http://archive.apache.org/dist/activemq/5.12.2/apache-activemq-5.12.2-bin.tar.gz# 解壓安裝
cd /usr/local/apache-activemq-5.12.2/bin# 啟動ActiveMQ
./activemq start# web控制臺
http://192.168.240.131:8161
admin/admin
2、安全配置
安裝完成ActiveMQ后,任何連接到ActiveMQ的程序都可以創建和消費隊列,可以通過修改配置文件conf/activemq.xml來加入身份驗證,在文件的borker標簽中加入:
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="admin" password="admin" groups="users,admins"/> </users> </simpleAuthenticationPlugin>
</plugins>
控制臺賬號密碼,修改conf/jetty.xml,確保authenticate的值是true。
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="admin" /> <property name="authenticate" value="true" />
</bean>
登陸管控臺的帳號和密碼在conf/jetty-realm.properties文件。
# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin
重啟生效。
A、相關依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
B、配置文件
在使用ActiveMQ時有兩種使用方式,一種是使用獨立安裝的ActiveMQ,在生產環境推薦使用這種;另一種是使用基于內存ActiveMQ ,在調試階段建議使用這種方式。
# 基于內存
ActiveMQspring.activemq.in-memory=true# 不適應連接池
spring.activemq.pool.enabled=false# 獨立安裝ActiveMQ
#spring.activemq.broker-url=tcp://10.255.242.168:61616
#spring.activemq.user=admin
#spring.activemq.password=admin
三、隊列(Queue)
隊列發送的消息,只能被一個消費者接收。
A、創建隊列
@Configuration
public class ActiveMqConfig
{@Beanpublic Queue queue(){return new ActiveMQQueue("isisiwish.test.queue");}
}
使用定義了隊列queue命名為isisiwish.test.queue。
B、消息生產者
@Slf4j
@Component
public class Producer
{@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Queue queue;public void sendQueue(String msg){log.info("send queue msg : {}", msg);this.jmsMessagingTemplate.convertAndSend(this.queue, msg);}
}
JmsMessagingTemplate是Spring提供發送消息的工具類,使用JmsMessagingTemplate和創建好的queue對消息進行發送。
C、消息消費者
@Slf4j
@Component
public class ConsumerA
{@JmsListener(destination = "isisiwish.test.queue")public void receiveQueue(String text){log.info("ConsumerA queue msg : {}", text);}
}@Slf4j
@Component
public class ConsumerB
{@JmsListener(destination = "isisiwish.test.queue")public void receiveQueue(String text){log.info("ConsumerB queue msg : {}", text);}
}
使用注解@JmsListener(destination = "isisiwish.test.queue"),表示此方法監控了名為isisiwish.test.queue的隊列。當隊列isisiwish.test.queue中有消息發送時會觸發此方法的執行,text為消息內容。
D、測試
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqActivemqQueueApplicationTests
{@Autowiredprivate Producer producer;@Testpublic void sendSimpleQueueMessage() throws InterruptedException{this.producer.sendQueue("Test queue message");}@Testpublic void send100QueueMessage() throws InterruptedException{for (int i = 0; i < 100; i++){this.producer.sendQueue("Test queue message " + i);}Thread.sleep(1000L);}
}
當有多個消費者監聽一個隊列時,消費者會自動均衡負載的接收消息,并且每個消息只能有一個消費者所接收。
PS:控制臺輸出javax.jms.JMSException: peer (vm://localhost#1) stopped.報錯信息可以忽略。
四、廣播(Topic)
廣播發送的消息,可以被多個消費者接收。
A、創建Topic
@Configuration
public class ActiveMqConfig
{@Beanpublic Topic topic(){return new ActiveMQTopic("isisiwish.test.topic");}
}
B、消息生產者
@Slf4j
@Component
public class ConsumerA
{@JmsListener(destination = "isisiwish.test.topic")public void receiveTopic(String text){log.info("ConsumerA topic msg : {}", text);}
}@Slf4j
@Component
public class ConsumerB
{@JmsListener(destination = "isisiwish.test.topic")public void receiveTopic(String text){log.info("ConsumerB topic msg : {}", text);}
}
C、消息消費者
@Slf4j
@Component
public class Producer
{@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Topic topic;public void sendTopic(String msg){log.info("send queue msg : {}", msg);this.jmsMessagingTemplate.convertAndSend(this.topic, msg);}
}
D、測試
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqActivemqTopicApplicationTests
{@Autowiredprivate Producer producer;@Testpublic void sendSimpleTopicMessage() throws InterruptedException{this.producer.sendTopic("Test Topic message");Thread.sleep(1000L);}
}
廣播(Topic)是一個發送者多個消費者的模式,兩個消費者都收到了發送的消息。
五、同時支持隊列(Queue)和廣播(Topic)
Spring Boot集成ActiveMQ的項目默認只支持隊列或者廣播中的一種,通過配置項 spring.jms.pub-sub-domain的值來控制,true為廣播模式,false為隊列模式,默認情況下支持隊列模式。
如果需要在同一項目中既支持隊列模式也支持廣播模式,可以通過DefaultJmsListenerContainerFactory創建自定義的JmsListenerContainerFactory實例,之后在@JmsListener注解中通過containerFactory屬性引用它。
分別創建兩個自定義的JmsListenerContainerFactory實例,通過pubSubDomain來控制是支持隊列模式還是廣播模式。
@Configuration
@EnableJms
public class ActiveMqConfig
{@Bean("queueListenerFactory")public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);return factory;}@Bean("topicListenerFactory")public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(true);return factory;}@Beanpublic Queue queue(){return new ActiveMQQueue("isisiwish.test.queue");}@Beanpublic Topic topic(){return new ActiveMQTopic("isisiwish.test.topic");}
}
在消費者接收的方法中,指明使用containerFactory接收消息。
@Slf4j
@Component
public class ConsumerA
{@JmsListener(destination = "isisiwish.test.queue", containerFactory = "queueListenerFactory")public void receiveQueue(String text){log.info("ConsumerA queue msg : {}", text);}@JmsListener(destination = "isisiwish.test.topic", containerFactory = "topicListenerFactory")public void receiveTopic(String text){log.info("ConsumerA topic msg : {}", text);}
}
常用配置。
# 基于內存的ActiveMQ
#spring.activemq.in-memory=true
#spring.activemq.pool.enabled=false# 獨立安裝的ActiveMQ
spring.activemq.broker-url=tcp://10.255.242.168:61616
spring.activemq.user=admin
spring.activemq.password=admin# 結束之前等待的時間
#spring.activemq.close-timeout=15s# 等待消息發送響應的時間,設置為0永遠等待
spring.activemq.send-timeout=0# 默認情況下ActiveMQ提供的是queue模式,若要使用topic模式需要配置下面配置
#spring.jms.pub-sub-domain=true#賬號
# spring.activemq.user=admin# 密碼
# spring.activemq.password=admin# 是否信任所有包
#spring.activemq.packages.trust-all=# 要信任的特定包(逗號分隔)
#spring.activemq.packages.trusted=# 當連接請求和滿時是否阻塞,設置false會拋出JMSException異常
#spring.activemq.pool.block-if-full=true# 如果池滿,則在拋出異常前阻塞時間
#spring.activemq.pool.block-if-full-timeout=-1ms# 是否在啟動時創建連接,可以在啟動時用于熱加載
#spring.activemq.pool.create-connection-on-startup=true# 是否用Pooledconnectionfactory代替普通的ConnectionFactory
#spring.activemq.pool.enabled=false# 連接過期超時
#spring.activemq.pool.expiry-timeout=0ms# 連接空閑超時
#spring.activemq.pool.idle-timeout=30s# 連接池最大連接數
#spring.activemq.pool.max-connections=1# 每個連接的有效會話的最大數目。
#spring.activemq.pool.maximum-active-session-per-connection=500# 當有JMSException時嘗試重新連接
#spring.activemq.pool.reconnect-on-exception=true# 空閑連接清除線程之間運行的時間,當為負數時,沒有空閑連接驅逐線程運行
#spring.activemq.pool.time-between-expiration-check=-1ms# 是否只使用一個MessageProducer
#spring.activemq.pool.use-anonymous-producers=true

六、總結
消息中間件廣泛應用在大型互聯網架構中,利用消息中間件隊列和廣播各自的特性可以支持很多業務,比如群發發送短信、給單個用戶發送郵件等。
ActiveMQ是一款非常流行的消息中間件,它的特點是部署簡單、使用方便,比較適合中小型團隊。Spring Boot提供了集成ActiveMQ對應的組件,在Spring Boot中使用ActiveMQ只需要添加相關注解即可。