kafka SASL/PLAIN 認證及 ACL 權限控制

一、Zookeeper 配置 SASL/PLAIN 認證(每個zookeeper節點都要做)

1.1 在 zookeeper 的 conf 目錄下,創建 zk_server_jaas.conf 文件,內容如下

Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_kafka="kafka";
};

username="admin" 是 zookeeper 實例之間通信的用戶;user_kafka="kafka" 是 kafka broker 與 zookeeper 連接的時候的認證用戶,密碼為=后面的值;

1.2 修改 zoo.cfg ,添加以下內容

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

1.3 因為認證的時候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 這個是 kafka-client.jar 里面,所有需要將相應的 jar 包拷貝到 zookeeper 安裝根目錄的 lib 目錄下, 大概要 copy 以下這些 jar 包

mv /opt/module/kafka_2.12-2.4.1/libs/kafka-clients-2.4.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/lz4-java-1.6.0.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/osgi-resource-locator-1.0.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/slf4j-api-1.7.28.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/snappy-java-1.1.7.3.jar /opt/module/zookeeper/lib/

1.4 修改 zookeeper 啟動命令參數,在文件末尾追加以下內容

SERVER_JVMFLAGS="-Djava.security.auth.login.config=/opt/module/zookeeper/conf/zk_server_jaas.conf"

1.5 重啟 zookeeper 服務。

二、Kafka Server 配置 SASL/PLAIN 認證(每個kafka節點都要做)

2.1 修改 kafka server.properties 文件,添加以下內容(各個node修改為自己的hostname)

listeners=SASL_PLAINTEXT://node2:9092
advertised.listeners=SASL_PLAINTEXT://node2:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 配置ACL入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer  
# default false | true to accept all the users to use it.
# allow.everyone.if.no.acl.found=true 
# 設置本例中admin為超級用戶
super.users=User:admin

2.2 在kafka config 目錄下創建 kafka_server_jass.conf 文件,內容如下

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_consumer="consumer"
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafka";
};

KafkaServer 段里面配置了 broker 之間的認證配置以及 client 和 broker 之間的認證配置

KafkaServer.username, KafkaServer.password 用于broker之間的相互認證

KafkaServer.user_admin和KafkaServer.user_consumer 用于 client 和broker 之間的認證, 下面我們 client 里面都用用戶 consumer 進行認證

Client 段里面定義 username 和 password 用于 broker 與 zookeeper 連接的認證;

2.3 修改 kafka 啟動命令腳本工具 kafka-server-start.sh,添加 java.security.auth.login.config 環境變量

將最后一句

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

修改為

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

2.4 修改完所有節點后,重啟 kafka 服務,查看啟動日志。

三、 Kafka client 的認證配置

3.1 在 Kafka 的 config 目錄下創建 kafka_client_jaas.conf 文件,內容如下

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="consumer"password="consumer";
};

3.2 修改 kafka 的 consumer.properties 和 producer.properties,添加以下內容

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

3.3 修改 producer 啟動腳本參數,修改 kafka-console-producer.sh 文件

將最后一行

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

修改為

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"

3.4 修改 consumer 啟動腳本參數,修改 kafka-console-consumer.sh 文件

將最后一行

exec $(dirname $0)/kafka-run-class.sh  kafka.tools.ConsoleConsumer "$@"

修改為

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"

四、ACL 配置

4.1 因為我們有個超級用戶 admin,所以可以用 admin 做生產者,admin不需要做 ACL 配置

4.2 為 test topic 添加消費者用戶 consumer

kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:consumer --consumer --topic test --group test_group

4.3 測試

生產者測試

kafka-console-producer.sh --broker-list 
node2 --topic test --producer.config /opt/module/kafka_2.12-2.4.1/config/producer.properties

消費者測試

kafka-console-consumer.sh --bootstrap-server 
node2:9092 --topic test --from-beginning --consumer.config 
/opt/module/kafka_2.12-2.4.1/config/consumer.properties  

4.4 Flink 代碼消費測試

object TestKafkaSource {// Kafka Broker 地址private val BOOTSTRAP_SERVERS = "node2:9092"// SASL/PLAIN 認證配置private val SASL_USERNAME = "consumer" private val SASL_PASSWORD = "consumer"// Topic 名稱private val ALLOWED_TOPIC = "test"private val DENIED_TOPIC = "test1"def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Propertiesprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group")props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")// SASL/PLAIN 配置props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")props.put("sasl.mechanism", "PLAIN")props.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD))val dataSource: DataStreamSource[String] = env.addSource(new FlinkKafkaConsumer[String](ALLOWED_TOPIC , new SimpleStringSchema(), props))dataSource.print()env.execute()}
}

