Zookeeper與Kafka

Zookeeper與Kafka

    • 一、Zookeeper 概述
      • 1.Zookeeper 定義
      • 2.Zookeeper 工作機制
      • 3.Zookeeper 特點
      • 4.Zookeeper 數據結構
      • 5.Zookeeper 應用場景
      • 6.Zookeeper 選舉機制
    • 二、部署 Zookeeper 集群
      • 1.準備 3 臺服務器做 Zookeeper 集群
      • 2.安裝 Zookeeper
      • 3.拷貝配置好的 Zookeeper 配置文件到其他機器上
      • 4.配置 Zookeeper 啟動腳本
      • 5.設置開機自啟
    • 三、Kafka概述
      • 1.為什么需要消息隊列(MQ):
      • 2.使用消息隊列的好處
      • 3.消息隊列的兩種模式
      • 4.Kafka 定義
      • 5.Kafka 簡介
      • 6.Kafka 的特性
      • 7.Kafka 系統架構
    • 四、Kafka架構
      • 1.Partation 數據路由規則:
      • 2.分區介紹
      • 3.分區的原因
      • 4.架構概述
    • 五、部署 kafka 集群
      • 1.下載安裝包
      • 2.安裝 Kafka
      • 3.Kafka 命令行操作
    • 六、Kafka 架構深入
      • 1.Kafka 工作流程及文件存儲機制
      • 2.數據可靠性保證
      • 3.數據一致性問題
      • 4.ack 應答機制
    • 七、部署
      • 1.部署 Zookeeper+Kafka 集群
      • 2.部署 Filebeat
      • 3.部署 ELK,在 Logstash 組件所在節點上新建一個 Logstash 配置文件

)

一、Zookeeper 概述

1.Zookeeper 定義

Zookeeper是一個開源的分布式的,為分布式框架提供協調服務的Apache項目。

2.Zookeeper 工作機制

Zookeeper從設計模式角度來理解:是一個基于觀察者模式設計的分布式服務管理框架,它負責存儲和管理大家都關心的數據,然后接受觀察者的注冊,一旦這些數據的狀態發生變化,Zookeeper就將負責通知已經在Zookeeper上注冊的那些觀察者做出相應的反應。也就是說 Zookeeper = 文件系統 + 通知機制。
在這里插入圖片描述

3.Zookeeper 特點

(1)Zookeeper:一個領導者(Leader),多個跟隨者(Follower)組成的集群。
(2)Zookeepe集群中只要有半數以上節點存活,Zookeeper集群就能正常服務。所以Zookeeper適合安裝奇數臺服務器。
(3)全局數據一致:每個Server保存一份相同的數據副本,Client無論連接到哪個Server,數據都是一致的。
(4)更新請求順序執行,來自同一個Client的更新請求按其發送順序依次執行,即先進先出。
(5)數據更新原子性,一次數據更新要么成功,要么失敗。
(6)實時性,在一定時間范圍內,Client能讀到最新數據。

4.Zookeeper 數據結構

ZooKeeper數據模型的結構與Linux文件系統很類似,整體上可以看作是一棵樹,每個節點稱做一個ZNode。每一個ZNode默認能夠存儲1MB的數據,每個ZNode都可以通過其路徑唯一標識。

在這里插入圖片描述

5.Zookeeper 應用場景

提供的服務包括:統一命名服務、統一配置管理、統一集群管理、服務器節點動態上下線、軟負載均衡等。
(1)統一命名服務
在分布式環境下,經常需要對應用/服務進行統一命名,便于識別。例如:IP不容易記住,而域名容易記住。

(2)統一配置管理
① 分布式環境下,配置文件同步非常常見。一般要求一個集群中,所有節點的配置信息是一致的,比如Kafka集群。對配置文件修改后,希望能夠快速同步到各個節點上。
② 配置管理可交由ZooKeeper實現。可將配置信息寫入ZooKeeper上的一個Znode。各個客戶端服務器監聽這個Znode。一旦 Znode中的數據被修改,ZooKeeper將通知各個客戶端服務器。

(3)統一集群管理
① 分布式環境中,實時掌握每個節點的狀態是必要的。可根據節點實時狀態做出一些調整。
② ZooKeeper可以實現實時監控節點狀態變化。可將節點信息寫入ZooKeeper上的一個ZNode。監聽這個ZNode可獲取它的實時狀態變化。

