0.前置說明
由于我們手里只有一臺Linux機器,所以我們實現的是簡單的單機模擬的集群部署,通過修改配置文件,啟動 3 個 kafka 時用到 3 個不同的端口(9091,9092,9093)。
1.安裝Java11
- 切換到你的
工作目錄
下執行:
yum install java-11-openjdk -y
- 添加環境變量;
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.23.0.9-3.tl3.x86_64
export PATH=$PATH:$JAVA_HOME/bin
- 讓你的環境變量生效;
source /etc/profile
- 測試是否安裝成功;
java -version
openjdk version "11.0.23" 2024-04-16 LTS
OpenJDK Runtime Environment (Red_Hat-11.0.23.0.9-2) (build 11.0.23+9-LTS)
OpenJDK 64-Bit Server VM (Red_Hat-11.0.23.0.9-2) (build 11.0.23+9-LTS, mixed mode, sharing)
2.集群部署
- 在你的工作目錄下新建目錄
cluster
(集群);
mkdir cluster
- 進入
cluster
目錄下,下載 kafka 安裝包kafka-3.6.1-src.tgz
并解壓。
rz -E # 上傳本地安裝包
tar -zxf kafka_2.12-3.6.1.tgz
- 改名為
kafka
;
mv kafka_2.12-3.6.1 kafka
2.1 安裝ZooKeeper
- 修改文件夾名為
zookeeper
;
因為 kafka 內置了 ZooKeeper 軟件,所以此處將解壓縮的文件作為 ZooKeeper 軟件使用。
mv kafka/ zookeeper
- 修改
config/zookeeper.properties
文件;
cd zookeeper/
vim config/zookeeper.properties
- 修改
dataDir
,zookeeper 數據的存儲文件。
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/root/cluster/zookeeper-data/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
2.2 安裝Kafka
- 將上面解壓縮的文件復制一份,改名為
broke-1
;
mv kafka_2.12-3.6.1/ broker-1
ll
total 8
drwxr-xr-x 7 root root 4096 Nov 24 17:43 broker-1
drwxr-xr-x 7 root root 4096 Nov 24 17:43 zookeeper
- 修改
config/server.properties
配置文件;
vim broker-1/config/server.properties
############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=1 # kafka 節點數字標識,集群內具有唯一性
#......############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9091
#.......
############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/root/cluster/broker-data/broker-1 # 監聽器 9091 為本地端口,如果沖突,請重新指定
# .......
############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181 # 數據文件路徑,如果不存在,會自動創建# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000 # ZooKeeper 軟件連接地址,2181 為默認的ZK 端口號 /kafka 為ZK 的管理節點
- 同樣的步驟,復制一份
broker-2
與broker-3
,并修改配置文件。
# broker-2
############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=2 # kafka 節點數字標識,集群內具有唯一性
#......############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
#.......
############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/root/cluster/broker-data/broker-2 # 監聽器 9091 為本地端口,如果沖突,請重新指定
# .......
############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181 # 數據文件路徑,如果不存在,會自動創建# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000 # ZooKeeper 軟件連接地址,2181 為默認的ZK 端口號 /kafka 為ZK 的管理節點
# broker-3
############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=3 # kafka 節點數字標識,集群內具有唯一性
#......############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093
#.......
############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/root/cluster/broker-data/broker-3 # 監聽器 9091 為本地端口,如果沖突,請重新指定
# .......
############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181 # 數據文件路徑,如果不存在,會自動創建# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000 # ZooKeeper 軟件連接地址,2181 為默認的ZK 端口號 /kafka 為ZK 的管理節點
2.3 封裝啟動腳本
因為 Kafka 啟動前,必須先啟動 ZooKeeper,并且Kafka集群中有多個節點需要啟動,所以啟動過程比較繁瑣,這里我們將啟動的指令進行封裝。
- 在
zookeeper
文件夾下創建zk.sh
批處理文件;
cd zookeeper
vim zk.sh## zk.sh
./bin/zookeeper-server-start.sh config/zookeeper.propertieschmod +x zk.sh
- 在
broker-1
,broker-2
,broker-3
文件夾下分別創建kfk.sh
批處理文件;
cd broker-1
vim kfk.sh## kfk.sh
./bin/kafka-server-start.sh config/server.propertieschmod +x kfk.sh
- 在
cluster
文件夾下創建cluster.sh
批處理文件,用于啟動 kafka 集群。
vim cluster.sh
# cluster.sh
cd zookeeper
./zk.sh
cd ../broker-1
./kfk.sh
cd ../broker-2
./kfk.sh
cd ../broker-3
./kfk.sh
chmod +x cluster.sh
- 在
cluster
文件夾下創建cluster-clear.sh
批處理文件,用于清理和重置 kafka 數據。
vim cluster-clear.sh# cluster-clear.sh
rm -rf zookeeper-data
rm -rf broker-datachmod +x cluster-clear.sh
- 在
cluster
目錄下,運行./cluster.sh
文件即可啟動集群。
# 啟動集群
./cluster# 查看是否啟動成功,如果新建了data文件,說明啟動成功了
ll zookeeper-data/
total 4
drwxr-xr-x 3 root root 4096 May 27 10:20 zookeeperll broker-data/
total 12
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-1
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-2
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-3
- 當我們想要關閉集群時,不僅要清理 zookeeper 和 kafka 的
數據文件
,還有kill -9
結束 zookeeper 進程與 kakfa 進程,這需要我們手動逐一kill
。
ps axj | grep zookeeper
kill -9 #zookeeper的PIDps axj | grep kafka
kill -9 #3個kafka節點的PID