淺析Kafka-Stream消息流式處理流程及原理

以下結合案例:統計消息中單詞出現次數,來測試并說明kafka消息流式處理的執行流程

Maven依賴

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies>

準備工作

首先編寫創建三個類,分別作為消息生產者、消息消費者、流式處理者
KafkaStreamProducer:消息生產者

public class KafkaStreamProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//kafka的連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//發送失敗,失敗的重試次數properties.put(ProducerConfig.RETRIES_CONFIG, 5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");producer.send(producerRecord);}producer.close();}
}

該消息生產者向主題kafka-stream-topic-input發送五次hello kafka
KafkaStreamConsumer:消息消費者

public class KafkaStreamConsumer {public static void main(String[] args) {Properties properties = new Properties();//kafka的連接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//消費者組properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//手動提交偏移量properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//訂閱主題consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));try {while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("consumerRecord.key() = " + consumerRecord.key());System.out.println("consumerRecord.value() = " + consumerRecord.value());}// 異步提交偏移量consumer.commitAsync();}} catch (Exception e) {e.printStackTrace();} finally {// 同步提交偏移量consumer.commitSync();}}
}

KafkaStreamQuickStart:流式處理類

public class KafkaStreamQuickStart {public static void main(String[] args) {Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");StreamsBuilder streamsBuilder = new StreamsBuilder();//流式計算streamProcessor(streamsBuilder);KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);kafkaStreams.start();}/*** 消息格式:hello world hello world* 配置并處理流數據。* 使用StreamsBuilder創建并配置KStream,對輸入的主題中的數據進行處理,然后將處理結果發送到輸出主題。* 具體處理包括:分割每個消息的值,按值分組,對每個分組在10秒的時間窗口內進行計數,然后將結果轉換為KeyValue對并發送到輸出主題。** @param streamsBuilder 用于構建KStream對象的StreamsBuilder。*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 從"kafka-stream-topic-input"主題中讀取數據流KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");System.out.println("stream = " + stream);// 將每個值按空格分割成數組,并將數組轉換為列表,以擴展單個消息的值stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);})// 按消息的值進行分組,為后續的窗口化計數操作做準備.groupBy((key, value) -> value)// 定義10秒的時間窗口,在每個窗口內對每個分組進行計數.windowedBy(TimeWindows.of(Duration.ofSeconds(10))).count()// 將計數結果轉換為流,以便進行進一步的處理和轉換.toStream()// 顯示鍵值對的內容,并將鍵和值轉換為字符串格式.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());})// 將處理后的流數據發送到"kafka-stream-topic-output"主題.to("kafka-stream-topic-output");}}

該處理類首先從主題kafka-stream-topic-input中獲取消息數據,經處理后發送到主題kafka-stream-topic-output中,再由消息消費者KafkaStreamConsumer進行消費

執行結果

在這里插入圖片描述
在這里插入圖片描述

流式處理流程及原理說明

初始階段

當從輸入主題kafka-stream-topic-input讀取數據流時,每個消息都是一個鍵值對。假設輸入消息的鍵是null或一個特定的字符串,這取決于消息是如何被發送到輸入主題的。

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");

分割消息值

使用flatMapValues方法分割消息的值,但這個操作不會改變消息的鍵。如果輸入消息的鍵是null,那么在這個階段消息的鍵仍然是null

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);
})

按消息的值進行分組

在 Kafka Streams 中,當使用groupBy方法對流進行分組時,實際上是在指定一個新的鍵,這個鍵將用于后續的窗口化操作和聚合操作。在這個案例中groupBy方法被用來按消息的值進行分組:

.groupBy((key, value) -> value)

這意味著在分組操作之后,流中的每個消息的鍵被設置為消息的值。因此,當你在后續的map方法中看到key參數時,這個key實際上是消息的原始值,因為在groupBy之后,消息的值已經變成了鍵。

定義時間窗口并計數

在這個階段,消息被窗口化并計數,但是鍵保持不變。

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()

將計數結果轉換為流

當將計數結果轉換為流時,鍵仍然是之前分組時的鍵

.toStream()

處理和轉換結果

map方法中,你看到的key參數實際上是分組后的鍵,也就是消息的原始值:

.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());
})

map方法中的key.key().toString()是為了獲取鍵的字符串表示,而value.toString()是為了將計數值轉換為字符串。