(4)服務器動態上下線
客戶端能實時洞察到服務器上下線的變化。

(5)軟負載均衡
在Zookeeper中記錄每臺服務器的訪問數,讓訪問數最少的服務器去處理最新的客戶端請求。

6.Zookeeper 選舉機制

(1)第一次啟動選舉機制
① 服務器1啟動,發起一次選舉。服務器1投自己一票。此時服務器1票數一票,不夠半數以上(3票),選舉無法完成,服務器1狀態保持為LOOKING;
② 服務器2啟動,再發起一次選舉。服務器1和2分別投自己一票并交換選票信息:此時服務器1發現服務器2的myid比自己目前投票推舉的(服務器1)大,更改選票為推舉服務器2。此時服務器1票數0票,服務器2票數2票,沒有半數以上結果,選舉無法完成,服務器1,2狀態保持LOOKING
③ 服務器3啟動,發起一次選舉。此時服務器1和2都會更改選票為服務器3。此次投票結果:服務器1為0票,服務器2為0票,服務器3為3票。此時服務器3的票數已經超過半數,服務器3當選Leader。服務器1,2更改狀態為FOLLOWING,服務器3更改狀態為LEADING;
④ 服務器4啟動,發起一次選舉。此時服務器1,2,3已經不是LOOKING狀態,不會更改選票信息。交換選票信息結果:服務器3為3票,服務器4為1票。此時服務器4服從多數,更改選票信息為服務器3,并更改狀態為FOLLOWING;
⑤ 服務器5啟動,同4一樣當小弟。

(2)非第一次啟動選舉機制
① 當ZooKeeper 集群中的一臺服務器出現以下兩種情況之一時,就會開始進入Leader選舉:
1)服務器初始化啟動。
2)服務器運行期間無法和Leader保持連接。

② 而當一臺機器進入Leader選舉流程時,當前集群也可能會處于以下兩種狀態:
1)集群中本來就已經存在一個Leader。
對于已經存在Leader的情況,機器試圖去選舉Leader時,會被告知當前服務器的Leader信息,對于該機器來說,僅僅需要和 Leader機器建立連接,并進行狀態同步即可。

2)集群中確實不存在Leader。
假設ZooKeeper由5臺服務器組成,SID分別為1、2、3、4、5,ZXID分別為8、8、8、7、7,并且此時SID為3的服務器是Leader。某一時刻,3和5服務器出現故障,因此開始進行Leader選舉。
選舉Leader規則:
1.EPOCH大的直接勝出
2.EPOCH相同,事務id大的勝出
3.事務id相同,服務器id大的勝出


  • SID:服務器ID。用來唯一標識一臺ZooKeeper集群中的機器,每臺機器不能重復,和myid一致。

  • ZXID:事務ID。ZXID是一個事務ID,用來標識一次服務器狀態的變更。在某一時刻,集群中的每臺機器的ZXID值不一定完全一致,這和ZooKeeper服務器對于客戶端“更新請求”的處理邏輯速度有關。

  • Epoch:每個Leader任期的代號。沒有Leader時同一輪投票過程中的邏輯時鐘值是相同的。每投完一次票這個數據就會增加

二、部署 Zookeeper 集群

1.準備 3 臺服務器做 Zookeeper 集群

192.168.174.13
192.168.174.15
192.168.174.17

(1)安裝前準備,關閉防火墻

systemctl stop firewalld
systemctl disable firewalld
setenforce 0

(2)安裝 JDK

yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version//下載安裝包
官方下載地址:https://archive.apache.org/dist/zookeeper/cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz

2.安裝 Zookeeper

cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7//修改配置文件
cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfgvim zoo.cfg
tickTime=2000   #通信心跳時間,Zookeeper服務器與客戶端心跳時間,單位毫秒
initLimit=10    #Leader和Follower初始連接時能容忍的最多心跳數(tickTime的數量),這里表示為10*2s
syncLimit=5     #Leader和Follower之間同步通信的超時時間,這里表示如果超過5*2s,Leader認為Follwer死掉,并從服務器列表中刪除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data      ●修改,指定保存Zookeeper中的數據的目錄,目錄需要單獨創建
dataLogDir=/usr/local/zookeeper-3.5.7/logs   ●添加,指定存放日志的目錄,目錄需要單獨創建
clientPort=2181   #客戶端連接端口
#添加集群信息
server.1=192.168.174.13:3188:3288
server.2=192.168.174.15:3188:3288
server.3=192.168.174.17:3188:3288

