實例演示Kafka-Stream消息流式處理流程及原理

以下結合案例:統計消息中單詞出現次數,來測試并說明kafka消息流式處理的執行流程

Maven依賴

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies>

準備工作

首先編寫創建三個類,分別作為消息生產者、消息消費者、流式處理者
KafkaStreamProducer:消息生產者

public class KafkaStreamProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//kafka的連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//發送失敗,失敗的重試次數properties.put(ProducerConfig.RETRIES_CONFIG, 5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");producer.send(producerRecord);}producer.close();}
}

該消息生產者向主題kafka-stream-topic-input發送五次hello kafka
KafkaStreamConsumer:消息消費者

public class KafkaStreamConsumer {public static void main(String[] args) {Properties properties = new Properties();//kafka的連接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//消費者組properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//手動提交偏移量properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//訂閱主題consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));try {while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("consumerRecord.key() = " + consumerRecord.key());System.out.println("consumerRecord.value() = " + consumerRecord.value());}// 異步提交偏移量consumer.commitAsync();}} catch (Exception e) {e.printStackTrace();} finally {// 同步提交偏移量consumer.commitSync();}}
}

KafkaStreamQuickStart:流式處理類

public class KafkaStreamQuickStart {public static void main(String[] args) {Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");StreamsBuilder streamsBuilder = new StreamsBuilder();//流式計算streamProcessor(streamsBuilder);KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);kafkaStreams.start();}/*** 消息格式:hello world hello world* 配置并處理流數據。* 使用StreamsBuilder創建并配置KStream,對輸入的主題中的數據進行處理,然后將處理結果發送到輸出主題。* 具體處理包括:分割每個消息的值,按值分組,對每個分組在10秒的時間窗口內進行計數,然后將結果轉換為KeyValue對并發送到輸出主題。** @param streamsBuilder 用于構建KStream對象的StreamsBuilder。*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 從"kafka-stream-topic-input"主題中讀取數據流KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");System.out.println("stream = " + stream);// 將每個值按空格分割成數組,并將數組轉換為列表,以擴展單個消息的值stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);})// 按消息的值進行分組,為后續的窗口化計數操作做準備.groupBy((key, value) -> value)// 定義10秒的時間窗口,在每個窗口內對每個分組進行計數.windowedBy(TimeWindows.of(Duration.ofSeconds(10))).count()// 將計數結果轉換為流,以便進行進一步的處理和轉換.toStream()// 顯示鍵值對的內容,并將鍵和值轉換為字符串格式.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());})// 將處理后的流數據發送到"kafka-stream-topic-output"主題.to("kafka-stream-topic-output");}}

該處理類首先從主題kafka-stream-topic-input中獲取消息數據,經處理后發送到主題kafka-stream-topic-output中,再由消息消費者KafkaStreamConsumer進行消費

執行結果

在這里插入圖片描述
在這里插入圖片描述

流式處理流程及原理說明

初始階段

當從輸入主題kafka-stream-topic-input讀取數據流時,每個消息都是一個鍵值對。假設輸入消息的鍵是null或一個特定的字符串,這取決于消息是如何被發送到輸入主題的。

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");

分割消息值

使用flatMapValues方法分割消息的值,但這個操作不會改變消息的鍵。如果輸入消息的鍵是null,那么在這個階段消息的鍵仍然是null

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);
})

按消息的值進行分組

在 Kafka Streams 中,當使用groupBy方法對流進行分組時,實際上是在指定一個新的鍵,這個鍵將用于后續的窗口化操作和聚合操作。在這個案例中groupBy方法被用來按消息的值進行分組:

.groupBy((key, value) -> value)

這意味著在分組操作之后,流中的每個消息的鍵被設置為消息的值。因此,當你在后續的map方法中看到key參數時,這個key實際上是消息的原始值,因為在groupBy之后,消息的值已經變成了鍵。

定義時間窗口并計數

在這個階段,消息被窗口化并計數,但是鍵保持不變。

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()

將計數結果轉換為流

當將計數結果轉換為流時,鍵仍然是之前分組時的鍵

.toStream()

處理和轉換結果

map方法中,你看到的key參數實際上是分組后的鍵,也就是消息的原始值:

.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());
})

map方法中的key.key().toString()是為了獲取鍵的字符串表示,而value.toString()是為了將計數值轉換為字符串。

將處理后的數據發送到輸出主題