將處理后的數據發送到輸出主題

.to("kafka-stream-topic-output");

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/44089.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/44089.shtml
英文地址,請注明出處:http://en.pswp.cn/web/44089.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Okhttp實現原理

OkHttp 是一個高效的 HTTP 客戶端庫&#xff0c;廣泛應用于 Android 和 Java 應用中。它提供了簡潔的 API&#xff0c;支持多種協議&#xff0c;如 HTTP/1.x 和 HTTP/2&#xff0c;并且內置了緩存和重試機制。下面是結合源碼分析的 OkHttp 的實現原理&#xff1a; 核心組件 O…

Swift 數據類型

Swift 數據類型 Swift 是一種強類型語言,這意味著在 Swift 中聲明的每個變量和常量都必須具有明確的類型。Swift 的類型系統旨在幫助開發者編寫清晰、安全的代碼。本文將詳細介紹 Swift 中的基本數據類型,包括整數、浮點數、布爾值、字符和字符串。 整數類型 Swift 提供了…

音頻語言學習領域數據集現狀、分類及評估

Audio Language Learning (Audio-Text Learning) 是一個新興的研究領域&#xff0c;專注于處理、理解和描述聲音。它的發展動力是機器學習技術的進步以及越來越多地將聲音與其相應的文本描述相結合的數據集的可用性。 Audio Language Models (ALMs) 是這個領域的關鍵技術&#…

MATLAB中的SDPT3、LMILab、SeDuMi工具箱

MATLAB中的SDPT3、LMILab、SeDuMi工具箱都是用于解決特定數學優化問題的工具箱&#xff0c;它們在控制系統設計、機器學習、信號處理等領域有廣泛的應用。以下是對這三個工具箱的詳細介紹&#xff1a; 1. SDPT3工具箱 簡介&#xff1a; SDPT3&#xff08;Semidefinite Progra…

基于QT開發的反射內存小工具

前言 最近項目需要需要開發一個反射內存小工具&#xff0c;經過2天的修修改終于完成了。界面如下&#xff1a; 功能簡介 反射內存指定地址數據讀取反射內存指定地址數據寫入反射內存指定地址數據清理十進制、十六進制、二進制數據相互轉換 部分代碼 void RfmMain::setWOthe…

SqlSugar-使用SqlSugar進行多數據庫操作

使用SqlSugar進行多數據庫操作主要涉及以下幾個步驟&#xff1a; 1. 配置數據庫連接 首先&#xff0c;你需要在項目的配置文件中&#xff08;如appsettings.json、web.config或app.config&#xff09;配置多個數據庫的連接字符串。每個連接字符串都對應一個不同的數據庫。 例…

攻防世界(PHP過濾器過濾)file_include

轉換過濾器官方文檔&#xff1a;https://www.php.net/manual/zh/filters.convert.php#filters.convert.iconv 這道題因為convert.base64-encode被過濾掉了&#xff0c;所以使用convert.iconv.*過濾器 在激活 iconv 的前提下可以使用 convert.iconv.* 壓縮過濾器&#xff0c; 等…

Win10安裝MongoDB(詳細版)

文章目錄 1、安裝MongoDB Server1.1. 下載1.2. 安裝 2、手動安裝MongoDB Compass(GUI可視工具)2.1. 下載2.2.安裝 3、測試連接3.1.MongoDB Compass 連接3.2.使用Navicat連接 1、安裝MongoDB Server 1.1. 下載 官網下載地址 https://www.mongodb.com/try/download/community …

【第28章】MyBatis-Plus之插件主體

文章目錄 前言一、MybatisPlusInterceptor 概覽1. 屬性2. InnerInterceptor 接口 二、使用示例1.Spring 配置2.Spring Boot 配置3 .mybatis-config.xml 配置 三、攔截忽略注解 InterceptorIgnore四、手動設置攔截器忽略執行策略五、本地緩存 SQL 解析總結 前言 MyBatis-Plus 提…

android 固定圖片大小

在Android中&#xff0c;固定圖片大小可以通過多種方法實現&#xff0c;這些方法主要涉及到ImageView控件的使用、Bitmap類的操作&#xff0c;以及第三方庫&#xff08;如Glide&#xff09;的輔助。以下是幾種常見的方法&#xff1a; 1. 使用ImageView控件 在Android的布局文…