server.A=B:C:D

  • A是一個數字,表示這個是第幾號服務器。集群模式下需要在zoo.cfg中dataDir指定的目錄下創建一個文件myid,這個文件里面有一個數據就是A的值,Zookeeper啟動時讀取此文件,拿到里面的數據與zoo.cfg里面的配置信息比較從而判斷到底是哪個server。
  • B是這個服務器的地址。
  • C是這個服務器Follower與集群中的Leader服務器交換信息的端口。
  • D是萬一集群中的Leader服務器掛了,需要一個端口來重新進行選舉,選出一個新的Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。

3.拷貝配置好的 Zookeeper 配置文件到其他機器上

scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.174.15:/usr/local/zookeeper-3.5.7/conf/
scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.174.17:/usr/local/zookeeper-3.5.7/conf///在每個節點上創建數據目錄和日志目錄
mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs//在每個節點的dataDir指定的目錄下創建一個 myid 的文件
echo 1 > /usr/local/zookeeper-3.5.7/data/myid
echo 2 > /usr/local/zookeeper-3.5.7/data/myid
echo 3 > /usr/local/zookeeper-3.5.7/data/myid

4.配置 Zookeeper 啟動腳本

vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)echo "---------- zookeeper 啟動 ------------"$ZK_HOME/bin/zkServer.sh start
;;
stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop
;;
restart)echo "---------- zookeeper 重啟 ------------"$ZK_HOME/bin/zkServer.sh restart
;;
status)echo "---------- zookeeper 狀態 ------------"$ZK_HOME/bin/zkServer.sh status
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac

5.設置開機自啟

chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper//分別啟動 Zookeeper
systemctl start zookeeper.service //查看當前狀態
systemctl status zookeeper.service 

在這里插入圖片描述

三、Kafka概述

1.為什么需要消息隊列(MQ):

(1)由于在高并發環境下,同步請求來不及處理,請求往往會發生阻塞。比如大量的請求并發訪問數據庫,導致行鎖表鎖,最后請求線程會堆積過多,從而觸發 too many connection 錯誤,引發雪崩效應。
(2)我們使用消息隊列,通過異步處理請求,從而緩解系統的壓力。消息隊列常應用于異步處理,流量削峰,應用解耦,消息通訊等場景。

(3)當前比較常見的 MQ 中間件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。

2.使用消息隊列的好處

(1)解耦
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

(2)可恢復性
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。

(3)緩沖
有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。

(4)靈活性 & 峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

(5)異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

3.消息隊列的兩種模式

(1)點對點模式(一對一,消費者主動拉取數據,消息收到后消息清除)
消息生產者生產消息發送到消息隊列中,然后消息消費者從消息隊列中取出并且消費消息。消息被消費以后,消息隊列中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。消息隊列支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。

(2)發布/訂閱模式(一對多,又叫觀察者模式,消費者消費數據之后不會清除消息)
消息生產者(發布)將消息發布到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到 topic 的消息會被所有訂閱者消費。
發布/訂閱模式是定義對象間一種一對多的依賴關系,使得每當一個對象(目標對象)的狀態發生改變,則所有依賴于它的對象(觀察者對象)都會得到通知并自動更新。

4.Kafka 定義

(1)Kafka 是一個分布式的基于發布/訂閱模式的消息隊列(MQ,Message Queue),主要應用于大數據實時處理領域。

5.Kafka 簡介

(1)Kafka 是最初由 Linkedin 公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基于 Zookeeper 協調的分布式消息中間件系統。

(2)它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景,比如基于 hadoop 的批處理系統、低延遲的實時系統、Spark/Flink 流式處理引擎,nginx 訪問日志,消息服務等等,用 scala 語言編寫。

(3)Linkedin 于 2010 年貢獻給了 Apache 基金會并成為頂級開源項目。

6.Kafka 的特性

(1)高吞吐量、低延遲
Kafka 每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒。每個 topic 可以分多個 Partition,Consumer Group 對 Partition 進行消費操作,提高負載均衡能力和消費能力。

(2)可擴展性
kafka 集群支持熱擴展

(3)持久性、可靠性
消息被持久化到本地磁盤,并且支持數據備份防止數據丟失

