Rabbitmq的初步使用
隨著微服務概念發展,大應用逐步拆分為小應用,提高開發效率,專門的人做專門的事情,逐漸的流行起來。
在微服務上實現通信的方式大部分是采用rpc方式,也有升級版本的grpc。
還有另外一種實現就是使用mq來進行解耦。
今天初識mq,快速入門先,準備一個環境實現案例,該文涉及以下內容:
- 安裝rabbitmq
- mq能解決的問題
- 實戰演練
安裝
rabbitmq的安裝我們采用docker的方式,docker方便我們快速的實現rabbitmq的安裝,不需要再對安裝mq進行頭疼。
docker 的兩種方式
docker方式
//拉取mq鏡像
docker pull rabbitmq
//啟動mq
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9
說明:
- -d 后臺運行容器;
- --name 指定容器名;
- -p 指定服務運行的端口(5672:應用訪問端口;15672:控制臺Web端口號);
- -v 映射目錄或文件;
- --hostname 主機名(RabbitMQ的一個重要注意事項是它根據所謂的 “節點名稱” 存儲數據,默認為主機名);
- -e 指定環境變量;(RABBITMQDEFAULTVHOST:默認虛擬機名;RABBITMQDEFAULTUSER:默認的用戶名;RABBITMQDEFAULTPASS:默認用戶名的密碼)
docker-compose 方式
version
下載的rabbitmq內置管理界面,ip:15672 用戶名與密碼是我們在啟動是寫入的。

mq能解決什么?
通俗的來說,主要使用MQ來解決以下三個問題。
異步消息
在業務中,經常會遇到同時發送郵件,短信或者其他通知內容服務。業務初期,采用同步或者異步處理方式都需要等發送完畢后再返回給客戶端。中間有一定的延遲

業務增長后,此方式系統性能就會造成很大的浪費。采用消息隊列,將這幾個服務進行解耦,只需將消息內容發送到消息隊列中,降低用戶的等待時間,體驗效果比原先好很多。

應用間解耦
同一個服務中可能需要其他服務的配合才能完成一項業務操作.還是拿常見的購物案例來說明。
在京東下單支付后,消息要通知到商家,郵件通知用戶已經購買某商品。
如果這兩種操作都采用同步執行,用戶等待時間會變長。
采用mq方式之后,訂單系統將消息持久化到mq上,返回給用戶下單成功。
- 商家接收到用戶的下單信息,進行處理,如果有庫存管理那么需要進行庫存處理。
- 郵件通知用戶,告知用戶下單成功。
mq保證消息的可靠投遞,不會導致消息丟失,保證消息的高可靠性。如果庫存出現失敗也不會導致用戶下單失敗的情況,可以重新進行投遞。
流量削峰
流量削峰,一般是同一時間涌進來很多請求,后臺處理不過來。那么需要采用削峰方式來處理。
簡單來說是通過一個隊列承接瞬時過來流量洪峰,在消費端平滑的將消息推送出去,如果消費者消費不及時可以將消息內容持久化在隊列中,消息不存在丟失。
- 消費端不及時進行消費,還可以動態的擴增消費者數量,提高消費速度。
- 設定相關的閥值,多余的消息直接丟棄,告知用戶秒殺失敗等業務消息內容。

