Kafka 消息隊列

一、?消息隊列

1.?什么是消息隊列

消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到MQ中而不用管誰來取,消息使用者只管從 MQ中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。

2.?消息隊列的特征

(1) 存儲

與依賴于使用套接字的基本 TCP和 UDP 協議的傳統請求和響應系統不同,消息隊列通常將消息存儲在某種類型的緩沖區中,直到目標進程讀取這些消息或將其從消息隊列中顯式移除為止。

(2)?異步

與請求和響應系統不同,消息隊列通過緩沖消息可以在應用程序中實現一定程度的異步性,允許源進程發送消息并在隊列中累積消息,而目標進程則可以挑選消息進行處理。 這樣,應用程序就可以在某些故障情況下運行,例如連接斷斷續續或源進程或目標進程故障。路由:消息隊列還可以提供路由功能,其中多個進程可以在同一隊列中讀取或寫入消息,從而實現廣播或單播通信模式。

3.?為什么需要消息隊列

(1)解耦

允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

(2) 冗余

消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

(3)?擴展性

因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。

(4)?靈活性&峰值處理能力

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

(5)?可恢復性

系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。

(6)?順序保證

在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition 內的消息的有序性)

(7)?緩沖

有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。

(8)?異步通信

很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

二、?Kafka基礎與入門

1.?Kafka 基本概念

Kafka 是一種高吞吐量的分布式發布/訂閱消息系統,這是官方對 kafka 的定義kafka 是 Apache 組織下的一個開源系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基于 hadoop 平臺的數據分析、低時延的實時系統、storm/spark 流式處理引擎等。kafka現在已被多家大型公司作為多種類型的數據管道和消息系統使用。

2.?Kafka 相關術語

kafka 的一些核心概念和角色

  • Broker:Kafka 集群包含一個或多個服務器,每個服務器被稱為 broker(經紀人)。Topic:每條發布到 Kafka 集群的消息都有一個分類,這個類別被稱為 Topic(主題)。
  • Producer:指消息的生產者,負責發布消息到kafka broker。
  • Consumer:指消息的消費者,從kafka broker 拉取數據,并消費這些已發布的消息。
  • Partition: Partition 是物理上的概念,每個 Topic 包含一個或多個 Partition,每個 partition 都是一個有序的隊列。partition 中的每條消息都會被分配一個有序的 id(offset)。
  • Consumer Group:消費者組,可以給每個Consumer 指定消費組,若不指定消費者組,則屬于默認的 group。
  • Message:消息,通信的基本單位,每個producer 可以向一個 topic 發布一些消息。

5.?Producer 生產機制

Producer 是消息和數據的生產者,它發送消息到broker 時,會根據Paritition 機制選擇將其存儲到哪一個 Partition。如果 Partition 機制設置的合理,所有消息都可以均勻分布到不同的 Partition 里,這樣就實現了數據的負載均衡。如果一個 Topic 對應一個文件,那這個文件所在的機器 I/0 將會成為這個 Topic 的性能瓶頸,而有了 Partition 后,不同的消息可以并行寫入不同broker 的不同 Partition 里,極大的提高了吞吐率。

6.?Consumer消費機制

Kafka 發布消息通常有兩種模式:隊列模式(queuing)和發布/訂閱模式(publish-subscribe)。在隊列模式下,只有一個消費組,而這個消費組有多個消費者,一條消息只能被這個消費組中的一個消費者所消費;而在發布/訂閱模式下,可有多個消費組,每個消費組只有一個消費者,同一條消息可被多個消費組消費。

Kafka 中的 Producer 和 consumer 采用的是 push、pull 的模式,即 producer 向broker 進行 push 消息,comsumer 從 bork 進行 pul1 消息,push 和 pu11 對于消息的生產和消費是異步進行的。pul1模式的一個好處是consumer 可自主控制消費消息的速率,同時consumer 還可以自己控制消費消息的方式是批量的從broker 拉取數據還是逐條消費數據。

三、?Zookeeper概念介紹

ZooKeeper是一種分布式協調技術,所謂分布式協調技術主要是用來解決分布式環境當中多個進程之間的同步控制,讓他們有序的去訪問某種共享資源,防止造成資源競爭(腦裂)的后果。腦裂是指在主備切換時,由于切換不徹底或其他原因,導致客戶端和 Slave 誤以為出現兩個 activemaster,最終使得整個集群處于混亂狀態