(4)容錯性
允許集群中節點失敗(多副本情況下,若副本數量為 n,則允許 n-1 個節點失敗)

(5)高并發
支持數千個客戶端同時讀寫

7.Kafka 系統架構

(1)Broker
一臺 kafka 服務器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic。

(2)Topic 主題
可以理解為一個隊列,生產者和消費者面向的都是一個 topic。
類似于數據庫的表名或者 ES 的 index
物理上不同 topic 的消息分開存儲

(3)Partition 分區
為了實現擴展性,一個非常大的 topic 可以分布到多個 broker(即服務器)上,一個 topic 可以分割為一個或多個 partition,每個 partition 是一個有序的隊列。Kafka 只保證 partition 內的記錄是有序的,而不保證 topic 中不同 partition 的順序。

每個 topic 至少有一個 partition,當生產者產生數據的時候,會根據分配策略選擇分區,然后將消息追加到指定的分區的隊列末尾。

四、Kafka架構

1.Partation 數據路由規則:

(1)指定了 patition,則直接使用;
(2)未指定 patition 但指定 key(相當于消息中某個屬性),通過對 key 的 value 進行 hash 取模,選出一個 patition;
(3)patition 和 key 都未指定,使用輪詢選出一個 patition。

  • 每條消息都會有一個自增的編號,用于標識消息的偏移量,標識順序從 0 開始。

  • 每個 partition 中的數據使用多個 segment 文件存儲。

  • 如果 topic 有多個 partition,消費數據時就不能保證數據的順序。嚴格保證消息的消費順序的場景下(例如商品秒殺、 搶紅包),需要將 partition 數目設為 1。

2.分區介紹

(1)broker 存儲 topic 的數據。如果某 topic 有 N 個 partition,集群有 N 個 broker,那么每個 broker 存儲該 topic 的一個 partition。
(2)如果某 topic 有 N 個 partition,集群有 (N+M) 個 broker,那么其中有 N 個 broker 存儲 topic 的一個 partition, 剩下的 M 個 broker 不存儲該 topic 的 partition 數據。
(3)如果某 topic 有 N 個 partition,集群中 broker 數目少于 N 個,那么一個 broker 存儲該 topic 的一個或多個 partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致 Kafka 集群數據不均衡。

3.分區的原因

