springcloud 相同服務名_SpringCloud系列之SpringCloud Stream

SpringCloud Stream

技術興起的原因:為了解決系統中不同中間件的適配問題,出現了cloud stream,采用適配綁定的方式,自動給不同的MQ之間進行切換。

屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。

官方定義Spring Cloud Stream是一個構建消息驅動微服務的框架。

應用程序通過inputs(消費者)或者outputs(生產者)來與Spring Cloud Stream中binder對象交互。通過我們配置來綁定,而Spring Cloud Stream的binder對象負責與消息中間件交互。

Spring Cloud Stream為一些供應商的消息中間件產品提供了個性化的自動配置,引用了發布、訂閱、消費、分區的三個核心概念。

官方版本目前僅僅支持RabbitMQ和Kafka。

MQ相關術語

Message:生產者/消費者之間靠消息媒介傳遞信息內容

MessageChannel:消息必須走特定的通道

消息通道的子接口SubscribableChannel,由MessageHandle消息處理器所訂閱。

相關注解

Middleware:中間件,目前只支持RabbitMQ和Kafka

Binder:應用層和消息中間件之間的封裝,實現了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型,這些可以通過配置文件修改。

Input:表示輸入通道,消息進入該通道傳到應用程序。

Output:注解標識輸出通道,發布的消息將通過該通道離開應用程序。

StreamListener:監聽隊列,用于消費者的隊列的消息接收。

EnableBinding:將信道channel和exchange綁定在一起。

首先創建一個provider,服務提供者rabbitmq-provider8801

導入依賴

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-actuator

org.springframework.cloud

spring-cloud-starter-netflix-eureka-client

org.springframework.cloud

spring-cloud-starter-stream-rabbit

org.springframework.boot

spring-boot-devtools

runtime

true

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

編寫配置文件application.yml

server:

port: 8801

spring:

application:

name: cloud-stream-provider

cloud:

stream:

binders: # 在此處配置要綁定的rabbitmq的服務信息;

defaultRabbit: # 表示定義的名稱,用于于binding整合

type: rabbit # 消息組件類型

environment: # 設置rabbitmq的相關的環境配置

spring:

rabbitmq:

host: 192.168.31.52 #rabbitmq服務啟動所在機器的IP地址

port: 5672

username: guest

password: guest

bindings: # 服務的整合處理

output: # 這個名字是一個通道的名稱

destination: studyExchange # 表示要使用的Exchange名稱定義

content-type: application/json # 設置消息類型,本次為json,文本則設置“text/plain”

binder: defaultRabbit # 設置要綁定的消息服務的具體設置

eureka:

client: # 客戶端進行Eureka注冊的配置

service-url:

defaultZone: http://localhost:7001/eureka

instance:

lease-renewal-interval-in-seconds: 2# 設置心跳的時間間隔(默認是30秒)

lease-expiration-duration-in-seconds: 5# 如果現在超過了5秒的間隔(默認是90秒)

instance-id: send-8801.com # 在信息列表時顯示主機名稱

prefer-ip-address: true# 訪問的路徑變為IP地址

編寫一個發送數據的接口IMessageProvider

public interface IMessageProvider {

String sendMessage();

}

接口的實現類IMessageProviderImpl

@EnableBinding(Source.class) //定義消息的推送管道

public class IMessageProviderImpl implements IMessageProvider

{

@Resource

private MessageChannel output; // 消息發送管道

@Override

public String sendMessage()

{

String serial = UUID.randomUUID().toString();

output.send(MessageBuilder.withPayload(serial).build());

System.out.println("*****serial: "+serial);

return null;

}

}

controller層下的SendMessageController

@RestController

public class SendMessageController {

@Autowired

private IMessageProvider iMessageProvider;

@GetMapping(value = "/sendMessage")

public String send(){

return iMessageProvider.sendMessage();

}

}

啟動Eureka7001,啟動服務提供者8801.啟動虛擬機上的RabbitMQ

記得把虛擬機防火墻關了。

[hadoop@centos7 bin]$ systemctl stop firewalld

[hadoop@centos7 bin]$ systemctl status firewalld

然后測試一下服務提供者是否正常運行。

控制臺輸出UUID。

然后再創建一個服務消費者,在MQ的另一端進行消費消息。

創建另一個模塊,cloud-stream-rabbitmq-consumer8802

導入依賴

org.springframework.boot

spring-boot-starter-web

org.springframework.cloud

spring-cloud-starter-netflix-eureka-client

org.springframework.cloud

spring-cloud-starter-stream-rabbit

org.springframework.boot

spring-boot-starter-actuator

org.springframework.boot

spring-boot-devtools

runtime

true

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

和上一個服務提供者的依賴一樣。

寫配置文件application.yml

server:

port: 8802

spring:

application:

name: cloud-stream-consumer

cloud:

stream:

binders: # 在此處配置要綁定的rabbitmq的服務信息;

defaultRabbit: # 表示定義的名稱,用于于binding整合

