目錄
- SSL證書
- 生成根證書
- 生成服務端和客戶端證書
- 生成keystore.jks和truststore.jks
- 輔助腳本
- 單獨生成truststore.jks
- 環境配置
- hosts文件
- kafka server.properties配置ssl
- 啟動kafka
- kafka基礎操作
- springboot集成
- 準備工作
- 需要配置的文件
- 開始消費
SSL證書
證書主要包含兩大類,一個是根證書,用于簽發和認證證書。其他證書可以用同一個根證書簽發,也可以用不同的根證書簽發各自的證書,使用同一個的話比較方便管理,這樣所有節點的trust可以公用,即只需要生成一次,其他節點復制就可以。
整個證書生成過程大概如下圖:
最終用于認證的是keystore.jks和truststore.jks,兩個證書的作用分別是:
keystore.jks:證明自己的身份,自己的keystore.jks是由別人truststore.jks包含的ca-cert.pem簽發就可以證明
truststore.jks:認證別人是否可信,看到別人的keystore.jks里有自己truststore.jks包含的ca-cert.pem就認為可信
這里要注意的是,如果要信任別人,就要在truststore中導入別人的根證書,這里是因為用的同一個根證書簽發,所以導入的根證書一樣,否則應該交叉導入,也就是客戶端導入服務端的,服務端導入客戶端的。
備注:后面的腳本直接復制使用有可能會出現下面的錯誤,這是由于不同系統的換行符不一致導致的,需要轉換成對應系統兼容的
$'\r': command not found
: invalid optionet: -
set: usage: set [-abefhkmnptuvxBCHP] [-o option-name] [--] [arg ...]
以在Linux中使用為例,可以使用Notepad++按照以下操作路徑修改一下保存之后覆蓋掉就可以了。
生成根證書
生成根證書包含流程圖的第一步,這時會生成根證書和他的私鑰,命令腳本如下:
#!/bin/bashset -e# === 配置部分 需要根據自己的實際情況進行調整===
#根證書
CA_CERT="ca-cert.pem"
#根證書私鑰
CA_KEY="ca-key.pem"
#有效期
VALIDITY=365
#subj
SUBJ="/CN=KafkaCA"
# 檢查目錄
if [ ! -d "/usr/ca/ssl" ]; thenmkdir -p /usr/ca/sslchmod 700 /usr/ca/ssl
ficd /usr/ca/ssl
# 檢查文件是否已存在
if [ -f "$CA_CERT" ] || [ -f "$CA_KEY" ]; thenecho "錯誤: CA證書或私鑰已存在,請先刪除或備份現有文件"exit 1
fi#正式生成證書,以下內容可以不用調整
echo "=== 步驟 1: 生成自簽名 CA ==="
openssl req -new -x509 \-keyout $CA_KEY \-out $CA_CERT \-days $VALIDITY \-nodes \-subj $SUBJ# 設置文件權限
chmod 600 "$CA_KEY"
有效命令其實就是最后一句,其他的是因為放在腳本中所以進行一些通用配置,方便復用
生成服務端和客戶端證書
這個階段包含流程圖的2-7,在都用同一個CA的情況下,步驟7只需要執行一次,然后復制到所有需要用到的計算機中就可以。
生成keystore.jks和truststore.jks
#!/bin/bashset -e# === 配置部分 需要根據自己的實際情況進行調整===
ALIAS="kafka"
KEYSTORE="keystore.jks"
TRUSTSTORE="truststore.jks"
CSR="sign.csr"
SIGN="signed.crt"
STOREPASS="123456"
KEYPASS="654321"
DNAME="CN=localhost, OU=IT, O=Kafka, L=City, S=State, C=CN"
VALIDITY=365
#上一步生成的根證書
CA_CERT="/usr/ca/ssl/ca-cert.pem"
CA_KEY="/usr/ca/ssl/ca-key.pem"
# 檢查目錄
if [ ! -d "/usr/ca/ssl" ]; thenecho "錯誤: 目錄 /usr/ca/ssl 不存在"exit 1
ficd /usr/ca/ssl#正式生成各個證書,以下內容可以不用調整
echo "=== 步驟 1: 生成Keystore證書和私鑰 ==="
keytool -genkeypair \-alias $ALIAS \-keyalg RSA \-keysize 2048 \-validity $VALIDITY \-keystore $KEYSTORE \-storepass $STOREPASS \-keypass $KEYPASS \-dname "$DNAME"echo "=== 步驟 2: 生成證書簽名請求 (CSR) ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-certreq \-file $CSR \-storepass $STOREPASSecho "=== 步驟 3: 使用 CA 簽名證書 ==="
openssl x509 -req \-CA $CA_CERT \-CAkey $CA_KEY \-in $CSR \-out $SIGN \-days $VALIDITY \-CAcreateserialecho "=== 步驟 4: 將 CA 根證書導入Keystore ==="
keytool -keystore $KEYSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -nopromptecho "=== 步驟 5: 將簽名證書導入 Keystore ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-import -file $SIGN \-storepass $STOREPASS -noprompt#使用同一個CA證書,在多個計算機使用時,下面這步可以只執行一次,每次新生成也不影響
echo "=== 步驟 6: 創建Truststore(導入 CA 根證書) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt
輔助腳本
如果正在生成服務端證書,需要把相關證書配置到server.properties可以在上面腳本中增加一下內容:
#順便生成后續Kafka要配置的內容,直接復制到server.properties文件
cat <<EOF############ SSL 配置 - server.properties 中添加 ############listeners=SSL://:9092
#下面的localhost需要改成ip,否則只有自己能連上
advertised.listeners=SSL://localhost:9092
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
#這里配置成雙向認證
ssl.client.auth=required
# 不驗證客戶端證書
#ssl.client.auth=none #############################################################EOF
如果是為客戶端生成證書,可以增加一下內容:
#如果是客戶端就增加使用以下腳本生成的文件去執行Kafka相關命令
echo "=== 創建 Kafka 客戶端配置文件 client.properties ==="
cat <<EOF > client.properties
security.protocol=SSL
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
ssl.endpoint.identification.algorithm=
group.id=test-group
#如果單向認證就不用添加下面三個配置
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
EOF
單獨生成truststore.jks
#!/bin/bashset -e
# === 配置部分 需要根據自己的實際情況進行調整===
TRUSTSTORE="truststore.jks"
STOREPASS="123456"
#信任的根證書
CA_CERT="/usr/ca/ssl/ca-cert.pem"
echo "=== 步驟 6: 創建Truststore(導入 CA 根證書) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt
環境配置
hosts文件
文件位置:
Windows hosts:C:\Windows\System32\drivers\etc\hosts
Linux hosts:/etc/hosts
添加內容:公網IP kafka
如果是本機使用也可以直接用內網IP
kafka server.properties配置ssl
把生成jks那步輸出的內容增加到server.properties中就可以,如果不是第一次配置,就只增加自己需要配置的內容即可。或者沒有增加輔助腳本的話,直接把下面內容中keystore和truststore的位置手動替換一下就行:
#下面兩項原來如果已經配置過就不要重復
listeners=SSL://:9092
#下面的localhost需要改成ip,否則只有自己能連上
advertised.listeners=SSL://localhost:9092security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=這里替換成keystore的路徑
ssl.keystore.password=keystore的密碼
ssl.key.password=key的密碼
ssl.truststore.location=這里替換成truststore的路徑
ssl.truststore.password=truststore的密碼#這里配置成雙向認證
ssl.client.auth=required
# 不驗證客戶端證書
#ssl.client.auth=none
啟動kafka
cd到kafka安裝路徑下可以直接執行下面命令,或者使用絕對路徑
/bin/zookeeper-server-start.sh -daemon /config/zookeeper.properties
/bin/kafka-server-start.sh -daemon /config/server.properties
kafka基礎操作
client.properties的路徑要換成自己的
創建topic
bin/kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic test-topic --command-config /usr/local/kafka/ssl/client.properties
查看topic
bin/kafka-topics.sh --list --bootstrap-server kafka:9092 --command-config /usr/ca/ssl-server/ssl-client/client-ssl.properties
生成消息
bin/kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test-topic --producer.config /usr/local/kafka/ssl/client.properties
springboot集成
準備工作
1、生成客戶端keystore.jks和truststore.jks
這時候spring程序作為客戶端,所以需要為他生成一個keystore.jks和truststore.jks,然后放到項目或別的位置,配置到項目中用來認證。如果只是暫時測試一下是否能連通,也可以討巧,直接用服務端的同一套keystore.jks和truststore.jks,但是這樣的操作不能用到正式中。
2、修改項目所在計算機的hosts文件
需要配置的文件
pom:引入kafka依賴,有說需要版本對應的,我直接沒有指定版本也是可以的
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
yml:增加Kafka配置項,當然也可以放到代碼里
spring:kafka:#kafka代理地址bootstrap-servers: kafka:9092ssl:protocol: SSL###服務端證書配置的時候設置的密碼#broke對client的認證,ssl.client.auth=required時需要key的配置key-store-location: classpath:/certs/keystore.jkskey-store-password: 123456key-password: 654321#client對broke的認證trust-store-password: 123456trust-store-location: classpath:/certs/truststore.jkskey-store-type: JKS#不驗證主機名 properties:ssl:endpoint:identification:algorithm: ''security:protocol: SSL#認證的配置就到這里了,下面的配置可以根據自己的習慣配置#消息發送失敗重試次數producer:retries: 0# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: consumer-dev-groupauto-offset-reset: earliestenable-auto-commit: falsemax-poll-records: 30# 指定消息key和消息體的編解碼方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manualtype: batchconcurrency: 1#kafka監聽的topic和group
report:kafka:#接收kafka消息的topic和groupproducerTopic: test-topicreportGroup: test-group
開始消費
如果需要生成可以找別的教程,我直接通過命令進行生產的,只是寫了一個消費(后面有時間可以補上)
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;@Slf4j
@Component
public class Consumer {@KafkaListener(topics = "${report.kafka.producerTopic}", groupId = "${report.kafka.reportGroup}")public void reportConsumer(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {log.info("---------- 從Kafka上接收消息 -----");for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("offset是" + consumerRecord.offset() + "," + consumerRecord.partition());String value = consumerRecord.value();System.out.println("接到的內容:"+value);//具體的業務處理邏輯可以寫在后面}// 手動批量ackack.acknowledge();log.info("kafka提交成功");}}