spring-boot2.x整合Kafka步驟

1.pom依賴添加

<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version><relativePath /> <!-- lookup parent from repository --></parent><dependencies><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></exclusion><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>

2. 配置文件添加配置:

server:port: 8080spring:application:name: application-kafkakafka:bootstrap-servers: 192.168.190.100:9092,192.168.190.101:9092 #這個是kafka的地址,對應你server.properties中配置的producer:batch-size: 16384 #批量大小acks: 1 #應答級別:多少個分區副本備份完成時向生產者發送ack確認(可選0、1、all/-1)retries: 10 # 消息發送重試次數#transaction-id-prefix: transactionbuffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延遲#partitioner: #指定分區器#class: com.test.config.CustomerPartitionHandlerconsumer:group-id: testGroup,testg2 #默認的消費組IDenable-auto-commit: true #是否自動提交offsetauto-commit-interval: 2000 #提交offset延時# 當kafka中沒有初始offset或offset超出范圍時將自動重置offset# earliest:重置為分區中最小的offset;# latest:重置為分區中最新的offset(消費分區中新產生的數據);# none:只要有一個分區不存在已提交的offset,就拋出異常;auto-offset-reset: latestmax-poll-records: 500 #單次拉取消息的最大條數key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 120000 # 消費會話超時時間(超過這個時間 consumer 沒有發送心跳,就會觸發 rebalance 操作)request:timeout:ms: 18000 # 消費請求的超時時間listener:missing-topics-fatal: false # consumer listener topics 不存在時,啟動項目就會報錯
#      type: batchlogging:config: classpath:log4j2.xml

3. 日志配置

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO"><!--全局參數--><Properties><Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property>
<!--         <Property name="logDir">/data/logs/logViewer</Property> --><Property name="logDir">logs</Property></Properties><Appenders><!-- 定義輸出到控制臺 --><Console name="console" target="SYSTEM_OUT" follow="true"><!--控制臺只輸出level及以上級別的信息-->
<!--             <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> --><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout></Console><!-- 同一來源的Appender可以定義多個RollingFile,定義按天存儲日志 --><RollingFile name="rolling_file"fileName="${logDir}/logViewer.log"filePattern="${logDir}/logViewer_%d{yyyy-MM-dd}.log"><ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout><Policies><TimeBasedTriggeringPolicy interval="1"/></Policies><!-- 日志保留策略,配置只保留七天 --><DefaultRolloverStrategy><Delete basePath="${logDir}/" maxDepth="1"><IfFileName glob="logViewer_*.log" /><IfLastModified age="7d" /></Delete></DefaultRolloverStrategy></RollingFile></Appenders><Loggers><Root level="INFO"><AppenderRef ref="console"/><AppenderRef ref="rolling_file"/></Root></Loggers>
</Configuration>

4. controller入口類,其它應用通過該接口直接將數據寫入kafka

@RequestMapping(value="/kafka")
@Controller
public class ProducerController {@Autowiredprivate KafkaTemplate kafkaTemplate;//    模擬發送消息@RequestMapping(value = "/send",method = RequestMethod.GET)public String sendMessage(@PathParam(value = "msg") String msg) {System.out.println("收到get請求。。。");kafkaTemplate.send("test",msg);return "成功";}

5. kafka回調方法(需要回調通知時使用該方式):

    @GetMapping("/kafka/callbackTwo/{message}")public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("test", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("發送消息失敗2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("發送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});}

6.kafka消費者注冊

@Component
public class KafkaMessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);/*** containerFactory* 消息過濾器消息過濾器可以在消息抵達consumer之前被攔截,在實際應用中,我們可以根據自己的業務邏輯,篩選出需要的信息再交由KafkaListener處理,不需要的消息則過濾掉。配置消息過濾只需要為 監聽器工廠 配置一個RecordFilterStrategy(消息過濾策略),返回true的時候消息將會被拋棄,返回false時,消息能正常抵達監聽容器。* @param record*/@KafkaListener(topics = {"test","test2"},groupId = "testGroup")public void listenTestStatus(ConsumerRecord<?, ?> record) {LOGGER.info("接收到消息:開始業務處理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空數據,跳過");}else {LOGGER.info("test-topics -->kafka監聽到的值為: {}", record.value().toString());}}@KafkaListener(topics = {"test"},groupId = "testg2")public void listenTest2(ConsumerRecord<?, ?> record) {LOGGER.info("###listenTest2接收到消息:開始業務處理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空數據,跳過");}else {LOGGER.info("test-topics -->kafka監聽到的值為: {}", record.value().toString());}}/*** id:消費者IDgroupId:消費組IDtopics:監聽的topic,可監聽多個topicPartitions:可配置更加詳細的監聽信息,可指定topic、parition、offset監聽,手動分區* @param records*///批量消費@KafkaListener(id = "consumer2", topics = {"test"}, groupId = "testGroup",errorHandler = "consumerAwareErrorHandler")public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {System.out.println(">>> 批量消費一次,recoreds.size()=" + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println(record.value());}}

