KafkaStream:Springboot中集成

1、在kafka-demo中創建配置類

? ? ? ? 配置kafka參數

package com.heima.kafkademo.config;import lombok.Data;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通過重新注冊KafkaStreamsConfiguration對象,設置自定配置參數*/@Data
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

2、在application.yml中配置上面配置類需要的參數

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

3、新增配置類,創建KStream對象,進行聚合

package com.heima.kafkademo.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//創建kstream對象,同時指定從那個topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根據value進行聚合分組.groupBy((key,value)->value)//聚合計算時間間隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求單詞的個數.count().toStream()//處理后的結果轉換為string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//發送消息.to("itcast-topic-out");return stream;}
}

4、啟動kafka-demo服務測試

? ? ? ? 使用生產者發送消息可以看到控制臺接收成功

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/39623.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/39623.shtml
英文地址,請注明出處:http://en.pswp.cn/news/39623.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

8月11日上課內容 nginx的多實例和動靜分離

多實例部署 在一臺服務器上有多個tomcat的服務。 配置多實例之前&#xff0c;看單個實例是否訪問正常。 1.安裝好 jdk 2.安裝 tomcat cd /opt tar zxvf apache-tomcat-9.0.16.tar.gz mkdir /usr/local/tomcat mv apache-tomcat-9.0.16 /usr/local/tomcat/tomcat1 cp -a /u…

Linux系統管理:虛擬機ESXi安裝

目錄 一、理論 1.VMware Workstation 2.VMware vSphere Client 3.ESXi 二、實驗 1.ESXi 7安裝 一、理論 1.VMware Workstation 它是一款專業的虛擬機軟件&#xff0c;可以在一臺物理機上運行多個操作系統&#xff0c;支持Windows、Linux等操作系統&#xff0c;可以模擬…

使用selenium如何實現自動登錄

回顧使用requests如何實現自動登錄一文中&#xff0c;提到好多網站在我們登錄過后&#xff0c;在之后的某段時間內訪問該網頁時&#xff0c;不會給出請登錄的提示&#xff0c;時間到期后就會提示請登錄&#xff01;這樣在使用爬蟲訪問網頁時還要登錄&#xff0c;打亂我們的節奏…

item_get_sales-獲取商品銷量詳情

一、接口參數說明&#xff1a; item_get_sales-獲取商品銷量詳情&#xff0c;點擊更多API調試&#xff0c;請移步注冊API賬號點擊獲取測試key和secret 公共參數 請求地址: https://api-gw.onebound.cn/taobao/item_get_sales 名稱類型必須描述keyString是調用key&#xff08…

ACM模式刷Leetcode題目

139題單詞拆分 鏈接: link #include<iostream> #include<sstream> #include<string> #include<vector> #include<algorithm> #include<unordered_set> using namespace std;int main() {// 實現輸入第一行為s字符串。// 第二行為wordDic…

【代碼隨想錄day22】爬樓梯

題目 假設你正在爬樓梯。需要 n 階你才能到達樓頂。 每次你可以爬 1 或 2 個臺階。你有多少種不同的方法可以爬到樓頂呢&#xff1f; 示例 1&#xff1a; 輸入&#xff1a;n 2 輸出&#xff1a;2 解釋&#xff1a;有兩種方法可以爬到樓頂。 1. 1 階 1 階 2. 2 階 示例 2…

Spring的三種異常處理方式

1.SpringMVC 異常的處理流程 異常分為編譯時異常和運行時異常&#xff0c;編譯時異常我們 try-cache 進行捕獲&#xff0c;捕獲后自行處理&#xff0c;而運行時異常是不 可預期的&#xff0c;就需要規范編碼來避免&#xff0c;在SpringMVC 中&#xff0c;不管是編譯異常還是運行…

java:JDBC

文章目錄 什么是JDBCJDBC使用步驟詳解各個對象DriverManagerConnectionStatementResultSetPreparedStatement JDBC控制事務操作步驟示例 什么是JDBC 我們知道&#xff0c;數據庫有很多種&#xff0c;比如 mysql&#xff0c;Oracle&#xff0c;DB2等等&#xff0c;如果每一種數…

C# WPF 中 外部圖標引入iconfont,無法正常顯示問題 【小白記錄】

wpf iconfont 外部圖標引入&#xff0c;無法正常顯示問題。 1. 檢查資源路徑和引入格式是否正確2. 檢查資源是否包含在程序集中 1. 檢查資源路徑和引入格式是否正確 正確的格式&#xff0c;注意字體文件 “xxxx.ttf” 應寫為 “#xxxx” <TextBlock Text"&#xe7ae;…

不重啟Docker能添加自簽SSL證書鏡像倉庫嗎?

