目錄
一、kafka簡介
1.1、概述
1.2、消息系統介紹
1.3、點對點消息傳遞模式
1.4、發布-訂閱消息傳遞模式
二、kafka術語解釋
2.1、結構概述
2.2、broker
2.3、topic
2.4、producer
2.5、consumer
2.6、consumer group
2.7、leader
2.8、follower
2.9、partition
2.10、offset
2.11、replica
2.12、message
2.13、zookeeper
三、kafka架構
四、kafka的部署
4.1、軟件下載
4.1.1、jdk的安裝
4.1.2、zookeeper安裝
4.1.3、kafka的安裝
4.2、單機模式
4.3、集群部署
4.3.1、針對每一個節點的hosts文件添加節點的ip映射信息
4.3.2、時間同步
4.3.3、zookeeper配置?
4.3.4、創建對應的服務id
4.3.5、zoo.cfg參數解析
4.3.6、集群kafka配置
一、kafka簡介
1.1、概述
1.2、消息系統介紹
1.3、點對點消息傳遞模式

1.4、發布-訂閱消息傳遞模式

二、kafka術語解釋
2.1、結構概述
2.2、broker
2.3、topic
2.4、producer
2.5、consumer
2.6、consumer group
2.7、leader
2.8、follower
2.9、partition
2.10、offset

2.11、replica
2.12、message
2.13、zookeeper
三、kafka架構


