單機Kafka配置ssl并在springboot使用

目錄

  • SSL證書
    • 生成根證書
    • 生成服務端和客戶端證書
      • 生成keystore.jks和truststore.jks
      • 輔助腳本
      • 單獨生成truststore.jks
  • 環境配置
    • hosts文件
    • kafka server.properties配置ssl
  • 啟動kafka
  • kafka基礎操作
  • springboot集成
    • 準備工作
    • 需要配置的文件
    • 開始消費

SSL證書

證書主要包含兩大類,一個是根證書,用于簽發和認證證書。其他證書可以用同一個根證書簽發,也可以用不同的根證書簽發各自的證書,使用同一個的話比較方便管理,這樣所有節點的trust可以公用,即只需要生成一次,其他節點復制就可以。
整個證書生成過程大概如下圖:
在這里插入圖片描述
最終用于認證的是keystore.jks和truststore.jks,兩個證書的作用分別是:
keystore.jks:證明自己的身份,自己的keystore.jks是由別人truststore.jks包含的ca-cert.pem簽發就可以證明
truststore.jks:認證別人是否可信,看到別人的keystore.jks里有自己truststore.jks包含的ca-cert.pem就認為可信
這里要注意的是,如果要信任別人,就要在truststore中導入別人的根證書,這里是因為用的同一個根證書簽發,所以導入的根證書一樣,否則應該交叉導入,也就是客戶端導入服務端的,服務端導入客戶端的。

備注:后面的腳本直接復制使用有可能會出現下面的錯誤,這是由于不同系統的換行符不一致導致的,需要轉換成對應系統兼容的

$'\r': command not found
: invalid optionet: -
set: usage: set [-abefhkmnptuvxBCHP] [-o option-name] [--] [arg ...]

以在Linux中使用為例,可以使用Notepad++按照以下操作路徑修改一下保存之后覆蓋掉就可以了。

在這里插入圖片描述

生成根證書

生成根證書包含流程圖的第一步,這時會生成根證書和他的私鑰,命令腳本如下:

#!/bin/bashset -e# === 配置部分 需要根據自己的實際情況進行調整===
#根證書
CA_CERT="ca-cert.pem"
#根證書私鑰
CA_KEY="ca-key.pem"
#有效期
VALIDITY=365
#subj
SUBJ="/CN=KafkaCA"
# 檢查目錄
if [ ! -d "/usr/ca/ssl" ]; thenmkdir -p /usr/ca/sslchmod 700 /usr/ca/ssl
ficd /usr/ca/ssl
# 檢查文件是否已存在
if [ -f "$CA_CERT" ] || [ -f "$CA_KEY" ]; thenecho "錯誤: CA證書或私鑰已存在,請先刪除或備份現有文件"exit 1
fi#正式生成證書,以下內容可以不用調整
echo "=== 步驟 1: 生成自簽名 CA ==="
openssl req -new -x509 \-keyout $CA_KEY \-out $CA_CERT \-days $VALIDITY \-nodes \-subj $SUBJ# 設置文件權限
chmod 600 "$CA_KEY"

有效命令其實就是最后一句,其他的是因為放在腳本中所以進行一些通用配置,方便復用

生成服務端和客戶端證書

這個階段包含流程圖的2-7,在都用同一個CA的情況下,步驟7只需要執行一次,然后復制到所有需要用到的計算機中就可以。

生成keystore.jks和truststore.jks

