msk加密方式
msk提供了兩種加密方式
- 靜態加密
- 傳輸中加密
創建集群時可以指定加密方式,參數如下
aws kafka create-cluster --cluster-name "ExampleClusterName" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --kafka-version "{YOUR MSK VERSION}" --number-of-broker-nodes 3// encryptioninfo.json
{"EncryptionAtRest": {"DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/abcdabcd-1234-abcd-1234-abcd123e8e8e"},"EncryptionInTransit": {"InCluster": true,"ClientBroker": "TLS"}
}
查看證書位置
$ pwd
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.amzn2.0.1.x86_64/jre/lib/security
$ ls -al ../../../../../../../etc/pki/java/cacerts
lrwxrwxrwx 1 root root 40 May 8 09:44 ../../../../../../../etc/pki/java/cacerts -> /etc/pki/ca-trust/extracted/java/cacerts
$ cp /etc/pki/ca-trust/extracted/java/cacerts /tmp/kafka.client.truststore.jks
測試tls加密,創建client.properties
security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks
列出端點
$ aws kafka get-bootstrap-brokers --cluster-arn arn:aws-cn:kafka:cn-north-1:037047667284:cluster/mytest/93d5cf51-9e82-4049-a4bc-cefb6bd61716-3
{"BootstrapBrokerString": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092","BootstrapBrokerStringTls": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094","BootstrapBrokerStringSaslScram": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096","BootstrapBrokerStringSaslIam": "b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098"
}
測試tls連接
$ ./kafka-topics.sh --bootstrap-server b-2.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094,b-1.test320t.ivec50.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list
__amazon_msk_canary
__consumer_offsets
first# 連接string端點報錯
$ ./kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --command-config client.properties --list[2023-07-20 12:40:04,944] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)
指定iam的客戶端配置,之后連接tls端口會報錯
- 可見這個tls broker連接僅僅是給
Unauthenticated
用的,并且如果開了iam認證會失敗
./kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9094 --command-config client.properties --list[2023-07-20 12:26:31,401] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.14.174:9094) failed authentication due to: Unexpected handshake request with client mechanism AWS_MSK_IAM, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
[2023-07-20 12:26:31,403] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
msk訪問控制
https://docs.amazonaws.cn/msk/latest/developerguide/kafka_apis_iam.html
kafka客戶端配置,https://kafka.apache.org/documentation/#security_configclients
msk可選的訪問控制/加密組合如下,訪問控制方式決定了能夠選擇的加密方式
Authentication | Client-broker encryption options | Broker-broker encryption |
---|---|---|
Unauthenticated | TLS, PLAINTEXT, TLS_PLAINTEXT | Can be on or off |
mTLS | TLS, TLS_PLAINTEXT | Must be on |
SASL/SCRAM | TLS | Must be on |
SASL/IAM | TLS | Must be on |
集群完畢后提供了多種連接終端節點
端口信息,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/port-info.html
plaintext
采取Unauthenticated方式,客戶端使用PLAINTEXT
查找bootstrap-server端點
(可選)在bin/client.properties
中加入客戶端配置
security.protocol=PLAINTEXT
測試連接,不需要特意配置tls連接
./bin/kafka-topics.sh --bootstrap-server b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092,b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9092 --list
java客戶端開啟ssl連接
// 開啟 tls 連接
properties.put("security.protocol", "SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512");// 創建kafka對象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
IAM認證
msk對kafka的源碼進行了修改,允許使用iam進行認證,訪問事件會發送到cloudtrail中。注意事項
-
不適用于zk節點
-
開啟iam認證后,
allow.everyone.if.no.acl.found
配置無效 -
使用iam認證后創建的kafka acl(存儲在zk中),對iam認證無效
-
client和broker之間必須啟用tls加密
-
和連接kafka相關的權限以
kafka-cluster
作為前綴,https://docs.amazonaws.cn/en_us/msk/latest/developerguide/iam-access-control.html -
需要使用9098和9198端口
shell連接
需要在客戶端配置如下參數
# config/client.properties
# ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE> # if don't specify a value for ssl.truststore.location, the Java process uses the default certificate.
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
awsProfileName="admin";
下載客戶端依賴jar到/libs目錄下
https://github.com/aws/aws-msk-iam-auth/releases
aws s3 cp s3://zhaojiew/software/aws-msk-iam-auth-1.1.7-all.jar .
測試連接
./bin/kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list
可能出現以下報錯
./bin/kafka-topics.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --list Error while executing topic command : Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
[2023-07-20 10:57:39,293] ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1689850718887, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics(kafka.admin.TopicCommand$)
[2023-07-20 10:57:39,316] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap spaceat java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)at org.apache.kafka.common.network.Selector.poll(Selector.java:481)at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)at java.lang.Thread.run(Thread.java:750)
指定client配置后成功連接
- .aws/config中的profile需要寫成
[profile prod]
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --command-config client.properties --list
[2023-07-20 11:21:06,517] WARN The configuration 'awsProfileName' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
__amazon_msk_canary
__consumer_offsets
創建topic
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --command-config client.properties --topic first --create --partitions 2 --replication-factor 2
發送消息
./kafka-console-producer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --producer.config client.properties --topic first
消費信息
./kafka-console-consumer.sh --bootstrap-server b-1.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 --consumer.config client.properties --from-beginning --topic first
java代碼連接
加入依賴
<dependency><groupId>software.amazon.msk</groupId><artifactId>aws-msk-iam-auth</artifactId><version>1.0.0</version> </dependency>
// 完整配置
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "AWS_MSK_IAM");
properties.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
properties.put("sasl.client.callback.handler.class",IAMClientCallbackHandler.class.getName());
properties.put("awsProfileName","admin");
相關報錯
// 沒有導上面包的報錯如下
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for software.amazon.msk.auth.iam.IAMLoginModuleat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)... 4 more// 如果沒有找到憑證
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9098 (id: -1 rack: null) disconnected
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with b-4.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.28.80 (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [com.amazonaws.auth.DefaultAWSCredentialsProviderChain@7e8d5309: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: To use assume role profiles the aws-java-sdk-sts module must be on the class path.,
mTLS
目前中國區不可用,需要依賴于Private CA
SASL/SCRAM
https://docs.amazonaws.cn/en_us/msk/latest/developerguide/msk-password.html
使用secret manager保存username和password
創建secret
-
名稱必須以
AmazonMSK_
開頭 -
不能使用默認kms加密secret
-
密鑰內容必須為以下格式
{"username": "alice","password": "alice-secret" }
shell連接
創建配置文件users_jaas.conf
,導出為環境變量
# KafkaClient首字母大寫
cat > /tmp/users_jaas.conf << EOF
KafkaClient {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="alice"password="alice-secret";
};
EOFexport KAFKA_OPTS=-Djava.security.auth.login.config=/tmp/users_jaas.conf
bin目錄下創建客戶端配置文件
cat > client_sasl.properties << EOF
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
EOF
鏈接集群
./kafka-topics.sh --bootstrap-server b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn:9096 --command-config client_sasl.properties --list
相關報錯
# 密碼錯誤
[2023-07-21 09:40:44,467] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (b-2.mytest.30734t.c3.kafka.cn-north-1.amazonaws.com.cn/172.31.23.61:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
java代碼連接
java代碼連接配置
System.setProperty("java.security.auth.login.config", "/tmp/users_jaas.conf");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-512"); //僅支持SCRAM-SHA-512