type: rabbit # 消息組件類型

environment: # 設置rabbitmq的相關的環境配置

spring:

rabbitmq:

host: 192.168.31.52

port: 5672

username: guest

password: guest

bindings: # 服務的整合處理

input: # 這個名字是一個通道的名稱

destination: studyExchange # 表示要使用的Exchange名稱定義

content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”

binder: defaultRabbit # 設置要綁定的消息服務的具體設置

eureka:

client: # 客戶端進行Eureka注冊的配置

service-url:

defaultZone: http://localhost:7001/eureka

instance:

lease-renewal-interval-in-seconds: 2# 設置心跳的時間間隔(默認是30秒)

lease-expiration-duration-in-seconds: 5# 如果現在超過了5秒的間隔(默認是90秒)

instance-id: receive-8802.com # 在信息列表時顯示主機名稱

prefer-ip-address: true# 訪問的路徑變為IP地址

創建一個消費者的ReceiveMessageController

@Component

@EnableBinding(Sink.class)

public class ReceiveMessageController {

@Value("${server.port}")

private String serverPort;

@StreamListener(Sink.INPUT)

public void input(Message message){

System.out.println("message = "+message.getPayload()+"\t"+"serverPort= "+serverPort);

}

}

如果消費者成功接收消息,則在控制臺輸出產生的UUID和端口號。

啟動Eureka7001,啟動服務提供者8801,啟動服務消費者8802,還有MQ。

在Eureka中可以看到兩個服務已經啟動。

每次請求http://localhost:8801/sendMessage;消費者都能輸出結果,輸出的UUID與提供者的一致。

登錄RabbitMQ的web管理,可以看到我們新建的exchange,并且可以查看消息隊列中的請求次數的情況。

發送的消息除了可以是字符串類型還可以發送對象,在消費者接受數據的時候,會將實體轉換成JSON字符串。

配置文件中,如果你使用的消息中間件是kafka,type: kafka;environment是設置消息中間件的配置信息,端口,主機地址,用戶名,密碼等,可以設置多個binder,適應不同的場景。

重復消費問題

默認情況下,每個消費者的分組名都是隨機的,不同的,對于不同的組會引起重復消費的問題,例如:消息提供者只向消息隊列中發送了一個消息,正常情況下,消費者A從隊列中拿走之后,消費者B不能再獲得相同的消息,但是由于AB是不同的組,所以A和B都會獲取相同的消息,這就導致了資源被重復消費。

微服務應用放置到同一個group中,就能夠保證消息只會被其中應用消費一次,不同的組是可以消費的,同一個組內會發生競爭關系,只有其中一個可以消費。

同一個應用的不同微服務,只用在配置文件中指定相同的group。

再次發送消息時,只有消費者其中一個能消費。避免了重復消費。

消息持久化

當兩個消費者A和B,A設置了group屬性值,B沒有設置,這時,消費者全部宕機,但是消息生產者一直響MQ中生產消息,這時候重啟A和B兩者有什么區別呢?

正因為B沒有這時分組,B再次啟動后不會再向MQ中取數據,而A啟動成功后可以正常消費消息隊列中的消息。

因此設置了group的消費者,可以保證消息隊列中的消息持久化,group對于消費者來講很重要,既能避免重復消費,又能在消費者重啟后依然可以消費消息隊列中未消費的消息。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/259903.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/259903.shtml
英文地址,請注明出處:http://en.pswp.cn/news/259903.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

計算機意外重啟或遇錯誤無法繼續,計算機意外地重新啟動或遇到錯誤如何解決?...

電腦小白在重裝系統后難免會遇到些問題,有的容易處理,有的會有些棘手。那么,計算機意外地重新啟動或遇到錯誤如何解決?今天快啟動小編為大家分享詳細的計算機意外地重新啟動或遇到錯誤的解決方法,獻給對系統重裝知識不太了解的小…

jqueryui的Tooltip使用方法

http://api.jqueryui.com/tooltip/#option-position,詳細使用方法。 http://jqueryui.com/tooltip/,DEMO。 content使用 $( ".selector" ).tooltip({ content: "Awesome title!" });//div及相關標簽使用樣式,鼠標放上去時…

iOS 開發者賬號共用發布證書 (Distribution)問題