(1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;
(2)可以提高并發,因為可以以Partition為單位讀寫了。

4.架構概述

(1)Replica
副本,為保證集群中的某個節點發生故障時,該節點上的 partition 數據不丟失,且 kafka 仍然能夠繼續工作,kafka 提供了副本機制,一個 topic 的每個分區都有若干個副本,一個 leader 和若干個 follower。

(2)Leader
每個 partition 有多個副本,其中有且僅有一個作為 Leader,Leader 是當前負責數據的讀寫的 partition。

(3)Follower
Follower 跟隨 Leader,所有寫請求都通過 Leader 路由,數據變更會廣播給所有 Follower,Follower 與 Leader 保持數據同步。Follower 只負責備份,不負責數據的讀寫。
如果 Leader 故障,則從 Follower 中選舉出一個新的 Leader。
當 Follower 掛掉、卡住或者同步太慢,Leader 會把這個 Follower 從 ISR(Leader 維護的一個和 Leader 保持同步的 Follower 集合) 列表中刪除,重新創建一個 Follower。

(4) Producer
生產者即數據的發布者,該角色將消息 push 發布到 Kafka 的 topic 中。
broker 接收到生產者發送的消息后,broker 將該消息追加到當前用于追加數據的 segment 文件中。
生產者發送的消息,存儲到一個 partition 中,生產者也可以指定數據存儲的 partition。

(5)Consumer
消費者可以從 broker 中 pull 拉取數據。消費者可以消費多個 topic 中的數據。

(6)Consumer Group(CG)
消費者組,由多個 consumer 組成。
所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。可為每個消費者指定組名,若不指定組名則屬于默認的組。
將多個消費者集中到一起去處理某一個 Topic 的數據,可以更快的提高數據的消費能力。
消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個組內消費者消費,防止數據被重復讀取。
消費者組之間互不影響。

(7)offset 偏移量
可以唯一的標識一條消息。
偏移量決定讀取數據的位置,不會有線程安全的問題,消費者通過偏移量來決定下次讀取的消息(即消費位置)。
消息被消費之后,并不被馬上刪除,這樣多個業務就可以重復使用 Kafka 的消息。
某一個業務也可以通過修改偏移量達到重新讀取消息的目的,偏移量由用戶控制。
消息最終還是會被刪除的,默認生命周期為 1 周(7*24小時)。

(8)Zookeeper
Kafka 通過 Zookeeper 來存儲集群的 meta 信息。

由于 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復后,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復后繼續消費。
Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中;從 0.9 版本開始,consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為 __consumer_offsets。

總結:zookeeper的作用就是,生產者push數據到kafka集群,就必須要找到kafka集群的節點在哪里,這些都是通過zookeeper去尋找的。消費者消費哪一條數據,也需要zookeeper的支持,從zookeeper獲得offset,offset記錄上一次消費的數據消費到哪里,這樣就可以接著下一條數據進行消費。

五、部署 kafka 集群

1.下載安裝包

官方下載地址:http://kafka.apache.org/downloads.html
上傳到opt目錄
cd /opt

2.安裝 Kafka

cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka

(1)修改配置文件

cd /usr/local/kafka/config/
cp server.properties{,.bak}vim server.properties
broker.id=0    ●21行,broker的全局唯一編號,每個broker不能重復,因此要在其他機器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.174.13:9092    ●31行,指定監聽的IP和端口,如果修改每個broker的IP需區分開來,也可保持默認配置不用修改
num.network.threads=3    #42行,broker 處理網絡請求的線程數量,一般情況下不需要去修改
num.io.threads=8         #45行,用來處理磁盤IO的線程數量,數值應該大于硬盤數
socket.send.buffer.bytes=102400       #48行,發送套接字的緩沖區大小
socket.receive.buffer.bytes=102400    #51行,接收套接字的緩沖區大小
socket.request.max.bytes=104857600    #54行,請求套接字的緩沖區大小
log.dirs=/usr/local/kafka/logs        #60行,kafka運行日志存放的路徑,也是數據存放的路徑
num.partitions=1    #65行,topic在當前broker上的默認分區個數,會被topic創建時的指定參數覆蓋
num.recovery.threads.per.data.dir=1    #69行,用來恢復和清理data下數據的線程數量
log.retention.hours=168    #103行,segment文件(數據文件)保留的最長時間,單位為小時,默認為7天,超時將被刪除
log.segment.bytes=1073741824    #110行,一個segment文件最大的大小,默認為 1G,超出將新建一個新的segment文件
zookeeper.connect=192.168.174.13:2181,192.168.174.15:2181,192.168.174.17:2181    ●123行,配置連接Zookeeper集群地址

(2)修改環境變量

vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/binsource /etc/profile

(3)配置 Zookeeper 啟動腳本

vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 啟動 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 狀態 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac

(4)設置開機自啟

chmod +x /etc/init.d/kafka
chkconfig --add kafka

(5)分別啟動 Kafka

systemctl start kafka

3.Kafka 命令行操作

(1)創建topic

kafka-topics.sh --create --zookeeper 192.168.174.13:2181,192.168.174.15:2181,192.168.174.17:2181 --replication-factor 2 --partitions 3 --topic testkafka-topics.sh --create --zookeeper 192.168.10.17:2181,192.168.10.20:2181,192.168.10.21:2181 --replication-factor 2 --partitions 3 --topic test--zookeeper:定義 zookeeper 集群服務器地址,如果有多個 IP 地址使用逗號分割,一般使用一個 IP 即可
--replication-factor:定義分區副本數,1 代表單副本,建議為 2 
--partitions:定義分區數

在這里插入圖片描述

(2)topic:定義 topic 名稱

//查看當前服務器中的所有 topic
kafka-topics.sh --list --zookeeper 192.168.174.13:2181,192.168.174.15:2181,192.168.174.17:2181//查看某個 topic 的詳情
kafka-topics.sh  --describe --zookeeper 192.168.174.13:2181,192.168.174.15:2181,192.168.174.17:2181//發布消息
kafka-console-producer.sh --broker-list 192.168.174.13:9092,192.168.174.15:9092,192.168.174.17:9092  --topic test//消費消息
kafka-console-consumer.sh --bootstrap-server 192.168.174.13:9092,192.168.174.15:9092,192.168.174.17:9092 --topic test --from-beginning

(3)from-beginning:會把主題中以往所有的數據都讀取出來

//修改分區數
kafka-topics.sh --zookeeper 192.168.174.13:2181,192.168.174.15:2181,192.168.174.17:2181 --alter --topic test --partitions 6//刪除 topic
kafka-topics.sh --delete --zookeeper 192.168.174.13:2181,192.168.174.15:2181,192.168.174.17:2181 --topic test

六、Kafka 架構深入

1.Kafka 工作流程及文件存儲機制

(1)Kafka 中消息是以 topic 進行分類的,生產者生產消息,消費者消費消息,都是面向 topic 的。

(2)topic 是邏輯上的概念,而 partition 是物理上的概念,每個 partition 對應于一個 log 文件,該 log 文件中存儲的就是 producer 生產的數據。Producer 生產的數據會被不斷追加到該 log 文件末端,且每條數據都有自己的 offset。 消費者組中的每個消費者,都會實時記錄自己消費到了哪個 offset,以便出錯恢復時,從上次的位置繼續消費。

(3)由于生產者生產的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數據定位效率低下,Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment。每個 segment 對應兩個文件:“.index” 文件和 “.log” 文件。這些文件位于一個文件夾下,該文件夾的命名規則為:topic名稱+分區序號。例如,test 這個 topic 有三個分區, 則其對應的文件夾為 test-0、test-1、test-2。

index 和 log 文件以當前 segment 的第一條消息的 offset 命名。

“.index” 文件存儲大量的索引信息,“.log” 文件存儲大量的數據,索引文件中的元數據指向對應數據文件中 message 的物理偏移地址。

2.數據可靠性保證

(1)為保證 producer 發送的數據,能可靠的發送到指定的 topic,topic 的每個 partition 收到 producer 發送的數據后, 都需要向 producer 發送 ack(acknowledgement 確認收到),如果 producer 收到 ack,就會進行下一輪的發送,否則重新發送數據。

3.數據一致性問題

LEO:指的是每個副本最大的 offset;
HW:指的是消費者能見到的最大的 offset,所有副本中最小的 LEO。

(1)follower 故障
follower 發生故障后會被臨時踢出 ISR(Leader 維護的一個和 Leader 保持同步的 Follower 集合),待該 follower 恢復后,follower 會讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 leader 進行同步。等該 follower 的 LEO 大于等于該 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障
leader 發生故障之后,會從 ISR 中選出一個新的 leader, 之后,為保證多個副本之間的數據一致性,其余的 follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader 同步數據。

注:這只能保證副本之間的數據一致性,并不能保證數據不丟失或者不重復。

4.ack 應答機制

對于某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等 ISR 中的 follower 全部接收成功。所以 Kafka 為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡選擇。

當 producer 向 leader 發送數據時,可以通過 request.required.acks 參數來設置數據可靠性的級別:
0:這意味著producer無需等待來自broker的確認而繼續發送下一批消息。這種情況下數據傳輸效率最高,但是數據可靠性確是最低的。當broker故障時有可能丟失數據。

1(默認配置):這意味著producer在ISR中的leader已成功收到的數據并得到確認后發送下一條message。如果在follower同步成功之前leader故障,那么將會丟失數據。

-1(或者是all):producer需要等待ISR中的所有follower都確認接收到數據后才算一次發送完成,可靠性最高。但是如果在 follower 同步完成后,broker 發送ack 之前,leader 發生故障,那么會造成數據重復。

三種機制性能依次遞減,數據可靠性依次遞增。

注:在 0.11 版本以前的Kafka,對此是無能為力的,只能保證數據不丟失,再在下游消費者對數據做全局去重。在 0.11 及以后版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發送多少次重復數據, Server 端都只會持久化一條。

七、部署

1.部署 Zookeeper+Kafka 集群

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ZiraG7ns-1691406753329)(C:\Users\yuyong\Desktop\筆記\圖\68.png)]

