文章目錄
- 啟用集群
- 資源規劃
- 準備證書
- 創建相關文件夾
- 配置文件
- 啟動各Kafka節點
- 故障轉移測試
- spring boot集成
啟用集群
配置集群時關鍵就是提前梳理好需要的網絡資源,完成對應server.properties文件的配置。在執行前先把這些梳理好,可以方便后面的配置,也不容易出錯,其他操作跟單機Kafka版都是一樣的。
一般需要提前規劃各節點所在服務器、端口、Kafka安裝路徑、數據存放位置、配置文件位置、ssl證書位置(需要配置的ssl的話才需要)。對于測試,如果所有節點都在同一臺服務器上,直接使用同一個Kafka搭配多個不同配置文件啟動多個實例就行。
資源規劃
下面是通過一臺服務器的多個端口仿照多臺服務器,配置3個節點的集群,大概需要以下資源:
服務器地址 | broker端口 | controller端口 | 安裝位置 | 配置文件 | 數據 | 證書位置 |
---|---|---|---|---|---|---|
192.168.1.1 | 9092 | 9095 | /home/kafka-cluster/broker1 | /home/kafka-cluster/broker1/config | /data/kafka/broker1 | /usr/ca/ssl |
192.168.1.1 | 9093 | 9096 | /home/kafka-cluster/broker2 | /home/kafka-cluster/broker2/config | /data/kafka/broker2 | /usr/ca/ssl2 |
192.168.1.1 | 9094 | 9097 | /home/kafka-cluster/broker3 | /home/kafka-cluster/broker3/config | /data/kafka/broker3 | /usr/ca/ssl3 |
如果是在多臺服務器上配置,可以直接配置好一臺,復制server.properties或者整個kafka到其他服務器,這樣可以減少很多重復性工作。只需要保證把證書生成到相同的路徑下,就不用做重復其他工作了。
準備證書
證書生成可以參考上一篇單機Kafka配置ssl并在springboot使用
生成的各節點證書如下:
創建相關文件夾
Kafka及配置文件位置:
各節點數據位置:
配置文件
broker1配置文件:
#SSL配置部分
listeners=SSL://:9092
#下面的localhost需要改成ip,否則只有自己能連上
advertised.listeners=SSL://localhost:9092
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=/usr/ca/ssl/keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/usr/ca/ssl/truststore.jks
ssl.truststore.password=123456
#這里配置成雙向認證
ssl.client.auth=required
# 不驗證客戶端證書
#ssl.client.auth=none# 在集群中的編號,只要保持唯一就行,
broker.id=1#Kraft模式的集群關鍵配置就是這一句,每個節點多配置了一個controller端口
controller.quorum.voters=1@kafka1:9095,2@kafka1:9096,3@kafka1:9097log.dirs=/data/kafka/broker1
#后面的配置都是默認的,可以不用管
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#雖然Kraft模式的集群不需要zookeeper,但是這個配置是必須要有的,否則會報錯
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
broker2配置文件:
listeners=SSL://:9093
#下面的localhost需要改成ip,否則只有自己能連上
advertised.listeners=SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=/usr/ca/ssl2/keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/usr/ca/ssl/truststore.jks
ssl.truststore.password=123456
#這里配置成雙向認證
ssl.client.auth=required
# 不驗證客戶端證書
#ssl.client.auth=none broker.id=2controller.quorum.voters=1@kafka1:9095,2@kafka1:9096,3@kafka1:9097
log.dirs=/data/kafka/broker2#后面的配置都是默認的,可以不用管
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
broker3配置文件:
listeners=SSL://:9094
#下面的localhost需要改成ip,否則只有自己能連上
advertised.listeners=SSL://localhost:9094
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=/usr/ca/ssl3/keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/usr/ca/ssl/truststore.jks
ssl.truststore.password=123456
#這里配置成雙向認證
ssl.client.auth=required
# 不驗證客戶端證書
#ssl.client.auth=none broker.id=3controller.quorum.voters=1@kafka1:9095,2@kafka1:9096,3@kafka1:9097
log.dirs=/data/kafka/broker3#后面的配置都是默認的,可以不用管
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
集群關鍵配置:
broker.id=3
controller.quorum.voters=1@kafka1:9095,2@kafka1:9096,3@kafka1:9097
其他的配置項單機也要配置,集群只是保證broker id的唯一性且增加controller配置,controller配置中的kafka1為對應服務器hosts的主機名。
啟動各Kafka節點
cd到對應Kafka路徑下(比如broker1就cd /home/kafka-cluster/broker1)執行以下命令或者使用絕對路徑執行,
./bin/kafka-server-start.sh -daemon config/server.properties
故障轉移測試
故障前的集群狀態:
原leader故障后從broker1轉到broker2
恢復原來的leader節點之后,會自動切回去
spring boot集成
其他配置跟單機Kafka配置ssl并在springboot使用是一樣的,只需要把bootstrap-servers改成多個節點的情況就行,其中kafka是項目服務器hosts配置的主機名
bootstrap-servers: kafka:9092,kafka:9093,kafka:9094