RabbitMq基礎
- 1.概念
- 2.數據隔離
- 3.使用控制臺向mq傳遞消息
- 1.創建兩個隊列-“測試隊列”,“測試隊列2”
- 2.創建一個交換機-"測試交換機"
- 3.測試發送消息
- 3.1讓交換機和隊列進行綁定
- 3.2發送消息
- 3.3查看消息
- 4.創建虛擬主機
- 5.java使用rabbitmq
- 5.1 發送消息
- 5.2 消費消息
- 6.任務模型work queues
- 7.交換機
- 7.1.為什么使用交換機
- 7.2.交換機模型
- 7.2.1交換機模型Fanout(廣播)
- 7.2.1.1改造java代碼
- 7.2.2交換機模型Direct(訂閱)
- 7.2.2.1
- 7.2.3交換機模型Topic()
- 7.3.隊列和交換機的申明
- 8.消息轉換器
1.概念
- 消息發送者(publisher):生產消息
- 交換機(exchange):負責路由消息,把消息路由給隊列,可以路由給一個隊列,也可以路由給多個隊列,這取決于交換機的類型
- 隊列(queue):隊列,存儲消息
- 消息消費者(coumsmser):消費消息
- 虛擬主機(virtual-host):虛擬主機,數據隔離作用
2.數據隔離
在實際工作中,公司一般是在一個指定的服務器上去搭建mq,或者多個機器上去搭建集群模式,那一個公司肯定不止一個項目組,多個項目組的情況下,不可能每個項目都搞一套自己的mq,費時費力不說,維護還麻煩,所以mq就有數據隔離,多個項目組用一個環境的mq,數據不一樣而已
3.使用控制臺向mq傳遞消息
1.創建兩個隊列-“測試隊列”,“測試隊列2”
2.創建一個交換機-“測試交換機”
3.測試發送消息
3.1讓交換機和隊列進行綁定
綁定成功之后在指定的"測試隊列"中也可以看到他和交換機的綁定關系
3.2發送消息
3.3查看消息
當然你也可以使用這個交換機同時綁定創建的兩個隊列
4.創建虛擬主機
5.java使用rabbitmq
5.1 發送消息
接著之前的在common里面引入依賴(沒看之前的文章的直接就創建一個單體的springboot項目引入這個依賴就行)
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
在用戶工程作為消息投遞方,訂單工程作為消費者,不通過交換機投遞消息,并且消費
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
userController
@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/sendMassage")@ApiOperation(value = "不通過交換機發送消息")public void sendMassage( String queueName ,String msg ){rabbitTemplate.convertAndSend(queueName,msg);}
接口測試
查看消息
5.2 消費消息
order工程加配置
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
創建orderListen
@Component
public class orderListen {@RabbitListener(queues = "測試隊列2")public void listenOrder(String msg){System.out.println("我已經接收到訂單消息:"+msg);}
}
6.任務模型work queues
簡單的說就是多個消費者綁定一個隊列
- 創建一個隊列work.queue
- 生產者(用戶服務)向隊列(work.queue)中發送消息,每秒鐘100條記錄
- 創建兩個消費者(訂單服務)監聽隊列,一個消費者一秒鐘消費20條,一個消費者一秒鐘消費30條記錄
生產者代碼
@GetMapping("/sendWorkQueueMassage")@ApiOperation(value = "發送到任務模型")public void sendWorkQueueMassage() throws InterruptedException {String queueName="work.queue";for (int i = 1; i <=100 ; i++) {String msg="msg_"+i;rabbitTemplate.convertAndSend(queueName,msg);//休眠20毫秒Thread.sleep(20);}}
消費者代碼
@RabbitListener(queues = "work.queue")public void listenWorkQueueOrder(String msg) throws InterruptedException {System.out.println("消費者1已經接收到訂單消息:"+msg);// Thread.sleep(30);}@RabbitListener(queues = "work.queue")public void listenWorkQueueOrder2(String msg) throws InterruptedException {System.err.println("消費者2已經接收到訂單消息:"+msg);// Thread.sleep(40);}
結果:
1.隊列在被多個消費者綁定的時候,隊列會把消息輪詢分配給每一個消費者
2.消息被消費方消費之后就消失了
產生的問題:
問題1.資源浪費:現實生活中,每個服務器的負載能力都是不一樣的,假如B服務器一秒鐘只能處理2個請求,A服務器一秒鐘能處理20個,那在輪詢消費的時候,假設時間過去0.3秒,B服務器還沒消費完一個消息,按照A服務器的性能,他0.3秒都可以處理好幾個了,他應該在0.05秒的時候就處理完畢一個了,但是由于輪詢他只能處理一個,這個時候A就要等著B消費完,這樣就很浪費A的服務器資源。
2.消息積壓,以上代碼,生產方發送消息到隊列,休眠時間為20毫秒,消費者1消費一個消息要30毫秒,B需要40毫秒,時間長了。生產者發的消息消費者就消費不過來
問題1處理方案:
增加配置
spring:rabbitmq:listener:direct:prefetch: 1 #保證同一時刻最多投遞一條消息給消費者
結果,因為消費者1的消費能力比消費者2要快,所有可以看到他沒有等著
問題2處理方案:
很明顯能看到,兩個消費者的消費能力跟不上生產者的生產速度,所有只能再增加多個消費者,直到消費者的消費能力快過生產者的生產能力
7.交換機
7.1.為什么使用交換機
我們上面的代碼,是生產者直接連接隊列,然后消費者消費,實際業務中,你在網購平臺買東西,購買成功你的訂單微服務得知道,積分微服務得知道,購物車微服務得知道,如果按照不用交換機去做,那消息一旦被訂單服務消費了,這條消息在隊列認為就消費完畢了,直接就會刪除,造成的結果就是積分微服務就不知道了。那咋搞,所以就可以用到交換機
7.2.交換機模型
7.2.1交換機模型Fanout(廣播)
把消息放到交換機,然后交換機廣播給多個隊列(積分隊列,購物車隊列,訂單隊列),然后相應得微服務去跟相應得隊列綁定,這種方式叫做廣播
7.2.1.1改造java代碼
- 使用之前創建的交換機,測試交換機,并且綁定"測試隊列","測試隊列2"兩個隊列
- 編寫兩個消費者方法,分別監聽兩個隊列
創建積分微服務
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:direct:prefetch: 1 #保證同一時刻最多投遞一條消息給消費者
@Component
public class PointsFanoutListen {@RabbitListener(queues = "測試隊列2")public void listenPoints(String msg){System.out.println("積分服務已經接收到消息:"+msg);}
}
訂單微服務微服務中監聽另外一個隊列
@Component
public class OrderFanoutListen {@RabbitListener(queues = "測試隊列")public void listenOrder(String msg){System.out.println("訂單服務已經接收到消息:"+msg);}}
- 編寫生產者方法,向交換機發送消息
@GetMapping("/sendFanoutMassage")@ApiOperation(value = "發送消息到廣播交換機")public void sendFanoutMassage() throws InterruptedException {String exchangeName="測試交換機";String msg="用戶成功下單了";rabbitTemplate.convertAndSend(exchangeName,null,msg);}
測試
本地調用接口: loclahost:8001/user/sendFanoutMassage
啟動兩個消費者
7.2.2交換機模型Direct(訂閱)
實際業務中,我可能不需要把消息發送給每個隊列,比如。我訂單交易失敗,我的積分微服務就不需要接收到這種,積分微服務只有在交易成功才做積分減少或者增加的操作,那就是我只訂閱交易成功的訂單消息
7.2.2.1
-
創建交換機
-
創建隊列
-
交換機跟隊列綁定
-
創建消費者
消費者1:訂單服務監聽隊列1
@Component
public class PointsDirectListen {@RabbitListener(queues = "driect.queue2")public void listenPoints(String msg){System.out.println("積分服務已經接收到用戶成功下單消息:"+msg);}
}
消費者2:積分服務監聽隊列2
@Component
public class PointsDirectListen {@RabbitListener(queues = "driect.queue2")public void listenPoints(String msg){System.out.println("積分服務已經接收到用戶成功下單消息:"+msg);}
}
創建生產者用戶服務
@GetMapping("/sendDirectMassage")@ApiOperation(value = "發送消息到訂閱交換機")public void sendDirectMassage() throws InterruptedException {String exchangeName="work.dirice";String msg="用戶成功下單了";rabbitTemplate.convertAndSend(exchangeName,"red",msg);}@GetMapping("/sendDirectMassageFaild")@ApiOperation(value = "發送消息到訂閱交換機")public void sendDirectMassageFaild() throws InterruptedException {String exchangeName="work.dirice";String msg="用戶下單失敗了";rabbitTemplate.convertAndSend(exchangeName,"blue",msg);}@GetMapping("/sendDirectMassageWait")@ApiOperation(value = "發送消息到訂閱交換機")public void sendDirectMassageWait() throws InterruptedException {String exchangeName="work.dirice";String msg="用戶下單但是還未付款";rabbitTemplate.convertAndSend(exchangeName,"yellow",msg);}
分別調用三個接口.結果如下
sendDirectMassage接口兩個消費者都能接收到
sendDirectMassageFaild只有消費者1能接收到
sendDirectMassageWait只有消費者2能接收到
7.2.3交換機模型Topic()
編寫案例
創建綁定關系
7.3.隊列和交換機的申明
在之前我們都是手動在控制臺去創建隊列或者交換機,但是在真實企業中,不可能手動在控制臺去創建,而且這樣創建的,一旦中間件出問題了,所有的隊列和交換機就沒了,一般是用代碼處理。
/*** 注解的方式創建隊列* 一般在消費方創建* 1.創建一個名字叫annotate.work的且類型為TOPIC的交換機* 2.交換機綁定的隊列為annotate.queue,該隊列持久化* 3.交換機綁定的key為"red","yellow"* @param msg* @throws InterruptedException*/@Componentpublic class orderListen {@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="annotate.queue",declare = "true"), //隊列名稱叫annotate.queue,且需要持久化exchange = @Exchange(name = "annotate.work",type = ExchangeTypes.TOPIC),//交換機名稱和類型key={"red","yellow"} //路由key))public void listenWorkAnnotateQueueOrder2(String msg) throws InterruptedException {System.err.println("注解方式生成的隊列收到消息:"+msg);Thread.sleep(50);}}
項目啟動之后就能直接創建相應的隊列和交換機
8.消息轉換器
1.創建一個隊列,名字叫object.queue
2.創建生產者往這個隊列發送一個消息,消息的類型為map或者java對象
import com.threesum.OderApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.HashMap;@SpringBootTest(classes = OderApplication.class)
public class orderTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendObjectMsg(){HashMap<String, Object> msg = new HashMap<>();msg.put("name","aa");msg.put("age",21);rabbitTemplate.convertAndSend("object.queue",msg);}
}
3.觀察隊列中的消息
結論:會發現變成了一堆亂碼(因為默認采用的是java的jdk序列化)
4.采用java序列化方式處理問題
4.1引入依賴
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId></dependency>
4.2發送方和消費方都使用java序列化
package com.threesum.config.rabbit;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class JacksonDada {@Beanpublic MessageConverter JacksonJsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}
4.3再次獲取,就轉換正常了