2.部署 Filebeat

cd /usr/local/filebeatvim filebeat.yml
filebeat.prospectors:- type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]......
#添加輸出到 Kafka 的配置
output.kafka:enabled: truehosts: ["192.168.174.13:9092","192.168.174.15:9092","192.168.174.17:9092"]    #指定 Kafka 集群配置topic: "httpd"    #指定 Kafka 的 topic#啟動 filebeat
./filebeat -e -c filebeat.yml

3.部署 ELK,在 Logstash 組件所在節點上新建一個 Logstash 配置文件

cd /etc/logstash/conf.d/vim kafka.conf
input {kafka {bootstrap_servers => "192.168.174.13:9092,192.168.174.15:9092,192.168.174.17:9092"  #kafka集群地址topics  => "httpd"     #拉取的kafka的指定topictype => "httpd_kafka"  #指定 type 字段codec => "json"        #解析json格式的日志數據auto_offset_reset => "latest"  #拉取最近數據,earliest為從頭開始拉取decorate_events => true   #傳遞給elasticsearch的數據額外增加kafka的屬性數據}
}output {if "access" in [tags] {elasticsearch {hosts => ["192.168.174.13:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.174.13:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}#啟動 logstash
logstash -f kafka.conf

注:生產黑屏操作es時查看所有的索引:curl -X GET “localhost:9200/_cat/indices?v”

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/34850.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/34850.shtml
英文地址,請注明出處:http://en.pswp.cn/news/34850.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Java】 java | git | win系統重裝會給開發環境帶來哪些問題

一、概述 1、近期發現電腦用起來不絲滑了,文件夾操作卡頓,一陣操作還會藍屏 2、不能忍,整理排查 二、電腦情況 1、CPU: I5-9400F 2.9GHz 6核 2、內存: 32G 3、固態:256G 4、機械:1T 5、盤符使用…

二叉樹的講解

💓博主個人主頁:不是笨小孩👀 ?專欄分類:數據結構與算法👀 刷題專欄👀 C語言👀 🚚代碼倉庫:笨小孩的代碼庫👀 ?社區:不是笨小孩👀 🌹歡迎大家三連關注&…

詳解C語言中的int8_t、uint8_t、int16_t、uint16_t、int32_t、uint32_t、int64_t、uint64_t

2023年8月8日,周二上午 目錄 為什么會產生int8_t、uint8_t等這類數據類型int8_t、uint8_t等這類數據類型有什么用頭文件int8_t、uint8_t等這類數據類型是怎么實現的 為什么會產生int8_t、uint8_t等這類數據類型 根本原因在于,C 語言標準只是規定了各個…

SQL | 匯總數據

9-匯總數據 9.1-聚集函數 在實際開發過程中,可能會遇到下面這些情況: 確定大于某個值的有多少行數據,比如游戲排行榜,查詢玩家排行多少名。 獲取表中某些行的和,比如雙十一當天,某個用戶總訂單價格是多少…

學習篇之React Fiber概念及原理

什么是React Fibber? React Fiber 是 React 框架的一種底層架構,為了改進 React 的渲染引擎,使其更加高效、靈活和可擴展。 傳統上,React 使用一種稱為堆棧調和遞歸算法來處理虛擬 DOM 的更新,這種方法在大型應用或者…

最強自動化測試框架Playwright(7)- 使用cookie避免重復登錄

playwright在稱為瀏覽器上下文的隔離環境中執行測試。這種隔離模型提高了可重復性,并防止了級聯測試失敗。測試可以加載現有的經過身份驗證的狀態。這消除了在每次測試中進行身份驗證的需要,并加快了測試執行速度。 每次測試前登錄 以下示例登錄到 Git…

談談什么是云計算?以及它的應用

作者:Insist-- 個人主頁:insist--個人主頁 作者會持續更新網絡知識和python基礎知識,期待你的關注 目錄 ?編輯 一、什么是云計算 二、云計算的優勢與劣勢? 1、云計算的優勢 ①提高資源利用率 ②提升效率 ③降低成本 2、云…

python編程基礎與案例集錦,python編程入門經典

大家好,本文將圍繞python編程基礎與案例集錦展開說明,python編程入門與案例詳解是一個很多人都想弄明白的事情,想搞清楚python入門程序例子需要先了解以下幾個事情。 【程序1】 題目:輸入一行字符,分別統計出其中英文字…

『CV學習筆記』Opencv和PIL Image以及base64編碼互相轉化

Opencv和PIL Image以及base64編碼互相轉化 文章目錄 一. opencv&PIL.Image&Skimage1.1. opencv-python讀取透明圖片(帶alpha通道)1.2. opencv、PIL.Image、Skimage讀取的彩色圖片維度區別1.3. opencv、PIL.Image轉換二. base64和cv2 imge互相轉換三. base64和PIL imge互…

射頻入門知識-混頻器-1

5.4混頻電路-視頻_嗶哩嗶哩_bilibili ???????

【算法題】螺旋矩陣II (求解n階Z形矩陣)

一、問題的提出 n階Z形矩陣的特點是按照之(Z)字形的方式排列元素。n階Z形矩陣是指矩陣的大小為nn,其中n為正整數。 題目描述 一個 n 行 n 列的螺旋(Z形)矩陣如圖1所示,觀察并找出填數規律。 圖1 7行7列和8行8列的螺旋(Z形)矩陣 現在給出矩陣大小 n&…

數據結構入門:棧

目錄 前言 1. 棧 1.1棧的概念及結構 1.2 棧的實現 1.2.1 棧的定義 1.2.2 棧的初始化 1.2.3 入棧 1.2.4 出棧 1.2.5 棧的元素個數 1.2.6 棧頂數據 1.2.7 棧的判空 2.棧的應用 2.1 題目一:括號匹配 2.1.1 思路 2.1.2 分析 2.1.3 題解 總結 前言 無論你是計算機科學專…

CVE漏洞復現-CVE-2021-22555 Linux Netfilter 權限提升漏洞

CVE-2021-22555 Linux Netfilter 權限提升漏洞 漏洞描述 近日,互聯網公開了Linux Netfilter權限提升漏洞的POC及EXP,相關CVE編號:CVE-2021-22555。該漏洞在kCTF中被用于攻擊kubernetes pod容器實現虛擬化逃逸,該漏洞已在Linux內…

用chatGPT從左右眼圖片生成點云數據

左右眼圖片 需求 需要將左右眼圖像利用視差生成三維點云數據 先問問chatGPT相關知識 進一步問有沒有現成的軟件 chatGPT提到了OpenCV,我們讓chatGPT用OpenCV寫一個程序來做這個事情 當然,代碼里面會有一些錯誤,chatGPT寫的代碼并不會做模…

Arduino驅動MQ2模擬煙霧傳感器(氣體傳感器篇)

目錄 1、傳感器特性 2、硬件原理圖 3、控制器和傳感器連線圖 4、驅動程序 MQ2氣體傳感器,可以很靈敏的檢測到空氣中的煙霧、液化氣、丁烷、丙烷、甲烷、酒精、氫氣等氣體,與Arduino結合使用,可以制作火災煙霧報警、液化氣、丁烷、丙烷、甲烷、酒精、氫氣氣體泄露報警等相…

面試題. 字符串壓縮

字符串壓縮。利用字符重復出現的次數,編寫一種方法,實現基本的字符串壓縮功能。比如,字符串aabcccccaaa會變為a2b1c5a3。若“壓縮”后的字符串沒有變短,則返回原先的字符串。你可以假設字符串中只包含大小寫英文字母(a…

【JavaEE進階】Spring 更簡單的讀取和存儲對象

文章目錄 一. 存儲Bean對象1. 配置掃描路徑2. 添加注解存儲 Bean 對象2.1 使用五大類注解存儲Bean2.2 為什么要有五大類注解?2.3 有關獲取Bean參數的命名規則 3. 使用方法注解儲存 Bean 對象3.1 方法注解儲存對象的用法3.2 Bean的重命名3.3 同?類型多個 Bean 報錯 …

Spring Boot單元測試與Mybatis單表增刪改查

目錄 1. Spring Boot單元測試 1.1 什么是單元測試? 1.2 單元測試有哪些好處? 1.3 Spring Boot 單元測試使用 單元測試的實現步驟 1. 生成單元測試類 2. 添加單元測試代碼 簡單的斷言說明 2. Mybatis 單表增刪改查 2.1 單表查詢 2.2 參數占位符 ${} 和 #{} ${} 和 …

學點Selenium玩點新鮮~,讓分布式測試有更多玩法

前 言 我們都知道 Selenium 是一款在 Web 應用測試領域使用的自動化測試工具,而 Selenium Grid 是 Selenium 中的一大組件,通過它能夠實現分布式測試,能夠幫助團隊簡單快速在不同的環境中測試他們的 Web 應用。 分布式執行測試其實并不是一…

opencv,opengl,osg,vulkan,webgL,opencL,cuda

OpenCV OpenCV是一個基于BSD許可(開源)發行的跨平臺計算機視覺和機器學習軟件庫,可以運行在Linux、Windows、Android和Mac OS操作系統上。 它輕量級而且高效——由一系列 C 函數和少量 C 類構成,同時提供了Python、Ruby、MATLAB等…