1.?zookeeper應用舉例

(1)?什么是單點故障問題呢?

所謂單點故障,就是在一個主從的分布式系統中,主節點負責任務調度分發,從節點負責任務的處理,而當主節點發生故障時,整個應用系統也就癱瘓了,那么這種故障就稱為單點故障。那我們的解決方法就是通過對集群 master 角色的選取,來解決分布式系統單點故障的問題。

(2)?傳統的方式是怎么解決單點故障的?以及有哪些缺點呢?

傳統的方式是采用一個備用節點,這個備用節點定期向主節點發送 ping 包,主節點收到 ping 包以后向備用節點發送回復 Ack 信息,當備用節點收到回復的時候就會認為當前主節點運行正常,讓它繼續提供服務。而當主節點故障時,備用節點就無法收到回復信息了,此時,備用節點就認為主節點宕機,然后接替它成為新的主節點繼續提供服務。
這種傳統解決單點故障的方法,雖然在一定程度上解決了問題,但是有一個隱患,就是網絡問題,可能會存在這樣一種情況:主節點并沒有出現故障,只是在回復 ack 響應的時候網絡發生了故障,這樣備用節點就無法收到回復,那么它就會認為主節點出現了故障,接著,備用節點將接管主節點的服務,并成為新的主節點,此時,分布式系統中就出現了兩個主節點(雙Master 節點)的情況,雙 Master 節點的出現,會導致分布式系統的服務發生混亂。這樣的話,整個分布式系統將變得不可用。為了防止出現這種情況,就需要引入 ZooKeeper 來解決這種問題。

2.?zookeeper的工作原理是什么?

(1)?master 啟動

在分布式系統中引入 Zookeeper 以后,就可以配置多個主節點,這里以配置兩個主節點為例,假定它們是主節點A和主節點B,當兩個主節點都啟動后,它們都會向 ZooKeeper 中注冊節點信息。我們假設主節點A注冊的節點信息是master00001,主節點B注冊的節點信息是 master00002 ,注冊完以后會進行選舉,選舉有多種算法,這里以編號最小作為選舉算法為例,編號最小的節點將在選舉中獲勝并獲得鎖成為主節點,也就是主節點A將會獲得鎖成為主節點,然后主節點B將被阻塞成為一個各用節點。這樣,通過這種方式 Zookeeper 就完成了對兩個 Master 進程的調度。完成了主、備節點的分配和協作

(2)?master 故障

如果主節點A 發生了故障,這時候它在 ZooKeeper 所注冊的節點信息會被自動刪除,而 ZooKeeper 會自動感知節點的變化,發現主節點A故障后,會再次發出選舉,這時候 主節點B 將在選舉中獲勝,替代主節點A 成為新的主節點,這樣就完成了主、被節點的重新選舉。

(3)?master 恢復

如果主節點恢復了,它會再次向 ZooKeeper 注冊自身的節點信息,只不過這時候它注冊的節點信息將會變成 master00003,而不是原來的信息。ZooKeeper會感知節點的變化再次發動選舉,這時候,主節點B在選舉中會再次獲勝繼續擔任主節點,主節點A 會擔任備用節點。
zookeeper 就是通過這樣的協調、調度機制如此反復的對集群進行管理和狀態同步的。

3.?zookeeper 集群架構

zookeeper 一般是通過集群架構來提供服務的,下圖是 zookeeper 的基本架構圖。

zookeeper 集群主要角色有 server 和 client,其中 server 又分為 leader、follower 和 observer 三個角色,每個角色的含義如下:

  • Leader:領導者角色,主要負責投票的發起和決議,以及更新系統狀態。follower:跟隨著角色,用于接收客戶端的請求并返回結果給客戶端,在選舉過程中參與投票。
  • observer:觀察者角色,用戶接收客戶端的請求,并將寫請求轉發給leader,同時同步 1eader 狀態,但是不參與投票。0bserver 目的是擴展系統,提高伸縮性。
  • client:客戶端角色,用于向zookeeper 發起請求。

4.?zookeeper的工作流程