#!/bin/bashset -e# === 配置部分 需要根據自己的實際情況進行調整===
ALIAS="kafka"
KEYSTORE="keystore.jks"
TRUSTSTORE="truststore.jks"
CSR="sign.csr"
SIGN="signed.crt"
STOREPASS="123456"
KEYPASS="654321"
DNAME="CN=localhost, OU=IT, O=Kafka, L=City, S=State, C=CN"
VALIDITY=365
#上一步生成的根證書
CA_CERT="/usr/ca/ssl/ca-cert.pem"
CA_KEY="/usr/ca/ssl/ca-key.pem"
# 檢查目錄
if [ ! -d "/usr/ca/ssl" ]; thenecho "錯誤: 目錄 /usr/ca/ssl 不存在"exit 1
ficd /usr/ca/ssl#正式生成各個證書,以下內容可以不用調整
echo "=== 步驟 1: 生成Keystore證書和私鑰 ==="
keytool -genkeypair \-alias $ALIAS \-keyalg RSA \-keysize 2048 \-validity $VALIDITY \-keystore $KEYSTORE \-storepass $STOREPASS \-keypass $KEYPASS \-dname "$DNAME"echo "=== 步驟 2: 生成證書簽名請求 (CSR) ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-certreq \-file $CSR \-storepass $STOREPASSecho "=== 步驟 3: 使用 CA 簽名證書 ==="
openssl x509 -req \-CA $CA_CERT \-CAkey $CA_KEY \-in $CSR \-out $SIGN \-days $VALIDITY \-CAcreateserialecho "=== 步驟 4: 將 CA 根證書導入Keystore ==="
keytool -keystore $KEYSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -nopromptecho "=== 步驟 5: 將簽名證書導入 Keystore ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-import -file $SIGN \-storepass $STOREPASS -noprompt#使用同一個CA證書,在多個計算機使用時,下面這步可以只執行一次,每次新生成也不影響
echo "=== 步驟 6: 創建Truststore(導入 CA 根證書) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt

輔助腳本

如果正在生成服務端證書,需要把相關證書配置到server.properties可以在上面腳本中增加一下內容:

#順便生成后續Kafka要配置的內容,直接復制到server.properties文件
cat <<EOF############ SSL 配置 - server.properties 中添加 ############listeners=SSL://:9092
#下面的localhost需要改成ip,否則只有自己能連上
advertised.listeners=SSL://localhost:9092
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
#這里配置成雙向認證
ssl.client.auth=required
# 不驗證客戶端證書
#ssl.client.auth=none  #############################################################EOF

如果是為客戶端生成證書,可以增加一下內容:

#如果是客戶端就增加使用以下腳本生成的文件去執行Kafka相關命令
echo "=== 創建 Kafka 客戶端配置文件 client.properties ==="
cat <<EOF > client.properties
security.protocol=SSL
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
ssl.endpoint.identification.algorithm=
group.id=test-group
#如果單向認證就不用添加下面三個配置
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
EOF

單獨生成truststore.jks

#!/bin/bashset -e
# === 配置部分 需要根據自己的實際情況進行調整===
TRUSTSTORE="truststore.jks"
STOREPASS="123456"
#信任的根證書
CA_CERT="/usr/ca/ssl/ca-cert.pem"
echo "=== 步驟 6: 創建Truststore(導入 CA 根證書) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt

環境配置

hosts文件

文件位置:
Windows hosts:C:\Windows\System32\drivers\etc\hosts
Linux hosts:/etc/hosts
添加內容:公網IP kafka
如果是本機使用也可以直接用內網IP

kafka server.properties配置ssl

把生成jks那步輸出的內容增加到server.properties中就可以,如果不是第一次配置,就只增加自己需要配置的內容即可。或者沒有增加輔助腳本的話,直接把下面內容中keystore和truststore的位置手動替換一下就行:

#下面兩項原來如果已經配置過就不要重復
listeners=SSL://:9092
#下面的localhost需要改成ip,否則只有自己能連上
advertised.listeners=SSL://localhost:9092security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=這里替換成keystore的路徑
ssl.keystore.password=keystore的密碼
ssl.key.password=key的密碼
ssl.truststore.location=這里替換成truststore的路徑
ssl.truststore.password=truststore的密碼#這里配置成雙向認證
ssl.client.auth=required
# 不驗證客戶端證書
#ssl.client.auth=none  

