在連接Kerberos認證kafka之前,需要了解Kerberos協議
二、什么是Kerberos協議
Kerberos是一種計算機網絡認證協議 ,其設計目標是通過密鑰系統為網絡中通信的客戶機(Client)/服務器(Server)應用程序提供嚴格的身份驗證服務,確保通信雙方身份的真實性和安全性。不同于其他網絡服務,Kerberos協議中不是所有的客戶端向想要訪問的網絡服務發起請求,他就能建立連接然后進行加密通信,而是在發起服務請求后必須先進行一系列的身份認證,包括客戶端和服務端兩方的雙向認證,只有當通信雙方都認證通過對方身份之后,才可以互相建立起連接,進行網絡通信。即Kerberos協議的側重在于認證通信雙方的身份,客戶端需要確認即將訪問的網絡服務就是自己所想要訪問的服務而不是一個偽造的服務器,而服務端需要確認這個客戶端是一個身份真實,安全可靠的客戶端,而不是一個想要進行惡意網絡攻擊的用戶。
三、Kerberos協議角色組成
Kerberos協議中存在三個角色,分別是:
客戶端(Client):發送請求的一方
服務端(Server):接收請求的一方
密鑰分發中心(Key distribution KDC)
一,首先需要準備三個文件
(user.keytab,krb5.conf,jass.conf)
其中user.keytab和krb5.conf是兩個認證文件,需要廠商提供,就是你連接誰的kafka,讓誰提供
jass.conf文件需要自己在本地創建
jass.conf文件內容如下,具體路徑和域名需要換成自己的:
debug: truefusioninsight:kafka:bootstrap-servers: 10.80.10.3:21007,10.80.10.181:21007,10.80.10.52:21007security:protocol: SASL_PLAINTEXTkerberos:domain:name: hadoop.798687_97_4a2b_9510_00359f31c5ec.comsasl:kerberos:service:name: kafka
其中kerberos.domain.name:hadoop.798687_97_4a2b_9510_00359f31c5ec.com
hadoop.798687_97_4a2b_9510_00359f31c5ec.com需要根據現場提供給你的域名
二、文件準備好后可以將三個配置文件,放在自己項目中,也可以放在服務器的某個目錄下,只要確保項目啟動后能讀取到即可
我的目錄結構如下:
pom依賴:
我用的是華為云的Kafka依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>kafka-sample-01</artifactId><version>2.3.1.RELEASE</version><packaging>jar</packaging><name>kafka-sample-01</name><description>Kafka Sample 1</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0-hw-ei-302002</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 華為 組件 kafka ?start -->
<!--?? ??? ?<dependency>-->
<!--?? ??? ??? ?<groupId>com.huawei</groupId>-->
<!--?? ??? ??? ?<artifactId>kafka-clients</artifactId>-->
<!--?? ??? ??? ?<version>2.4.0</version>-->
<!--?? ??? ??? ?<scope>system</scope>-->
<!--?? ??? ??? ?<systemPath>${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-302002.jar</systemPath>-->
<!--?? ??? ?</dependency>--></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build><repositories><repository><id>huaweicloudsdk</id><url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><repository><id>central</id><name>Mavn Centreal</name><url>https://repo1.maven.org/maven2/</url></repository></repositories>
</project>
然后再SpringBoot項目啟動類如下:
package com.example;import com.common.Foo1;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;import java.io.File;
import java.util.HashMap;
import java.util.Map;/*** @author*/
@SpringBootApplication
public class Application {private final Logger logger = LoggerFactory.getLogger(Application.class);@Value("${fusioninsight.kafka.bootstrap-servers}")public String boostrapServers;@Value("${fusioninsight.kafka.security.protocol}")public String securityProtocol;@Value("${fusioninsight.kafka.kerberos.domain.name}")public String kerberosDomainName;@Value("${fusioninsight.kafka.sasl.kerberos.service.name}")public String kerberosServiceName;public static void main(String[] args) {
// ? ? ? ?String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main"
// ? ? ? ?String filePath = "D:\\Java\\workspace\\20231123MOSPT4eB\\sample-01\\src\\main\\resources\\";String filePath = "/home/yxxt/";System.setProperty("java.security.auth.login.config", filePath + "jaas.conf");System.setProperty("java.security.krb5.conf", filePath + "krb5.conf");SpringApplication.run(Application.class, args);}@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) {System.out.println(boostrapServers);ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),new FixedBackOff(0L, 2))); // dead-letter after 3 triesreturn factory;}@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}// 指定消費監聽,該topic有消息時立刻消費@KafkaListener(id = "fooGroup1", topics = "topic_ypgk")public void listen(ConsumerRecord<String, String> record) {System.out.println("監聽到了消息-----");logger.info("Received:消息監聽成功! " );System.out.println("監聽到了-----");System.out.println(record);
// ? ? ? ?if (foo.getFoo().startsWith("fail")) {
// ? ? ? ? ? ?// 觸發83行的 ErrorHandler,將異常數據寫入 topic名稱+.DLT的新topic中
// ? ? ? ? ? ?throw new RuntimeException("failed");
// ? ? ? ?}}// 創建topic,指定分區數、副本數
// ? ?@Bean
// ? ?public NewTopic topic() {
// ? ? ? ?return new NewTopic("topic1", 1, (short) 1);
// ? ?}@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("kerberos.domain.name", kerberosDomainName);return new KafkaAdmin(configs);}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new DefaultKafkaConsumerFactory<>(configs);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);return new KafkaTemplate<>(producerFactory);}
}
生產者:通過發送請求進行向主題里發送消息
package com.example;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import com.common.Foo1;/*** @author haosuwei**/
@RestController
public class Controller {@Autowiredprivate KafkaTemplate<String, String> template;@PostMapping(path = "/send/foo/{what}")public void sendFoo(@PathVariable String what) {Foo1 foo1 = new Foo1(what);this.template.send("topic1", foo1.toString());}}
運行成功,就可以監聽到主題消息了