ufw disabled #關閉防火墻 或者 放開指定端口
vim /etc/hosts #配置ip host映射關系
10.3.1.96 ? ?node1
10.3.1.97 ? ?node2
#1.所有機器安裝jdk
apt install openjdk-8-jdk -y
java -version #export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_202
#2.部署zookeeper集群
cd /usr/local
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz
mv apache-zookeeper-3.8.4-bin zookeeper-3.8.4cd /usr/local/zookeeper-3.8.4/conf && cp zoo_sample.cfg zoo.cfg
mkdir -p /usr/local/zookeeper-3.8.4/logs
mkdir -p /usr/local/zookeeper-3.8.4/datavim zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-3.8.4/data
dataLogDir=/usr/local/zookeeper-3.8.4/logs
clientPort=2181
#客戶端訪問zk的端口
server.1=node1:2888:3888
server.2=node2:2888:3888
#說明:2888為組成zookeeper服務器之間的通信端口,3888為用來選舉leader的端口,server后面的數字與后面的myid相對應
#注意前后不要有空格 否則報錯Invalid config, exiting abnormally 可以通過:set list顯示隱藏字符來處理vim /etc/profile
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.8.4
export PATH=$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile
#node1節點執行
cd /usr/local/zookeeper-3.8.4/data && echo "1" > myid
#node2節點執行
cd /usr/local/zookeeper-3.8.4/data && echo "2" > myid
#所有機器啟動zk
cd /usr/local/zookeeper-3.8.4/bin
./zkServer.sh start
./zkServer.sh status
#3.部署kafka 并修改配置文件(不同節點參數需修改)
cd /usr/local
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz
tar -zxvf kafka_2.12-3.7.0.tgz
mkdir -p /usr/local/kafka_2.12-3.7.0/logs
#node1進行如下操作。? node2同理:只是把id改為2 node1改為node2即可
vim /usr/local/kafka_2.12-3.7.0/config/server.properties
#id不重復 修改下面三行
broker.id=1
listeners=PLAINTEXT://node1:9092
advertised.listeners=PLAINTEXT://node1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#建議開啟 新增此行
delete.topic.enable=true
#修改此行
log.dirs=/usr/local/kafka_2.12-3.7.0/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#修改此行
zookeeper.connect=node1:2181,node2:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0vim /usr/local/kafka_2.12-3.7.0/config/consumer.properties
bootstrap.servers=node1:9092
group.id=test-consumer-groupvim /usr/local/kafka_2.12-3.7.0/config/producer.properties
bootstrap.servers=node1:9092
compression.type=none
#所有機器啟動
cd /usr/local/kafka_2.12-3.7.0/bin
./kafka-server-start.sh -daemon ../config/server.properties
#配置systemctl 開機自啟動 zookeeper和 kafka
vim /lib/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/local/zookeeper-3.8.4/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper-3.8.4/bin/zkServer.sh stop
Restart=on-abnormal[Install]
WantedBy=multi-user.targetvim /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service[Service]
Type=forking
User=root
Group=root
Environment="JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64"
ExecStart=/usr/local/kafka_2.12-3.7.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.7.0/config/server.properties
ExecStop=/usr/local/kafka_2.12-3.7.0/bin/kafka-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target
#which java; ?#然后ls -l 直到沒有軟鏈則為java安裝路徑
systemctl daemon-reload #刷新配置
systemctl start zookeeper.service
systemctl enable zookeeper.service
systemctl start kafka.service
systemctl enable kafka.service
注意:systemctl啟動若不成功,可以修改Type 然后再嘗試
命令行測試
#測試創建topic
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 3 --partitions 3 --topic test #老版本 --zookeeper node1:2181
./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic test
./kafka-topics.sh -list --bootstrap-server node1:9092
./kafka-console-producer.sh --broker-list node1:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server node:9092 --topic test --from-beginning
#舊版Kafka,用的是zookeeper地址而非bootstrap.servers
#主要有兩個目的/動機:一是優化元數據管理,原來的zk方案,極端情況下可能會造成數據不一致;二是簡化部署和配置。
#bootstrap.servers只是用于客戶端啟動(bootstrap)的時候有一個可以熱啟動的一個連接者,一旦啟動完畢客戶端就應該可以得知當前集群的所有節點的信息,日后集群擴展的時候客戶端也能夠自動實時的得到新節點的信息,即使bootstrap.servers里面的掛掉了也應該是能正常運行的,除非節點掛掉后客戶端也重啟了
#服務器是Kafka,生產者(發送數據的)和消費者(接收數據的)是客戶端
#刪除topic
#1.kafka啟動之前,在server.properties配置delete.topic.enable=true
#2.執行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manager集群管理工具刪除
#注意:如果沒有設置 delete.topic.enable=true,則調用kafka 的delete命令無法真正將topic刪除,而是顯示(marked for deletion)