【.NET Core】|?總結/Edison Zhou
大家好,我是Edison。
最近有一個ASP.NET Core使用認證機制訪問Kafka的需求,加之我們又使用了CAP這個開源項目使用的Kafka,于是網上尋找了一番發現對應資料太少,于是調查了一番,做了如下的筆記,希望對你有用。
背景
在實際場景中,開發環境的Kafka服務器一般沒有要求通過認證即可發布和讀取消息,并且還可以隨意創建Topic和Consumer Group。但是,在生產環境中則一般有較強的安全需求,無法隨意創建Topic和Consumer,還做了一些認證和權限約束。而在ASP.NET Core的解決方案中,我們經常使用到CAP這個開源項目作為事件總線,在CAP.Kafka項目中,只提供了最基礎的Servers配置,文檔示例中也只給出了這種只適合開發環境的配置示例,而對于對安全要求較高的生產環境,則需要我們研究一下Kafka官方的配置文檔,在CAP.Kafka的MainConfig對象中進行主動配置Key/Value。
本文會首先介紹一下Kafka的認證機制,然后會給出基于CAP項目通過認證方式訪問Kafka的示例。
Kafka認證機制
自 0.9.0.0 版本開始,Kafka 正式引入了認證機制,用于實現基礎的安全用戶認證,這是將 Kafka 上云或進行多租戶管理的必要步驟。目前Kafka的版本,已支持基于 SSL 和 基于 SASL 的安全認證機制。
基于 SSL 的認證主要是指 Broker 和客戶端的雙路認證(2-way authentication)。通常來說,SSL 加密(Encryption)已經啟用了單向認證,即客戶端認證 Broker 的證書(Certificate)。如果要做 SSL 認證,那么我們要啟用雙路認證,也就是說 Broker 也要認證客戶端的證書。
Note:Kafka 的源碼中依然是使用 SSL 而不是 TLS 來表示這類東西的。不過,今天出現的所有 SSL 字眼,我們都可以認為它們是和 TLS 等價的。
Kafka 還支持通過 SASL 做客戶端認證。SASL 是提供認證和數據安全服務的框架。Kafka 支持的 SASL 機制有 5 種:
GSSAPI:也就是 Kerberos 使用的安全接口,是在 0.9 版本中被引入的。
PLAIN:是使用簡單的用戶名 / 密碼認證的機制,在 0.10 版本中被引入。
SCRAM:主要用于解決 PLAIN 機制安全問題的新機制,是在 0.10.2 版本中被引入的。
OAUTHBEARER:是基于 OAuth 2 認證框架的新機制,在 2.0 版本中被引進。
Delegation Token:補充現有 SASL 機制的輕量級認證機制,是在 1.1.0 版本被引入的。
在實際應用中,一般建議 使用 SSL 來做通信加密,使用 SASL 來做 Kafka 的認證實現。對于小型公司來說,SASL/PLAIN 的配置和運維成本相對較小,比較適合Kafka集群配置。
下圖將這些認證機制進行了匯總,源自極客時間胡夕《Kafka核心技術與實戰》。
通過認證機制使用Kafka
這里假設我們已經搭建好了一個Kafka集群,并且配置了SASL/PLAIN方式,并且創建了一個賬號“kafka_user”,密碼為"kakfa_user_password@2022abcdlk!",約束客戶端只能通過SSL方式帶上CA證書加密訪問。
假設我們已經有了一個ASP.NET Core應用,并且之前已經在開發環境通過CAP項目使用了Kafka,那么對于生產環境或安全要求較高的測試環境,我們應該如何修改呢?
通過查看CAP的文檔,在CAP.Kafka中其實只提供了幾個最基礎的配置項:
而其他的配置項,我們只能通過CAP.Kafka提供的MainConfig這個Dictionary類進行手動添加,如下所示:
services.AddCap(capOptions =>
{capOptions.UseKafka(kafkaOption=>{// kafka options.// kafkaOptions.MainConfig.Add("", "");});
});
那么,我們應該添加哪些配置呢?它們的key和可選的value又是哪些呢?CAP給出了一個參考鏈接:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md,它是librdkafka項目的配置參數的文檔。
通過研究配置項文檔,我們大概需要以下一些參數,將其添加到MainConfig字典中,這些參數不僅適配Producer也適配Consumer。
namespace Microsoft.Extensions.DependencyInjection
{public static class ApplicationServiceCollectionExtensions{public static IServiceCollection AddEventBus(this IServiceCollection services, IConfigurationSection configuration){services.AddCap(option =>{option.UseInMemoryStorage();option.UseKafka(kfkOption =>{kfkOption.Servers = configuration["KafkaBootstrapServers"];if (Convert.ToBoolean(configuration["EnableAuthorization"])){kfkOption.MainConfig.Add("security.protocol", "sasl_ssl");kfkOption.MainConfig.Add("sasl.mechanism", "PLAIN");kfkOption.MainConfig.Add("sasl.username", configuration["SaslUserName"]); kfkOption.MainConfig.Add("sasl.password", configuration["SaslPassword"]); kfkOption.MainConfig.Add("ssl.ca.location", configuration["SslCertificatePath"]); kfkOption.MainConfig.Add("enable.ssl.certificate.verification", configuration["EnableSslCertificateVerification"]); }});option.SucceedMessageExpiredAfter = 3600 * 24 * Convert.ToInt32(configuration["SuccessMsgExpireDays"]);});return services;}}
}
在Program.cs中調用AddEventBus方法:
builder.Services.AddEventBus(builder.Configuration.GetSection("EventBusConfigs"));
在appsettings中的配置如下:
{"EventBusConfigs": {"KafkaBootstrapServers": "prd.kafka01.com:9093, prd.kafka02.com:9093, prd.kafka03.com:9093","SuccessMsgExpireDays": 7,"EnableAuthorization": true,"SaslUserName": "kafka_user","SaslPassword": "kakfa_user_password@2022abcdlk!","SslCertificatePath": "resources/certificates/intranet_server_ca.cer","EnableSslCertificateVerification": true}
}
既然是通過證書訪問,那么我們得告訴ASP.NET Core這個證書放在什么位置,本文示例是放在這個ASP.NET Core應用目錄下的,在實際中建議由運維管理員統一放在一個中心服務器位置,掛載到容器內部可以訪問,從而保證證書的安全。如果是通過K8s部署,那么將其添加為一個Secret存放是更好的方式。
CAP中的異構系統集成
順帶說一下,在CAP這個項目中,如果你的項目都是基于它來做事件總線,那么CAP可以正常的Publish和Subscribe消息,但是如果在你使用它之前已經有了許多的Topic Messages,它需要和一些第三方系統進行消息傳輸,這就會涉及到異構系統的集成。如果我們不做一些配置,CAP是無法正常Subscribe和Consume消息的。
因此,在CAP中,我們需要主動對Message做一些改造,添加傳遞一些額外信息以便于CAP能夠在收到消息時提取到關鍵特征從而正常運作。否則,你會在啟動時收到這樣一個錯誤:The given key "cap-msg-id" is not existed........。
我們只需要在注冊CAP組件時添加自定義Headers,確保"cap-msg-id"和"cap-msg-name"兩個Header值能夠被解析到:
namespace Microsoft.Extensions.DependencyInjection
{public static class ApplicationServiceCollectionExtensions{public static IServiceCollection AddEventBus(this IServiceCollection services, IConfigurationSection configuration){services.AddCap(option =>{option.UseInMemoryStorage();option.UseKafka(kfkOption =>{kfkOption.Servers = configuration["KafkaBootstrapServers"];if (Convert.ToBoolean(configuration["EnableAuthorization"])){kfkOption.MainConfig.Add("security.protocol", "sasl_ssl");kfkOption.MainConfig.Add("sasl.mechanism", "PLAIN");kfkOption.MainConfig.Add("sasl.username", configuration["SaslUserName"]); kfkOption.MainConfig.Add("sasl.password", configuration["SaslPassword"]); kfkOption.MainConfig.Add("ssl.ca.location", configuration["SslCertificatePath"]); kfkOption.MainConfig.Add("enable.ssl.certificate.verification", configuration["EnableSslCertificateVerification"]); }// 以下為新增自定義Headers配置option.CustomHeaders = e => new List<KeyValuePair<string, string>>{new KeyValuePair<string, string>(DotNetCore.CAP.Messages.Headers.MessageId, SnowflakeId.Default().NextId().ToString()),new KeyValuePair<string, string>(DotNetCore.CAP.Messages.Headers.MessageName, e.Topic)};});option.SucceedMessageExpiredAfter = 3600 * 24 * Convert.ToInt32(configuration["SuccessMsgExpireDays"]);});return services;}}
}
小結
本文介紹了在ASP.NET Core中使用CAP項目通過認證機制安全地使用kafka消息中間件,希望能夠對你有所幫助!
參考資料
CAP文檔Kafka部分:https://cap.dotnetcore.xyz/user-guide/en/transport/kafka
librdkafka配置項文檔:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
CAP文檔Messaging部分:https://cap.dotnetcore.xyz/user-guide/en/cap/messaging
胡夕《Kafka核心技術與實戰》之Kafka認證機制用哪家:https://time.geekbang.org/column/article/118347
年終總結:Edison的2021年終總結
數字化轉型:我在傳統企業做數字化轉型
C#刷題:C#刷劍指Offer算法題系列文章目錄
.NET面試:.NET開發面試知識體系
.NET大會:2020年中國.NET開發者大會PDF資料