五、kafka-exporter 配置(使用了 kafka-exporter 監控 kafka 集群才做)

5.1 在kafka_server_jaas.conf 文件中加入 kafka-exporter 用戶信息,內容如下

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_consumer="consumer"user_kafka-exporter="kafka-exporter"
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafka";
};

5.2 分發到所有 kafka broker ,并重啟 kafka 服務

5.3 為 kafka-exporter 用戶配置權限

kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --cluster
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --group '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Read --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Read --topic __consumer_offsets

5.4 配置 kafka_export 服務啟動參數,在 kafka-export 安裝目錄下創建一個文件 start_kafka_export ,內容如下

#!/bin/bash
export KAFKA_SASL_USERNAME="kafka-exporter"
export KAFKA_SASL_PASSWORD="kafka-exporter"
export KAFKA_SASL_MECHANISM="PLAIN"/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181/kafka241 \--sasl.enabled \--sasl.username=$KAFKA_SASL_USERNAME \--sasl.password=$KAFKA_SASL_PASSWORD \--sasl.mechanism=$KAFKA_SASL_MECHANISM

5.5 修改 kafka_export.service 文件,內容如下

[Unit]
Description=kafka_exporter
Wants=prometheus.service
After=network.target prometheus.service[Service]
Type=simple
User=bigdata
Group=bigdata
Restart=on-failure
WorkingDirectory=/opt/module/kafka_exporter-1.7.0
#ExecStart=/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181
ExecStart=/opt/module/kafka_exporter-1.7.0/start_kafka_exporter
[Install]
WantedBy=multi-user.target

5.6 重新加載 systemctl 管理的服務

sudo systemctl daemon-reload

5.7 啟動 kafka_exporter 服務

sudo systemctl start kafka_exporter

5.8 上 grafana 查看是否能獲取 kafka 狀態

六、 參考文檔

6.1 官網文檔 Apache Kafka

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/81584.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/81584.shtml
英文地址,請注明出處:http://en.pswp.cn/web/81584.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

20250528-C#知識:函數簡介及函數重載

C#知識:函數簡介及函數重載 本文主要介紹函數參數和函數重載相關的知識點 1、函數 函數一般寫在類中 一般函數調用 static int Add(int num, int value){num value;return num;}//一般函數調用,發生值類型參數的復制int num 1;Add(num, 1); //調用…

Vue內置指令與自定義指令

一、前言 在 Vue 開發中,指令(Directives) 是一種非常強大的特性,它允許我們以聲明式的方式操作 DOM。Vue 提供了一些常用的內置指令,如 v-if、v-show、v-bind、v-on 等,同時也支持開發者根據需求創建自己…

華為AP6050DN無線接入點瘦模式轉胖模式

引言 華為AP6050DN是一款企業級商用的無線接入點。由于產品定位原因,其默認工作在瘦模式下,即須經AC統一控制和管理,是不能直接充當普通的無線路由器來使用的。 而本文的目的,就是讓其能脫離AC的統一控制和管理,當作普通無線路由器來使用。 硬件準備 華為AP6050DN無線接…

程序員出海之英語-使用手冊

為什么現在實時翻譯工具這么牛逼了,AI轉譯這么準確了,我還在這里跟老古董一樣吭哧吭哧學英語呢? 這是因為我們始終是和人打交道,不僅僅是為了考試,看懂官方文章,聽懂官方視頻。這里為什么說官方&#xff0c…

Java 事務管理:在分布式系統中實現可靠的數據一致性

Java 事務管理:在分布式系統中實現可靠的數據一致性 在當今的軟件開發領域,分布式系統逐漸成為主流架構。然而,這也給事務管理帶來了巨大的挑戰。本文將深入探討 Java 事務管理在分布式系統中的關鍵要點,并通過詳細代碼實例展示如…

微信小程序關于截圖、錄屏攔截

1.安卓 安卓: 在需要禁止的頁面添加 onShow() {if (wx.setVisualEffectOnCapture) {wx.setVisualEffectOnCapture({visualEffect: hidden,complete: function(res) {}})}},// 頁面隱藏和銷毀時需要釋放防截屏錄屏設置onHide() {if (wx.setVisualEffectOnCapture) {w…

使用 PySpark 從 Kafka 讀取數據流并處理為表

使用 PySpark 從 Kafka 讀取數據流并處理為表 下面是一個完整的指南,展示如何通過 PySpark 從 Kafka 消費數據流,并將其處理為可以執行 SQL 查詢的表。 1. 環境準備 確保已安裝: Apache Spark (包含Spark SQL和Spark Streaming)KafkaPySpark對應的Ka…

第十天的嘗試

