Flink-05學習 接上節,將FlinkJedisPoolConfig 從Kafka寫入Redis

上節成功實現了FlinkKafkaConsumer消費Kafka數據,并將數據寫入到控制臺,接下來將繼續將計算的結果輸入到redis中。

pom.xml

引入redis到pom包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.maven</groupId><artifactId>maven-plugin-api</artifactId><version>2.0</version></dependency><dependency><groupId>org.apache.maven.plugin-tools</groupId><artifactId>maven-plugin-annotations</artifactId><version>3.2</version></dependency><dependency><groupId>org.codehaus.plexus</groupId><artifactId>plexus-utils</artifactId><version>3.0.8</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><!--mybatis坐標--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.4.5</version></dependency><!--mysql驅動坐標--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.6</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-plugin-plugin</artifactId><version>3.2</version><executions><execution><phase>package</phase><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins>
</build>
</project>

KafkaProducer.java 生產數據存入Kafka

同上一節,具體代碼

package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @author snowsong*/
public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka 集群的初始連接地址props.put("bootstrap.servers", "172.16.1.173:9092");// 序列化器 將 Java 對象序列化為字節數組props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// kafka生產者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 消息循環for (int i = 0; i < 50; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);producer.send(record);System.out.println("send: " + key);Thread.sleep(200);}// 關閉生產者producer.close();}
}

啟動服務類

Flink消費Kafka,并將結果存入redis。
設置FlinkRedisConfig

   // 配置 Redis 連接池,設置 Redis 服務器地址和端口并構建對象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 創建 RedisSink 對象,用于將數據寫入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 將 RedisSink 添加到數據流中,作為數據的接收端wordData.addSink(redisSink);

MyRedisMapper
它實現了 RedisMapper 接口,用于自定義 Redis 數據的映射規則。MyRedisMapper 類用于將 Flink 數據流中的 Tuple2 對象映射到 Redis 命令中。

public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 獲取當前命令的描述信息。** @return 返回Redis命令的描述信息對象,其中包含了命令的類型為LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 從給定的Tuple2數據中獲取鍵。** @param data 一個包含兩個字符串元素的Tuple2對象* @return 返回Tuple2對象的第一個元素,即鍵*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 從給定的元組中獲取第二個元素的值。** @param data 一個包含兩個字符串元素的元組* @return 元組中的第二個元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}

starApp的完整代碼如下:

package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** @author snowsong*/
public class StartApp {private static final String REDIS_SERVER = "0.0.0.0";private static final Integer REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客戶端的連接參數Properties properties = new Properties();properties.setProperty("bootstrap.servers", "172.16.1.173:9092");FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",new SimpleStringSchema(), properties);DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);// 將接收的數據映射為二元組SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {/*** 將輸入的字符串映射為 Tuple2 對象。** @param value 輸入的字符串* @return 一個包含兩個元素的 Tuple2 對象,第一個元素為 "l_words",第二個元素為輸入的字符串* @throws Exception 如果發生異常,則拋出該異常*/@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});// 配置 Redis 連接池,設置 Redis 服務器地址和端口并構建對象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 創建 RedisSink 對象,用于將數據寫入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 將 RedisSink 添加到數據流中,作為數據的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 類用于將 Flink 數據流中的 Tuple2 對象映射到 Redis 命令中。* 它實現了 RedisMapper 接口,用于自定義 Redis 數據的映射規則。*/public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 獲取當前命令的描述信息。** @return 返回Redis命令的描述信息對象,其中包含了命令的類型為LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 從給定的Tuple2數據中獲取鍵。** @param data 一個包含兩個字符串元素的Tuple2對象* @return 返回Tuple2對象的第一個元素,即鍵*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 從給定的元組中獲取第二個元素的值。** @param data 一個包含兩個字符串元素的元組* @return 元組中的第二個元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}}}

運行結果

請添加圖片描述

存入redis結果
請添加圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913051.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913051.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913051.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

git教程-pycharm使用tag打標簽

一.生成tag標簽 前言 當我們的代碼完成了第一階段的需求&#xff0c;版本穩定后&#xff0c;希望能出個穩定版本。于是在 commit 后需要打個 tag 標簽&#xff0c;也就是我們平常說的版本號&#xff0c;如v1.0版本 本篇講解如何使用 pycharm 打 tag 標簽&#xff0c;并推送到…

PHP Error: 深入解析與處理技巧

PHP Error: 深入解析與處理技巧 引言 PHP作為一種廣泛使用的服務器端腳本語言,在Web開發領域占據著重要地位。然而,任何編程語言都難以避免錯誤的發生。本文將深入探討PHP錯誤處理的相關知識,包括錯誤類型、錯誤顯示、錯誤日志以及錯誤處理技巧,幫助開發者更好地應對和解…

21、企業行政辦公(OA)數字化轉型:系統如何重塑企業高效運營新范式

企業行政辦公是營造高效工作環境、提升員工幸福感和歸屬感的重要基石&#xff0c;更是傳遞組織溫度與價值關懷的第一窗口。在數字化轉型浪潮席卷各行各業的今天&#xff0c;企業行政辦公領域正經歷一場靜默但深刻的變革。據統計&#xff0c;采用智能化OA系統的企業&#xff0c;…

基于開源AI智能名片鏈動2+1模式S2B2C商城小程序的抖音渠道力拓展與多渠道利潤增長研究

摘要&#xff1a;在數字化商業競爭日益激烈的背景下&#xff0c;抖音平臺憑借其龐大的流量基礎和興趣電商生態&#xff0c;成為品牌增長的關鍵陣地。渠道力作為品牌增長的核心驅動力&#xff0c;以抖音勢能為內核&#xff0c;通過流量與銷量的外溢效應&#xff0c;可顯著提升品…

基于二維碼的視頻合集高效管理與分發技術

一、 視頻資源聚合的技術挑戰與解決方案 在企業培訓、在線教育和產品展示等場景中&#xff0c;視頻資源的結構化組織與高效分發始終是技術實現的核心挑戰。傳統方案往往面臨三大痛點&#xff1a;資源碎片化導致的管理混亂、多視頻序列播放的用戶體驗不佳、以及跨平臺兼容性問題…

GPT-2論文閱讀:Language Models are Unsupervised Multitask Learners

本文解析 OpenAI 2019 年發布的里程碑式論文&#xff0c;該論文首次提出了 GPT-2 模型&#xff0c;揭示了語言模型作為無監督多任務學習器的革命性潛力。文章的核心觀點是&#xff1a;語言模型在無監督訓練過程中&#xff0c;可以隱式地學習多種任務&#xff0c;無需特定任務微…

R 語言安裝使用教程

一、R 語言簡介 R 是一種用于統計分析、數據挖掘和可視化的編程語言和環境。它在學術界和數據分析領域中廣泛使用&#xff0c;擁有豐富的統計函數庫和繪圖功能。 二、安裝 R 語言 2.1 下載 R 安裝包 前往 CRAN 官網下載適合你操作系統的安裝程序&#xff1a; 官網地址&…

智能Agent場景實戰指南 Day 1:智能Agent概述與架構設計

【智能Agent場景實戰指南 Day 1】智能Agent概述與架構設計 引言 歡迎來到"智能Agent場景實戰指南"系列的第一天&#xff01;今天我們將深入探討智能Agent的基本概念和架構設計。在這個大模型時代&#xff0c;智能Agent已成為連接AI技術與實際業務場景的關鍵橋梁&am…

Plan-Grounded Large Language Models forDual Goal Conversational Settings

Plan-Grounded Large Language Models for Dual Goal Conversational Settings - ACL Anthologyhttps://aclanthology.org/2024.eacl-long.77/ 1. 概述 引導用戶完成諸如烹飪或 DIY 之類的手動任務(Choi 等,2022),對于當前的大型語言模型(LLMs)來說是一個新穎且具有挑戰…

python打卡day57@浙大疏錦行

知識點回顧 序列數據的處理&#xff1a; 處理非平穩性&#xff1a;n階差分處理季節性&#xff1a;季節性差分自回歸性無需處理 模型的選擇 AR(p) 自回歸模型&#xff1a;當前值受到過去p個值的影響MA(q) 移動平均模型&#xff1a;當前值收到短期沖擊的影響&#xff0c;且沖擊影…

YOLOv11性能評估全解析:從理論到實戰的指標指南

深入剖析目標檢測核心指標,掌握模型優化的關鍵密碼 為什么需要性能評估指標? 在目標檢測領域,YOLO系列模型以其卓越的速度-精度平衡成為行業標桿。當我們訓練或使用YOLOv11模型時,一個核心問題始終存在:如何量化模型的性能? 性能評估指標正是回答這個問題的關鍵工具,它…

【Linux內核及內核編程】Linux2.6 后的內核特點

2003 年發布的 Linux 2.6 內核是一個里程碑&#xff0c;它標志著 Linux 從 “極客玩具” 向全場景操作系統的蛻變。如果說 2.4 內核是 Linux 進入企業級市場的起點&#xff0c;那么 2.6 及后續版本則是一場從內到外的 “現代化革命”&#xff0c;不僅讓 Linux 在服務器、桌面、…

GO 語言學習 之 結構體

在 Go 語言中&#xff0c;結構體&#xff08;struct&#xff09;是一種用戶自定義的數據類型&#xff0c;它可以包含多種不同類型的數據組合在一起。結構體為組織和管理相關數據提供了一種有效的方式&#xff0c;常用于表示現實世界中的對象或概念。如果你懂C/C&#xff0c;那么…

ubuntu 啟動SSH 服務

在Ubuntu系統中&#xff0c;啟動SSH服務需要確保SSH服務已經安裝&#xff0c;并且正確配置。以下是詳細步驟&#xff1a; 一、檢查SSH服務是否已安裝 檢查SSH服務是否安裝 打開終端&#xff08;Terminal&#xff09;。 輸入以下命令來檢查SSH服務是否已安裝&#xff1a; bash…

【3.4 漫畫分布式共識算法】

3.4 漫畫分布式共識算法 ?? 人物介紹 小明:對分布式共識算法好奇的開發者架構師老王:分布式系統專家,精通各種共識算法?? 共識算法概述 小明:“老王,分布式系統中為什么需要共識算法?” 架構師老王:“想象一下,你有多個服務器需要就某個決定達成一致,比如選出一…

程序計數器(PC)是什么?

程序計數器&#xff08;PC&#xff09;是什么&#xff1f; 程序計數器&#xff08;PC&#xff09;詳解 程序計數器&#xff08;Program Counter, PC&#xff09; 是CPU中的一個關鍵寄存器&#xff0c;用于存儲下一條待執行指令的內存地址。它控制程序的執行流程&#xff0c;是…

影樓精修-智能修圖Agent

今天給大家介紹一篇令人驚喜的論文《JarvisArt: Liberating Human Artistic Creativity via an Intelligent Photo Retouching Agent》 論文地址&#xff1a;https://arxiv.org/pdf/2506.17612 Code&#xff08;暫無代碼&#xff09;&#xff1a;https://github.com/LYL1015/…

帕金森與健康人相關數據和處理方法(一些文獻的記錄)

主要的帕金森腦電數據進行一些分類分析的文章。 帕金森病 2004 年至 2023 年腦電圖研究的文獻計量分析對于研究的分析以及關鍵研究和趨勢從腦電圖信號中檢測帕金森病&#xff0c;采用離散小波變換、不同熵度量和機器學習技術使用機器學習和深度學習方法分析不同模態的數據以診…

優象光流模塊,基于python的數據讀取demo

優象光流模塊&#xff0c;型號UP-FLOW-LC-302-3C&#xff0c;準備將其應用于設備的運行速度測量&#xff0c;物美價廉。 廠家提供的數據格式表&#xff1a; 實測用python的serial包readline()函數讀取到的幀數據&#xff1a; 與官方的給定略有出入&#xff0c;不過主要字節的順…

模型部署與推理--利用libtorch模型部署與推理

文章目錄 1從pytorch導出pt文件2下載并配置libtorch3推理4結果&#xff1a;時間對比&#xff1a;推理結果&#xff1a; 參考 以deeplabv3plus為例講解怎么利用libtorch部署在c上模型。關于libtorch和pt文件請參考我之前的博客。 1從pytorch導出pt文件 if __name__ __main__: …