7.非spring-boot環境下使用java原生API手寫kafka生產消息:

 public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:設置發送者相關屬性Properties props = new Properties();// 此處配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化類props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:構建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:發送消息//單向發送:不關心服務端的應答。producer.send(record);System.out.println("message "+i+" sended");}//消息處理完才停止發送者。producer.close();}

8.非spring-boot環境下使用java原生API手寫java手寫kafka消費者:

 public static void main(String[] args) {//PART1:設置發送者相關屬性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);//每個消費者要指定一個groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化類props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化類props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {//PART2:拉取消息// 100毫秒超時時間ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//PART3:處理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不會重復推送。consumer.commitSync(); //同步提交,表示必須等到offset提交完畢,再去消費下一批數據。
//            consumer.commitAsync(); //異步提交,表示發送完提交offset請求后,就開始消費下一批數據了。不用等到Broker的確認。}

9.非spring-boot環境下使用java原生API手寫異步發送kafka:

 public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:設置發送者相關屬性Properties props = new Properties();// 此處配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);// 配置key的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化類props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:構建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:發送消息//異步發送:消息發送后不阻塞,服務端有應答后會觸發回調函數producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息發送失敗,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息處理完才停止發送者。latch.await();//消息處理完才停止發送者。producer.close();}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/45909.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/45909.shtml
英文地址,請注明出處:http://en.pswp.cn/web/45909.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

自學鴻蒙HarmonyOS的ArkTS語言<十二>wrapBuilder:組件工廠類封裝

// FactoryComponent.ets Builder function Radio1() {Column() {Text(單選組件&#xff1a;)Row() {Radio({ value: 1, group: radioGroup })Text(選項1)}Row() {Radio({ value: 2, group: radioGroup })Text(選項2)}}.margin(10) }Builder function Checkbox1() {Column() {T…

DP(5) | 完全背包 | Java | 卡碼52, LeetCode 518, 377, 70 做題總結

完全背包 感覺越寫越糊涂了&#xff0c;初始化怎么做的&#xff1f;遞推公式怎么來的&#xff1f; 卡碼52. 攜帶研究材料 https://kamacoder.com/problempage.php?pid1052 import java.util.*;public class Main {public static void main(String[] args) {Scanner sc new …

Java面試八股之Redis集群是怎么選擇數據庫的

在Redis集群中&#xff0c;數據被水平分割&#xff08;sharding&#xff09;到各個節點上&#xff0c;這意味著所有的鍵空間被分成16384個哈希槽&#xff08;hash slots&#xff09;&#xff0c;這些槽均勻地分布在集群中的各個節點上。Redis集群并不支持傳統的數據庫切換&…

xiuno兔兔超級SEO插件(精簡版)

xiuno論壇是一個一款輕論壇產品的論壇&#xff0c;但是對于這個論壇基本上都是用插件實現&#xff0c;一個論壇怎么能離開網站seo&#xff0c;本篇分享一個超級seo插件&#xff0c;自動sitemap、主動提交、自動Ping提交。 插件下載:tt_seo.zip

實驗11 數據庫日志及數據庫恢復

一、 實驗目的 了解Mysql數據庫系統中數據恢復機制和主要方法。 二、 實驗環境 操作系統&#xff1a;Microsoft Windows 7旗艦版&#xff08;32&64位&#xff09;/Linux。 硬件&#xff1a;容量足以滿足MySQL 5.7&#xff08;8.0&#xff09;安裝及后續實驗的使用。 軟件…

Python | Leetcode Python題解之第232題用棧實現隊列

題目&#xff1a; 題解&#xff1a; class MyQueue:def __init__(self):self.A, self.B [], []def push(self, x: int) -> None:self.A.append(x)def pop(self) -> int:peek self.peek()self.B.pop()return peekdef peek(self) -> int:if self.B: return self.B[-1…

什么叫圖像的中值濾波,并附利用OpenCV和MATLB實現均值濾波的代碼

圖像的中值濾波&#xff08;Median Filtering&#xff09;是一種非線性數字濾波技術&#xff0c;常用于圖像處理以減少噪聲&#xff0c;同時保留圖像邊緣細節。其基本思想是用圖像中某個窗口內像素的中值替代該窗口中心像素的值。具體步驟如下&#xff1a; 選擇窗口&#xff1a…

C++樹(二)【直徑,中心】

目錄&#xff1a; 樹的直徑&#xff1a; 樹的直徑的性質&#xff1a; 性質1&#xff1a;直徑的端點一定是葉子節點 性質2&#xff1a;任意點的最長鏈端點一定是直徑端點。 性質3&#xff1a;如果一棵樹有多條直徑,那么它們必然相交&#xff0c;且有極長連…

STM32中PC13引腳可以當做普通引腳使用嗎?如何配置STM32的TAMPER?

1.STM32中PC13引腳可以當做普通引腳使用嗎&#xff1f; 在STM32單片機中&#xff0c;PC13引腳可以作為普通IO使用&#xff0c;但需要進行一定的配置。PC13通常與RTC侵入檢測功能&#xff08;TAMPER&#xff09;復用&#xff0c;因此需要關閉TAMPER功能才能將其作為普通IO使用。…

服務端渲染框架:Nuxt.js 與 Next.js 的區別和對比

&#x1f49d;&#x1f49d;&#x1f49d;歡迎蒞臨我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;希望您在這里可以感受到一份輕松愉快的氛圍&#xff0c;不僅可以獲得有趣的內容和知識&#xff0c;也可以暢所欲言、分享您的想法和見解。 推薦:「stormsha的主頁」…

2024國家護網面試小結

24年國護馬上就要開始&#xff0c;基本上大部分藍隊紅隊都已經準備入場了 今年護網第一年變成常態化護網&#xff0c;由十五天突然變成了兩個月常態化&#xff0c;導致今年護網有很多項目整的七零八落 博主今年參加了三家廠商藍隊護網面試&#xff0c;在這邊分享一下護網面試…

掌握這些技巧,讓你成為畫冊制作高手

在數字化的時代背景下&#xff0c;電子畫冊以其便捷的傳播方式、豐富的視覺表現形式&#xff0c;贏得了大眾的喜愛。它不僅能夠在個人電腦上展現&#xff0c;還能通過智能手機、平板電腦等多種移動設備隨時隨地被訪問和瀏覽。這種跨平臺的支持&#xff0c;使得無論你身處何地&a…

Html_Css問答集(12)

99、將上例的0%改為30%&#xff0c;會如何變化&#xff1f; none:延遲2秒間無色&#xff0c;3.8秒&#xff08;0%-30%占1.8秒&#xff09;前無色&#xff0c;之后變紅到5秒綠最后藍&#xff0c;動畫結束時恢復初始&#xff08;無色&#xff09;。 forward:延遲2秒間無色&am…

leetcode刷題總結——字符串匹配

KMP&#xff08;字符串匹配算法&#xff09; 主串或目標串&#xff1a;比較長的&#xff0c;我們就是在它里面尋找子串是否存在&#xff1b; 子串或模式串&#xff1a;比較短的。 前綴&#xff1a;字符串A和B&#xff0c;A BS&#xff0c;S非空&#xff0c;則B為A的前綴。 …

婚禮成本與籌備策略:一場夢幻婚禮的理性規劃

婚禮成本與籌備策略&#xff1a;一場夢幻婚禮的理性規劃 摘要 婚禮&#xff0c;作為人生中的重要儀式&#xff0c;承載著新人的愛情與夢想&#xff0c;同時也伴隨著不菲的經濟投入。本文旨在探討婚禮所需的大致成本、影響成本的主要因素以及婚禮籌備過程中的關鍵注意事項&…

【Java--數據結構】二叉樹

歡迎關注個人主頁&#xff1a;逸狼 創造不易&#xff0c;可以點點贊嗎~ 如有錯誤&#xff0c;歡迎指出~ 樹結構 樹是一種非線性的數據結構&#xff0c;它是由n&#xff08;n>0&#xff09;個有限結點組成一個具有層次關系的集合 注意&#xff1a;樹形結構中&#xff0c;子…

Transformer模型在多任務學習中的革新應用

在深度學習領域&#xff0c;多任務學習&#xff08;Multi-task Learning, MTL&#xff09;是一種訓練模型以同時執行多個任務的方法。這種方法可以提高模型的泛化能力&#xff0c;因為它允許模型在不同任務之間共享知識。近年來&#xff0c;Transformer模型因其在自然語言處理&…

【linux高級IO(三)】初識epoll

&#x1f493;博主CSDN主頁:杭電碼農-NEO&#x1f493; ? ?專欄分類:Linux從入門到精通? ? &#x1f69a;代碼倉庫:NEO的學習日記&#x1f69a; ? &#x1f339;關注我&#x1faf5;帶你學更多操作系統知識 ? &#x1f51d;&#x1f51d; Linux高級IO 1. 前言2. 初識e…

STM32 HRTIM生成PWM時遇到無法輸出PWM脈沖波形問題

在使用HRTIM生成PWM時&#xff0c;當把周期寄存器更新的設置放到while循環中時&#xff0c;無法輸出PWM脈沖波形&#xff0c;即使增加計數延時也無法輸出&#xff0c;最終只能放到中斷函數中執行后期寄存器值更新才能夠生成PWM脈沖波形。

主流大數據調度工具DolphinScheduler之數據ETL流程

今天給大家分享主流大數據調度工具DolphinScheduler&#xff0c;以及數據的ETL流程。 一&#xff1a;調度工具DS 主流大數據調度工具DolphinScheduler&#xff0c; 其定位&#xff1a;解決數據處理流程中錯綜復雜的依賴關系 任務支持類型&#xff1a;支持傳統的shell任務&a…