啟動kafka

cd到kafka安裝路徑下可以直接執行下面命令,或者使用絕對路徑
/bin/zookeeper-server-start.sh -daemon /config/zookeeper.properties
/bin/kafka-server-start.sh -daemon /config/server.properties

kafka基礎操作

client.properties的路徑要換成自己的
創建topic

 bin/kafka-topics.sh   --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic test-topic  --command-config /usr/local/kafka/ssl/client.properties

查看topic

 bin/kafka-topics.sh --list --bootstrap-server kafka:9092  --command-config /usr/ca/ssl-server/ssl-client/client-ssl.properties

生成消息

 bin/kafka-console-producer.sh  --bootstrap-server  kafka:9092 --topic test-topic --producer.config /usr/local/kafka/ssl/client.properties

springboot集成

準備工作

1、生成客戶端keystore.jks和truststore.jks
這時候spring程序作為客戶端,所以需要為他生成一個keystore.jks和truststore.jks,然后放到項目或別的位置,配置到項目中用來認證。如果只是暫時測試一下是否能連通,也可以討巧,直接用服務端的同一套keystore.jks和truststore.jks,但是這樣的操作不能用到正式中。

2、修改項目所在計算機的hosts文件

需要配置的文件

pom:引入kafka依賴,有說需要版本對應的,我直接沒有指定版本也是可以的

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml:增加Kafka配置項,當然也可以放到代碼里

spring:kafka:#kafka代理地址bootstrap-servers: kafka:9092ssl:protocol: SSL###服務端證書配置的時候設置的密碼#broke對client的認證,ssl.client.auth=required時需要key的配置key-store-location: classpath:/certs/keystore.jkskey-store-password: 123456key-password: 654321#client對broke的認證trust-store-password: 123456trust-store-location: classpath:/certs/truststore.jkskey-store-type: JKS#不驗證主機名  properties:ssl:endpoint:identification:algorithm: ''security:protocol: SSL#認證的配置就到這里了,下面的配置可以根據自己的習慣配置#消息發送失敗重試次數producer:retries: 0# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: consumer-dev-groupauto-offset-reset: earliestenable-auto-commit: falsemax-poll-records: 30# 指定消息key和消息體的編解碼方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manualtype: batchconcurrency: 1#kafka監聽的topic和group
report:kafka:#接收kafka消息的topic和groupproducerTopic: test-topicreportGroup: test-group

開始消費