利用docker容器安裝node,使用vue的開發環境

目錄 vue-app ├── docker-data │ ├── site │ ├── app ├── docker-compose.yaml └── deploy.sh docker-compose.yaml yaml文件執行 version: 3.8services:node:image: node:latestcontainer_name: vue-appports:- "8080:8080" # 宿主8080映射容器8…

系統服務綜合項目

要求&#xff1a; 現有主機 node01 和 node02&#xff0c;完成如下需求&#xff1a; 1、在 node01 主機上提供 DNS 和 WEB 服務 2、dns 服務提供本實驗所有主機名解析 3、web服務提供 www.rhce.com 虛擬主機 4、該虛擬主機的documentroot目錄在 /nfs/rhce 目錄 5、該目錄由 no…

如何保證語音芯片的穩定性能和延長使用壽命

要讓語音芯片保持穩定性能&#xff0c;首先需要深入理解其工作原理和內部構造。語音芯片&#xff0c;作為現代電子設備中的核心組件之一&#xff0c;承載著聲音信號的處理與輸出功能。為了確保其穩定運行&#xff0c;我們需要從多個方面進行細致的考慮和操作。? 1、避免長期高…

Windows系統MySQL的安裝,客戶端工具Navicat的安裝

下載mysql安裝包&#xff0c;可以去官網下載&#xff1a;www.mysql.com。點擊downloads 什么&#xff1f;后面還有福利&#xff1f; 下載MySQL 下載企業版&#xff1a; 下載Windows版 5點多的版本有點低&#xff0c;下載8.0.38版本的。Window系統。下載下面的企業版。不下載…

鄉鎮集裝箱生活污水處理設備處理效率高

鄉鎮集裝箱生活污水處理設備處理效率高 鄉鎮集裝箱生活污水處理設備優勢 結構緊湊&#xff1a;集裝箱式設計減少了占地面積&#xff0c;便于在土地資源緊張的鄉鎮地區部署。 安裝方便&#xff1a;設備出廠前已完成組裝和調試&#xff0c;現場只需進行簡單的連接和調試即可投入使…

[數字圖像處理]基礎知識整理(部分,持續更新)

程序中描述一副圖像&#xff0c;已知其橫向縱向的像素個數即可&#xff08;&#xff09; 灰度直方圖能反映一副圖像各個灰度級像素占圖像的面積比&#xff08;√&#xff09; 從程序編寫的角度看&#xff0c;描述一副圖像的基本屬性通常包括其分辨率&#xff0c;即圖像的寬度…

Docker鏡像和容器的管理

1 Docker鏡像管理操作 開啟鏡像加速 根據關鍵字查詢鏡像 下載查看鏡像 詳細鏡像信息 查看latest版本 上傳鏡像到阿里云倉庫 2 Docker容器操作 關于容器根據第一個pid進程是否能正常在前臺運行

19. 地址轉換

地址轉換 題目描述 Excel 是最常用的辦公軟件。每個單元格都有唯一的地址表示。比如&#xff1a;第 12 行第 4 列表示為&#xff1a;"D12"&#xff0c;第 5 行第 255 列表示為"IU5"。 事實上&#xff0c;Excel 提供了兩種地址表示方法&#xff0c;還有一…

算法訓練營第30天|122.買賣股票的最佳時機II|55. 跳躍游戲|45.跳躍游戲II|1005.K次取反后最大化的數組和

122.買賣股票的最佳時機II 思路&#xff1a;只有前一天與后一天的利潤為正時&#xff0c;才將其加入總利潤。 55. 跳躍游戲 思路&#xff1a;找最大覆蓋范圍 出錯點&#xff1a;數組的遍歷&#xff0c;遍歷范圍應該是覆蓋范圍內 45.跳躍游戲II 思路&#xff1a; 局部最優&am…

批量爬取B站網絡視頻信息

使用XPath爬取B站視頻鏈接等相關信息 分析B站html框架獲取內容完整代碼 對于B站&#xff0c;目前網上的爬蟲大多都是使用通過解析服務器的響應來爬取想要的內容&#xff0c;下面我們通過使用XPath來爬取B站上一些想要的信息 此次任務我們需要對B站搜索到的關鍵字&#xff0c;并…