Zookeeper 修改數據的流程: Zookeeper 集群中每個 Server 在內存中存儲了一份數據,在 Zookeeper 啟動時,將從實例中選舉一個 Server 作為 leader,Leader 負責處理數據更新等操作,當且僅當大多數 Server 在內存中成功修改數據,才認為數據修改成功。
Zookeeper 寫的流程為:客戶端 Client 首先和一個 Server 或者 0bserve 通信,發起寫請求,然后 Server 將寫請求轉發給Leader,Leader 再將寫請求轉發給其它 Server,其它 Server 在接收到寫請求后寫入數據并響應 Leader,Leader在接收到大多數寫成功回應后,認為數據寫成功,最后響應C1ient,完成一次寫操作過程。

五、?單節點部署Kafka

1.?安裝 Zookeeper

先安裝java

[root@localhost ~]# dnf -y install java
[root@localhost ~]# ls
anaconda-ks.cfg  apache-zookeeper-3.6.0-bin.tar.gz  kafka_2.13-2.4.1.tgz[root@localhost ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@localhost ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper[root@localhost ~]# cd /etc/zookeeper/conf
[root@localhost conf]# mv zoo_sample.cfg zoo.cfg
[root@localhost conf]# ls
configuration.xsl  log4j.properties  zoo.cfg[root@localhost conf]# vim zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data在/etc/zookeeper/目錄下創建zookeeper-data目錄
[root@localhost zookeeper]# mkdir zookeeper-data##切換到指定目錄,啟動zookeeper服務
cd /etc/zookeeper/bin    
[root@localhost bin]# ./zkServer.sh start

2.?安裝Kafka

[root@localhost ~]# tar zxvf kafka_2.13-2.4.1.tgz
[root@localhost ~]# mv kafka_2.13-2.4.1 /etc/kafka
[root@localhost ~]# cd /etc/kafka[root@localhost kafka]# vim config/server.properties 
log.dirs=/etc/kafka/kafka-logs           ##60行修改[root@localhost kafka]# mkdir kafka-logs##啟動kafka服務
[root@localhost ~]# cd /etc/kafka/bin
[root@localhost bin]# ./kafka-server-start.sh ../config/server.properties &

3.?測試

[root@localhost bin]# netstat -anpt |grep java
tcp6       0      0 :::45561                :::*                    LISTEN      5055/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      5055/java           
tcp6       0      0 :::9092                 :::*                    LISTEN      5098/java           
tcp6       0      0 :::43721                :::*                    LISTEN      5098/java           
tcp6       0      0 :::8080                 :::*                    LISTEN      5055/java           
tcp6       0      0 127.0.0.1:2181          127.0.0.1:56962         ESTABLISHED 5055/java           
tcp6       0      0 127.0.0.1:9092          127.0.0.1:53582         ESTABLISHED 5098/java           
tcp6       0      0 127.0.0.1:53582         127.0.0.1:9092          ESTABLISHED 5098/java           
tcp6       0      0 127.0.0.1:56962         127.0.0.1:2181          ESTABLISHED 5098/java ##生產消息
[root@localhost bin]./kafka-console-producer.sh --broker-list 127.0.0.1:9092 -topic testaaa
>123
>456
>789##打開一個新的終端,查看消息
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 -topic testaaa
123
456
789

六、?集群部署Kafka

1。 基礎環境設置

關閉防火墻、安裝java

systemctl stop firewalld
setenforce 0
dnf -y install java
##三臺服務器修改名字
hostnamectl set-hostname kafka1
hostnamectl set-hostname kafka2
hostnamectl set-hostname kafka3cat /etc/hosts        ##在該文件中添加
192.168.10.101 kafka1
192.168.10.105 kafka2
192.168.10.106 kafka3

2.?安裝 Zookeeper

[root@kafka1 ~]# cd /etc/zookeeper/conf
[root@kafka1 conf]# ls
configuration.xsl  zoo.cfg               zoo_sample.cfg
log4j.properties   zoo.cfg.dynamic.next[root@kafka1 conf]# vim zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data          ##修改并添加幾行
clientPort=2181
server.1=192.168.10.101:2888:3888          
server.2=192.168.10.105:2888:3888
server.3=192.168.10.106:2888:3888[root@kafka1 conf]# mkdir /etc/zookeeper/zookeeper-data/echo '1'>//etc/zookeeper/zookeeper-data/myid
echo '2'>//etc/zookeeper/zookeeper-data/myid
echo '3'>//etc/zookeeper/zookeeper-data/myid[root@kafka1 ~]# cd /etc/zookeeper/bin 
[root@kafka1 bin]# ./zkServer.sh setart          ##一定要啟動zookeeper

3.?安裝 kafka

[root@kafka1 ~]# cd /etc/kafka/config[root@kafka1 config]# vim server.properties 
broker.id=1           ##id值不能一樣,其他兩個id為2和三
listeners=PLAINTEXT://192.168.10.101:9092      ##三臺填寫自己的ip
log.dirs=/etc/kafka/kafka-logs
zookeeper.connect=192.168.10.101:2181,192.168.10.105:2181,192.168.10.106:2181##啟動kafka
[root@kafka1 ~]# cd /etc/kafka/bin
[root@kafka1 bin]# ./kafka-server-start.sh ../config/server.properties &

4. 測試

任意一臺服務器創建topic
./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test1111111./kafka-console-producer.sh --broker-list kafka1:9092 -topic test1111111       生產消息./kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test1111111      另一臺消費消息

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/83562.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/83562.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/83562.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

NodeJS全棧WEB3面試題——P3Web3.js / Ethers.js 使用

3.1 Ethers.js 和 Web3.js 的主要區別是什么? 比較點Ethers.jsWeb3.js體積更輕量,適合前端較大,加載慢,適合 Node文檔文檔簡潔、現代化,支持 TypeScript文檔豐富,但不夠現代化模塊化設計高度模塊化&#x…

Ubuntu 桌面版忘記賬戶密碼的重置方法

如果你忘記了 Ubuntu 桌面版的用戶密碼,可以通過進入恢復模式(Recovery Mode)來重置密碼。以下是詳細步驟: 一、進入 GRUB 引導菜單 重啟計算機:點擊關機按鈕,選擇重啟。在啟動時按住 Shift 鍵&#xff1…

全志A40i android7.1 調試信息打印串口由uart0改為uart3

一,概述 1. 目的 將調試信息打印串口由uart0改為uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改動 使能uart3(TX:PH00 RX:PH01),并讓boo…

【五子棋在線對戰】二.項目結構設計 實用工具類模板的實現

項目結構設計 1.項目模塊劃分2.業務處理模塊子模塊的劃分3.實用工具類模板的實現3.1 日志宏的實現3.2 mysql工具3.3 JsonCpp工具3.4 string-Split工具 && file_util工具 1.項目模塊劃分 ● 數據管理模塊:依托 MySQL 數據庫,負責用戶數據的存儲與…

53 python akshare(獲取金融數據)

在金融數據獲取與分析領域,AkShare是一個強大且靈活的開源庫,它提供了豐富的金融數據接口,覆蓋股票、期貨、期權、基金、債券、外匯等多個金融市場。AkShare更專注于中國金融市場數據,并且支持從多個數據源獲取數據,具有更高的穩定性和更全面的數據覆蓋。 一、安裝akshar…

藍橋杯17114 殘缺的數字

問題描述 七段碼顯示器是一種常見的顯示數字的電子元件,它由七個發光管組成: 圖依次展示了數字 0~9 用七段碼來顯示的狀態,其中燈管為黃色表示點亮,灰色表示熄滅。根據燈管的亮暗狀態,我們可以用一個狀態碼(狀態碼是一個 7 位的…

Java觀察者模式深度解析:構建松耦合事件驅動系統的藝術

目錄 觀察者模式基礎解析核心結構與實現原理Java內置觀察者實現Spring框架中的高級應用典型應用場景與實戰案例觀察者模式變體與優化常見問題與最佳實踐總結與未來展望1. 觀察者模式基礎解析 1.1 模式定義與核心思想 觀察者模式(Observer Pattern)是一種行為型設計模式,它…

NocoBase v1.7.0 正式版發布

原文鏈接:https://www.nocobase.com/cn/blog/nocobase-1-7-0。 新特性 用戶角色并集 角色并集是一種權限管理模式,根據系統設置,系統開發者可以選擇使用獨立角色、允許角色并集,或者僅使用角色并集,以滿足不同的權限…

破解通信難題,modbus轉profibus網關在高爐水沖渣系統中穩定好用

基于在高爐水沖渣傳動監控系統的工藝背景下,穩聯技術Profibus-Modbus網關在控制系統中使支持Profibus協議的設備與支持Modbus RTU協議的設備之間進行通訊協議轉換的作用,使得支持不同通訊協議的設備之間能夠進行數據傳遞,并且給出了設計方法.應用Profibus-Modbus總線橋WL-ABD30…

開源是什么?我們為什么要開源?

本片為故事類文章推薦聽音頻哦 軟件自由運動的背景 夢開始的地方 20世紀70年代,軟件行業處于早期發展階段,軟件通常與硬件捆綁銷售,用戶對軟件的使用、修改和分發權利非常有限。隨著計算機技術的發展和互聯網的普及,越來越多的開…

Educational Codeforces Round 179 (Rated for Div. 2)(A-E)

題目鏈接:Dashboard - Educational Codeforces Round 179 (Rated for Div. 2) - Codeforces A. Energy Crystals 思路 貪心地模擬一下過程很容易就看出來了,每次變成盡可能大的數 1 1 0 -> 1 1 3 -> 3 3 5 -> 5 5 11....我們只需要關注最大…

React Native開發鴻蒙運動健康類應用的項目實踐記錄

??項目名稱??:HarmonyFitness - 基于React Native的鴻蒙運動健康應用 ??技術棧??:React Native 0.72.5 TypeScript HarmonyOS API ArkTS原生模塊 一、環境搭建與項目初始化 ??雙環境配置?? ??React Native環境??: npx re…

Linux --UDP套接字實現簡單的網絡聊天室

一、Server端的實現 1.1、服務端的初始化 ①、創建套接字&#xff1a; 創建套接字接口&#xff1a; #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> int socket(int domain, int type, int protocol); //1. 這是一個創建套接字的接…

Eureka 高可用集群搭建實戰:服務注冊與發現的底層原理與避坑指南

引言&#xff1a;為什么 Eureka 依然是存量系統的核心&#xff1f; 盡管 Nacos 等新注冊中心崛起&#xff0c;但金融、電力等保守行業仍有大量系統運行在 Eureka 上。理解其高可用設計與自我保護機制&#xff0c;是保障分布式系統穩定的必修課。本文將手把手帶你搭建生產級 Eur…

Spring Boot應用開發實戰

Spring Boot應用開發實戰&#xff1a;從零到生產級項目的深度指南 在當今Java生態中&#xff0c;Spring Boot已占據絕對主導地位——據統計&#xff0c;超過75%的新Java項目選擇Spring Boot作為開發框架。本文將帶您從零開始&#xff0c;深入探索Spring Boot的核心精髓&#xf…

yum更換阿里云的鏡像源

步驟 1&#xff1a;備份原有源配置&#xff08;重要&#xff01;&#xff09; sudo mkdir /etc/yum.repos.d/backup sudo mv /etc/yum.repos.d/CentOS-* /etc/yum.repos.d/backup/步驟 2&#xff1a;下載阿里云源配置 sudo curl -o /etc/yum.repos.d/CentOS-Base.repo https:…

【算法訓練營Day06】哈希表part2

文章目錄 四數相加贖金信三數之和四數之和 四數相加 題目鏈接&#xff1a;454. 四數相加 II 這個題注意它只需要給出次數&#xff0c;而不是元組。所以我們可以分治。將前兩個數組的加和情況使用map存儲起來&#xff0c;再將后兩個數組的加和情況使用map存儲起來&#xff0c;ke…

JS手寫代碼篇---手寫apply方法

11、手寫apply方法 apply方法的作用&#xff1a; apply 是一個函數的方法&#xff0c;它允許你調用一個函數&#xff0c;同時將函數的 this 值設置為指定的值&#xff0c;并將函數的參數作為數組&#xff08;或類數組對象&#xff09;傳遞給該函數。 與call的區別&#xff1…

冪等性:保障系統穩定的關鍵設計

冪等性&#xff08;Idempotence&#xff09; 是計算機科學和分布式系統中的核心概念&#xff0c;指同一操作重復執行多次所產生的效果與執行一次的效果相同。這一特性對系統容錯性、數據一致性至關重要&#xff0c;尤其在網絡通信&#xff08;如HTTP&#xff09;和數據庫設計中…

electron定時任務,打印內存占用情況

// 監聽更新 function winUpdate(){// 每次執行完后重新設置定時器try {// 獲取當前時間并格式化為易讀的字符串const now new Date();const timeString now.toLocaleString();console.log(當前時間: ${timeString});// 記錄內存使用情況&#xff08;可選&#xff09;const m…