KafkaStream:基本使用

簡介:

? ? ? ? kafkaStream:提供了對存儲在kafka中的數據進行流式處理和分析的功能

特點:

????????KafkasSream提供了一個非常簡單輕量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入門案例:

? 1、新建工程kafka-demo

? ? ? ? ? ?引入kafkaStream依賴

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--kafkaStream--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

? ?2、新建流式處理類

? ? ? ? ? 代碼如下

package com.heima.kafkademo.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*
* 流式處理
* */
public class KafkaStreamQuickStart {public static void main(String[] args) {/*創建kafka配置中心并配置參數*/Properties prop = new Properties();//連接地址prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key序列化prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//value序列化prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//創建id名稱prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream構造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式計算streamProcessor(streamsBuilder);//創建KafkaStream對象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//開啟流式計算kafkaStreams.start();}//流式計算方法private static void streamProcessor(StreamsBuilder streamsBuilder) {//創建kafka對象,同時指定從哪個topic獲取消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");//處理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<?>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})      //按照value進行聚合.groupBy((key,value)->value)//時間窗口,每隔10秒更新一次.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//統計單詞個數.count()//轉換為kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//發送消息.to("itcast-topic-out");}
}

3、啟動消費者類和流式處理類監聽消息

? ? ? ? 使用生產者類發送消息

? ? ? ?消費者和生產者類代碼參考Kafka:安裝和配置_Success___的博客-CSDN博客

4、測試

? ? ? ? 成功接收到消息

?

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/39370.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/39370.shtml
英文地址,請注明出處:http://en.pswp.cn/news/39370.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

jenkins自動化部署Jenkinsfile文件配置

簡介 使用jenkins部署時會讀取項目中Jenkinsfile文件&#xff0c;文件配置不對會導致部署失敗 文件內容 pipeline {agent anyparameters {string(name: project_name, defaultValue: xxx1, description: 項目jar名稱)string(name: version, defaultValue: xxx2, description…

【Apollo】阿波羅自動駕駛:塑造自動駕駛技術的未來

前言 Apollo (阿波羅)是一個開放的、完整的、安全的平臺&#xff0c;將幫助汽車行業及自動駕駛領域的合作伙伴結合車輛和硬件系統&#xff0c;快速搭建一套屬于自己的自動駕駛系統。 開放能力、共享資源、加速創新、持續共贏是 Apollo 開放平臺的口號。百度把自己所擁有的強大、…

Java之SpringCloud Alibaba【四】【微服務 Sentinel服務熔斷】

Java之SpringCloud Alibaba【四】【微服務 Sentinel服務熔斷】 一、分布式系統遇到的問題1、服務掛掉的一些原因 二、解決方案三、Sentinel&#xff1a;分布式系統的流量防衛兵1、Sentinel是什么2、Sentinel和Hystrix對比3、Sentinel快速開發4、通過注解的方式來控流5、啟動Sen…

DoIP學習筆記系列:(五)“安全認證”的.dll從何而來?

文章目錄 1. “安全認證”的.dll從何而來?1.1 .dll文件base1.2 增加客戶需求算法傳送門 DoIP學習筆記系列:導航篇 1. “安全認證”的.dll從何而來? 無論是用CANoe還是VFlash,亦或是編輯cdd文件,都需要加載一個與$27服務相關的.dll(Windows的動態庫文件),這個文件是從哪…

Go 流程控制

if語句使用 package mainimport "fmt"func main() {score : 700if score 700 {fmt.Println("清華")}//if支持一個初始化語句 初始化語句和條件判斷用;分割if a : 700; a 700 {fmt.Println("清華")}}清華 清華if_else使用 package mainimpor…

機器學習深度學習——seq2seq實現機器翻譯(數據集處理)

&#x1f468;?&#x1f393;作者簡介&#xff1a;一位即將上大四&#xff0c;正專攻機器學習的保研er &#x1f30c;上期文章&#xff1a;機器學習&&深度學習——從編碼器-解碼器架構到seq2seq&#xff08;機器翻譯&#xff09; &#x1f4da;訂閱專欄&#xff1a;機…

yolo源碼注釋1——文件結構

代碼基于yolov5 v6.0 目錄&#xff1a; yolo源碼注釋1——文件結構yolo源碼注釋2——數據集配置文件yolo源碼注釋3——模型配置文件yolo源碼注釋4——yolo-py datasets # 用于存放數據集的默認文件夾yolov5 data # 模型訓練的超參數配置文件以及數據集配置文件 hyps # 存放超參…

C語言學習錯題集(五)

1.最大公倍數的求法(gcd已知) 2.報數 3.字符串最后必須有’\0’!!! 4.例題 5.例題 6.例題 1.最大公倍數的求法(gcd已知) int lcmgcd*(a/gcd)*(b/gcd);2.報數 報數游戲是這樣的&#xff1a;有n個人圍成一圈&#xff0c;按順序從1到n編好號。從第一個人開始報數&#xff0c;報到…

工程項目管理系統源碼+功能清單+項目模塊+spring cloud +spring boot em

? 工程項目管理軟件&#xff08;工程項目管理系統&#xff09;對建設工程項目管理組織建設、項目策劃決策、規劃設計、施工建設到竣工交付、總結評估、運維運營&#xff0c;全過程、全方位的對項目進行綜合管理 工程項目各模塊及其功能點清單 一、系統管理 1、數據字典&#…

代碼保護 code protection

為什么要做代碼保護&#xff1f; 為了保護知識產權并讓攻擊者的利用更加困難&#xff0c;組織應該為其軟件的逆向工程設置障礙(例如&#xff0c;反篡改、調試保護、反盜版特性、運行時完整性)&#xff0c;增加攻擊者分析和利用你的軟件所需的投入。代碼保護對于廣泛分布的代碼…

Markdown使用筆記

Markdown使用筆記 一、段落與強調 important denotes the impossible thing to do Because your ugly appearance, you cannot have a happy ending. 使用*括起來的為斜體 使用**括起來的是粗體 使用~~括起來的是刪除線 在句子后面添加<br>即可換行 二、標題 在…

常見期權策略類型有哪些?

這幾天在做一個期權策略類型的整理分類&#xff0c;怎么解釋期權策略&#xff0c;期權策略是現代金融市場中運用非常廣泛、變化非常豐富、結構非常精妙的金融衍生產品&#xff1b;同時也是一種更為復雜也更為靈活的投資工具&#xff0c;下文介紹常見期權策略類型有哪些&#xf…

iptables安全技術和防火墻

通信五元素 源ip和目標ip 源端口和目標端口 協議 通信四元素 源ip和目標ip 源端口和目標端口 iptables表鏈結構 Netfilter Linux防火墻是由Netfilter組件提供的&#xff0c;Netfilter工作在內核空間&#xff0c;集成在linux內核中 Netfilter是Linux 2.4.x之后新一代的Li…

CI/CD流水線實戰

不知道為什么&#xff0c;現在什么技術都想學&#xff0c;因為我覺得我遇到了技術的壁壘&#xff0c;大的項目接觸不到&#xff0c;做的項目一個字辣*。所以&#xff0c;整個人心浮氣躁&#xff0c;我已經得通過每天的騎行和長跑緩解這種浮躁了。一個周末&#xff0c;我再次宅在…

k8s問題匯總

作者前言 本文章為記錄使用k8s遇到的問題和解決方法&#xff0c;文章持續更新中… 目錄 作者前言正常配置ingress&#xff0c;但是訪問錯誤添加工作節點報錯安裝k8s報錯使用kubectl命令報錯container沒有運行安裝會出現kubelet異常&#xff0c;無法識別刪除k8s集群訪問dashboa…

Docker安裝RabbitMQ單機版

Docker安裝RabbitMQ單機版 先安裝Docker服務&#xff0c;可參考安裝Docker及學習 編寫rabbitmq-composefile.yml文件 這里以rabbitmq 3.11.16 版本為例 cat << \EOF > /opt/rabbitmq-composefile.yml version: 3 services:rabbitmq:image: rabbitmq:3.10.0-managem…

【Apollo】推動創新:探索阿波羅自動駕駛的進步(含安裝 Apollo的詳細教程)

前言 Apollo (阿波羅)是一個開放的、完整的、安全的平臺&#xff0c;將幫助汽車行業及自動駕駛領域的合作伙伴結合車輛和硬件系統&#xff0c;快速搭建一套屬于自己的自動駕駛系統。 開放能力、共享資源、加速創新、持續共贏是 Apollo 開放平臺的口號。百度把自己所擁有的強大、…

【密碼學】維京密碼

維京密碼 瑞典羅特布魯納巨石上的圖案看起來毫無意義&#xff0c;但是它確實是一種維京密碼。如果我們注意到每組圖案中長筆畫和短筆畫的數量&#xff0c;將得到一組數字2、4、2、3、3、5、2、3、3、6、3、5。組合配對得到24、23、35、23、36、35。現在考慮如圖1.4所示的內容&a…

【變形金剛03】使用 Pytorch 開始構建transformer

一、說明 在本教程中&#xff0c;我們將使用 PyTorch 從頭開始構建一個基本的轉換器模型。Vaswani等人在論文“注意力是你所需要的一切”中引入的Transformer模型是一種深度學習架構&#xff0c;專為序列到序列任務而設計&#xff0c;例如機器翻譯和文本摘要。它基于自我注意機…

iOS Epub閱讀器改造記錄

六個月前在這個YHEpubDemo閱讀器的基礎上做了一些優化&#xff0c;這里做一下記錄。 1.首行縮進修復 由于分頁的存在&#xff0c;新的一頁的首行可能是新的一行&#xff0c;則應該縮進&#xff1b;也可能是前面一頁段落的延續&#xff0c;這時候不應該縮進。YHEpubDemo基于XDS…