Kafka介紹
ChatGPT對于Apache Kafka的介紹:
Apache Kafka是一個分布式流處理平臺,最初由LinkedIn開發并于2011年開源。它主要用于解決大規模數據的實時流式處理和數據管道問題。
Kafka是一個分布式的發布-訂閱消息系統,可以快速地處理高吞吐量的數據流,并將數據實時地分發到多個消費者中。Kafka消息系統由多個broker(服務器)組成,這些broker可以在多個數據中心之間分布式部署,以提供高可用性和容錯性。
Kafka的基本架構由生產者、消費者和主題(topic)組成。生產者可以將數據發布到指定的主題,而消費者可以訂閱這些主題并消費其中的數據。同時,Kafka還支持數據流的處理和轉換,可以在管道中通過Kafka Streams API進行流式計算,例如過濾、轉換、聚合等。
Kafka使用高效的數據存儲和管理技術,能夠輕松地處理TB級別的數據量。其優點包括高吞吐量、低延遲、可擴展性、持久性和容錯性等。
Kafka在企業級應用中被廣泛應用,包括實時流處理、日志聚合、監控和數據分析等方面。同時,Kafka還可以與其他大數據工具集成,如Hadoop、Spark和Storm等,構建一個完整的數據處理生態系統。
MQ的作用
MQ:MessageQueue,消息隊列。 隊列,是一種FIFO 先進先出的數據結構。消息則是跨進程傳遞的數據。一個典型的MQ系統,會將消息消息由生產者發送到MQ進行排隊,然后根據一定的順序交由消息的消費者進行處理。
主要作用:
-
異步
異步能提高系統的響應速度、吞吐量。
-
解耦
1、服務之間進行解耦,才可以減少服務之間的影響。提高系統整體的穩定性以及可擴展性。
2、另外,解耦后可以實現數據分發。生產者發送一個消息后,可以由一個或者多個消費者進行消費,并且消費者的增加或者減少對生產者沒有影響。
-
削峰
作用:以穩定的系統資源應對突發的流量沖擊。
為什么要用Kafka
典型日志聚合的應用場景:
業務場景決定了產品的特點。
1、數據吞吐量很大: 需要能夠快速收集各個渠道的海量日志
2、集群容錯性高:允許集群中少量節點崩潰
3、功能不需要太復雜:Kafka的設計目標是高吞吐、低延遲和可擴展,主要關注消息傳遞而不是消息處理。所以,Kafka并沒有支持死信隊列、順序消息等高級功能。
4、允許少量數據丟失:Kafka本身也在不斷優化數據安全問題,目前基本上可以認為Kafka可以做到不會丟數據。
Kafka快速上手
實驗環境
準備三臺CentOS7的虛擬機,預備搭建三臺機器的集群。分別配置機器名 worker1,worker2,worker3。
vi /etc/hosts192.168.146.128 worker1
192.168.146.129 worker2
192.168.146.130 worker3
關閉防火墻(實驗環境建議關閉)
firewall-cmd --state 查看防火墻狀態
systemctl stop firewalld.service 關閉防火墻
補充:虛擬機centos7遇到問題,bash: jps: 未找到命令... 的解決方案
yum list *openjdk-devel*
#安裝適合自己的版本
yum install java-1.8.0-openjdk-devel.x86_64
#安裝過程有幾個同意步驟,輸入y,安裝完成測試jps ok!
下載kafka地址:Apache Kafka ,選擇kafka_2.13-3.4.0.tgz進行下載。
下載Zookeeper地址 Apache ZooKeeper ,這里選擇比較新的3.6.1版本。
下載完成后,將這兩個工具包上傳到服務器上,解壓后,分別放到/app/kafka和/app/zk目錄下。并將部署目錄下的bin目錄路徑配置到path環境變量中。
環境變量/etc/profile
最終配置:
export ZK_HOME=/app/zk/apache-zookeeper-3.6.4-bin
export KAFKA_HOME=/app/kafka/kafka_2.13-3.4.0
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export PATH=$KAFKA_HOME/bin:$ZK_HOME/bin:$JAVA_HOME/bin:$PATH
export CLASSPATH=:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
單機服務體驗
1、啟動Kafka之前需要先啟動Zookeeper。
#解壓命令
tar zxvf apache-zookeeper-3.6.4-bin.tar.gz
tar zxvf kafka_2.13-3.4.0.tgz#這里用Kafka自帶的Zookeeper啟動腳本
cd kafka_2.13-3.4.0/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
通過jps指令看到一個QuorumPeerMain進程,確定服務啟動成功。zk默認啟動在2181端口
啟動遇到問題可以查看nohup.out日志文件,注意腳本的執行權限
2、啟動Kafka
nohup bin/kafka-server-start.sh config/server.properties &
啟動完成后,使用jps指令,看到一個kafka進程,確定服務啟動成功。服務默認9092端口
3、簡單收發消息
Kafka的基礎工作機制:消息發送者將消息發送到kafka上指定的topic,消息消費者從指定的topic上消費消息。
簡單收到命令:
#創建Topic
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
#查看Topic
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
#啟動一個消息發送者端,往一個名為test的Topic發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#啟動一個消息接收者端,接收名為test的Topic消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
生產者端示例:
消費者端示例:
注意:消費者啟動命令執行后有幾秒鐘延遲(啟動中接收不到消息),默認處理啟動成功后接收到的消息
4、其他消費模式
指定消費進度
#通過--from-beginning消費之前發的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
#指定從哪一條消息開始消費,offset表示索引/偏移量,索引4也就是第五條消息開始,0號partition
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 4 --topic test
分組消費
kafka中的同一條消息,只能被同一個消費者組下的某一個消費者消費。而不屬于同一個消費者組的其他消費者,也可以消費到這一條消息。通過--consumer-property group.id=testGroup
指定消費者組
#兩個消費者實例屬于同一個消費者組
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
#這個消費者實例屬于不同的消費者組
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test
查看消費者組的偏移量
#查看消費者組的情況,包括消費進度。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup···命令輸出結果示例
Consumer group 'testGroup' has no active members. #沒有活躍消費者GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testGroup test 0 20 20 0 ... ... ...
···描述:
PARTITION 分區
CURRENT-OFFSET 當前消費進度
LOG-END-OFFSET 日志種最大消息進度
LAG 未消費消息數
雖然業務上是通過Topic來分發消息的,但是實際上,消息是保存在Partition這樣一個數據結構上
理解Kakfa的消息傳遞機制
Kafka的消息發送者和消息消費者通過Topic這樣一個邏輯概念來進行業務溝通。但是實際上,所有的消息是存在服務端的Partition這樣一個數據結構當中的。
-
客戶端Client: 包括消息生產者和消息消費者。
-
消費者組:每個消費者可以指定一個所屬的消費者組,相同消費者組的消費者共同構成一個邏輯消費者組。每一個消息會被多個感興趣的消費者組消費,但是在每一個消費者組內部,一個消息只會被消費一次。
-
服務端Broker:一個Kafka服務器就是一個Broker。
-
話題Topic:這是一個邏輯概念,一個Topic被認為是業務含義相同的一組消息。客戶端都通過綁定Topic來生產或者消費自己感興趣的話題。
-
分區Partition:Topic只是一個邏輯概念,而Partition就是實際存儲消息的組件。每個Partiton就是一個queue隊列結構。所有消息以FIFO先進先出的順序保存在這些Partition分區中。
Kafka集群服務
為什么要用集群?
單機服務下,Kafka已經具備了非常高的性能。TPS能夠達到百萬級別。但是,在實際工作中使用時,單機搭建的Kafka會有很大的局限性。
一方面:消息太多,需要分開保存。Kafka是面向海量消息設計的,一個Topic下的消息會非常多,單機服務很難存得下來。這些消息就需要分成不同的Partition,分布到多個不同的Broker上。這樣每個Broker就只需要保存一部分數據。這些分區的個數就稱為分區數。
另一方面:服務不穩定,數據容易丟失。單機服務下,如果服務崩潰,數據就丟失了。為了保證數據安全,就需要給每個Partition配置一個或多個備份,保證數據不丟失。Kafka的集群模式下,每個Partition都有一個或多個備份。Kafka會通過一個統一的Zookeeper集群作為選舉中心,給每個Partition選舉出一個主節點Leader,其他節點就是從節點Follower。主節點負責響應客戶端的具體業務請求,并保存消息。而從節點則負責同步主節點的數據。當主節點發生故障時,Kafka會選舉出一個從節點成為新的主節點。
最后:Kafka集群中的這些Broker信息,包括Partition的選舉信息,都會保存在額外部署的Zookeeper集群當中,這樣,kafka集群就不會因為某一些Broker服務崩潰而中斷。
Kafka集群架構:
1、部署Zookeeper集群
Zookeeper是一種多數同意的選舉機制,允許集群中少半數節點出現故障。因此,在搭建集群時,通常采用奇數節點,這樣可以最大化集群的高可用特性。
先將下載下來的Zookeeper解壓到/app/zk目錄。
然后進入conf目錄,修改配置文件。在conf目錄中,提供了一個zoo_sample.cfg示例文件。只需要將這個文件復制一份zoo.cfg,并修改其中的關鍵配置:
#Zookeeper的本地數據目錄,默認是/tmp/zookeeper。這是Linux的臨時目錄,隨時會被刪掉。
dataDir=/app/zk/data
#Zookeeper的服務端口
clientPort=2181
#集群節點配置
server.1=192.168.146.128:2888:3888
server.2=192.168.146.129:2888:3888
server.3=192.168.146.130:2888:3888
clientPort 2181是對客戶端開放的服務端口。
集群配置部分, server.x這個x就是節點在集群中的myid。后面的2888端口是集群內部數據傳輸使用的端口。3888是集群內部進行選舉使用的端口。
zookeeper啟動時data目錄會自動創建,但是需要手動在data目錄下面添加一個myid文件
#啟動服務
bin/zkServer.sh --config conf start
#查看服務狀態
bin/zkServer.sh status
2、部署Kafka集群
kafka服務并不需要進行選舉,因此也沒有奇數臺服務的建議。
首先將Kafka解壓到/app/kafka目錄下,然后進入config目錄,修改server.properties。重點關注的配置:
#broker 的全局唯一編號,不能重復,只能是數字。
broker.id=0
#數據文件地址。同樣默認是給的/tmp目錄。
log.dirs=/app/kafka/logs
#默認的每個Topic的分區數
num.partitions=1
#zookeeper的服務地址
zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
多個Kafka服務注冊到同一個zookeeper集群上的節點,會自動組成集群。
server.properties文件中比較重要的核心配置:
Property | Default | Description |
---|---|---|
broker.id | 0 | broker的“名字”,你可以選擇任意你喜歡的數字作為id,只要id是唯每個broker都可以用一個唯一的非負整數id進行標識;這個id可以作為一的即可。 |
log.dirs | /tmp/kafka-logs | kafka存放數據的路徑。這個路徑并不是唯一的,可以是多個,路徑之間只需要使用逗號分隔即可;每當創建新partition時,都會選擇在包含最少partitions的路徑下進行。 |
listeners | PLAINTEXT://127.0.0.1:9092 | server接受客戶端連接的端口,ip配置kafka本機ip即可 |
zookeeper.connect | localhost:2181 | zookeeper連接地址。hostname:port。如果是Zookeeper集群,用逗號連接。 |
log.retention.hours | 168 | 每個日志文件刪除之前保存的時間。 |
num.partitions | 1 | 創建topic的默認分區數 |
default.replication.factor | 1 | 自動創建topic的默認副本數量 |
min.insync.replicas | 1 | 當producer設置acks為-1時,min.insync.replicas指定replicas的最小數目(必須確認每一個repica的寫數據都是成功的),如果這個數目沒有達到,producer發送消息會產生異常 |
delete.topic.enable | false | 是否允許刪除主題 |
啟動服務:
bin/kafka-server-start.sh -daemon config/server.properties
理解服務端的Topic、Partition和Broker
# 創建一個分布式的Topic
bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic disTopic
# 列出所有的Topic
bin/kafka-topics.sh --bootstrap-server worker1:9092 --list
# 查看列表情況
bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic disTopic
這里硬件資源有限,只啟動了兩臺(上面截圖)
1、--create創建集群,可以指定一些補充的參數。大部分的參數都可以在配置文件中指定默認值。
-
partitons參數表示分區數,這個Topic下的消息會分別存入這些不同的分區中。
-
replication-factor表示每個分區有幾個備份。
2、--describe查看Topic信息。
-
partiton參數列出了四個partition,后面帶有分區編號,用來標識這些分區。
-
Leader表示這一組partiton中的Leader節點是哪一個。這個Leader節點就是負責響應客戶端請求的主節點。從這里可以看到,Kafka中的每一個Partition都會分配Leader,也就是說每個Partition都有不同的節點來負責響應客戶端的請求。這樣就可以將客戶端的請求做到盡量的分散。
-
Replicas參數表示這個partition的多個備份是分配在哪些Broker上的。也稱為AR。這里的0,1就對應配置集群時指定的broker.id。但是,Replicas列出的只是一個邏輯上的分配情況,并不關心數據實際是不是按照這個分配。甚至有些節點服務掛了之后,Replicas中也依然會列出節點的ID。
-
ISR參數表示partition的實際分配情況。他是AR的一個子集,只列出那些當前還存活,能夠正常同步數據的那些Broker節點。
之前在配置Kafka集群時,指定了一個log.dirs屬性,指向了一個服務器上的日志目錄。進入這個目錄,就能看到每個Broker的實際數據承載情況。
Kafka當中,Topic是一個數據集合的邏輯單元。同一個Topic下的數據,實際上是存儲在Partition分區中的,Partition就是數據存儲的物理單元。而Broker是Partition的物理載體,這些Partition分區會盡量均勻的分配到不同的Broker機器上。offset,就是每個消息在partition上的偏移量。
Kafka為何要這樣來設計Topic、Partition和Broker的關系呢?
1、Kafka設計需要支持海量的數據,而這樣龐大的數據量,一個Broker是存不下的。那就拆分成多個Partition,每個Broker只存一部分數據。這樣極大的擴展了集群的吞吐量。
2、每個Partition保留了一部分的消息副本,如果放到一個Broker上,就容易出現單點故障。所以就給每個Partition設計Follower節點,進行數據備份,從而保證數據安全。另外,多備份的Partition設計也提高了讀取消息時的并發度。
3、在同一個Topic的多個Partition中,會產生一個Partition作為Leader。這個Leader Partition會負責響應客戶端的請求,并將數據往其他Partition分發。
Kafka集群的整體結構
1、Topic是一個邏輯概念,Producer和Consumer通過Topic進行業務溝通。
2、Topic并不存儲數據,Topic下的數據分為多組Partition,盡量平均的分散到各個Broker上。每組Partition包含Topic下一部分的消息。每組Partition包含一個Leader Partition以及若干個Follower Partition進行備份,每組Partition的個數稱為備份因子 replica factor。
3、Producer將消息發送到對應的Partition上,然后Consumer通過Partition上的Offset偏移量,記錄自己所屬消費者組Group在當前Partition上消費消息的進度。
4、Producer發送給一個Topic的消息,會由Kafka推送給所有訂閱了這個Topic的消費者組進行處理。但是在每個消費者組內部,只會有一個消費者實例處理這一條消息。
5、最后,Kafka的Broker通過Zookeeper組成集群。然后在這些Broker中,需要選舉產生一個擔任Controller角色的Broker。這個Controller的主要任務就是負責Topic的分配以及后續管理工作。在我們實驗的集群中,這個Controller實際上是通過ZooKeeper產生的。
Kraft集群--了解
Kraft集群簡介
Kraft是Kafka從2.8.0版本開始支持的一種新的集群架構方式。其目的主要是為了擺脫Kafka對Zookeeper的依賴。因為以往基于Zookeeper搭建的集群,增加了Kafka演進與運維的難度,逐漸開始成為Kakfa擁抱云原生的一種障礙。使用Kraft集群后,Kafka集群就不再需要依賴Zookeeper,將之前基于Zookeeper管理的集群數據,轉為由Kafka集群自己管理。
雖然官方規劃會在未來完全使用Kraft模式代替現有的Zookeeper模式,但是目前來看,Kraft集群還是沒有Zookeeper集群穩定,所以現在大部分企業還是在使用Zookeeper集群。
2022年10月3日發布的3.3.1版本才開始將KRaft標注為準備用于生產。KIP-833: Mark KRaft as Production Ready。 這離大規模使用還有比較長的距離。
實際上,Kafka擺脫Zookeeper是一個很長的過程。在之前的版本迭代過程中,Kafka就已經在逐步減少Zookeeper中的數據。在Kafka的bin目錄下的大量腳本,早期都是要指定zookeeper地址,后續長期版本更迭過程中,逐步改為通過--bootstrap-server參數指定Kafka服務地址。到目前版本,基本所有腳本都已經拋棄了--zookeeper參數了。
傳統的Kafka集群,會將每個節點的狀態信息統一保存在Zookeeper中,并通過Zookeeper動態選舉產生一個Controller節點,通過Controller節點來管理Kafka集群,比如觸發Partition的選舉。而在Kraft集群中,會固定配置幾臺Broker節點來共同擔任Controller的角色,各組Partition的Leader節點就會由這些Controller選舉產生。原本保存在Zookeeper中的元數據也轉而保存到Controller節點中。
Raft協議是目前進行去中心化集群管理的一種常見算法,類似于之前的Paxos協議,是一種基于多數同意,從而產生集群共識的分布式算法。Kraft則是Kafka基于Raft協議進行的定制算法。
新的Kraft集群相比傳統基于Zookeeper的集群,有一些很明顯的好處:
-
Kafka可以不依賴于外部框架獨立運行。這樣減少Zookeeper性能抖動對Kafka集群性能的影響,同時Kafka產品的版本迭代也更自由。
-
Controller不再由Zookeeper動態選舉產生,而是由配置文件進行固定。這樣比較適合配合一些高可用工具來保持集群的穩定性。
-
Zookeeper的產品特性決定了他不適合存儲大量的數據,這對Kafka的集群規模(確切的說應該是Partition規模)是極大的限制。擺脫Zookeeper后,集群擴展時元數據的讀寫能力得到增強。
不過,由于分布式算法的復雜性。Kraft集群和同樣基于Raft協議定制的RocketMQ的Dledger集群一樣,都還不太穩定,在真實企業開發中,用得相對還是比較少。
配置Kraft集群
在Kafka的config目錄下,提供了一個kraft的文件夾,在這里面就是Kraft協議的參考配置文件。在這個文件夾中有三個配置文件,broker.properties,controller.properties,server.properties,分別給出了Kraft中三種不同角色的示例配置。
-
broker.properties:數據節點
-
controller.properties:Controller控制節點
-
server.properties:即可以是數據節點,又可以是Controller控制節點。
這里同樣列出幾個比較關鍵的配置項,按照自己的環境定制。
#配置當前節點的角色。Controller相當于Zookeeper的功能,負責集群管理。Broker提供具體的消息轉發服務。
process.roles=broker,controller
#配置當前節點的id。與普通集群一樣,要求集群內每個節點的ID不能重復。
node.id=1
#配置集群的投票節點。其中@前面的是節點的id,后面是節點的地址和端口,這個端口跟客戶端訪問的端口是不一樣的。通常將集群內的所有Controllor節點都配置進去。
controller.quorum.voters=1@worker1:9093,2@worker2:9093,3@worker3:9093
#Broker對客戶端暴露的服務地址。基于PLAINTEXT協議。
advertised.listeners=PLAINTEXT://worker1:9092
#Controller服務協議的別名。默認就是CONTROLLER
controller.listener.names=CONTROLLER
#配置監聽服務。不同的服務可以綁定不同的接口。這種配置方式在端口前面是省略了一個主機IP的,主機IP默認是使用的java.net.InetAddress.getCanonicalHostName()
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#數據文件地址。默認配置在/tmp目錄下。
log.dirs=/app/kafka/kraft-log
#topic默認的partition分區數。
num.partitions=2
將配置文件分發,并修改每個服務器上的node.id屬性和advertised.listeners屬性。
由于Kafka的Kraft集群對數據格式有另外的要求,所以在啟動Kraft集群前,還需要對日志目錄進行格式化。
[root@worker1 kafka_2.13-3.4.0]# bin/kafka-storage.sh random-uuid
vRqZXTz0QT6FJKmeyEU7Yw
[root@worker1 kafka_2.13-3.4.0]# bin/kafka-storage.sh format -t vRqZXTz0QT6FJKmeyEU7Yw -c config/kraft/server.properties
Formatting /tmp/kraft-combined-logs with metadata.version 3.4-IV0.
-t 表示集群ID,三個服務器上可以使用同一個集群ID。
接下來就可以指定配置文件,啟動Kafka的服務了。 例如,在Worker1上,啟動Broker和Controller服務。
bin/kafka-server-start.sh -daemon config/kraft/server.properties
等三個服務都啟動完成后,就可以像普通集群一樣去創建Topic,并維護Topic的信息了。