實戰案例
本文是按照Java語言進行,使用Spring boot搭建,包管理工具Gradle。
導入rabbitmq jar包
compile("org.springframework.boot:spring-boot-starter-amqp:1.5.10.RELEASE")
配置mq
yaml 文件配置
spring:rabbitmq:host: 192.168.110.5port: 5672username: gutstpassword: guest
準備好模板類,供后面直接使用
package com.infervision.config;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: fruiqi* @date: 19-2-18 下午2:42* @version:1.0 rabbit配置**/
@Configuration
public class RabbitConfig {
/*** 日志**/private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);@Value("${spring.rabbitmq.username}")String userName;@Value("${spring.rabbitmq.password}")String userPassword;@Value("${spring.rabbitmq.host}")String host;@Value("${spring.rabbitmq.port}")Integer port;
/*** 注入** @param* @return com.rabbitmq.client.Connection* @author fruiqi* @date 19-1-22 下午5:41**/@Beanpublic ConnectionFactory getConnection() throws Exception {CachingConnectionFactory factory = new CachingConnectionFactory();factory.setUsername(userName);factory.setPassword(userPassword);factory.setHost(host);factory.setPort(port);return factory;}/*** 創建制定的 監聽容器** @param queueName 監聽的隊列名字* @param listenerChannel 設置是否將監聽的頻道 公開給已注冊的* @param PrefetchCount 告訴代理一次請求多少條消息過來* @param ConcurrentConsumers 制定創建多少個并發的消費者數量* @param acknowledgeMode 消息確認模式* @param listener 監聽器* @return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer**/public SimpleMessageListenerContainer setSimpleMessageListenerContainer(String queueName, boolean listenerChannel,int PrefetchCount, int ConcurrentConsumers,AcknowledgeMode acknowledgeMode,ChannelAwareMessageListener listener) throws Exception {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnection());container.setQueueNames(queueName);container.setExposeListenerChannel(listenerChannel);container.setPrefetchCount(PrefetchCount);container.setConcurrentConsumers(ConcurrentConsumers);container.setAcknowledgeMode(acknowledgeMode);container.setMessageListener(listener);return container;}
}package com.infervision.config;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author: fruiqi* @date: 19-2-18 下午2:51* @version:1.0**/
@Component
public class MsgSender {private static final Logger logger = LoggerFactory.getLogger(MsgSender.class);@Autowiredprivate RabbitTemplate rabbitTemplate;/*** @param exchange 交換機名稱* @param routingKey 路由名稱* @param message 消息內容* @return void* @description //TODO 發送消息到消息隊列中**/public void sendMsg(String exchange, String routingKey, Object message) {try {rabbitTemplate.convertAndSend(exchange,routingKey,message);}catch (Exception e){logger.error("[ERROR] send statistic message error ",e);}}}
實例鏈接mq
在使用rabbitmq 有的時候需要自己客戶端創建queue,但有的時候并不是自己創建,在rabbitmq頁面上進行創建queue,其他消費者直接引用。
客戶端創建mq
//初始化隊列,如果隊列已存在,則不作任何處理 如果有權限控制如下操作并不能實現@Beanpublic Queue dicomQueue() {return new Queue(getMacPreStr(DICOM_QUEUE_NAME));}//初始化交換機@Beanpublic Exchange topicExchange() {return ExchangeBuilder.topicExchange((DEFAULT_TOPIC_EXCHANGE).durable(true).build();}// 將隊列與交換機按照路由規則進行綁定@BeanBinding bindingExchangeDicomQueue(Queue dicomQueue, TopicExchange topicExchange) {return BindingBuilder.bind(dicomQueue).to(topicExchange).with(DICOM_QUEUE_ROUTING_KEY);}
使用
隊列的使用:一個是發送,屬于生產者;一個是監聽,屬于消費者.
生產者實現
在mq配置模板類中,專門實現了一個發送類,發送文件內容,直接調用發送接口即可。
@AutowiredRabbitService rabbitService;/*** 練習 發送數據到 mq中* 1. 發送的數據會到 mq中* 2. 我們配置的 listener 是用來消費消息的* 3. 客戶端配置 可以參考 RabbitClientConfig* @param name 名字編號* @param vo 實體內容* @return: com.infervision.model.NameVo*/@ApiOperation(value = "增加name信息", notes = "實體信息")@PostMapping(value = "/{name}")@ApiImplicitParam(paramType = "query", name = "name", value = "用戶名字", required = true, dataType = "string")public NameVo addNameVo(@RequestParam String name, @RequestBody NameVo vo) {rabbitService.sendMessage(DEFAULT_TOPIC_TEST_EXCHANGE, LABEL_FIEL_XML_QUEUE_ROUTING_KEY, JSON.toJSONString(vo));return vo;}@Service
public class RabbitServiceImpl implements RabbitService {@AutowiredMsgSender msgSender;/*** 嘗試發送 message 到mq中* @param message* @return: void*/@Overridepublic void sendMessage(String exchange, String routingKey,String message) {msgSender.sendMsg(exchange, routingKey, message);}
}
消費者實現
消費者實現有兩種方式,一種通過注解的方式監聽,一種是實現ChannelAwareMessageListener類來實現消費。
注解實現監聽
//在方法上進行注入。配置工廠幫助提高單個消費者一次性消費的消息數量,設置多少個消費者,用來提高程序的性能
@RabbitListener(queues = "dicom.queue",containerFactory = "multipleConsumerContainerFactory")public void processDicomMessage(Message message, Channel channel) {logger.info(message);}// 工廠可以在配置模板類中中配置好。
@Bean("multipleConsumerContainerFactory")public SimpleRabbitListenerContainerFactory multipleConsumerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setPrefetchCount(50);factory.setConcurrentConsumers(10);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);configurer.configure(factory, connectionFactory);return factory;}
實現接口方式
/**
總結
以上內容就完成了rabbitmq 從搭建到使用全部的流程。當然里面還有更多的可以讓我們去探討,比如mq的隊列模式,一個系統配置多個mq等等內容。敬請期待我們下一篇mq系列內容。
大家在系統中使用過mq嗎?你們使用的mq是什么樣的?可以在留言區我們一起探討哦。
·END·
路雖遠,行則必至
本文原發于 同名微信公眾號「胖琪的升級之路」,回復「1024」你懂得,給個贊唄。
微信ID:YoungRUIQ