原始狀態的 activemq-client sdk 集成非常方便,也更適合定制。就是有些同學,可能對原始接口會比較陌生,會希望有個具體的示例。
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-client</artifactId><version>${activemq.version}</version>
</dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>${activemq.version}</version>
</dependency>
希望更加簡化使用的同學,可以使用:
activemq-solon-cloud-plugin (使用更簡單,定制性弱些)
1、添加集成配置
先使用 Solon 初始器 先生成一個 Solon Web 模板項目,然后添加上面的 activemq-client 依賴。再做個配置約定(也可按需定義):
- “solon.activemq”,作為配置前綴
- “properties”,作為公共配置
- “producer”,作為生態者專屬配置(估計用不到)
- “consumer”,作為消費者專屬配置(估計用不到)
具體的配置屬性,參考自:ActiveMQConnectionFactory
solon.app:name: "demo-app"group: "demo"# 配置可以自由定義,與 @Bean 代碼對應起來即可(以下為參考)
solon.activemq:properties: #公共配置(配置項,參考:ActiveMQConnectionFactory)brokerURL: "failover:tcp://localhost:61616"redeliveryPolicy:initialRedeliveryDelay: 5000backOffMultiplier: 2useExponentialBackOff: truemaximumRedeliveries: -1maximumRedeliveryDelay: 3600_000
添加 java 配置器
@Configuration
public class ActivemqConfig {@Bean(destroyMethod = "stop")public Connection client(@Inject("${solon.activemq.properties}") Props common) throws Exception {String brokerURL = (String) common.remove("brokerURL");String userName = (String) common.remove("userName");String password = (String) common.remove("password");ActiveMQConnectionFactory factory;if (Utils.isEmpty(userName)) {factory = new ActiveMQConnectionFactory(brokerURL);} else {factory = new ActiveMQConnectionFactory(brokerURL, userName, password);}//綁定額外的配置并創建連接Connection connection = common.bindTo(factory).createConnection();connection.start();return connection;}@Beanpublic IProducer producer(Connection connection) throws Exception {return new IProducer(connection);}@Beanpublic void consumer(Connection connection,MessageListener messageListener) throws Exception {Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);Destination destination = session.createTopic("topic.test");MessageConsumer consumer = session.createConsumer(destination);consumer.setMessageListener(messageListener);}
}
activemq 的消息發送的代碼比較復雜,所以我們可以做個包裝處理(用于上面的配置構建),臨時命名為 IProducer:
public class IProducer {private Connection connection;public IProducer(Connection connection) {this.connection = connection;}public void send(String topic, MessageBuilder messageBuilder) throws JMSException {Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);Destination destination = session.createTopic(topic);MessageProducer producer = session.createProducer(destination);producer.send(destination, messageBuilder.build(session));}@FunctionalInterfacepublic static interface MessageBuilder {Message build(Session session) throws JMSException;}
}
3、代碼應用
發送(或生產),這里代控制器由用戶請求再發送消息(僅供參考):
@Controller
public class DemoController {@Injectprivate IProducer producer;@Mapping("/send")public void send(String msg) throws Exception {//發送producer.send("topic.test", s -> s.createTextMessage("test"));}
}
監聽(或消費),這里采用訂閱回調的方式:(僅供參考)
@Component
public class DemoMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {System.out.println(message);RunUtil.runAndTry(message::acknowledge);}
}