一、?消息隊列
1.?什么是消息隊列
消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到MQ中而不用管誰來取,消息使用者只管從 MQ中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
2.?消息隊列的特征
(1) 存儲
與依賴于使用套接字的基本 TCP和 UDP 協議的傳統請求和響應系統不同,消息隊列通常將消息存儲在某種類型的緩沖區中,直到目標進程讀取這些消息或將其從消息隊列中顯式移除為止。
(2)?異步
與請求和響應系統不同,消息隊列通過緩沖消息可以在應用程序中實現一定程度的異步性,允許源進程發送消息并在隊列中累積消息,而目標進程則可以挑選消息進行處理。 這樣,應用程序就可以在某些故障情況下運行,例如連接斷斷續續或源進程或目標進程故障。路由:消息隊列還可以提供路由功能,其中多個進程可以在同一隊列中讀取或寫入消息,從而實現廣播或單播通信模式。
3.?為什么需要消息隊列
(1)解耦
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
(2) 冗余
消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
(3)?擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
(4)?靈活性&峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
(5)?可恢復性
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
(6)?順序保證
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition 內的消息的有序性)
(7)?緩沖
有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
(8)?異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
二、?Kafka基礎與入門
1.?Kafka 基本概念
Kafka 是一種高吞吐量的分布式發布/訂閱消息系統,這是官方對 kafka 的定義kafka 是 Apache 組織下的一個開源系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基于 hadoop 平臺的數據分析、低時延的實時系統、storm/spark 流式處理引擎等。kafka現在已被多家大型公司作為多種類型的數據管道和消息系統使用。
2.?Kafka 相關術語
kafka 的一些核心概念和角色
- Broker:Kafka 集群包含一個或多個服務器,每個服務器被稱為 broker(經紀人)。Topic:每條發布到 Kafka 集群的消息都有一個分類,這個類別被稱為 Topic(主題)。
- Producer:指消息的生產者,負責發布消息到kafka broker。
- Consumer:指消息的消費者,從kafka broker 拉取數據,并消費這些已發布的消息。
- Partition: Partition 是物理上的概念,每個 Topic 包含一個或多個 Partition,每個 partition 都是一個有序的隊列。partition 中的每條消息都會被分配一個有序的 id(offset)。
- Consumer Group:消費者組,可以給每個Consumer 指定消費組,若不指定消費者組,則屬于默認的 group。
- Message:消息,通信的基本單位,每個producer 可以向一個 topic 發布一些消息。
5.?Producer 生產機制
Producer 是消息和數據的生產者,它發送消息到broker 時,會根據Paritition 機制選擇將其存儲到哪一個 Partition。如果 Partition 機制設置的合理,所有消息都可以均勻分布到不同的 Partition 里,這樣就實現了數據的負載均衡。如果一個 Topic 對應一個文件,那這個文件所在的機器 I/0 將會成為這個 Topic 的性能瓶頸,而有了 Partition 后,不同的消息可以并行寫入不同broker 的不同 Partition 里,極大的提高了吞吐率。
6.?Consumer消費機制
Kafka 發布消息通常有兩種模式:隊列模式(queuing)和發布/訂閱模式(publish-subscribe)。在隊列模式下,只有一個消費組,而這個消費組有多個消費者,一條消息只能被這個消費組中的一個消費者所消費;而在發布/訂閱模式下,可有多個消費組,每個消費組只有一個消費者,同一條消息可被多個消費組消費。
Kafka 中的 Producer 和 consumer 采用的是 push、pull 的模式,即 producer 向broker 進行 push 消息,comsumer 從 bork 進行 pul1 消息,push 和 pu11 對于消息的生產和消費是異步進行的。pul1模式的一個好處是consumer 可自主控制消費消息的速率,同時consumer 還可以自己控制消費消息的方式是批量的從broker 拉取數據還是逐條消費數據。
三、?Zookeeper概念介紹
ZooKeeper是一種分布式協調技術,所謂分布式協調技術主要是用來解決分布式環境當中多個進程之間的同步控制,讓他們有序的去訪問某種共享資源,防止造成資源競爭(腦裂)的后果。腦裂是指在主備切換時,由于切換不徹底或其他原因,導致客戶端和 Slave 誤以為出現兩個 activemaster,最終使得整個集群處于混亂狀態
1.?zookeeper應用舉例
(1)?什么是單點故障問題呢?
所謂單點故障,就是在一個主從的分布式系統中,主節點負責任務調度分發,從節點負責任務的處理,而當主節點發生故障時,整個應用系統也就癱瘓了,那么這種故障就稱為單點故障。那我們的解決方法就是通過對集群 master 角色的選取,來解決分布式系統單點故障的問題。
(2)?傳統的方式是怎么解決單點故障的?以及有哪些缺點呢?
傳統的方式是采用一個備用節點,這個備用節點定期向主節點發送 ping 包,主節點收到 ping 包以后向備用節點發送回復 Ack 信息,當備用節點收到回復的時候就會認為當前主節點運行正常,讓它繼續提供服務。而當主節點故障時,備用節點就無法收到回復信息了,此時,備用節點就認為主節點宕機,然后接替它成為新的主節點繼續提供服務。
這種傳統解決單點故障的方法,雖然在一定程度上解決了問題,但是有一個隱患,就是網絡問題,可能會存在這樣一種情況:主節點并沒有出現故障,只是在回復 ack 響應的時候網絡發生了故障,這樣備用節點就無法收到回復,那么它就會認為主節點出現了故障,接著,備用節點將接管主節點的服務,并成為新的主節點,此時,分布式系統中就出現了兩個主節點(雙Master 節點)的情況,雙 Master 節點的出現,會導致分布式系統的服務發生混亂。這樣的話,整個分布式系統將變得不可用。為了防止出現這種情況,就需要引入 ZooKeeper 來解決這種問題。
2.?zookeeper的工作原理是什么?
(1)?master 啟動
在分布式系統中引入 Zookeeper 以后,就可以配置多個主節點,這里以配置兩個主節點為例,假定它們是主節點A和主節點B,當兩個主節點都啟動后,它們都會向 ZooKeeper 中注冊節點信息。我們假設主節點A注冊的節點信息是master00001,主節點B注冊的節點信息是 master00002 ,注冊完以后會進行選舉,選舉有多種算法,這里以編號最小作為選舉算法為例,編號最小的節點將在選舉中獲勝并獲得鎖成為主節點,也就是主節點A將會獲得鎖成為主節點,然后主節點B將被阻塞成為一個各用節點。這樣,通過這種方式 Zookeeper 就完成了對兩個 Master 進程的調度。完成了主、備節點的分配和協作
(2)?master 故障
如果主節點A 發生了故障,這時候它在 ZooKeeper 所注冊的節點信息會被自動刪除,而 ZooKeeper 會自動感知節點的變化,發現主節點A故障后,會再次發出選舉,這時候 主節點B 將在選舉中獲勝,替代主節點A 成為新的主節點,這樣就完成了主、被節點的重新選舉。
(3)?master 恢復
如果主節點恢復了,它會再次向 ZooKeeper 注冊自身的節點信息,只不過這時候它注冊的節點信息將會變成 master00003,而不是原來的信息。ZooKeeper會感知節點的變化再次發動選舉,這時候,主節點B在選舉中會再次獲勝繼續擔任主節點,主節點A 會擔任備用節點。
zookeeper 就是通過這樣的協調、調度機制如此反復的對集群進行管理和狀態同步的。
3.?zookeeper 集群架構
zookeeper 一般是通過集群架構來提供服務的,下圖是 zookeeper 的基本架構圖。
zookeeper 集群主要角色有 server 和 client,其中 server 又分為 leader、follower 和 observer 三個角色,每個角色的含義如下:
- Leader:領導者角色,主要負責投票的發起和決議,以及更新系統狀態。follower:跟隨著角色,用于接收客戶端的請求并返回結果給客戶端,在選舉過程中參與投票。
- observer:觀察者角色,用戶接收客戶端的請求,并將寫請求轉發給leader,同時同步 1eader 狀態,但是不參與投票。0bserver 目的是擴展系統,提高伸縮性。
- client:客戶端角色,用于向zookeeper 發起請求。
4.?zookeeper的工作流程
Zookeeper 修改數據的流程: Zookeeper 集群中每個 Server 在內存中存儲了一份數據,在 Zookeeper 啟動時,將從實例中選舉一個 Server 作為 leader,Leader 負責處理數據更新等操作,當且僅當大多數 Server 在內存中成功修改數據,才認為數據修改成功。
Zookeeper 寫的流程為:客戶端 Client 首先和一個 Server 或者 0bserve 通信,發起寫請求,然后 Server 將寫請求轉發給Leader,Leader 再將寫請求轉發給其它 Server,其它 Server 在接收到寫請求后寫入數據并響應 Leader,Leader在接收到大多數寫成功回應后,認為數據寫成功,最后響應C1ient,完成一次寫操作過程。
五、?單節點部署Kafka
1.?安裝 Zookeeper
先安裝java
[root@localhost ~]# dnf -y install java
[root@localhost ~]# ls anaconda-ks.cfg apache-zookeeper-3.6.0-bin.tar.gz kafka_2.13-2.4.1.tgz[root@localhost ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz [root@localhost ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper[root@localhost ~]# cd /etc/zookeeper/conf [root@localhost conf]# mv zoo_sample.cfg zoo.cfg [root@localhost conf]# ls configuration.xsl log4j.properties zoo.cfg[root@localhost conf]# vim zoo.cfg dataDir=/etc/zookeeper/zookeeper-data在/etc/zookeeper/目錄下創建zookeeper-data目錄 [root@localhost zookeeper]# mkdir zookeeper-data##切換到指定目錄,啟動zookeeper服務 cd /etc/zookeeper/bin [root@localhost bin]# ./zkServer.sh start
2.?安裝Kafka
[root@localhost ~]# tar zxvf kafka_2.13-2.4.1.tgz [root@localhost ~]# mv kafka_2.13-2.4.1 /etc/kafka [root@localhost ~]# cd /etc/kafka[root@localhost kafka]# vim config/server.properties log.dirs=/etc/kafka/kafka-logs ##60行修改[root@localhost kafka]# mkdir kafka-logs##啟動kafka服務 [root@localhost ~]# cd /etc/kafka/bin [root@localhost bin]# ./kafka-server-start.sh ../config/server.properties &
3.?測試
[root@localhost bin]# netstat -anpt |grep java tcp6 0 0 :::45561 :::* LISTEN 5055/java tcp6 0 0 :::2181 :::* LISTEN 5055/java tcp6 0 0 :::9092 :::* LISTEN 5098/java tcp6 0 0 :::43721 :::* LISTEN 5098/java tcp6 0 0 :::8080 :::* LISTEN 5055/java tcp6 0 0 127.0.0.1:2181 127.0.0.1:56962 ESTABLISHED 5055/java tcp6 0 0 127.0.0.1:9092 127.0.0.1:53582 ESTABLISHED 5098/java tcp6 0 0 127.0.0.1:53582 127.0.0.1:9092 ESTABLISHED 5098/java tcp6 0 0 127.0.0.1:56962 127.0.0.1:2181 ESTABLISHED 5098/java ##生產消息 [root@localhost bin]./kafka-console-producer.sh --broker-list 127.0.0.1:9092 -topic testaaa >123 >456 >789##打開一個新的終端,查看消息 [root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 -topic testaaa 123 456 789
六、?集群部署Kafka
1。 基礎環境設置
關閉防火墻、安裝java
systemctl stop firewalld setenforce 0 dnf -y install java
##三臺服務器修改名字 hostnamectl set-hostname kafka1 hostnamectl set-hostname kafka2 hostnamectl set-hostname kafka3cat /etc/hosts ##在該文件中添加 192.168.10.101 kafka1 192.168.10.105 kafka2 192.168.10.106 kafka3
2.?安裝 Zookeeper
[root@kafka1 ~]# cd /etc/zookeeper/conf [root@kafka1 conf]# ls configuration.xsl zoo.cfg zoo_sample.cfg log4j.properties zoo.cfg.dynamic.next[root@kafka1 conf]# vim zoo.cfg dataDir=/etc/zookeeper/zookeeper-data ##修改并添加幾行 clientPort=2181 server.1=192.168.10.101:2888:3888 server.2=192.168.10.105:2888:3888 server.3=192.168.10.106:2888:3888[root@kafka1 conf]# mkdir /etc/zookeeper/zookeeper-data/echo '1'>//etc/zookeeper/zookeeper-data/myid echo '2'>//etc/zookeeper/zookeeper-data/myid echo '3'>//etc/zookeeper/zookeeper-data/myid[root@kafka1 ~]# cd /etc/zookeeper/bin [root@kafka1 bin]# ./zkServer.sh setart ##一定要啟動zookeeper
3.?安裝 kafka
[root@kafka1 ~]# cd /etc/kafka/config[root@kafka1 config]# vim server.properties broker.id=1 ##id值不能一樣,其他兩個id為2和三 listeners=PLAINTEXT://192.168.10.101:9092 ##三臺填寫自己的ip log.dirs=/etc/kafka/kafka-logs zookeeper.connect=192.168.10.101:2181,192.168.10.105:2181,192.168.10.106:2181##啟動kafka [root@kafka1 ~]# cd /etc/kafka/bin [root@kafka1 bin]# ./kafka-server-start.sh ../config/server.properties &
4. 測試
任意一臺服務器創建topic ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test1111111./kafka-console-producer.sh --broker-list kafka1:9092 -topic test1111111 生產消息./kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test1111111 另一臺消費消息