SpringBoot項目連接,有Kerberos認證的Kafka

在連接Kerberos認證kafka之前,需要了解Kerberos協議
二、什么是Kerberos協議

Kerberos是一種計算機網絡認證協議 ,其設計目標是通過密鑰系統為網絡中通信的客戶機(Client)/服務器(Server)應用程序提供嚴格的身份驗證服務,確保通信雙方身份的真實性和安全性。不同于其他網絡服務,Kerberos協議中不是所有的客戶端向想要訪問的網絡服務發起請求,他就能建立連接然后進行加密通信,而是在發起服務請求后必須先進行一系列的身份認證,包括客戶端和服務端兩方的雙向認證,只有當通信雙方都認證通過對方身份之后,才可以互相建立起連接,進行網絡通信。即Kerberos協議的側重在于認證通信雙方的身份,客戶端需要確認即將訪問的網絡服務就是自己所想要訪問的服務而不是一個偽造的服務器,而服務端需要確認這個客戶端是一個身份真實,安全可靠的客戶端,而不是一個想要進行惡意網絡攻擊的用戶。

三、Kerberos協議角色組成
Kerberos協議中存在三個角色,分別是:

客戶端(Client):發送請求的一方
服務端(Server):接收請求的一方
密鑰分發中心(Key distribution KDC)

一,首先需要準備三個文件
(user.keytab,krb5.conf,jass.conf)

其中user.keytab和krb5.conf是兩個認證文件,需要廠商提供,就是你連接誰的kafka,讓誰提供

jass.conf文件需要自己在本地創建

jass.conf文件內容如下,具體路徑和域名需要換成自己的:

debug: truefusioninsight:kafka:bootstrap-servers: 10.80.10.3:21007,10.80.10.181:21007,10.80.10.52:21007security:protocol: SASL_PLAINTEXTkerberos:domain:name: hadoop.798687_97_4a2b_9510_00359f31c5ec.comsasl:kerberos:service:name: kafka


其中kerberos.domain.name:hadoop.798687_97_4a2b_9510_00359f31c5ec.com

hadoop.798687_97_4a2b_9510_00359f31c5ec.com需要根據現場提供給你的域名

二、文件準備好后可以將三個配置文件,放在自己項目中,也可以放在服務器的某個目錄下,只要確保項目啟動后能讀取到即可
我的目錄結構如下:


