我們的項目使用了開源的librdkafka庫,實現向kafka服務器生產發送數據的功能。使用的librdkafka的版本是1.9.0。
作為客戶端程序,在開發時和客戶協商確認后,支持了SASL_PLAINTEXT認證。以下概念解釋引用自通義千問AI
SASL (Simple Authentication and Security Layer) 是一種框架,允許服務添加認證支持。
Kafka 支持多種 SASL 機制,其中之一就是 PLAINTEXT。
盡管名稱中有“PLAINTEXT”,它實際上指的是使用的認證機制(即明文傳輸用戶名和密碼),而不是數據傳輸的安全性。
為了安全起見,通常會結合 SSL/TLS 來加密通信。主要用途:
用戶身份驗證:確認嘗試連接到 Kafka 集群的客戶端確實是其所聲稱的身份。
安全性:雖然 PLAINTEXT 機制本身不提供加密,但它可以與 SSL/TLS 結合使用以確保數據傳輸的安全性。
kafka服務端(github上下載kafka源碼后安裝)如何配置:
在 server.properties 文件中啟用 SASL 和指定機制:
listeners=SASL_PLAINTEXT://your.host.name:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
依賴librdkafka實現的C++客戶端的偽代碼設置如下
bool KafkaProducer::init(std::string &brokers, const string &username , const string &passwd, const bool &async, const int &size)
{...if (!username.empty() && !passwd.empty()){HLog(HGET_INFO << L"sasl authentication set, username:" << username << L" ,password:" << passwd);//security.protocol: 安全協議類型,示例為SASL_PLAINTEXTconf_->set("security.protocol", "sasl_plaintext", errstr);//sasl.mechanisms : sasl協議機制,示例為PLAIN, 表示普通文本conf_->set("sasl.mechanisms", "PLAIN", errstr);//sasl.username : 認證用戶名conf_->set("sasl.username", username, errstr);//sasl.password : 認證密碼 conf_->set("sasl.password", passwd, errstr);}}else{conf_->set("producer.type", "sync", errstr);}...
}
以下概念解釋也是來自于通義千問AI
ACLs(Access Control Lists,訪問控制列表) 是 Kafka 提供的一種方法,用于控制哪些用戶或客戶端可以對特定資源執行操作。ACL 定義了誰(principal)、可以在哪個資源上(resource)、執行什么操作(operation)。這里的資源可以是主題、消費者組等。ACL 組成部分
Principal:表示用戶身份,通常格式為 User:<username>。
Resource Type:要控制訪問權限的資源類型,如 Topic, Group, Cluster, TransactionalId。
Operation:允許的操作類型,包括但不限于 Read, Write, Create, Delete, Describe, Alter, All。
Pattern Type:資源匹配模式,支持 Literal(精確匹配),Prefixed(前綴匹配)等。
Host:指定允許從哪些主機發起請求,默認為 * 表示不限制
要啟用 ACL 支持,你需要在 Kafka broker 的配置文件 server.properties 中設置以下參數:authorizer.class.name默認情況下,Kafka 使用簡單的基于 Zookeeper 的 ACL 管理方式。為了啟用 ACL 支持,你需要指定一個授權器類。最常用的授權器是 kafka.security.auth.SimpleAclAuthorizer。
authorizer.class.name=kafka.security.authorizer.AclAuthorizer下面是開啟了ACL后的相關設置示例,允許某個用戶讀寫某個topic
(1)查看:在kafka-acls.sh腳本中傳入list參數來查看ACL授權
./kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181(2)配置ACL來讓writer用戶有權限寫入test這個topic
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test(3)為reader用戶設置test topic的讀權限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic test(4)設置訪問group的權限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group
問題1:
某天客戶反饋kafka服務器收不到數據了,我們拿回了日志。發現了日志中報錯
[1][2025-06-24][11:57:03][T2709509888][KafkaProducer.cpp ][0111][I]Kafka produce failed:Broker: Topic authorization failed .Message:{"db_name":"UniMonDB","dtagentpackagecusttime":"2023-11-03 13:29:23.0000","dtdevdowntime":"2025-06-17 15:11:49.0000","dtdevfirstfoundtime":"2024-02-20 15:47:47.0000","dtdevlastofftime":0,"dtdevstarttime":"2025-06-12 18:06:00.0000","dtdevuptime":"2025-06-24 12:41:28.0000","dtfoundlongtermnotrunningtime":"0-00-00 00:00:00.0000","dtlasttimeofuaagentdown":"2025-06-23 16:14:26.0000","dtpwdlastset":"0-00-00 00:00:00.0000","dttimeout":"0-00-00 00:00:00.0000","iagentpackagetype":64,"iagenttype":2,"iconnifno":39,"idatacollectby":0,"idevrecordid":71,"idevscrappedstatus":0,"idevtype":10,"idevtypeusedbytopo":0,"idot1xver":1,"iencryptstatus":0,"iextenstatus":0,"igetstatus":1,"ihastag":0,"iisaddedbyclient":0,"iisattrsetbyclient":0,"iisbelongtounit":1,"iisbind":0,"iisdevtypesetbyclient":0,"iisfoundbytopo":0,"iishidden":0,"iisinad":0,"iisiot":0,"iislongtermnotrunning":0,"imanuldept":0,"imultios":1,"inetdisposalstatus":0,"iregisterresult":0,"iroamingdevice":0,"isafescore":0,"iselfcheckscore":100,"isnmpagentstatus":0,"isolation":0,"istatus":1,"isysservices":0,"iuniaccessagentoldstatus":2,"iuniaccessagentstatus":1,"ivalnformat":0,"recycledstatus":0,"source_ip":"99.96.0.81","stradaccount":"","stragentpackagename":"V10820-20231103(辦公網版)","strassetid":"","strbaiduloc":"","strbelong":"","strbelongdeptid":"","strclientappid":"","strconnifname":"GigabitEthernet0/0/34","strconnswitchname":"SYD-OA-S5720-02","strdevalias":"","strdevdesc":"","strdevgip":"99.96.16.187","strdevidentiy":"DC8531FB-22E0-468B-ABD5-AD6B1E53AB9F","strdevip":"99.96.16.187","strdevname":"ASDO-CBMF.sydney.cmbchina.cn","strdevoid":"","strdomain":"sydney.cmbchina.cn","stremailaccount":"","strextend":"","strgpsx":"","strgpsy":"","strip1":"099096016187","strlocation":"","strmac":"7C:57:58:10:5F:14","strnatip":"99.96.16.187","strnatip1":"","strnet":"99.96.16.0/24","strreportlink":"/notifymsg/devreport/7cda64088ed3d9ed33cb37f06953c22d.html","strres1":"","strres2":"","strsafeclass":"","strservices":"","strverofuaagent":"3.5.10820.3","strxloc":"","stryloc":"","table_name":"tbl_devbaseinfo","uidconnswitchid":"68EFEC46-C6F9-435A-8AAA-566E42E78000","uiddeptid":"8b4e251a-0560-44cb-98b5-49bbb5add077","uiddevrecordid":"DC8531FB-22E0-468B-ABD5-AD6B1E53AB9F","uiddomainid":"SL832322282903524504","uiduserid":"50381d22-ecaa-4ed5-b3e2-4120b77673d8"}
根據這個日志,找到對應的代碼,可以知道第三方庫報錯的其實是”Broker: Topic authorization failed “
RdKafka::ErrorCode resp = producer_->produce(tpk, partition,RdKafka::Producer::RK_MSG_COPY,const_cast<char *>(data), size,key.c_str(), key.size(), NULL);if (resp != RdKafka::ERR_NO_ERROR){HString strLog;strLog << L"Kafka produce failed:" << HString(RdKafka::err2str(resp)) << L" .Message:" << HString(data);HASGlobal::pins()->mpFail->log(HASGlobal::pins()->mpFail->get(LEL_TIPS, __WFILE__, __LINE__) << strLog);...}
認證失敗的報錯,由此想到可能是客戶端和服務端關于SASL認證的相關配置是否有差異造成。或者是開啟了ACL,但是沒有為這個用戶開放寫topic的權限
要求用戶提供kafka服務端SASL的相關配置,和客戶端核對無誤后,要求客戶運維和客戶的開發溝通排查是否是ACL的相關權限問題導致,最終客戶運維反饋是客戶的開發同事沒有為這個用戶配置對于這個topic的寫權限,導致客戶端生產數據傳給客戶的kafka服務器的某個topic時,報錯”Broker: Topic authorization failed“,在添加了配置后,生產者的數據可正常寫入。
問題2:
客戶端程序使用./手動執行后,有如下報錯。
需要在/etc/hosts 文件中加上kafka服務器ip和主機名的對應關系。添加配置后可以客戶端可以正常生產發送數據到kafka服務器