文章目錄
- 需求描述
- 創建項目
- 訂單系統(生產者)
- 完善配置
- 聲明隊列
- 下單接口
- 啟動服務
- 物流系統(消費者)
- 完善配置
- 監聽隊列
- 啟動服務
- 格式化發送消息對象
- SimpleMessageConverter
- 定義一個對象
- 生產者代碼
- 消費者
- 運行程序
- JSON
- 定義一個對象
- 生產者代碼
- 定義轉換器
- 消費者代碼
- 運行程序
需求描述
用戶下單成功之后,通知物流系統,進行發貨(只涉及到應用通信,不做具體功能實現)
- 訂單系統——生產者
- 物流系統——消費者
創建項目
通常情況下,訂單系統和物流系統是不同團隊來開發的,是兩個獨立的應用
- 為了方便演示,就把兩個項目創建到一個文件夾下
- 圖標沒有發生變化,啟動類也沒有被識別出來
- 因為
Maven
沒有被識別出來 - 我們手動加入
Maven
- 在項目目錄中右鍵點擊
pom.xml
文件,選擇:Add as Maven Project
- 在項目目錄中右鍵點擊
- 因為
訂單系統(生產者)
完善配置
spring.application.name=order-service
server.port=8080
#amqp://username:password@Ip:port/virtual-host
spring.rabbitmq.addresses=amqp://order:order@127.0.0.1:5672/order
聲明隊列
package org.example.order.config; import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { // 使用簡單模式來完成消息發送 @Bean public Queue orderQueue(){ return QueueBuilder.durable("order.create").build(); }
}
下單接口
package org.example.order.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @RequestMapping("/order")
@RestController
public class OrderController { // 注入RabbitMQ的客戶端 @Autowired private RabbitTemplate rabbitTemplate; // 下單的接口 @RequestMapping("/create") public String create(){ // 參數校驗、數據庫保存等等...業務代碼省略 // 發送消息 // 交換機、隊列是什么routingKey就是什么、字符串信息 String orderId = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("", "order.create", "訂單信息,訂單ID: " + orderId); return "下單成功"; }
}
- 下單成功之后,發送訂單消息
啟動服務
- 訪問接口,模擬下單請求: http://127.0.0.1:8080/order/create
查看消息:
物流系統(消費者)
從 RabbitMQ
中接收消息
完善配置
8080
端口號已經被訂單系統占用了,修改物流系統的端口號為 9090
spring.application.name=logistics-service
# 兩邊的端口號不能一樣,他們是同時運行的
server.port=9090
#amqp://username:password@Ip:port/virtual-host
spring.rabbitmq.addresses=amqp://guest:guest@127.0.0.1:5672/order
監聽隊列
package org.example.logistics.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class OrderListener { @RabbitListener(queues = "order.create") // 指出需要監聽的隊列的名稱 public void handMessage(String orderInfo) { System.out.println("接收到訂單消息: " + orderInfo); // 收到消息后的處理,代碼省略}
}
啟動服務
訪問訂單系統的接口,模擬下單請求: http://127.0.0.1:8080/order/create
在物流系統的日志中,可以觀察到,通過 RabbitMQ
,成功把下單信息傳遞給了物流系統
格式化發送消息對象
如果通過 RabbitTemplate
發送?個對象作為消息, 我們需要對該對象進?序列化
SimpleMessageConverter
默認使用的是 SimpleMessageConverter
進行序列化
定義一個對象
package org.example.order.model; import lombok.Data; import java.io.Serializable; @Data
public class OrderInfo implements Serializable { private String orderId; private String name;
}
- 在發送消息的時候,信息需要經過
MessageConverter
進行轉換 (默認是SimpleMessageConverter
) SimpleMessageConverter
只支持String
、byte[]
、Serializable
- 此接口將
OrderInfo
序列化,之后才能被正常接收 (OrderInfo
類型不被支持,會報500
錯誤)
生產者代碼
package org.example.order.controller; import org.example.order.model.OrderInfo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import java.util.Random;
import java.util.UUID; @RequestMapping("/order")
@RestController
public class OrderController { // 注入RabbitMQ的客戶端 @Autowired private RabbitTemplate rabbitTemplate; // 下單的接口 @RequestMapping("/create2") public String create2(){ // 發送對象 OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(UUID.randomUUID().toString()); orderInfo.setName("商品" + new Random().nextInt(100)); rabbitTemplate.convertAndSend("", "order.create", orderInfo); return "下單成功"; }
}
消費者
package org.example.logistics.listener; import org.example.order.model.OrderInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
@RabbitListener(queues = "order.create") // 指出需要監聽的隊列的名稱
public class OrderListener { @RabbitHandler public void handMessage(String orderInfo) { System.out.println("接收到訂單消息String: " + orderInfo); // 收到消息后的處理,代碼省略 }
}
運行程序
訪問訂單系統的接口,模擬下單請求: http://127.0.0.1:8080/order/create2
觀察發送的消息
- 可以看到消息的可讀性太差
- 所以我們使用
JSON
序列化
JSON
使用 SimpleMessageConverter
序列化可讀性太差,Spring AMQP
推薦使用 JSON
序列化
Spring AMQP
提供了Jsckson2JsonMessageConverter
和MappingJackson2MessageConverter
等轉換器- 我們需要把一個
MessageConverter
設置到RabbitTemplate
中
定義一個對象
package org.example.order.model; import lombok.Data; @Data
public class OrderInfo { private String orderId; private String name;
}
生產者代碼
package org.example.order.controller; import org.example.order.model.OrderInfo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import java.util.Random;
import java.util.UUID; @RequestMapping("/order")
@RestController
public class OrderController { // 注入RabbitMQ的客戶端 @Autowired private RabbitTemplate rabbitTemplate; // 下單的接口 @RequestMapping("/create2") public String create2(){ // 發送對象 OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(UUID.randomUUID().toString()); orderInfo.setName("商品" + new Random().nextInt(100)); rabbitTemplate.convertAndSend("", "order.create", orderInfo); return "下單成功"; }
}
- 和前面的使用默認轉換器代碼一樣
定義轉換器
package org.example.order.config; import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { // 使用簡單模式來完成消息發送 @Bean public Queue orderQueue(){ return QueueBuilder.durable("order.create").build(); } /** * 創建一個 rabbitTemplate 對象 * @return */ @Bean public Jackson2JsonMessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jsonMessageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter); return rabbitTemplate; }
}
- 創建出一個
rabbitTemplate
對象進行使用 - 生產者(
order-service
)和消費者(logistics-service
)都需要- 不然還是拿不到消息
消費者代碼
package org.example.logistics.listener; import org.example.order.model.OrderInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
@RabbitListener(queues = "order.create") // 指出需要監聽的隊列的名稱
public class OrderListener { @RabbitHandler public void handMessage2(OrderInfo orderInfo) { System.out.println("接收到訂單消息OrderInfo: " + orderInfo); // 收到消息后的處理,代碼省略 }
}
@RabbitListener(queues = "order.create")
可以加在類上,也可以加在方法上,用于定義一個類或者方法作為消息的監聽器
@RabbitHandler
是一個方法級別的注解,當使用@RabbitHandler
注解時,這個方法將被調用處理特定的消息
- 根據調用的類型,調用相關方法
運行程序
訪問訂單系統的接口,模擬下單請求: http://127.0.0.1:8080/order/create2
前面的 String
也還可以接收到,只要模擬下單請求: http://127.0.0.1:8080/order/create 即可
- 歸功于
@RabbitHandler
,指哪打哪