SpringCloud Stream
技術興起的原因:為了解決系統中不同中間件的適配問題,出現了cloud stream,采用適配綁定的方式,自動給不同的MQ之間進行切換。
屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。
官方定義Spring Cloud Stream是一個構建消息驅動微服務的框架。
應用程序通過inputs(消費者)或者outputs(生產者)來與Spring Cloud Stream中binder對象交互。通過我們配置來綁定,而Spring Cloud Stream的binder對象負責與消息中間件交互。
Spring Cloud Stream為一些供應商的消息中間件產品提供了個性化的自動配置,引用了發布、訂閱、消費、分區的三個核心概念。
官方版本目前僅僅支持RabbitMQ和Kafka。
MQ相關術語
Message:生產者/消費者之間靠消息媒介傳遞信息內容
MessageChannel:消息必須走特定的通道
消息通道的子接口SubscribableChannel,由MessageHandle消息處理器所訂閱。
相關注解
Middleware:中間件,目前只支持RabbitMQ和Kafka
Binder:應用層和消息中間件之間的封裝,實現了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型,這些可以通過配置文件修改。
Input:表示輸入通道,消息進入該通道傳到應用程序。
Output:注解標識輸出通道,發布的消息將通過該通道離開應用程序。
StreamListener:監聽隊列,用于消費者的隊列的消息接收。
EnableBinding:將信道channel和exchange綁定在一起。
首先創建一個provider,服務提供者rabbitmq-provider8801
導入依賴
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-actuator
org.springframework.cloud
spring-cloud-starter-netflix-eureka-client
org.springframework.cloud
spring-cloud-starter-stream-rabbit
org.springframework.boot
spring-boot-devtools
runtime
true
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
編寫配置文件application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務信息;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設置rabbitmq的相關的環境配置
spring:
rabbitmq:
host: 192.168.31.52 #rabbitmq服務啟動所在機器的IP地址
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain”
binder: defaultRabbit # 設置要綁定的消息服務的具體設置
eureka:
client: # 客戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2# 設置心跳的時間間隔(默認是30秒)
lease-expiration-duration-in-seconds: 5# 如果現在超過了5秒的間隔(默認是90秒)
instance-id: send-8801.com # 在信息列表時顯示主機名稱
prefer-ip-address: true# 訪問的路徑變為IP地址
編寫一個發送數據的接口IMessageProvider
public interface IMessageProvider {
String sendMessage();
}
接口的實現類IMessageProviderImpl
@EnableBinding(Source.class) //定義消息的推送管道
public class IMessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // 消息發送管道
@Override
public String sendMessage()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}
controller層下的SendMessageController
@RestController
public class SendMessageController {
@Autowired
private IMessageProvider iMessageProvider;
@GetMapping(value = "/sendMessage")
public String send(){
return iMessageProvider.sendMessage();
}
}
啟動Eureka7001,啟動服務提供者8801.啟動虛擬機上的RabbitMQ
記得把虛擬機防火墻關了。
[hadoop@centos7 bin]$ systemctl stop firewalld
[hadoop@centos7 bin]$ systemctl status firewalld
然后測試一下服務提供者是否正常運行。
控制臺輸出UUID。
然后再創建一個服務消費者,在MQ的另一端進行消費消息。
創建另一個模塊,cloud-stream-rabbitmq-consumer8802
導入依賴
org.springframework.boot
spring-boot-starter-web
org.springframework.cloud
spring-cloud-starter-netflix-eureka-client
org.springframework.cloud
spring-cloud-starter-stream-rabbit
org.springframework.boot
spring-boot-starter-actuator
org.springframework.boot
spring-boot-devtools
runtime
true
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
和上一個服務提供者的依賴一樣。
寫配置文件application.yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務信息;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設置rabbitmq的相關的環境配置
spring:
rabbitmq:
host: 192.168.31.52
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
binder: defaultRabbit # 設置要綁定的消息服務的具體設置
eureka:
client: # 客戶端進行Eureka注冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2# 設置心跳的時間間隔(默認是30秒)
lease-expiration-duration-in-seconds: 5# 如果現在超過了5秒的間隔(默認是90秒)
instance-id: receive-8802.com # 在信息列表時顯示主機名稱
prefer-ip-address: true# 訪問的路徑變為IP地址
創建一個消費者的ReceiveMessageController
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message message){
System.out.println("message = "+message.getPayload()+"\t"+"serverPort= "+serverPort);
}
}
如果消費者成功接收消息,則在控制臺輸出產生的UUID和端口號。
啟動Eureka7001,啟動服務提供者8801,啟動服務消費者8802,還有MQ。
在Eureka中可以看到兩個服務已經啟動。
每次請求http://localhost:8801/sendMessage;消費者都能輸出結果,輸出的UUID與提供者的一致。
登錄RabbitMQ的web管理,可以看到我們新建的exchange,并且可以查看消息隊列中的請求次數的情況。
發送的消息除了可以是字符串類型還可以發送對象,在消費者接受數據的時候,會將實體轉換成JSON字符串。
配置文件中,如果你使用的消息中間件是kafka,type: kafka;environment是設置消息中間件的配置信息,端口,主機地址,用戶名,密碼等,可以設置多個binder,適應不同的場景。
重復消費問題
默認情況下,每個消費者的分組名都是隨機的,不同的,對于不同的組會引起重復消費的問題,例如:消息提供者只向消息隊列中發送了一個消息,正常情況下,消費者A從隊列中拿走之后,消費者B不能再獲得相同的消息,但是由于AB是不同的組,所以A和B都會獲取相同的消息,這就導致了資源被重復消費。
微服務應用放置到同一個group中,就能夠保證消息只會被其中應用消費一次,不同的組是可以消費的,同一個組內會發生競爭關系,只有其中一個可以消費。
同一個應用的不同微服務,只用在配置文件中指定相同的group。
再次發送消息時,只有消費者其中一個能消費。避免了重復消費。
消息持久化
當兩個消費者A和B,A設置了group屬性值,B沒有設置,這時,消費者全部宕機,但是消息生產者一直響MQ中生產消息,這時候重啟A和B兩者有什么區別呢?
正因為B沒有這時分組,B再次啟動后不會再向MQ中取數據,而A啟動成功后可以正常消費消息隊列中的消息。
因此設置了group的消費者,可以保證消息隊列中的消息持久化,group對于消費者來講很重要,既能避免重復消費,又能在消費者重啟后依然可以消費消息隊列中未消費的消息。