應用背景 在企業應用Docker規劃初期配置非安全鏡像倉庫時&#xff0c;有時會遺漏一些倉庫沒配置&#xff0c;但此時應用程序已經在Docker平臺上部署起來了&#xff0c;體量越大就越不會讓人去直接重啟Docker。 那么&#xff0c;不重啟Docker能添加自簽SSL證書鏡像倉庫嗎&…

經典人體模型SMPL介紹(一)

SMPL是馬普所提出的經典人體模型&#xff0c;目前已成為姿態估計、人體重建等領域必不可少的基礎先驗。SMPL基于蒙皮和BlendShape實現&#xff0c;從數千個三維人體掃描結果得來&#xff0c;后通過PCA統計學習得來。 論文&#xff1a;SMPL: A Skinned Multi-Person Linear Mode…

Python讀取svn版本

本文將詳細介紹如何使用Python讀取svn版本。 一、安裝svn庫 首先&#xff0c;我們需要使用Python來連接svn服務器&#xff0c;并獲取版本號。這里我們使用pysvn庫來完成這個工作。 pip install pysvn需要注意的是&#xff0c;如果你需要安裝特定版本的pysvn&#xff0c;你可…

2023連鎖收銀系統該如何選?值得推薦的5款連鎖收銀系統

現在不管是連鎖店還是零售店&#xff0c;只要是開店做生意賺錢的&#xff0c;都少不了要和錢打交道&#xff0c;尤其是對連鎖店來說&#xff0c;收銀工作更是重中之重。 連鎖店涉及的門店較多&#xff0c;必須要有一套足夠優秀的連鎖收銀系統&#xff0c;才能做好每個門店的收銀…

【ARM 嵌入式 編譯系列 5 -- GCC 內建函數 __builtin 詳細介紹】

文章目錄 什么是GCC內建函數?GCC 常見內建函數GCC內建函數使用示例上篇文章:ARM 嵌入式 編譯系列 4.2 – GCC 鏈接規范 extern “C“ 介紹 下篇文章:ARM 嵌入式 編譯系列 6 – GCC objcopy, objdump, readelf, nm 介紹 什么是GCC內建函數? GCC提供了一些專門的功能,用于…

使用 `tailwindcss-patch@2` 來提取你的類名吧

使用 tailwindcss-patch2 來提取你的類名吧 使用 tailwindcss-patch2 來提取你的類名吧 安裝使用方式 命令行 Cli 開始提取吧 Nodejs API 的方式來使用 配置 初始化 What’s next? tailwindcss-patch 是一個 tailwindcss 生態的擴展項目。也是 tailwindcss-mangle 項目重要…

redis的Key的過期策略是如何實現的?

Key的過期策略 一個redis中可能同時存在很多很多key&#xff0c;這些key可能有很大一部分都有過期時間&#xff0c;此時&#xff0c;redis服務器咋知道哪些key已經過期要被刪除&#xff0c;哪些key還沒有過期&#xff1f; 如果直接遍歷所有的key&#xff0c;顯然是行不通的&am…

Abandon_Ubuntu Declaration

鑒于以下幾個原因&#xff0c;持續到明年考研結束&#xff0c;我將不再搗鼓ubuntu和任何linux系統&#xff0c; 原因如下&#xff1a; ubuntu23.04不支持wps編輯pdf這個核心功能&#xff0c;且開機向canonial公司發送遠程遙測&#xff0c;暫時不會用iptables禁用&#xff0c;故…

第幾天(day)

廬陽區2021年信息學競賽試題 題目描述 Description 給定一個日期&#xff0c;求這一天是當年的第幾天。每年的元旦&#xff0c;1月1日&#xff0c;都是每年的第一天&#xff0c;但是每年的最后一天&#xff0c;12月31日&#xff0c;有可能是第365天&#xff0c;也有可能是第3…

2023年上半年網絡工程師上午真題及答案解析

1.固態硬盤的存儲介質是( )。 A.光盤 B.閃存 C.軟盤 D.磁盤 2.虛擬存儲技術把( )有機地結合起來使用&#xff0c;從而得到一個更大容量的“內存”。 A.內存與外存 B.Cache與內存 C.寄存器與Cache D.Cache與外存 3.下列接口協議中&…

關于安卓高版本gradle(7.0+)引入aar包報錯問題

背景 項目開發過程中&#xff0c;接入三方sdk&#xff0c;引入了本地aar包依賴&#xff0c;as rebuild項目的過程中&#xff0c;報錯&#xff0c;提示依賴找不到問題。 報錯&#xff1a;“bundleDebugAar FAILED”等 開發環境 win10 jdk11 gradle 7.5 原因 由于gradle的版…