目錄
?一、前期準備
1、查看網卡:
2、配置靜態IP
3、設置主機名
4、配置IP與主機名映射
5、關閉防火墻
6、配置免密登錄
二、JDK的安裝
三、Zookeeper的安裝
四、Kafka的安裝
1、Kafka的下載安裝?
2、修改配置文件
4、分發文件
5、修改其他節點broker.id及advertised.listeners
6、修改zookeeper中的?myid
7、啟動ZK集群--啟動停止腳本
8、啟動Kafka集群--啟動停止腳本
四、Kafka命令行操作
1、主題命令行操作
1.1.查看操作主題命令參數
1.2.查看當前服務器中的所有topic
1.3.創建topic
1.4.查看first主題的詳情
1.5.修改分區數(注意:分區數只能增加,不能減少)
1.6.再次查看first主題的詳情
1.7.刪除topic(學生自己演示)
2、生產者命令行操作
2.1.查看操作生產者命令參數
2.2.發送消息
3、消費者命令行操作
3.1.查看操作消費者命令參數
3.2.消費消息
? ? ? ?Kafka 是基于 Java 的,必須先安裝 JDK。 ?此次我們以?kafka_2.12-3.3.1.tgz? 即 Kafka3.3.1 為案例,安裝jdk1.8。
?? 注意:Kafka 3.9.0 要求本地必須安裝 JDK 17 或以上版本。JDK 8 和 11 已不再被官方支持。
? ? ?
?一、前期準備
1、查看網卡:
2、配置靜態IP
vi /etc/sysconfig/network-scripts/ifcfg-ens32? ----? 根據自己網卡設置。
3、設置主機名
hostnamectl --static set-hostname ?主機名
例如:
hostnamectl --static set-hostname ?hadoop001
4、配置IP與主機名映射
vi /etc/hosts
5、關閉防火墻
systemctl stop firewalld
systemctl disable?firewalld
6、配置免密登錄
傳送門
二、JDK的安裝
傳送門
三、Zookeeper的安裝
傳送門
四、Kafka的安裝
1、Kafka的下載安裝?
1.1. 下載
?https://archive.apache.org/dist/kafka/3.3.1/
?下載 ?kafka_2.12-3.3.1.tgz 安裝包1.2 上傳
使用xshell上傳到指定安裝路徑此處是安裝路徑是 /opt/module
??
1.3 解壓重命名
tar -zxvf kafka_2.12-3.3.1.tgz
mv kafka_2.12-3.3.1?kafka
??
?
1.4 配置環境變量
vi ?/etc/profile
export JAVA_HOME=/opt/module/java
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin
1.5 加載環境變量
source ?/etc/profile
驗證環境變量是否生效:
env?| grep HOME
env?| grep PATH
2、修改配置文件
vi /opt/module/kafka/config/server.properties
#broker的全局唯一編號,不能重復,只能是數字。
broker.id=0
#broker對外暴露的IP和端口 (每個節點單獨配置)
advertised.listeners=PLAINTEXT://hadoop001:9092
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka運行日志(數據)存放的路徑,路徑不需要提前創建,kafka自動幫你創建,可以配置多個磁盤路徑,路徑與路徑之間可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
# 每個topic創建時的副本數,默認時1個副本
offsets.topic.replication.factor=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#每個segment文件的大小,默認最大1G
log.segment.bytes=1073741824
#?檢查過期數據的時間,默認5分鐘檢查一次是否數據過期
log.retention.check.interval.ms=300000
#配置連接Zookeeper集群地址(在zk根目錄下創建/kafka,方便管理)
zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka
4、分發文件
scp -r? /etc/profile? root@hadoop002:/etc/profile
scp -r? /etc/profile? root@hadoop003:/etc/profile
scp -r? /opt/module/java?root@hadoop002:/opt/module/java?
scp -r??/opt/module/java?root@hadoop003:/opt/module/java?
scp -r? /opt/module/zookeeper? root@hadoop002:/opt/module/zookeeper
scp -r? /opt/module/zookeeper? root@hadoop003:/opt/module/zookeeper
scp -r? /opt/module/kafka root@hadoop002:/opt/module/kafka
scp -r? /opt/module/kafka root@hadoop003:/opt/module/kafka
讓三臺機器文件生效
ssh hadoop001 "source /etc/profile"
ssh hadoop002?"source /etc/profile"
ssh hadoop003 "source /etc/profile"
5、修改其他節點broker.id及advertised.listeners
修改?hadoop002 的
vi /opt/module/kafka/config/server.properties
broker.id=1
#broker對外暴露的IP和端口 (每個節點單獨配置)
advertised.listeners=PLAINTEXT://hadoop002:9092
修改?hadoop003 的
vi /opt/module/kafka/config/server.properties
broker.id=2
#broker對外暴露的IP和端口 (每個節點單獨配置)
advertised.listeners=PLAINTEXT://hadoop003:9092
或者使用下面腳本
ssh hadoop002?"sed -i 's/^broker.id=0$/broker.id=1/' /opt/module/kafka/config/server.properties"
ssh hadoop003 "sed -i 's/^broker.id=0$/broker.id=2/' /opt/module/kafka/config/server.properties"ssh hadoop002 "sed -i 's/^advertised.listeners=PLAINTEXT:\/\/hadoop001:9092$/advertised.listeners=PLAINTEXT:\/\/hadoop002:9092/' /opt/module/kafka/config/server.properties"
ssh hadoop003 "sed -i 's/^advertised.listeners=PLAINTEXT:\/\/hadoop001:9092$/advertised.listeners=PLAINTEXT:\/\/hadoop003:9092/' /opt/module/kafka/config/server.properties"查看效果
ssh hadoop001 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"
ssh hadoop002 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"
ssh hadoop003 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"
6、修改zookeeper中的?myid
修改 hadoop002 的 /opt/module/zookeeper/zkData/myid 內容為 2
vi /opt/module/zookeeper/zkData/myid
2
修改 hadoop003 的 /opt/module/zookeeper/zkData/myid 內容為 3
vi /opt/module/zookeeper/zkData/myid
3
或者使用下面腳本
ssh hadoop002 "echo '2' > /opt/module/zookeeper/zkData/myid"
ssh hadoop003 "echo '3' > /opt/module/zookeeper/zkData/myid"查看效果
ssh hadoop001 "cat /opt/module/zookeeper/zkData/myid"
ssh hadoop002 "cat /opt/module/zookeeper/zkData/myid"
ssh hadoop003 "cat /opt/module/zookeeper/zkData/myid"
7、啟動ZK集群--啟動停止腳本
touch /usr/bin/zkall.sh
chmod 777 /usr/bin/zkall.sh
vi?/usr/bin/zkall.sh
#!/bin/bashcase $1 in "start"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 啟動 ------------ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh start"done };; "stop"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 停止 ------------ ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh stop"done };; "status"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 狀態 ------------ ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh status"done };; esac
啟動:/usr/bin/zkall.sh start
停止:/usr/bin/zkall.sh stop
狀態:/usr/bin/zkall.sh status
8、啟動Kafka集群--啟動停止腳本
touch /usr/bin/kfall.sh
chmod 777 /usr/bin/kfall.sh
vi?/usr/bin/kfall.sh
#! /bin/bashcase $1 in "start"){for i in hadoop001 hadoop002 hadoop003doecho " --------啟動 $i Kafka-------"ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done };; "stop"){for i in hadoop001 hadoop002 hadoop003doecho " --------停止 $i Kafka-------"ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-stop.sh "done };; esac
啟動:/usr/bin/kfall.sh start
停止:/usr/bin/kfall.sh stop
四、Kafka命令行操作
1、主題命令行操作
1.1.查看操作主題命令參數
? ? ? ??bin/kafka-topics.sh
參數 | 描述 |
--bootstrap-server <String: server toconnect to> | 連接的Kafka?Broker主機名稱和端口號。 |
--topic <String: topic> | 操作的topic名稱。 |
--create | 創建主題。 |
--delete | 刪除主題。 |
--alter | 修改主題。 |
--list | 查看所有主題。 |
--describe | 查看主題詳細描述。 |
--partitions <Integer: # of partitions> | 設置分區數。 |
--replication-factor<Integer: replication factor> | 設置分區副本。 |
--config <String: name=value> | 更新系統默認的配置。 |
1.2.查看當前服務器中的所有topic
在任意一臺安裝zk的機器上都可以啟動客戶端
cd /opt/module/kafka/
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --list
1.3.創建topic
cd /opt/module/kafka/
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --create?--partitions 1 --replication-factor 3 --topic first
選項說明:
--topic 定義topic名
--replication-factor ?定義副本數
--partitions ?定義分區數
1.4.查看first主題的詳情
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --describe?--topic first
1.5.修改分區數(注意:分區數只能增加,不能減少)
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --alter?--topic first --partitions 3
1.6.再次查看first主題的詳情
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --describe?--topic first
1.7.刪除topic(學生自己演示)
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --delete?--topic first
2、生產者命令行操作
2.1.查看操作生產者命令參數
bin/kafka-console-producer.sh
參數 | 描述 |
--bootstrap-server <String: server toconnect to> | 連接的Kafka?Broker主機名稱和端口號。 |
--topic <String: topic> | 操作的topic名稱。 |
2.2.發送消息
bin/kafka-console-producer.sh --bootstrap-server hadoop001:9092 --topic first
>hello world
>hello beijing
>hello china
>welecome to china
3、消費者命令行操作
3.1.查看操作消費者命令參數
bin/kafka-console-consumer.sh
參數 | 描述 |
--bootstrap-server <String: server toconnect to> | 連接的Kafka?Broker主機名稱和端口號。 |
--topic <String: topic> | 操作的topic名稱。 |
--from-beginning | 從頭開始消費。 |
--group <String: consumer group id> | 指定消費者組名稱。 |
3.2.消費消息
(1)消費first主題中的數據
bin/kafka-console-consumer.sh?--bootstrap-server hadoop001:9092 --topic first
(2)把主題中所有的數據都讀取出來(包括歷史數據)
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --from-beginning?--topic first