課程來源:002-MQ簡介_嗶哩嗶哩_bilibili (尚硅谷老雷,時長19h)
第1章 RocketMQ概述
1. MQ是什么?
2.?MQ用途有哪些?
? ? ? ? 限流削峰;異步解耦;數據收集。
3. 常見MQ產品有哪些&對比?
4.?MQ常見協議?
JMS、STOMP、AMQP、MQTT;
需要說明的是,Kafka和RocketMQ是沒有遵循這些協議的。RocketMQ有自己的協議。
5.?RocketMQ發展歷程?
RocketMQ是一個統一消息引擎、輕量級數據處理平臺。
RocketMQ是一款阿里巴巴開源的消息中間件。2016年捐獻給了apache軟件基金會。
第2章 RocketMQ的安裝與啟動
一、基礎概念
- 消息(Message):
- 主題(Topic):
- 標簽(Tag):為消息設置的標簽,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。Topic是消息的一級分類,Tag是消息的二級分類。如 Topic:貨物,tag=上海,tag=江蘇。
- 隊列(Queue):存儲消息的物理實體。一個Topic中可以包含多個Queue,每個Queue中存放的就是該Topic的消息。一個Topic的Queue也被稱為一個Topic中消息的分區(相當于Kafka中Partition的概念)。一個Topic的Queue中的消息只能被一個消費者組中的一個消費者消費。一個Queue中的消息不允許同一個消費者組中的多個消費者同時消費。
在學習參考其它相關資料時,還會看到一個概念:分片(Sharding)。分片不同于分區。在RocketMQ中,分片指的是存放相應Topic的Broker。每個分片中會創建出相應數量的分區,即Queue,每個Queue的大小都是相同的。
- 消息標識(MessageId/Key):RocketMQ中每個消息擁有唯一的MessageId,且可以攜帶具有業務標識的Key,以方便對消息的查詢。不過需要注意的是,MessageId有兩個:在生產者send()消息時會自動生成一個MessageId(msgId),當消息到達Broker后,Broker也會自動生成一個MessageId(offsetMsgId)。msgId、offsetMsgId與key都稱為消息標識。
msgId:由producer端生成,其生成規則為:producerIp + 進程pid + MessageClientIDSetter類的ClassLoader的hashCode + 當前時間 + AutomicInteger自增計數器;
offsetMsgId:由broker端生成,其生成規則為:brokerIp + 物理分區的offset(Queue中的偏移量);
key:由用戶指定的業務相關的唯一標識。
二、系統架構
1. Producer:一個生產者組可以同時發送(生產)多個主題的消息。
2. Consumer:消息消費者都是以消費者組(Consumer Group)的形式出現的。消費者組是同一類消費者的集合,這類Consumer消費的是同一個Topic類型的消息。消費者組使得在消息消費方面,實現負載均衡(將一個Topic中的不同的Queue平均分配給同一個Consumer Group的不同的Consumer,注意,并不是將消息負載均衡)和容錯(一個Consmer掛了,該Consumer Group中的其它Consumer可以接著消費原Consumer消費的Queue)的目標變得非常容易。消費者只能消費一個Topic的消息。
注意:消費者組中Consumer的數量應該小于等于訂閱Topic的Queue數量。如果超出Queue數量,則多出的Consumer將不能消費消息。一個Topic類型的消息可以被多個消費者組同時消費。
3. Name Server:NameServer是一個Broker與Topic路由的注冊中心,支持Broker的動態注冊與發現。
RocketMQ的思想來自于Kafka,而Kafka是依賴了Zookeeper的。所以,在RocketMQ的早期版本,即在MetaQ v1.0與v2.0版本中,也是依賴于Zookeeper的。從MetaQ v3.0,即RocketMQ開始去掉了Zookeeper依賴,使用了自己的NameServer。
主要包括兩個功能:
Broker管理:接受Broker集群的注冊信息并且保存下來作為路由信息的基本數據;提供心跳檢測機制,檢查Broker是否還存活。
路由信息管理:每個NameServer中都保存著Broker集群的整個路由信息和用于客戶端查詢的隊列信息。Producer和Conumser通過NameServer可以獲取整個Broker集群的路由信息,從而進行消息的投遞和消費。
- 路由注冊:NameServer通常也是以集群的方式部署。Broker節點啟動時,輪詢NameServer列表,與每個NameServer節點建立長連接,發起注冊請求。Broker以心跳包的方式,上報給NameServer,以證明自己的存活狀態,每30秒發送一次。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會更新心跳時間戳,記錄這個Broker的最新存活時間。
- 路由剔除:NameServer中有?個定時任務,每隔10秒就會掃描?次Broker表,查看每一個Broker的最新心跳時間戳距離當前時間是否超過120秒,如果超過,則會判定Broker失效,然后將其從Broker列表中剔除。
- 路由發現:RocketMQ的路由發現采用的是Pull模型。當Topic路由信息出現變化時,NameServer不會主動推送給客戶端,而是客戶端定時拉取主題最新的路由。默認客戶端每30秒會拉取一次最新的路由。
擴展:
1)Push模型:推送模型。其實時性較好,是一個“發布-訂閱”模型,需要維護一個長連接。而長連接的維護是需要資源成本的。該模型適合于的場景:
- 實時性要求較高
- Client數量不多,Server數據變化較頻繁
2)Pull模型:拉取模型。存在的問題是,實時性較差。
3)Long Polling模型:長輪詢模型。其是對Push與Pull模型的整合,充分利用了這兩種模型的優勢,屏蔽了它們的劣勢。(如Nacos)
以上內容,詳見講義。
4. Broker:Broker充當著消息中轉角色,負責存儲消息、轉發消息。Broker在RocketMQ系統中負責接收并存儲從生產者發送來的消息,同時為消費者的拉取請求作準備。Broker同時也存儲著消息相關的元數據,包括消費者組消費進度偏移offset、主題、隊列等。
Kafka 0.8版本之后,offset是存放在Broker中的,之前版本是存放在Zookeeper中的。
4.1 模塊構成
Remoting Module:整個Broker的實體,負責處理來自clients端的請求。而這個Broker實體則由以下模塊構成。
Client Manager:客戶端管理器。負責接收、解析客戶端(Producer/Consumer)請求,管理客戶端。例如,維護Consumer的Topic訂閱信息。
Store Service:存儲服務。提供方便簡單的API接口,處理消息存儲到物理硬盤和消息查詢功能。
HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。
Index Service:索引服務。根據特定的Message key,對投遞到Broker的消息進行索引服務,同時也提供根據Message Key對消息進行快速查詢的功能。
4.2 集群部署
為了增強Broker性能與吞吐量,Broker一般都是以集群形式出現的。各集群節點中可能存放著相同
Topic的不同Queue。不過,這里有個問題,如果某Broker節點宕機,如何保證數據不丟失呢?其解決方案是,將每個Broker集群節點進行橫向擴展,即將Broker節點再建為一個HA集群,解決單點問題。
Broker節點集群是一個主從集群,即集群中具有Master與Slave兩種角色。Master負責處理讀寫操作請求,Slave負責對Master中的數據進行備份。當Master掛掉了,Slave則會自動切換為Master去工作。所以這個Broker集群是主備集群。一個Master可以包含多個Slave,但一個Slave只能隸屬于一個Master。Master與Slave 的對應關系是通過指定相同的BrokerName、不同的BrokerId 來確定的。BrokerId為0表示Master,非0表示Slave。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。
5. 工作流程
1)啟動NameServer,NameServer啟動后開始監聽端口,等待Broker、Producer、Consumer連接。
2)啟動Broker時,Broker會與所有的NameServer建立并保持長連接,然后每30秒向NameServer定時發送心跳包。
3)發送消息前,可以先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,當然,在創建Topic時也會將Topic與Broker的關系寫入到NameServer中。不過,這步是可選的,也可以在發送消息時自動創建Topic。
4)Producer發送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取路由信息,即當前發送的Topic消息的Queue與Broker的地址(IP+Port)的映射關系。然后根據算法策略從隊選擇一個Queue,與隊列所在的Broker建立長連接從而向Broker發消息。當然,在獲取到路由信息后,Producer會首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。
5)Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取其所訂閱Topic的路由信息,然后根據算法策略從路由信息中獲取到其所要消費的Queue,然后直接跟Broker建立長連接,開始消費其中的消息。Consumer在獲取到路由信息后,同樣也會每30秒從NameServer更新一次路由信息。不過不同于Producer的是,Consumer還會向Broker發送心跳,以確保Broker的存活狀態。
另外,Topic的創建模式、讀/寫隊列——這兩個內容詳見講義。
三、單機安裝與啟動
克隆一臺虛擬機,修改主機名和IP:
修改主機名命令:
[root@centos102 ~]# vim /etc/hostname?
修改IP命令:
[root@centos102 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33?
老師還在Windows系統的hosts文件中進行了主機名和IP的映射配置(?)。
我可以直接使用centos101哦。