蘋果客服回復: 1.第一臺申請發布證書的電腦,從鑰匙串中導出發布證書(Distribution)頒發的request文件?然后在第二臺電腦上用request文件新生成一個Distribution證書,這個是可以共用的?(不理解還是理解錯了&…

JMeter web 測試

2019獨角獸企業重金招聘Python工程師標準>>> JMeter web 測試 http://jmeter.apache.org/usermanual/build-web-test-plan.html 轉載于:https://my.oschina.net/276172622/blog/808957

scala 連接oracle_一分鐘教你學會用java連接Oracle數據庫

package java_jdbc;//java連接Oracle數據庫import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;public class JdbcOracleTest {public static void main(String[] args) {// TODO Auto-generated method stub//1.…

計算機軟件記不住設置,想知道電腦密碼記不住了怎么辦

635509517回答數:23216 | 被采納數:32017-01-09 17:51:10方法一:(1)啟動電腦,使用DOS啟動盤(比如:Windows 98啟動盤)進入純DOS狀態。(2)在DOS提示符下,根據下面步驟操作:cd\\ (切換到根目錄)c…

vue-cli#2.0 webpack 配置分析

目錄結構: ├── README.md ├── build │ ├── build.js │ ├── check-versions.js │ ├── dev-client.js │ ├── dev-server.js │ ├── utils.js │ ├── webpack.base.conf.js │ ├── webpack.dev.conf.js │ └── webpack.prod.conf.js…

initWithNibName與viewDidLoad的執行關系以及順序

一個ViewController,一般通過init或initWithNibName來加載。二者沒有什么不同,init最終還是要調用initWithNibName方法(除非這個ViewController沒有nib文件)。 我們經常在initWithNibName方法中初始化視圖,變量或者其他…

120xa正反轉參數_你知道變頻器的“正反轉死區時間”嗎?它的“停機方式”有幾種?...

若你我之間有緣,關注作者又何妨?兩情若是久長時,又豈在朝朝暮暮。大家好!我是江郎,一個踏踏實實的維修工。本期我們仍然探討兩個問題,如標題所述,#變頻器#“死區時間”和“停機方式”&#xff0…

【轉】游戲編程中的人工智能技術--神經網絡

原文:http://blog.csdn.net/ecitnet/article/details/1799444 游戲編程中的人工智能技術.>. (連載之一)用平常語言介紹神經網絡(Neural Networks in Plain English)因為我們沒有很好了解大腦,我們經常試圖用最新的技術作為一種模型來解釋它。在我童年…

w8計算機配置要求,win8系統最低配置要求有哪些|win8系統是否有最低配置要求-系統城...

2013-10-17 17:08:08  瀏覽量:5753小編這里要為大家帶來的是win8系統最低配置要求和部分安裝截圖,很多用戶想要將自己的電腦裝上win8,但也不是每一臺電腦都可以安裝win8系統的,為了避免一些低配置的用戶安裝了win8之后卻無法運行…

Session 丟失問題

項目從.NET Framework3.5 升級 .NET Framework4.0后,如果用Response.Redirect();進行頁面的跳轉,服務端會把這個跳轉動作當作是一個“新”的用戶去訪問網頁。 而這個時候,會給這個“新”的用戶一個SessionID,那造成的結果是&#…

財務管理專業應該報計算機二級哪個科目,我是應該報計算機二級還是三級呢

2008-12-01怎樣學好財務管理?“五步”學好財務管理:學習這門課程前,首先就不要認為它“很難”,只要相信“難而不會,會而不難”,充滿信心一定就能學好。我在學習過程中總結了幾條經驗,以供各位學友參考&…

libsvm java 調用說明

libsvm是著名的SVM開源組件,目前有JAVA.C/C,.NET 等多個版本,本人使用的是2.9libsvm命名空間下主要使用類:svm_model 為模型類,通過訓練或加載訓練好的模型文件獲得svm_parameter 為參數類,主要為支持向量機設定參數&a…

java字符串排序_對字符串排序持一種寬容的心態

在Java中一涉及中文處理就會冒出很多問題來,其中排序也是一個讓人頭疼的課題,我們來看下面的代碼:上面的代碼定義一個數組,然后進行升序排序,我們期望的結果是按照拼音升序排列,即為李四、王五、張三&#…

rails開發隨手記-0

helper默認是只在view中可用的,如果在controller中也要使用,要在ApplicationController中 include 如果model中如果有叫做type的列的話,會觸發rails的Single Table Inheritance ,放棄它吧,不好用,還是安心使…

nagios 監控配置介紹(二)

#配置服務端監控客戶端[rootnagios etc]# cd objects/[rootnagios objects]# vi hosts.cfg# Define a host for the local machinedefine host{use linux-serverhost_name 1.3-sambaalias 1.3-sambaaddress …

spoj SUBLEX (Lexicographical Substring Search) RE的歡迎來看看

SPOJ.com - Problem SUBLEX 這么裸的一個SAM,放在了死破OJ上面就是個坑。 注意用SAM做的時候輸出要用一個數組存下來,然后再puts,不然一個一個字符輸出會更慢。 還有一個就是不要多數據輸入,估計最后多了幾個沒用的數字&#xff0…

mt4雙線macd_3年內從虧損90多萬到獲利近760萬,我只堅持我的:60分鐘MACD雙回拉戰法!附選股公式...

MACD指標被普遍認為是最經典實用的技術指標之一。其實并不是因為MACD有多么精妙的算法,而是MACD遵循了最基本的“均線指導原則”,形象的將經典雙均線系統換了一種更加直觀的表達方式。在MT4中,默認應用的是單線MACD指標,而在證券市…