.to("kafka-stream-topic-output");

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/43792.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/43792.shtml
英文地址,請注明出處:http://en.pswp.cn/web/43792.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java中的LinkedList(鏈表)(如果想知道Java中有關LinkedList的知識點,那么只看這一篇就足夠了!)

前言&#xff1a;在Java編程語言中&#xff0c;Java集合框架提供了一組豐富的數據結構&#xff0c;以滿足各種應用需求。其中&#xff0c;LinkedList作為一種常用的數據結構&#xff0c;具有獨特的優勢和廣泛的應用場景。 ???這里是秋刀魚不做夢的BLOG ???想要了解更多內…

linux radix-tree 基數樹實現詳解

radix tree&#xff0c;又稱做基數樹&#xff0c;是一種適合于構建key(index)與value(item)相關聯的數據結構。內核中使用非常廣泛。本文主要聚焦linux內核基數樹的代碼實現,大量注釋過的代碼。 radix-tree組織結構如下: 1、數據結構 /** The bottom two bits of the slot de…

如何通過JSON-RPC向以太坊鏈發送簽名交易數據?

概述 在以太坊開發當中,通過web3.js、ethers.js等提供的API方法,都可以完成與以太坊的轉賬交易。那么如何通過以太坊JSON-RPC與以太坊進行交易呢? 在以太坊的JSON-RPC當中,有eth_sendRawTransaction這個方法,可以向以太坊網絡提交預簽名的交易廣播。 curl https://main…

IDEA阿里云OSS實現文件上傳·解決蒼穹外賣圖片回顯

簡單交代配置阿里云OSS的思路 1. 首先去阿里云開通一個OSS服務&#xff0c;配置好一個自己的Bucket 2. 在IDEA配置Bucket 3. 拷貝官網的OSS工具類代碼 package com.sky.utils;import com.aliyun.oss.ClientException; import com.aliyun.oss.OSS; import com.aliyun.oss.OSS…

同三維T80001編碼器(帶屏)系列視頻使用操作說明書:高清HDMI編碼器,高清SDI編碼器,4K超清HDMI編碼器,雙路4K超高清編碼器

同三維T80001編碼器&#xff08;帶屏&#xff09;系列視頻使用操作說明書&#xff1a;高清HDMI編碼器&#xff0c;高清SDI編碼器&#xff0c;4K超清HDMI編碼器&#xff0c;雙路4K超高清編碼器 同三維T80001編碼器&#xff08;帶屏&#xff09;系列視頻使用操作說明書&#xff1…

【C語言】printf、fprintf、sprintf,scanf、fscanf、sscanf的區別

目錄 前言 printf、fprintf、sprintf printf fprintf sprintf scanf、fscanf、sscanf scanf fscanf sscanf 前言 這幾個函數曾出現在面試中&#xff0c;因為函數名都差不多&#xff0c;所以很讓人迷惑啊~ 下面我們逐個分析。 printf、fprintf、sprintf 這三個函數的主…

子任務:IT運維的精細化管理之道

在當今的企業運營中&#xff0c;信息技術已成為支撐業務發展的核心力量。根據Gartner的報告&#xff0c;IT服務管理&#xff08;ITSM&#xff09;的有效實施可以顯著提升企業的運營效率&#xff0c;降低成本高達15%&#xff0c;同時提高服務交付速度和質量。隨著業務的復雜性和…

電腦工具箱神器——uTools

AI視頻生成&#xff1a;小說文案智能分鏡智能識別角色和場景批量Ai繪圖自動配音添加音樂一鍵合成視頻https://aitools.jurilu.com/ 很多人腦子里都有一些一個月只用兩三次的軟件&#xff0c;這些軟件就這樣積滿了灰塵&#xff0c;需要的時候又不知道去哪里找。uTools 完美地解決…

筆記:在Entity Framework Core 中,常用Attribute有哪些

一、目的&#xff1a; Entity Framework Core (EF Core) 支持使用屬性&#xff08;Attributes&#xff09;來配置模型和映射數據庫。這些屬性提供了一種聲明性的方式來指定如何將類和屬性映射到數據庫表和列。以下是一些EF Core中常用的屬性&#xff1a; 二、實現 1. [Table] …

力扣題解(環繞字符串中唯一的子字符串)

467. 環繞字符串中唯一的子字符串 定義字符串 base 為一個 "abcdefghijklmnopqrstuvwxyz" 無限環繞的字符串&#xff0c;所以 base 看起來是這樣的&#xff1a; "...zabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcd....". 給你一個字符串 s …