目錄 一、每日一言 二、練習題 三、效果展示 四、下次題目 五、總結 一、每日一言 哈哈,十天缺了兩天,我寫的文章現在質量不高,所以我可能考慮,應該一星期或者三四天出點高質量的文章,同時很開心大家能夠學到知識&a…

mediapipe標注視頻姿態關鍵點(基礎版加進階版)

前言 手語視頻流的識別有兩種大的分類,一種是直接將視頻輸入進網絡,一種是識別了關鍵點之后再進入網絡。所以這篇文章我就要來講講如何用mediapipe對手語視頻進行關鍵點標注。 代碼 需要直接使用代碼的,我就放這里了。環境自己配置一下吧&…

Redis數據遷移方案及持久化機制詳解

#作者:任少近 文章目錄 前言Redis的持久化機制RDBAOF Redis save和bgsave的區別redis數據遷移redis單機-單機數據遷移redis 主從-主從數據遷移redis 單機-cluster數據遷移redis cluster –redis cluster數據遷移 前言 Redis數據遷移是常見需求,主要包括…

圖論回溯

圖論 200.島嶼數量DFS 給你一個由 ‘1’(陸地)和 ‘0’(水)組成的的二維網格,請你計算網格中島嶼的數量。島嶼總是被水包圍,并且每座島嶼只能由水平方向和/或豎直方向上相鄰的陸地連接形成。此外&#xff…

真實網絡項目中交換機常用的配置與解析

一、配置三層鏈路聚合增加鏈路帶寬 1.組網需求 某企業有多個部門分布在不同的地區,由于業務發展的需要,不同區域的部門與部門之間有進行帶有VLAN Tag的報文的傳輸需求。采用透明網橋的遠程橋接和QinQ功能,可以實現企業在不同區域部門之間進…

【Redis】過期鍵刪除策略,LRU和LFU在redis中的實現,緩存與數據庫雙寫一致性問題,go案例

一、Redis 中的過期鍵刪除策略有哪些? 采用了 惰性刪除 和 定期刪除 兩種策略處理過期鍵: 1. 惰性刪除(Lazy Deletion) 機制:只有在訪問 key 時才檢查是否過期,如果已過期則立刻刪除。優點:對…

為什么單張表索引數量建議控制在 6 個以內

單張表索引數量建議控制在6個以內的主要原因包括以下幾點?: ?性能影響?:索引會占用額外的磁盤空間。如果索引數量過多,會占用大量的磁盤空間,尤其是在數據量較大的情況下,索引占用的空間可能會超過數據本身。此外&…

深度學習實戰109-智能醫療隨訪與健康管理系統:基于Qwen3(32B)、LangChain框架、MCP協議和RAG技術研發

大家好,我是微學AI,今天給大家介紹一下深度學習實戰109-智能醫療隨訪與健康管理系統:基于Qwen3(32B)、LangChain框架、MCP協議和RAG技術研發。在當今醫療信息化快速發展的背景下,醫療隨訪與健康管理面臨著數據分散、信息整合困難、個性化方案生成效率低等挑戰。傳統的醫療隨…

聊一聊 .NET Dump 中的 Linux信號機制

一:背景 1. 講故事 當 .NET程序 在Linux上崩潰時,我們可以配置一些參考拿到對應程序的core文件,拿到core文件后用windbg打開,往往會看到這樣的一句信息 Signal SIGABRT code SI_USER (Sent by kill, sigsend, raise)&#xff0c…

如何在uniapp H5中實現路由守衛

目錄 Vue3 app.config.globalProperties 1. 創建 Vue 應用實例 2. 添加全局屬性或方法 3. 在組件中使用全局屬性或方法 beforeEach在uniapp的注冊 1、在H5中這兩個對象是都存在的。「router:route」但是功能并不全面,具體可參考下圖。 2、剛剛測試了一下,在微信小程序…

無人機降落傘設計要點難點及原理!

一、設計要點 1. 傘體結構與折疊方式 傘體需采用輕量化且高強度的材料(如抗撕裂尼龍或芳綸纖維),并通過多重折疊設計(如三重折疊縫合)減少展開時的阻力,同時增強局部承力區域的強度。 傘衣的幾何參數&am…

AI時代新詞-AI增強現實(AI - Enhanced Reality)

一、什么是AI增強現實(AI - Enhanced Reality)? AI增強現實(AI - Enhanced Reality)是指將人工智能(AI)技術與增強現實(Augmented Reality,簡稱AR)技術相結合…

基于Matlab實現各種光譜數據預處理

在IT領域,尤其是在數據分析和科學研究中,光譜數據的預處理是至關重要的步驟。光譜數據通常包含了豐富的信息,但往往受到噪聲、雜散光、背景信號等因素的影響,需要通過預處理來提取有效信號,提高分析的準確性和可靠性。…