如果需要生成可以找別的教程,我直接通過命令進行生產的,只是寫了一個消費(后面有時間可以補上)


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;@Slf4j
@Component
public class Consumer {@KafkaListener(topics = "${report.kafka.producerTopic}", groupId = "${report.kafka.reportGroup}")public void reportConsumer(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {log.info("---------- 從Kafka上接收消息 -----");for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("offset是" + consumerRecord.offset() + "," + consumerRecord.partition());String value = consumerRecord.value();System.out.println("接到的內容:"+value);//具體的業務處理邏輯可以寫在后面}// 手動批量ackack.acknowledge();log.info("kafka提交成功");}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/82505.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/82505.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/82505.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PCB設計教程【入門篇】——電路分析基礎-元件數據手冊

前言 本教程基于B站Expert電子實驗室的PCB設計教學的整理&#xff0c;為個人學習記錄&#xff0c;旨在幫助PCB設計新手入門。所有內容僅作學習交流使用&#xff0c;無任何商業目的。若涉及侵權&#xff0c;請隨時聯系&#xff0c;將會立即處理 目錄 前言 一、數據手冊的重要…

Vue2實現Office文檔(docx、xlsx、pdf)在線預覽

&#x1f31f; 前言 歡迎來到我的技術小宇宙&#xff01;&#x1f30c; 這里不僅是我記錄技術點滴的后花園&#xff0c;也是我分享學習心得和項目經驗的樂園。&#x1f4da; 無論你是技術小白還是資深大牛&#xff0c;這里總有一些內容能觸動你的好奇心。&#x1f50d; &#x…

【辰輝創聚生物】JAK-STAT信號通路相關蛋白:細胞信號傳導的核心樞紐

在細胞間復雜的信號傳遞網絡中&#xff0c;Janus 激酶 - 信號轉導和轉錄激活因子&#xff08;JAK-STAT&#xff09;信號通路猶如一條高速信息公路&#xff0c;承擔著傳遞細胞外信號、調控基因表達的重要使命。JAK-STAT 信號通路相關蛋白作為這條信息公路上的 “關鍵節點” 和 “…

OceanBase數據庫從入門到精通(運維監控篇)

文章目錄 一、OceanBase 運維監控體系概述二、OceanBase 系統表與元數據查詢2.1 元數據查詢基礎2.2 核心系統表詳解2.3 分區元數據查詢實戰三、OceanBase 性能監控SQL詳解3.1 關鍵性能指標監控3.2 SQL性能分析實戰四、OceanBase 空間使用監控4.1 表空間監控體系4.2 空間使用趨勢…

linux 進程間通信_共享內存

目錄 一、什么是共享內存&#xff1f; 二、共享內存的特點 優點 缺點 三、使用共享內存的基本函數 1、創建共享內存shmget() 2、掛接共享內存shmat 3、脫離掛接shmdt 4、共享內存控制shmctl 5.查看和刪除共享內存 comm.hpp server.cc Client.cc Makefile 一、什么…

Spring Boot 登錄實現:JWT 與 Session 全面對比與實戰講解

Spring Boot 登錄實現&#xff1a;JWT 與 Session 全面對比與實戰講解 2025.5.21-23:11今天在學習黑馬點評時突然發現用的是與蒼穹外賣jwt不一樣的登錄方式-Session&#xff0c;于是就想記錄一下這兩種方式有什么不同 在實際開發中&#xff0c;登錄認證是后端最基礎也是最重要…

Vue中的 VueComponent

VueComponent 組件的本質 Vue 組件是一個可復用的 Vue 實例。每個組件本質上就是通過 Vue.extend() 創建的構造函數&#xff0c;或者在 Vue 3 中是由函數式 API&#xff08;Composition API&#xff09;創建的。 // Vue 2 const MyComponent Vue.extend({template: <div…

使用 FFmpeg 將視頻轉換為高質量 GIF(保留原始尺寸和幀率)

在制作教程動圖、產品展示、前端 UI 演示等場景中,我們經常需要將視頻轉換為體積合適且清晰的 GIF 動圖。本文將詳細介紹如何使用 FFmpeg 工具將視頻轉為高質量 GIF,包括: ? 保留原視頻尺寸或自定義縮放? 保留原始幀率或自定義幀率? 使用調色板優化色彩質量? 降低體積同…

【自然語言處理與大模型】大模型Agent四大的組件

大模型Agent是基于大型語言模型構建的智能體&#xff0c;它們能夠模擬獨立思考過程&#xff0c;靈活調用各類工具&#xff0c;逐步達成預設目標。這類智能體的設計旨在通過感知、思考與行動三者的緊密結合來完成復雜任務。下面將從大模型大腦&#xff08;LLM&#xff09;、規劃…

《軟件工程》第 11 章 - 結構化軟件開發

結構化軟件開發是一種傳統且經典的軟件開發方法&#xff0c;它強調將軟件系統分解為多個獨立的模塊&#xff0c;通過數據流和控制流來描述系統的行為。本章將結合 Java 代碼示例、可視化圖表&#xff0c;深入講解面向數據流的分析與設計方法以及實時系統設計的相關內容。 11.1 …

初步嘗試AI應用開發平臺——Dify的本地部署和應用開發

隨著大語言模型LLM和相關應用的流行&#xff0c;在本地部署并構建知識庫&#xff0c;結合企業的行業經驗或個人的知識積累進行定制化開發&#xff0c;是LLM的一個重點發展方向&#xff0c;在此方向上也涌現出了眾多軟件框架和工具集&#xff0c;Dify就是其中廣受關注的一款&…

高階數據結構——哈希表的實現

目錄 1.概念引入 2.哈希的概念&#xff1a; 2.1 什么叫映射&#xff1f; 2.2 直接定址法 2.3 哈希沖突&#xff08;哈希碰撞&#xff09; 2.4 負載因子 2.5 哈希函數 2.5.1 除法散列法&#xff08;除留余數法&#xff09; 2.5.2 乘法散列法&#xff08;了解&#xff09…

7.安卓逆向2-frida hook技術-介紹

免責聲明&#xff1a;內容僅供學習參考&#xff0c;請合法利用知識&#xff0c;禁止進行違法犯罪活動&#xff01; 內容參考于&#xff1a;圖靈Python學院 工具下載&#xff1a; 鏈接&#xff1a;https://pan.baidu.com/s/1bb8NhJc9eTuLzQr39lF55Q?pwdzy89 提取碼&#xff1…

DB-GPT擴展自定義Agent配置說明

簡介 文章主要介紹了如何擴展一個自定義Agent&#xff0c;這里是用官方提供的總結摘要的Agent做了個示例&#xff0c;先給大家看下顯示效果 代碼目錄 博主將代碼放在core目錄了&#xff0c;后續經過對源碼的解讀感覺放在dbgpt_serve.agent.agents.expand目錄下可能更合適&…

Android 架構演進之路:從 MVC 到 MVI,擁抱單向數據流的革命

在移動應用開發的世界里&#xff0c;架構模式的演進從未停歇。從早期的 MVC 到后來的 MVP、MVVM&#xff0c;每一次變革都在嘗試解決前一代架構的痛點。而今天&#xff0c;我們將探討一種全新的架構模式 ——MVI&#xff08;Model-View-Intent&#xff09;&#xff0c;它借鑒了…

【YOLOv8-pose部署至RK3588】模型訓練→轉換RKNN→開發板部署

已在GitHub開源與本博客同步的YOLOv8_RK3588_object_pose 項目&#xff0c;地址&#xff1a;https://github.com/A7bert777/YOLOv8_RK3588_object_pose 詳細使用教程&#xff0c;可參考README.md或參考本博客第六章 模型部署 文章目錄 一、項目回顧二、文件梳理三、YOLOv8-pose…

集成30+辦公功能的實用工具

軟件介紹 本文介紹的軟件是千峰辦公助手。 軟件功能概述與開發目的 千峰辦公助手集成了自動任務、系統工具、文件工具、PDF工具、OCR圖文識別、文字處理、電子表格七個模塊&#xff0c;擁有30余項實用功能。作者開發該軟件的目的是解決常見辦公痛點&#xff0c;把機械操作交…

IDEA啟動報錯:Cannot invoke “org.flowable.common.engine.impl.persistence.ent

1.問題 項目啟動報錯信息 java.lang.NullPointerException: Cannot invoke "org.flowable.common.engine.impl.persistence.ent 2.問題解析 出現這個問題是在項目中集成了Flowable或Activiti工作流&#xff0c;開啟自動創建工作流創建的表&#xff0c;因為不同環境的數據…

網絡安全--PHP第三天

今天學習文件上傳的相關知識 上傳的前端頁面如下 upload.html <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"&g…

【愚公系列】《生產線數字化設計與仿真》004-顏色分類站仿真(基礎概念)

??【技術大咖愚公搬代碼:全棧專家的成長之路,你關注的寶藏博主在這里!】?? ??開發者圈持續輸出高質量干貨的"愚公精神"踐行者——全網百萬開發者都在追更的頂級技術博主! ?? 江湖人稱"愚公搬代碼",用七年如一日的精神深耕技術領域,以"…