pom依賴:
我用的是華為云的Kafka依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>kafka-sample-01</artifactId><version>2.3.1.RELEASE</version><packaging>jar</packaging><name>kafka-sample-01</name><description>Kafka Sample 1</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0-hw-ei-302002</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 華為 組件 kafka ?start -->
<!--?? ??? ?<dependency>-->
<!--?? ??? ??? ?<groupId>com.huawei</groupId>-->
<!--?? ??? ??? ?<artifactId>kafka-clients</artifactId>-->
<!--?? ??? ??? ?<version>2.4.0</version>-->
<!--?? ??? ??? ?<scope>system</scope>-->
<!--?? ??? ??? ?<systemPath>${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-302002.jar</systemPath>-->
<!--?? ??? ?</dependency>--></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build><repositories><repository><id>huaweicloudsdk</id><url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><repository><id>central</id><name>Mavn Centreal</name><url>https://repo1.maven.org/maven2/</url></repository></repositories>
</project>

然后再SpringBoot項目啟動類如下:

package com.example;import com.common.Foo1;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;import java.io.File;
import java.util.HashMap;
import java.util.Map;/*** @author*/
@SpringBootApplication
public class Application {private final Logger logger = LoggerFactory.getLogger(Application.class);@Value("${fusioninsight.kafka.bootstrap-servers}")public String boostrapServers;@Value("${fusioninsight.kafka.security.protocol}")public String securityProtocol;@Value("${fusioninsight.kafka.kerberos.domain.name}")public String kerberosDomainName;@Value("${fusioninsight.kafka.sasl.kerberos.service.name}")public String kerberosServiceName;public static void main(String[] args) {
// ? ? ? ?String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main"
// ? ? ? ?String filePath = "D:\\Java\\workspace\\20231123MOSPT4eB\\sample-01\\src\\main\\resources\\";String filePath = "/home/yxxt/";System.setProperty("java.security.auth.login.config", filePath + "jaas.conf");System.setProperty("java.security.krb5.conf", filePath + "krb5.conf");SpringApplication.run(Application.class, args);}@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) {System.out.println(boostrapServers);ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),new FixedBackOff(0L, 2))); // dead-letter after 3 triesreturn factory;}@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}// 指定消費監聽,該topic有消息時立刻消費@KafkaListener(id = "fooGroup1", topics = "topic_ypgk")public void listen(ConsumerRecord<String, String> record) {System.out.println("監聽到了消息-----");logger.info("Received:消息監聽成功! " );System.out.println("監聽到了-----");System.out.println(record);
// ? ? ? ?if (foo.getFoo().startsWith("fail")) {
// ? ? ? ? ? ?// 觸發83行的 ErrorHandler,將異常數據寫入 topic名稱+.DLT的新topic中
// ? ? ? ? ? ?throw new RuntimeException("failed");
// ? ? ? ?}}// 創建topic,指定分區數、副本數
// ? ?@Bean
// ? ?public NewTopic topic() {
// ? ? ? ?return new NewTopic("topic1", 1, (short) 1);
// ? ?}@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("kerberos.domain.name", kerberosDomainName);return new KafkaAdmin(configs);}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new DefaultKafkaConsumerFactory<>(configs);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);return new KafkaTemplate<>(producerFactory);}
}

生產者:通過發送請求進行向主題里發送消息

package com.example;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import com.common.Foo1;/*** @author haosuwei**/
@RestController
public class Controller {@Autowiredprivate KafkaTemplate<String, String> template;@PostMapping(path = "/send/foo/{what}")public void sendFoo(@PathVariable String what) {Foo1 foo1 = new Foo1(what);this.template.send("topic1", foo1.toString());}}

運行成功,就可以監聽到主題消息了

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/166299.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/166299.shtml
英文地址,請注明出處:http://en.pswp.cn/news/166299.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Spring Boot 升級3.x 指南

Spring Boot 升級3.x 指南 1. 升級思路 先創建一個parent項目&#xff0c;打包類型為pom&#xff0c;繼承自spring boot的parent項目 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId&…

歷時三個月,我發布了一款外賣返錢小程序

近幾年&#xff0c;推廣外賣紅包爆火&#xff0c;各種推廣外賣紅包的公眾號層出不窮。于是&#xff0c;我就在想外賣紅包究竟是怎么一回事。就這樣&#xff0c;我帶著問題開始了關于外賣紅包的研究。 在研究的過程中&#xff0c;我開始了解商品聯盟、推廣分成、cps等一系列相關…

網絡攻擊當搭配什么產品比較好

網絡攻擊無處不在&#xff0c;當要時刻謹記 2014年&#xff0c;索尼影業受到黑客攻擊&#xff0c;導致公司內部文件和電子郵件泄露。 2015年&#xff0c;美國聯邦政府的辦公人員信息遭到盜竊&#xff0c;影響了超過2100萬人的個人信息。 2016年&#xff0c;Yahoo的3億用戶賬…

java 中集合之一【map】,map循環

在Java中&#xff0c;常用的集合框架有以下幾個&#xff1a; 1、List&#xff08;列表&#xff09;&#xff1a;List是有序的集合&#xff0c;允許包含重復元素。常用的實現類有ArrayList和LinkedList。ArrayList是基于動態數組實現的&#xff0c;支持快速隨機訪問&#xff1b;…

android之圖片選擇器--pictureselector

推薦一個安卓圖片/視頻/文件選擇器。簡單好用。 不多廢話。直接上代碼&#xff1a; 首先&#xff0c;添加依賴&#xff1a; //圖片選擇器api io.github.lucksiege:pictureselector:v3.11.1//圖片壓縮api io.github.lucksiege:compress:v3.11.1//圖片裁剪api io.github.lucksie…

Springboot3+vue3從0到1開發實戰項目(一)

一. 可以在本項目里面自由發揮拓展 二. 知識整合項目使用到的技術 后端開發 &#xff1a; Validation, Mybatis,Redis, Junit,SpringBoot3 &#xff0c;mysql&#xff0c;Swagger, JDK17 &#xff0c;項目部署 前端開發&#xff1a; Vue3&#xff0c;Vite&#xff0c;Router…

Java數組和集合

在Java中&#xff0c;數組和集合是兩個重要的概念&#xff0c;它們用于存儲和操作數據。本文將詳細介紹Java中的數組和集合&#xff0c;包括它們的定義、初始化、訪問和常見操作 一、數組&#xff08;Array&#xff09; 數組是一種用于存儲相同類型數據的容器&#xff0c;它可…

DNS的各種進階新玩法

你們好&#xff0c;我的網工朋友&#xff0c;今天和你聊聊DNS。 01 什么是DNS&#xff1f; mac地址誕生&#xff0c;可是太不容易記憶了&#xff0c;出現了簡化了IP形式&#xff0c;它被直接暴露給外網不說&#xff0c;還讓人類還是覺得比較麻煩&#xff0c;干脆用幾個字母算了…

【Git】一文教你學會 submodule 的增、刪、改、查

添加子模塊 $ git submodule add <url> <path>url 為想要添加的子模塊路徑path 為子模塊存放的本地路徑 示例&#xff0c;添加 r-tinymaix 為子模塊到主倉庫 ./sdk/packages/online-packages/r-tinymaix 路徑下&#xff0c;命令如下所示&#xff1a; $ git subm…

用自己熱愛的事賺錢,是多么的幸福

挖掘天賦可能有些困難&#xff0c;但挖掘愛好就簡單多啦&#xff01;最幸福的事情就是能用自己喜歡的事情賺錢。 我們要說的是一個博主&#xff0c;他非常喜歡騎自行車&#xff0c;雖然他的工作是在外貿公司做銷售&#xff0c;但每當有空時&#xff0c;他都會騎自行車。而且他…

Nginx同時支持Http和Https的配置詳解

當配置Nginx同時支持HTTP和HTTPS時&#xff0c;需要進行以下步驟&#xff1a; 安裝和配置SSL證書&#xff1a; 獲得SSL證書&#xff1a;從可信任的證書頒發機構&#xff08;CA&#xff09;或使用自簽名證書創建SSL證書。 將證書和私鑰保存到服務器&#xff1a;將SSL證書和私鑰…

spring 的事務隔離;Spring框架的事務管理的優點

文章目錄 說一下 spring 的事務隔離&#xff1f;Spring框架的事務管理有哪些優點&#xff1f;你更傾向用哪種事務管理類型&#xff1f; 聊一聊spring事務的隔離&#xff0c;事務的隔離對于一個系統來說也是非常重要的&#xff0c;直接上干貨&#xff01;&#xff01;&#xff0…

Python與設計模式--享元模式

10-Python與設計模式–享元模式 一、網上咖啡選購平臺 假設有一個網上咖啡選購平臺&#xff0c;客戶可以在該平臺上下訂單訂購咖啡&#xff0c;平臺會根據用戶位置進行 線下配送。假設其咖啡對象構造如下&#xff1a; class Coffee:name price 0def __init__(self,name):se…

Go iota簡介

當聲明枚舉類型或定義一組相關常量時&#xff0c;Go語言中的iota關鍵字可以幫助我們簡化代碼并自動生成遞增的值。本文檔將詳細介紹iota的用法和行為。 iota關鍵字 iota是Go語言中的一個預定義標識符&#xff0c;它用于創建自增的無類型整數常量。iota的行為類似于一個計數器…

數據庫基礎入門 — SQL排序與分頁

我是南城余&#xff01;阿里云開發者平臺專家博士證書獲得者&#xff01; 歡迎關注我的博客&#xff01;一同成長&#xff01; 一名從事運維開發的worker&#xff0c;記錄分享學習。 專注于AI&#xff0c;運維開發&#xff0c;windows Linux 系統領域的分享&#xff01; 本…

[深度理解] 重啟 Splunk Search Head Cluster

1: 背景: 關于釋放Splunk search head 的job 運行壓力:splunk search head cluster 要重啟的話,怎么辦? 答案是:splunk rolling-restart shcluster-members Initiate a rolling restart from the command line Invoke the splunk rolling-restart command from any me…

3款免費次數多且功能又強大的國產AI繪畫工具

hi&#xff0c;同學們&#xff0c;本期是我們第55 期 AI工具教程 最近兩個月&#xff0c;國內很多AI繪畫軟件被關停&#xff0c;國外絕大部分AI繪畫工具費用不低&#xff0c;因此 這兩天我 重新整理 國產 AI繪畫 工具 &#xff0c; 最終 篩選了 3款功能強大&#xf…

LeeCode前端算法基礎100題(3)- N皇后

一、問題詳情&#xff1a; 按照國際象棋的規則&#xff0c;皇后可以攻擊與之處在同一行或同一列或同一斜線上的棋子。 n 皇后問題 研究的是如何將 n 個皇后放置在 nn 的棋盤上&#xff0c;并且使皇后彼此之間不能相互攻擊。 給你一個整數 n &#xff0c;返回所有不同的 n 皇后…

虛擬機系列:vmware和Oracle VM VirtualBox虛擬機的區別,簡述哪一個更適合我?以及相互轉換

一. VMware和Oracle VM VirtualBox虛擬機的區別主要體現在以下幾個方面: 首先兩種軟件的安裝使用教程如下: VMware ESXI 安裝使用教程 Oracle VM VirtualBox安裝使用教程 商業模式:VMware是一家商業公司,而Oracle VM VirtualBox是開源軟件; 功能:VMware擁有更多的功能和…

Leetcode200. 島嶼數量

Every day a Leetcode 題目來源&#xff1a;200. 島嶼數量 解法1&#xff1a;深度優先搜索 設目前指針指向一個島嶼中的某一點 (i, j)&#xff0c;尋找包括此點的島嶼邊界。 從 (i, j) 向此點的上下左右 (i1,j)&#xff0c;(i-1,j)&#xff0c;(i,j1)&#xff0c;(i,j-1) …