四、kafka的部署
4.1、軟件下載
4.1.1、jdk的安裝
自帶JDK,這種JDK可以使用java -version檢查,如果使用javac就不行了,所以進行安裝sudo yum install java-1.8.0-openjdk-devel -y
4.1.2、zookeeper安裝
Apache ZooKeeper
選擇3.5.7版本
上傳服務器,安裝
解壓
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin zookeeper3.5.7
mv zookeeper3.5.7/ /opt創建軟鏈接
ln -s /opt/zookeeper3.5.7/ /opt/zookeeper配置環境變量
vim /etc/profile添加
export ZK_HOME=/opt/zookeeper
export PATH=$PATH:$ZK_HOME/binsource /etc/profile將Zookeeper提供的配置文件復制一份,復制成Zookeeper默認尋找的文件
cd /opt/zookeeper/conf
ls
cp zoo_sample.cfg zoo.cfg
cd ..創建數據存放目錄
mkdir data
chmod 755 /opt/zookeeper/data修改數據存放位置
cd conf/
vim zoo.cfg##修改以下配置
dataDir=/opt/zookeeper/data啟動 Zookeeper,Zookeeper的bin目錄下
cd ..
./bin/zkServer.sh start zoo.cfg
檢測zookeeper是否正常
jps # 看到控制臺成功輸出 QuorumPeerMain,表示啟動成功./bin/zkServer.sh status zoo.cfg ## Mode: standalone表示ok
4.1.3、kafka的安裝
?https://kafka.apache.org/downloads
選擇?kafka_2.12-3.8.0.tgz
?進行下載,Scala 2.12 和 Scala 2.13 主要是使用Scala編譯的版本不同,兩者皆可
上傳服務器,安裝
解壓
tar -zxvf kafka_2.12-2.7.0.tgz
mv kafka_2.12-2.7.0 /opt
cd /opt創建軟鏈接
ln -s /opt/kafka_2.12-2.7.0/ /opt/kafka
ls配置環境變量
vim /etc/profile添加
export KAFKA_HOME=/opt/kafka
export PATH=:$PATH:${KAFKA_HOME}source /etc/profile
4.2、單機模式
在Kafka的config目錄下存在相關的配置信息——本次我們只想讓Kafka快速啟動起來只關注server.properties文件即可cd ${KAFKA_HOME}/config
ls
#connect-console-sink.properties connect-file-source.properties consumer.properties server.properties
#connect-console-source.properties connect-log4j.properties kraft tools-log4j.properties
#connect-distributed.properties connect-mirror-maker.properties log4j.properties trogdor.conf
#connect-file-sink.properties connect-standalone.properties producer.properties zookeeper.properties打開配置文件,并主要注意以下幾個配置
vim server.propertiesbroker.id=0 #kafka服務節點的唯一標識,這里是單機不用修改
# listeners = PLAINTEXT://host1:9092 別忘了設置成自己的主機名
listeners=PLAINTEXT://host1:9092 #kafka底層監聽的服務地址,注意是使用主機名,不是ip。
# log.dirs 指定的目錄 kafka啟動時可以自動創建,因此不要忘了讓kafka可以有讀寫這個目錄的權限。
log.dirs=/opt/kafka/data ##kafka的分區以日志的形式存儲在集群中(其實就是broker數據存儲的目錄)log.retention.hours=168 #日志的留存策略,默認168小時也就是一周
# zookeeper 的連接地址 ,別忘了設置成自己的主機名,單機情況下可以使用 localhost
zookeeper.connect=host1:2181
啟動kafka
./bin/kafka-server-start.sh -daemon config/server.properties #后臺啟動kafka使用 jps 查看是否成功啟動kafka
jps
34843 QuorumPeerMain
21756 Jps
116076 Kafka
4.3、集群部署
4.3.1、針對每一個節點的hosts文件添加節點的ip映射信息
vim /etc/hosts
192.168.157.80 host1
192.168.157.81 host2
192.168.157.82 host3
4.3.2、時間同步
yum install ntp -y
ntpdate cn.pool.ntp.org | ntp[1-7].aliyun.com #兩個時鐘同步地址選擇一個就行
4.3.3、zookeeper配置?
vim /opt/zookeeper/conf/zoo.cfg
##額外添加以下配置
server.1=host1:2888:3888 #數據同步端口:領導選舉時服務器監聽的端口
server.2=host2:2888:3888
server.3=host3:2888:3888
4.3.4、創建對應的服務id
# host1
echo 1 > /opt/zookeeper/data/myid #在這個文件中寫入自己服務的id號
# host2
echo 2 > /opt/zookeeper/data/myid
# host3
echo 3 > /opt/zookeeper/data/myid
4.3.5、zoo.cfg參數解析
tickTime=2000: 通信心跳數,用于設置Zookeeper服務器與客戶端之間的心跳時間間隔,單位是毫秒。這個時間間隔是Zookeeper使用的基本時間單位,用于服務器之間或客戶端與服務器之間維持心跳的時間間隔。initLimit=10: LF初始通信時限,用于設置集群中的Follower跟隨者服務器與Leader領導者服務器之間啟動時能容忍的最多心跳數。如果在這個時限內(10個心跳時間)領導和根隨者沒有發出心跳通信,就視為失效的連接,領導和根隨者徹底斷開。syncLimit=5: LF同步通信時限,用于設置集群啟動后,Leader與Follower之間的最大響應時間單位。假如響應超過這個時間(syncLimit * tick Time -> 10秒),Leader就認為Follower已經死掉,會將Follower從服務器列表中刪除。dataDir: 數據文件目錄+數據持久化路徑,主要用于保存Zookeeper中的數據。dataLogDir: 日志文件目錄,用于存儲Zookeeper的日志文件。clientPort=2181: 客戶端連接端口,用于監聽客戶端連接的端口
4.3.6、集群kafka配置
?server.properties
配置文件
cd ${KAFKA_HOME}/config
vim server.propertiesbroker.id=0 #kafka服務節點的唯一標識
# listeners = PLAINTEXT://your.host.name:9092 別忘了設置成自己的主機名
listeners=PLAINTEXT://host1:9092 #集群中需要設置成每個節點自己的
# log.dirs 指定的目錄 kafka啟動時可以自動創建,因此不要忘了讓kafka可以有讀寫這個目錄的權限。
log.dirs=/opt/kafka/data ##kafka的分區以日志的形式存儲在集群中(其實就是broker數據存儲的目錄)
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168 #日志的留存策略,默認168小時也就是一周
# zookeeper 集群的連接地址
zookeeper.connect=host1:2181,host2:2181,host3:2181
其余配置:
##修改差異配置
cd ${KAFKA_HOME}/config
vim server.properties# host2節點
broker.id=1
listeners=PLAINTEXT://host2:9092
# host3節點
broker.id=2
listeners=PLAINTEXT://host3:9092
?kafka集群即可正常啟動
kafka其余命令./bin/kafka-server-stop.sh #關閉kafka
kafka-console-consumer.sh #消費命令
kafka-console-producer.sh #生產命令
kafka-consumer-groups.sh #查看消費者組,重置消費位點等
kafka-topics.sh #查詢topic狀態,新建,刪除,擴容
kafka-acls.sh #配置,查看kafka集群鑒權信息
kafka-configs.sh #查看,修改kafka配置
kafka-mirror-maker.sh #kafka集群間同步命令
kafka-preferred-replica-election.sh #重新選舉topic分區leader
kafka-producer-perf-test.sh #kafka自帶生產性能測試命令
kafka-reassign-partitions.sh #kafka數據重平衡命令
kafka-run-class.sh #kafka執行腳本