深入理解 MyBatis 的 SqlSession:MyBatis 核心接口全解析

MyBatis 是一個非常流行的 Java 持久層框架&#xff0c;它簡化了數據庫操作&#xff0c;并且提供了強大的映射特性。在 MyBatis 中&#xff0c;SqlSession 是與數據庫交互的核心接口。本文將詳細介紹 SqlSession 的功能和使用方法。 什么是 SqlSession&#xff1f; SqlSessio…

MYSQL審批流程判斷同一層級審批人是否全部通過審批

在做流程審批的時候&#xff0c;通常會出現某一層有多個審批人的情況&#xff0c;這個時候需要所有人都通過才會進入到下一步 數據結構如下圖表格所示 每一個審批申請對應一個apply_id serial_no相同的代表是同一層級審批人 approval_status是審核狀態 下面我們可以用一個SQL來…

Day50:單調棧 LeedCode 739. 每日溫度 496.下一個更大元素 I 503. 下一個更大元素 II

739. 每日溫度 給定一個整數數組 temperatures &#xff0c;表示每天的溫度&#xff0c;返回一個數組 answer &#xff0c;其中 answer[i] 是指對于第 i 天&#xff0c;下一個更高溫度出現在幾天后。如果氣溫在這之后都不會升高&#xff0c;請在該位置用 0 來代替。 示例 1: 輸…

【蓄勢·致遠】 同為科技(TOWE)2024年年中會議

2024年7月2日-8日&#xff0c;同為科技&#xff08;TOWE&#xff09;召開2024年年中工作會議。會議回顧上半年總體工作情況&#xff0c;分析研判發展形勢&#xff0c;規劃部署下半年工作。 為期一周的工作會議&#xff0c;由同為科技&#xff08;TOWE&#xff09;創始人、董事長…

futures.toArray(new CompletableFuture[0])

futures.toArray(new CompletableFuture[0]) 是一種常見的將 List 轉換為數組的方式&#xff0c;特別是在需要將 List 傳遞給接受數組參數的方法時。讓我們詳細解釋一下這段代碼的具體含義和工作原理。 代碼解釋 假設 futures 是一個 List<CompletableFuture<Map<St…

【人臉識別、Python實現】PyQt5人臉識別管理系統

PyQt5人臉識別管理系統 項目描述主要功能效果展示獲取源碼 項目描述 接的一個基于宿舍管理系統與人臉識別的小單子。然后我把它優化了一些&#xff0c;現在開源一下。有需要的小伙伴自取&#xff0c;點個免費的關注就行 主要功能 1、錄入學生基本信息、錄入人臉 2、主頁面展…

【Django】Django 使用連接串配置數據庫

Django 使用連接串配置數據庫 Django 配置數據庫 修改 settings.py 中 DATABASES&#xff0c;這里以 mysql 數據庫為例。 DATABASES {default: {ENGINE: django.db.backends.mysql,NAME: your_database_name,USER: your_database_user,PASSWORD: your_database_password,HO…

深度|不同數據系統中的“一致性”(Consistency)含義的區別

“你們的系統能實現強一致性嗎&#xff1f;”作為過去幾年一直在開發流處理系統的從業者&#xff0c;我經常被問到這個問題。我時常想自信地推銷我們的產品&#xff0c;但現實情況是&#xff0c;回答這個問題并不簡單。其中的挑戰并不在于問題本身&#xff0c;而在于 “一致性”…

字節8年經驗之談!好用移動APP自動化測試框架有哪些?

移動App自動化測試框架是為了提高測試效率、降低測試成本而開發的一套工具和方法。好用的移動App自動化測試框架有很多&#xff0c;下面將介紹一些常用的框架&#xff0c;并提供一篇超詳細和規范的文章&#xff0c;從零開始幫助你搭建一個移動App自動化測試框架。 1. Appium&a…

筆記:在Entity Framework Core中使用DeleteBehavior配置外鍵級聯刪除

一、目的&#xff1a; 在Entity Framework Core中&#xff0c;DeleteBehavior枚舉定義了在刪除主實體時如何處理與之關聯的外鍵約束。DeleteBehavior.Cascade是DeleteBehavior枚舉的一個選項&#xff0c;它指定當刪除主實體時&#xff0c;所有具有外鍵引用的相關實體也將被自動…