從淺入深 學習 SpringCloud 微服務架構(十六)
一、SpringCloudStream:自定義消息通道
1、在子工程 stream_product (子模塊)中,創建 自定義的消息通道類 MyProcessor.java
/*** spring_cloud_demo\stream_product\src\main\java\djh\it\stream\channel\MyProcessor.java** 2024-5-11 創建 自定義的消息通道類 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生產者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消費者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
2、在子工程 stream_product (子模塊)中,修改 消息發送的工具類 MessageSender.java 使用自定義消息通道。
/*** spring_cloud_demo\stream_product\src\main\java\djh\it\stream\producer\MessageSender.java** 2024-5-10 抽取一個消息發送的工具類 MessageSender.java*/package djh.it.stream.producer;import djh.it.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
//import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Source.class)
@EnableBinding(MyProcessor.class)
public class MessageSender {
// @Autowired
// private MessageChannel output;
//
// //發送消息
// public void send(Object obj){
// output.send(MessageBuilder.withPayload((obj)).build());
// }@Autowired@Qualifier(value = "myoutput")private MessageChannel myoutput;//發送消息public void send(Object obj){myoutput.send(MessageBuilder.withPayload((obj)).build());}
}
3、在子工程 stream_product (子模塊)中,修改 application.yml 配置文件, 添加自定義消息配置。
## spring_cloud_demo\stream_product\src\main\resources\application.ymlserver:port: 7001 #服務端口
spring:application:nmae: stream_product #指定服務名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output: #管道交互destination: djh-default #指定消息發送的目的地,在 rabbitmq 中,發送到一個 djh-default 的交換機 exchange。myoutput: # 自定義消息通道destination: djh-custom-outputbinders: #配置綁定器defaultRabbit:type: rabbit
4、在子工程 stream_consumer (子模塊)中,創建 自定義的消息通道類 MyProcessor.java
/*** spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\channel\MyProcessor.java** 2024-5-11 創建 自定義的消息通道類 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生產者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消費者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
5、在子工程 stream_consumer (子模塊)中,修改 獲取消息工具類 MessageListener.java 使用自定義消息通道。
/*** spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\consumer\MessageListener.java** 2024-5-10 創建一個獲取消息工具類 MessageListener.java*/package djh.it.stream.consumer;import djh.it.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {// //監聽 binding 中的消息
// @StreamListener(Sink.INPUT)
// public void input(String message) {
// System.out.println("獲取到的消息: " + message);
// }//監聽 binding 中的消息@StreamListener(MyProcessor.MYINPUT)public void input(String message) {System.out.println("獲取到的消息: " + message);}
}
6、在子工程 stream_consumer (子模塊)中,修改 application.yml 配置文件, 添加自定義消息配置。
## spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002 #服務端口
spring:application:nmae: stream_consumer #指定服務名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #管道交互,內置的獲取消息的通道,從 djh-default 中獲取消息。destination: djh-default #指定消息發送的目的地,在 rabbitmq 中,發送到一個 djh-default 的交換機 exchange。myinput: #自定義消息通道destination: djh-custom-outputbinders: #配置綁定器defaultRabbit:type: rabbit
7、在子工程 stream_product (子模塊)中,運行 啟動類 ProducerApplication.java 進行測試
/*** spring_cloud_demo\stream_product\src\main\java\djh\it\stream\ProducerApplication.java** 2024-5-9 SpringCloudStream 入門案例:啟動類 ProducerApplication.java* 1)引入依賴。* 2)配置 application.yml 配置文件。* 3)發送消息的話,定義一個通道接口,通過接口中內置的 messagechannel,(sprngcloudtream 中內置接口 Source)* 4)@EnableBinding 注解 :綁定對應通道。* 5)發送消息的話,通過 MessageChannel 發送消息,如果需要 MessageChannel --> 通過綁定內置接口獲取。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class);}
}
8、在子工程 stream_consumer (子模塊)中,運行 啟動類 ConsumerApplication.java 進行測試。
/*** spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\ConsumerApplication.java** 2024-5-9 SpringCloudStream 入門案例:啟動類 ConsumerApplication.java* 1)引入依賴。* 2)配置 application.yml 配置文件。* 3)定義一個通道接口,通過內置獲取消息的接口:Sink* 4)綁定對應通道。* 5)配置一個監聽方法 :當程序從中間件獲取數據之后,執行的業務邏輯方法,需要在監聽方法上配置 @StreamListener 注解。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class);}
}
9、在子工程 stream_product (子模塊)中,運行 一個測試類 ProducterTest.java 進行測試。
/*** spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java** 2024-5-10 創建一個測試類 ProducterTest.java*/package djh.it.stream;import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {@Autowiredprivate MessageSender messageSender;@Testpublic void testSend(){messageSender.send("hello 測試 工具類");}
}
10、啟動 rabbitmqctl-server.bat 服務,并運行 測試類 ProducterTest 和 ConsumerApplication 啟動類,在 idea Run Dashboard 控制面板,
同樣會輸出 “獲取到的消息: hello 測試 工具類”
二、SpringCloudStream:消息分組
1、SpringCloudStream:消息分組
-
通常在生產環境,我們的每個服務都不會以單節點的方式運行在生產環境,當同一個服務啟動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。默認情況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每個消費者實例接收和處理,但是有些業務場景之下,我們希望生產者產生的消息只被其中一個實例消費,這個時候我們需要為這些消費者設置消費組來實現這樣的功能。
-
實現的方式非常簡單,我們只需要在服務消費者端設置 spring.c1oud.stream.bindings.input.group 屬性即可。
2、在子工程 stream_consumer (子模塊),復制一個更名為:在子工程 stream_consumer_2 (子模塊),并把 application.yml 配置文件中的端口號改為:7003
1)子工程 stream_consumer_2 (子模塊)中的 pom.xml 文件。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>spring_cloud_demo</artifactId><groupId>djh.it</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>stream_consumer_2</artifactId><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency></dependencies>
</project>
<!-- spring_cloud_demo\stream_consumer_2\pom.xml -->
2)子工程 stream_consumer_2 (子模塊)中的 application.yml 文件。
## spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003 #服務端口
spring:application:nmae: stream_consumer_2 #指定服務名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #管道交互,內置的獲取消息的通道,從 djh-default 中獲取消息。destination: djh-default #指定消息發送的目的地,在 rabbitmq 中,發送到一個 djh-default 的交換機 exchange。myinput: #自定義消息通道destination: djh-custom-outputbinders: #配置綁定器defaultRabbit:type: rabbit
3)子工程 stream_consumer_2 (子模塊)中的 自定義的消息通道類 MyProcessor.java
/*** spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\channel\MyProcessor.java** 2024-5-11 創建 自定義的消息通道類 MyProcessor.java*/package djh.it.stream.channel;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyProcessor {//消息生產者的配置String MYOUTPUT = "myoutput";@Output("myoutput")MessageChannel myoutput();//消息消費者的配置String MYINPUT = "myinput";@Input("myinput")SubscribableChannel myinput();
}
4)子工程 stream_consumer_2 (子模塊)中的 獲取消息工具類 MessageListener.java
/*** spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\consumer\MessageListener.java** 2024-5-11 創建一個獲取消息工具類 MessageListener.java*/package djh.it.stream.consumer;import djh.it.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
//@EnableBinding(Sink.class)
@EnableBinding(MyProcessor.class)
public class MessageListener {// //監聽 binding 中的消息
// @StreamListener(Sink.INPUT)
// public void input(String message) {
// System.out.println("獲取到的消息: " + message);
// }//監聽 binding 中的消息@StreamListener(MyProcessor.MYINPUT)public void input(String message) {System.out.println("獲取到的消息: " + message);}
}
5)子工程 stream_consumer_2 (子模塊)中的 啟動類 ConsumerApplication_2.java
/*** spring_cloud_demo\stream_consumer_2\src\main\java\djh\it\stream\ConsumerApplication_2.java** 2024-5-11 SpringCloudStream 入門案例:啟動類 ConsumerApplication_2.java* 1)引入依賴。* 2)配置 application.yml 配置文件。* 3)定義一個通道接口,通過內置獲取消息的接口:Sink* 4)綁定對應通道。* 5)配置一個監聽方法 :當程序從中間件獲取數據之后,執行的業務邏輯方法,需要在監聽方法上配置 @StreamListener 注解。*/package djh.it.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApplication_2 {public static void main(String[] args) {SpringApplication.run(ConsumerApplication_2.class);}
}
3、啟動 rabbitmqctl-server.bat 服務,并運行 測試類 ProducterTest 和 ConsumerApplication 啟動類 和 ConsumerApplication_2 啟動類,
在 idea Run Dashboard 控制面板,兩個消費都啟動類都會輸出 “獲取到的消息: hello 測試 工具類”
4、在子工程 stream_consumer (子模塊)的 application.yml 配置文件中,添加 消息分組配置。
## spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002 #服務端口
spring:application:nmae: stream_consumer #指定服務名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #管道交互,內置的獲取消息的通道,從 djh-default 中獲取消息。destination: djh-default #指定消息發送的目的地,在 rabbitmq 中,發送到一個 djh-default 的交換機 exchange。myinput: #自定義消息通道destination: djh-custom-outputgroup: group1 #消息分組(同一組只能有一個消息者獲取消息)binders: #配置綁定器defaultRabbit:type: rabbit
5、在子工程 stream_consumer_2 (子模塊)的 application.yml 配置文件中,也添加 消息分組配置。
## spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003 #服務端口
spring:application:nmae: stream_consumer_2 #指定服務名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:input: #管道交互,內置的獲取消息的通道,從 djh-default 中獲取消息。destination: djh-default #指定消息發送的目的地,在 rabbitmq 中,發送到一個 djh-default 的交換機 exchange。myinput: #自定義消息通道destination: djh-custom-outputgroup: group1 #消息分組(同一組只能有一個消息者獲取消息)binders: #配置綁定器defaultRabbit:type: rabbit
6、重新啟動 rabbitmqctl-server.bat 服務,并運行 測試類 ProducterTest 和 ConsumerApplication 啟動類 和 ConsumerApplication_2 啟動類,
在 idea Run Dashboard 控制面板,發現只有一個消費都啟動類都會輸出 “獲取到的消息: hello 測試 工具類”
三、SpringCloudStream:消息分區
1、消息分區
有一些場景需要滿足,同一個特征的數據被同一個實例消費,比如同一個id的傳感器監測數據必須被同-個實例統計計算分析,否則可能無法獲取全部的數據。又比如部分異步任務,首次請求啟動task,二次請求取消task,此場景就必須保證兩次請求至同一實例.
2、在子工程 stream_producer (子模塊)的 application.yml 配置文件中,添加 消息分區配置。
## spring_cloud_demo\stream_product\src\main\resources\application.ymlserver:port: 7001 #服務端口
spring:application:nmae: stream_product #指定服務名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:bindings:output: #管道交互destination: djh-default #指定消息發送的目的地,在 rabbitmq 中,發送到一個 djh-default 的交換機 exchange。myoutput: # 自定義消息通道destination: djh-custom-outputproducer: # 配置分區partition-key-expression: payload # 分區關鍵字,對象中的 id 或 對象。partition-count: 2 # 分區大小binders: #配置綁定器defaultRabbit:type: rabbit
3、在子工程 stream_consumer (子模塊)的 application.yml 配置文件中,也添加 消息分區配置。
## spring_cloud_demo\stream_consumer\src\main\resources\application.ymlserver:port: 7002 #服務端口
spring:application:nmae: stream_consumer #指定服務名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:instanceCount: 2 # 消費者總數。instanceIndex: 0 # 當前消費者的索引,從 0 開始。bindings:input: #管道交互,內置的獲取消息的通道,從 djh-default 中獲取消息。destination: djh-default #指定消息發送的目的地,在 rabbitmq 中,發送到一個 djh-default 的交換機 exchange。myinput: #自定義消息通道destination: djh-custom-outputgroup: group1 #消息分組(同一組只能有一個消息者獲取消息)consumer:partitioned: true # 開啟分區支持binders: #配置綁定器defaultRabbit:type: rabbit
3、在子工程 stream_consumer_2 (子模塊)的 application.yml 配置文件中,也添加 消息分區配置。
## spring_cloud_demo\stream_consumer_2\src\main\resources\application.ymlserver:port: 7003 #服務端口
spring:application:nmae: stream_consumer_2 #指定服務名rabbitmq:addresses: 127.0.0.1username: guestpassword: guestcloud:stream:instanceCount: 2 # 消費者總數。instanceIndex: 1 # 當前消費者的索引,從 0 開始。bindings:input: #管道交互,內置的獲取消息的通道,從 djh-default 中獲取消息。destination: djh-default #指定消息發送的目的地,在 rabbitmq 中,發送到一個 djh-default 的交換機 exchange。myinput: #自定義消息通道destination: djh-custom-outputgroup: group2 #消息分組(同一組只能有一個消息者獲取消息)consumer:partitioned: true # 開啟分區支持binders: #配置綁定器defaultRabbit:type: rabbit
4、修改 子工程 stream_producer (子模塊)的 測試類 ProducterTest 進行測試。
/*** spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java** 2024-5-10 創建一個測試類 ProducterTest.java*/package djh.it.stream;import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {@Autowiredprivate MessageSender messageSender;@Testpublic void testSend(){
// messageSender.send("hello 測試 工具類");for(int i=0;i<5;i++){messageSender.send("0");}}
}
5、重新啟動 rabbitmqctl-server.bat 服務,并運行 測試類 ProducterTest 和 ConsumerApplication 啟動類 和 ConsumerApplication_2 啟動類,
在 idea Run Dashboard 控制面板,發現只有 ConsumerApplication 一個消費者啟動類都會輸出 “獲取到的消息: 0”
上一節關聯鏈接請點擊:
# 從淺入深 學習 SpringCloud 微服務架構(十五)