??、MQ簡介
MQ:MessageQueue,消息隊列。是在互聯?中使??常?泛的—系列服務中間件。 這個詞可以分兩個部分來看,
—是Message:消息。消息是在不同進程之間傳遞的數據。這些進程可以部署在同—臺機器上,也可以?分布在不同機器上。
?是Queue:隊列。隊列原意是指—種具有FI?FO(先進先出)特性的數據結構,是?來緩存??數據的。對于消息中間件產品來說,能不能保證FI?FO特性,?尚值得考量。但是,所有消息隊列都是需要具備存?儲消息,讓消息排隊的能??。
?義上來說,?只要能夠實現消息跨進程傳輸以及隊列數據緩存,就可以稱之為消息隊列。例如我們常?的QQ?、微信?、阿?旺旺等就都具備了這樣的功能。只不過他們對接的使?對象是??,?我們這?討論的MQ產品?需要對接的使?對象是應?程序。
1、MQ的作?主要有以下三個??:
? (1)異步
例?:?快遞員發快遞,直接到客戶家效率會很低。引?菜?驛站后,快遞員只需要把快遞放到菜?驛站,?就可以繼續發其他快遞去了。客戶再按??的時間安排去菜?驛站取快遞。
作? :異步能提?系統的響應速度?、吞吐量。
?(2)解耦
例?:《Thinking in JAVA》很經典,但是都是英??,我們看不懂,所以需要編輯社,將?章翻譯成其他?語?,?這樣就可以完成英語與其他語?的交流。
作??:
1 、服務之間進?解耦,才可以減少服務之間的影響。提?系統整體的穩定性以及可擴展性。
2?、另外,解耦后可以實現數據分發。?產者發送—個消息后,可以由—個或者多個消費者進?消費,并?且消費者的增加或者減少對?產者沒有影響。
?
(3)削峰
例?:??江每年都會漲??,但是下游出??的速度是基本穩定的,所以會漲??。引?三峽?壩后,可以把??儲存起來,下游慢慢排?。
作??:?以穩定的系統資源應對突發的流量沖擊。
?
??、Rocket?MQ產品特點
?1 、Rocket MQ介紹
????????Rocket?MQ是阿里巴巴開源的一個消息中間件,在阿里內部歷經了雙十一等很多高并發場景的考驗,能夠處理億萬級別的消息。2016年開源后捐贈給Apache,現在是Apache的—個頂級項目。
????????早期阿里使用ActiveMQ ,但是,當消息開始逐漸增多后,ActiveMQ的IO性能很快達到了瓶頸。于是, 阿?開始關注Kafka?。但是Kafka是針對日志收集場景設計的,他的高級功能并不是很貼合阿里的業務場景。尤其當他的Topic過多時, 由于Partition文件也會過多,這就會加大文件索引的耗時,會嚴重影響IO性能。于是阿里才決定自研中間件,?最早叫做MetaQ ,后來改名成為Rocket MQ?。最早他所希望解決的最大問題就是多Topic下的IO 性能壓力?。但是產品在阿里內部的不斷改進, Rocket MQ開始體現出一些不一樣的優勢。
2 、Rocket MQ特點
當今互聯?MQ產品眾多,其中,影響?和使?范圍最?的當數Apache?Kafka?、RabbitMQ?、Apache?Rocket?MQ以及Apache?Plusar。這??產品雖然都是典型的MQ產品,但是由于設計和實現上的—些差異,造成他們適合于不同的細分場景。
優點 | 缺點 | 適合場景 | |
Apache?Kafka | 吞吐量?常??,性能?常好,集群?可??。 | 會有丟數據的可 能,功能?較單— | ?志分析、 ?數據采集 |
Rabbit?MQ | 消息可靠性??,功能全??。 | erlang語?不好定?制。吞吐量?較????低。 | 企業內部? 規模服務調? |
Apache?Pulsar | 基于Bookeeper構建,?消息可靠性?常??。 | 周邊?態還有差 距,??前使?的公?司?較少。 | 企業內部? 規模服務調? |
Apache? Rocket?MQ | ?吞吐?、?性能?、?可??。功能全??。 客戶端協議豐富。使?java語?開發,???便定制。 | 服務加載?較慢。 | ?乎全場 景,特別適 合?融場景 |
其中Rocket?MQ?,孵化?阿?巴巴。歷經阿?多年雙十一的嚴格考驗,?Rocket?MQ可以說是從全世界最嚴苛的??并發場景中摸爬滾打出來的過硬產品,也是少數?個在?融場景?較適?的MQ產品。從橫向對?來看,Rocket?MQ與Kafka和RabbitMQ相??。Rocket?MQ的消息吞吐量雖然和Kafka相?還是稍有差距,但是卻?RabbitMQ?很多。在阿?內部,?Rocket?MQ集群每天處理的請求數超過5萬億次,?持的核?應?超過3000??個。?Rocket?MQ最?的優勢就是他天?就為?融互聯????。他的消息可靠性相?Kafka也有了很?的提升,??消息吞吐量相?RabbitMQ也有很?的提升。另外,?Rocket?MQ的?級功能也越來越全??,?播消費?、延遲隊列?、死信隊列等等?級功能一應俱全,?甚?某些業務功能?如事務消息,?已經呈現出領先潮流的趨勢。
三、RocketMQ快速實戰
1.快速搭建RocketMQ
RocketMQ的官網地址:?http://rocketmq.apache.org?。在下載頁面可以獲取RocketMQ的源碼包以及運行包。下載頁面地址:https://rocketmq.apache.org/download。
當前最新的版本是5.x,這是一個著眼于云原生的新版本,給 RocketMQ 帶來了非常多很亮眼的新特性。但是目前來看,企業中用得還比較少。因此,我們這里采用的還是更為穩定的4.9.5版本。??
注:在2020年下半年,RocketMQ新推出了5.0的大版本,這對于RocketMQ來說,是一個里程碑式的大版本。在這個大版本中,RocketMQ對整體功能做了一次大的升級。增加了很多非常有用的新特性,也對已有功能重新做了升級。
? 比如在具體功能方面,在4.x版本中,對于定時消息,只能設定幾個固定的延遲級別,而5.0版本中,已經可以指定具體的發送時間了。在客戶端語言方面,4.x版本,RocketMQ原生只支持基于Netty框架的Java客戶端。而在5.0版本中,增加了對Grpc協議的支持,這基本上就解除了對客戶端語言的限制。在服務端架構方面,4.x版本只支持固定角色的普通集群和可以動態切換角色的Dledger集群,而在5.0版本中,增加了Dledger Controller混合集群模式,即可以混合使用Dledger的集群機制以及 Broker 本地的文件管理機制。
? 但是功能強大,同時也意味著問題會很多。所以目前來看,企業中直接用新版本的還比較少。小部分使用新版本的企業,也大都是使用內部的改造優化版本。
?運?只需要下載Binary運?版本就可以了。 當然,源碼包也建議下載下來,后續會進?解讀。運?包下載下??來后,就可以直接解壓,上傳到服務器上。我們這?會上傳到/app/rocketmq?錄。解壓后?個重要的?錄如下:
?
默認情況下,?Rocket?MQ建議的運?環境需要?少12G的內存,?這是?產環境?較理想的資源配置。但是,?學?習階段,如果你的服務器沒有這么?的內存空間,那么就需要做—下調整。進?bin??錄,對其中的runserver.sh和runbroker.sh兩個腳本進?—下修改。
使?vi runserver.sh指令,編輯這個腳本,找到下?的—?配置,調整Java進程的內存??。?
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m - XX:MaxMetaspaceSize=320m"
接下來,?同樣調整runbroker.sh中的內存??。?
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
修改為:
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g"
?修改配置時, 注意要根據你的JDK版本調整對應的配置? 。Rocket MQ是—個典型的Java應? ,所以需要 提前安裝JDK 。我們這?采?的是1 .8版本。JDK的安裝過程略。
?產環境不建議調整。這—系列參數實際上就是Rocket MQ的JVM調優結果。
?Rocket?MQ的后端服務分為nameserver和broker兩個服務,關于他們的作??,后?會給你分享。接下來我們 先將這兩個服務啟動起來。
第?步:啟動nameserver服務。?
?
cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
nohup bin/mqnames rv &
指令執?后,會?成—個nohup.out的?志?件。在這個?志?件?如果看到下?這—條關鍵?志,就表示?nameserver服務啟動成功了。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and
will likely be removed in a future release.
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
?接下來,可以通過jsp指令進?驗證。使?jps指令后,可以看到有—個NamesrvStartup的進程運??,也表示?nameserver服務啟動完成。
第?步:啟動broker服務
broker也是—個Java服務,?只需要調整conf?錄下的broker.conf?件,?進?—些定制。然后就可以啟動了。
具體配置項參?官??檔,?這?盡量?默認配置。
如果你的服務器配置了多張?卡,?建議配置brokerIP1屬性。?如阿?云,騰訊云這樣的云服務器,他們?通常有內??卡和外??卡兩張?卡,那么需要增加配置brokerIP1屬性,指向服務器的外?IP 地址,?這?樣才能確保從其他服務器上訪問到Rocket?MQ 服務。
在啟動broker服務前,需要先指定NameServer的服務地址。Rocket?MQ可以使?—個NAMESRV_ADDR的環?境變量指定NameServer服務地址。?
export NAMESRV_ADDR= I localhost:9876 I
?9876是nameserver的默認服務端??。
然后也可以?之前的?式啟動broker服務。啟動broker服務的指令是mqbroker。?
cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
nohup bin/mqbroker &
?啟動完成后,?同樣檢查nohup.out?志?件,有如下—條關鍵?志,就表示broker服務啟動正常了。
The broker [xxxxx] boot success. serializeType=JSON and name server is localhost:9876
?
注?:1、在實際服務部署時,通常會將Rocket?MQ的部署地址添加到環境變量當中。例如使?vi?~/.bash_profile指令,添加以下內容。
export?ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-5.3.0-bin-release?PATH=$ROCKETMQ_HOME/bin:$PATH
export?PATH
這樣就不必每次進?Rocket?MQ的安裝?錄了。直接可以使?mqnamesrv?和mqbroker指令。
2?、停?Rocket?MQ服務可以通過mqshutdown指令進?
mqshutdown?namesrv?# 關閉nameserver服務mqshutdown?broker?# 關閉broker服務
同樣使?jps指令可以檢查服務的啟動狀態。使?jps指令后,可以看到—個名為BrokerStartup的進程,則表示?broker服務啟動完成。?
2?、快速實現消息收發
Rocket?MQ后端服務啟動完成后,就可以啟動客戶端的消息?產者和消息消費者進?消息轉發了。接下來,我?們會先通過Rocket?MQ提供的命令??具快速體驗—下Rocket?MQ消息收發的功能。然后,再動?搭建—個Maven項? ,在項?中使?Rocket?MQ進?消息收發。
(1)命令?快速實現消息收發
1)?:通過指令啟動Rocket?MQ的消息?產者發送消息。
cd /app/rocketmq/rocketmq-all-5.3.0-bin-release
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
這個指令會默認往Rocket?MQ中發送1000條消息。在命令?窗?可以看到發送消息的?志:?
?
.....
SendResult [sendStatus=SEND_OK, msgId=C0A841708122246B179D98C9E31103E6,
offsetMsgId=C0A8417000002A9F000000000003AEFE, messageQueue=MessageQueue [topic=TopicTest,
brokerName=192-168-65-112, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A841708122246B179D98C9E31203E7,
offsetMsgId=C0A8417000002A9F000000000003AFF0, messageQueue=MessageQueue [topic=TopicTest,
brokerName=192-168-65-112, queueId=2], queueOffset=249]
?這部分?志中,并沒有打印出發送了什么消息。上?Send?Result開頭部分是消息發送到Broker后的結果。最 后兩??志表示消息?產者發完消息后,服務正常關閉了。
2):可以啟動消息消費者接收之前發送的消息?
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
?消費者啟動完成后,可以看到消費到的消息
......
ConsumeMessageThread_please_rename_unique_group_name_4_18 Receive New Messages:
[MessageExt [brokerName=192-168-65-112, queueId=1, storeSize=242, queueOffset=211,
sysFlag=0, bornTimestamp=1725004967502, bornHost=/192.168.65.112:52748,
storeTimestamp=1725004967502, storeHost=/192.168.65.112:10911,
msgId=C0A8417000002A9F0000000000031F4E, commitLogOffset=204622, bodyCRC=47888112,
reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic= ITopicTest I,
flag=0, properties={CONSUME_START_TIME=1725005058184, MSG_REGION=DefaultRegion,
UNIQ_KEY=C0A841708122246B179D98C9E24E034E, CLUSTER=DefaultCluster, MIN_OFFSET=0,
TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=250}, body= [72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 52, 54], transactionId= Inull I}]]
?每—條這樣的?志信息就表示消費者接收到了—條消息。
這個Consumer消費者的指令并不會主動結束,他會繼續掛起,等待消費新的消息。我們可以使?CTRL+C停??該進程。?
注:在Rocket?MQ提供的這個簡單示例中并沒有打印出傳遞的消息內容,?是打印出了消息相關的很多重?要的屬性。
其中有?個?較重要的屬性: brokerId,brokerName,queueId,msgId,topic,cluster。這些屬性的作?會在?后續—起分享,?這?你不妨先找—下這些屬性是什么,消費者與?產者之間有什么樣的對應關系。
3?、搭建Java客戶端項?
之前的步驟實際上是在服務器上快速驗證Rocket?MQ的服務狀態,接下來我們動?搭建—個Rocket?MQ的客戶?端應??,在實際應?中集成使?Rocket?MQ。
第?步 :創建—個標準的maven項? ,在pom.xml中引?以下核?依賴
?
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>
第?步:**就可以直接創建—個簡單的消息?產者
?