【自定義Source、Sink】Flink自定義Source、Sink對redis進行讀寫操作

使用ParameterTool讀取配置文件

Flink讀取參數的對象

  1. Commons-cli: Apache提供的,需要引入依賴
  2. ParameterTool:Flink內置

ParameterTool 比 Commons-cli 使用上簡便;

ParameterTool能避免Jar包的依賴沖突

建議使用第二種

使用ParameterTool對象可以直接獲取配置文件中的信息,需要如下依賴

        <!-- Flink基礎依賴 【ParameterTool類 在該依賴中】 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId></dependency><!-- Flink流批處理依賴 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId></dependency>

Java讀取資源的方式

  1. Class.getResourceAsStream(Path):Path 必須以 “/”,表示從ClassPath的根路徑讀取資源
  2. Class.getClassLoader().getResourceAsStream(Path):Path 無須以 “/”,默認從ClassPath的根路徑讀取資源

推薦使用第2種,以類加載器的方式獲取靜態資源文件,不要通過ClassPath的相對路徑查找

最基本的工具類

public class ParameterUtil {// 創建 ParameterTool 對象public static ParameterTool getParameters() {// 讀取 resources 文件夾下 "flink.properties" 文件InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);try {return ParameterTool.fromPropertiesFile(inputStream);} catch (Exception e) {throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);}}
}

image-20231209095849541

可以通過 ParameterUtil.getParameters().get("redis.port") 直接讀取key對應的value值

Flink寫入Redis方式

  1. 繼承RichSinkFunction (Flink-Stream)
  2. 使用第3方的包 (Apache-Bahir-Flink)

Apache-Bahir-Flink 的 Redis-Connector的缺點:

  1. 使用Jedis, 沒有使用Lettuce
  2. 沒有對 Flink Table/SQL Api 的支持

不少基于bahir二開的例子解決了上述問題

gitee地址:https://gitee.com/jeff-zou/flink-connector-redis?_from=gitee_search

github地址:https://github.com/apache/bahir-flink

bahir 集成了許多連接器,其中就包含Redis

image-20231209103659812

Flink官網上也可以看到bahir的影子

image-20231209104014483

方便起見,接下來就基于bahir,Flink寫入Redis集群

基于巴希爾(Bahir)-Flink寫入Redis集群

引入connector連接器依賴

        <!-- Flink-Connector-Redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_${scala.binary.version}</artifactId></dependency>

依賴版本定義在父模塊中

image-20231209100449996

實現RedisMapper接口自定義Sink

首先實現RedisMapper接口并指定泛型——處理元素的類型

/*** 基于apache bachir flink的RedisSink,作用于Redis String數據類型*/
public class RedisSinkByBahirWithString implements RedisMapper<Tuple2<String, String>> {/*** 指定Redis的命令*/@Overridepublic RedisCommandDescription getCommandDescription() {/* ************************ 如果Redis的數據類型是 hash 或 z-Set* RedisCommandDescription 的構造方法必須傳入 additionalKey* additionalKey就是Redis的鍵** *********************/return new RedisCommandDescription(RedisCommand.SET);}/*** 從數據流里獲取Key值*/@Overridepublic String getKeyFromData(Tuple2<String, String> input) {return input.f0;}/*** 從數據流里獲取Value值*/@Overridepublic String getValueFromData(Tuple2<String, String> input) {return input.f1;}
}

寫入Redis工具類

public class RedisWriteUtil {/* ************************ FlinkJedisClusterConfig:集群模式* FlinkJedisPoolConfig:單機模式* FlinkJedisSentinelConfig:哨兵模式** *********************/// Jedis配置private static final FlinkJedisClusterConfig JEDIS_CONF;static {ParameterTool parameterTool = ParameterUtil.getParameters();String host = parameterTool.get("redis.host");String port = parameterTool.get("redis.port");/* ************************ InetSocketAddress 是Java的套接字** *********************/InetSocketAddress inetSocketAddress = new InetSocketAddress(host, Integer.parseInt(port));Set<InetSocketAddress> set = new HashSet<>();set.add(inetSocketAddress);JEDIS_CONF = new FlinkJedisClusterConfig.Builder().setNodes(set).build();}/*** 基于Bahir寫入Redis,Redis的數據是String類型*/public static void writeByBahirWithString(DataStream<Tuple2<String, String>> input) {input.addSink(new RedisSink<>(JEDIS_CONF, new RedisSinkByBahirWithString()));}}

測試一下

class RedisWriteUtilTest {@DisplayName("測試基于Bahir寫入Redis,Redis數據類型是String類型")@Testvoid writeByBahirWithString() throws Exception {LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();DataStreamSource<Tuple2<String, String>> dataStream = env.fromElements(Tuple2.of("k", "v"));RedisWriteUtil.writeByBahirWithString(dataStream);env.execute();}
}

非常完美!寫入成功

image-20231209105850707

Flink讀取Redis方式

  1. 繼承RichSourceFunction (實現自定義Source)
  2. 繼承RichParallelSourceFunction (實現自定義Source)【可以指定并行度】
  3. 實現SourceFunction接口 (實現自定義Source)

RichParallelSourceFunction 和 RichSourceFunction區別

RichParallelSourceFunction 可以設置并行度

RichParallelSourceFunction 和 RichSourceFunction 代碼是可以互相套用

RichParallelSourceFunction 默認的并行度是cpu 的 核心數(core數)

RichSourceFunction 的并行度只能是1

繼承RichSourceFunction類-Flink讀取Redis集群

前置準備

定義枚舉類

Redis數據類型枚舉類

@Getter
public enum RedisDataType {STRING,HASH,LIST,SET,SORTED_SET,;RedisDataType() {}
}

定義Redis命令的枚舉類,便于Source判斷操作

@Getter
public enum RedisCommand {// get stringGET(RedisDataType.STRING);private final RedisDataType redisDataType;RedisCommand(RedisDataType redisDataType) {this.redisDataType = redisDataType;}
}

Jedis配置類

bahir依賴中自帶jedis依賴一般不用,自行引入jedis,jedis依賴版本要與巴希爾中jedis版本保持一致

image-20231209111800457

public class JedisConf {public static JedisCluster getJedisCluster() throws IOException {ParameterTool parameterTool =ParameterUtil.getParameters();String host = parameterTool.get("redis.host");String port = parameterTool.get("redis.port");/* *********************** Jedis對象** JedisPool : 用于redis單機版* JedisCluster: 用于redis集群** JedisCluster對象能夠自動發現正常的redis節點** *********************/HostAndPort hostAndPort = new HostAndPort(host,Integer.parseInt(port));Set<HostAndPort> nodes = new HashSet<>();nodes.add(hostAndPort);return new JedisCluster(nodes);}
}

封裝Jedis對象的redis方法

封裝Jedis對象的redis方法,方便統一調用和維護

public class JedisBuilder {private JedisCluster jedis = null;public JedisBuilder(JedisCluster jedisCluster) {this.jedis = jedisCluster;}public void close() {if (this.jedis != null) {this.jedis.close();}}/*** Redis的Get方法*/public String get(String key) {return jedis.get(key);}
}

自定義Source

Redis數據的映射對象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RedisPO implements Serializable {private String data;}

Flink 自定義Redis Source讀取Redis

/* *********************** 【富函數類】 比函數類提供了更多函數生命周期,提供了獲取上下文的方法* 富函數類通常是抽象類* *********************/
public class RedisSource extends RichSourceFunction<RedisPO> {/*** Jedis對象*/private JedisBuilder jedisBuilder;/*** Redis命令枚舉對象*/private final RedisCommand redisCommand;/*** redis key*/private final String key;public RedisSource(RedisCommand redisCommand, String key) {this.redisCommand = redisCommand;this.key = key;}/*** volatile 修飾的變量,它的更新都會通知其他線程.*/private volatile boolean isRunning = true;/*** Redis的連接初始化*/@Overridepublic void open(Configuration parameters) throws Exception {JedisCluster jedisCluster = JedisConf.getJedisCluster();jedisBuilder = new JedisBuilder(jedisCluster);}/*** Redis數據的讀取*/@Overridepublic void run(SourceContext<RedisPO> output) throws Exception {/* ************************ 一直監聽Redis數據的讀取** *********************/String data = null;// while (isRunning) {switch (redisCommand.getRedisDataType()) {case STRING:data = jedisBuilder.get(key);}output.collect(new RedisPO(data));// }}@Overridepublic void cancel() {this.isRunning = false;}}

讀取Redis工具類

public class RedisReadUtil {public static DataStream<RedisPO> read(StreamExecutionEnvironment env,RedisCommand redisCommand,String key) {return env.addSource(new RedisSource(redisCommand, key));}
}

測試一下

class RedisReadUtilTest {@DisplayName("測試自定義Source讀取Redis,Redis數據類型是String類型")@Testvoid testReadByCustomSourceWithString() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<RedisPO> dataStream = RedisReadUtil.read(env,RedisCommand.GET,"k");dataStream.print();env.execute();}
}

測試成功!

image-20231209113539037

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/210643.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/210643.shtml
英文地址,請注明出處:http://en.pswp.cn/news/210643.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

西工大網絡空間安全學院計算機網絡實驗五——ACL配置

實驗五、ACL配置 一. 實驗目的 1. 掌握ACL的基本配置方法 二. 實驗內容 1. 基于如下圖所示的拓撲圖&#xff0c;對路由器進行正確的RIP協議配置&#xff1b; ? 首先引入3臺2811 IOS15型號的路由器、3臺2950-T24型號的交換機、4臺PC-PT型號的PC機、兩臺Server-PT型號的服務…

kafka學習筆記--生產者消息發送及原理

本文內容來自尚硅谷B站公開教學視頻&#xff0c;僅做個人總結、學習、復習使用&#xff0c;任何對此文章的引用&#xff0c;應當說明源出處為尚硅谷&#xff0c;不得用于商業用途。 如有侵權、聯系速刪 視頻教程鏈接&#xff1a;【尚硅谷】Kafka3.x教程&#xff08;從入門到調優…

JavaScript 的節流與防抖

// 函數防抖&#xff1a; 在事件被觸發 n 秒后再執行回調&#xff0c;如果在這 n 秒內事件又被觸發&#xff0c;則重新計時。// 函數節流&#xff1a; 規定一個單位時間&#xff0c;在這個單位時間內&#xff0c;只能有一次觸發事件的回調函數執行&#xff0c;如果在同一個單位…

Redis各種數據結構應用場景

Redis各種數據結構應用場景 一、基本類型 Redis的基本數據類型時&#xff0c;以下是它們的實際場景示例&#xff1a; 字符串&#xff08;String&#xff09;&#xff1a; 實際場景 緩存數據&#xff1a;將頻繁訪問的數據緩存在Redis中&#xff0c;以提高讀取速度。會話管理&…

Ubuntu與Windows通訊傳輸文件(FTP服務器版)(沒用的方法,無法施行)

本文介紹再Windows主機上建立FTP服務器&#xff0c;并且在Ubuntu虛擬機上面訪問Windows上FTP服務器的方法 只要按照上圖配置就可以了 第二部&#xff1a;打開IIS管理控制臺 右擊網站&#xff0c;新建FTP站點。需要注意的一點是在填寫IP地址的時候&#xff0c;只需要填寫Window…

遙感衛星綜述(下載和預處理)(持續更新)

遙感衛星綜述&#xff08;下載和預處理&#xff09; 目錄 遙感衛星綜述&#xff08;下載和預處理&#xff09;一、國產衛星GF-1 WFV 二、國外衛星Sentinel-1Sentinel-2 一、國產衛星 GF-1 WFV 下載 分辨率波段16m4(藍、綠、紅、近紅) 預處理&#xff1a; ENVI預處理GF-1號W…

用友T3如何反結賬、反記賬、反審核及刪除憑證

在T3總賬中已經進行了總賬記賬和月末結賬&#xff0c;但是需要去修改憑證或刪除憑證&#xff0c;這個時候就需要去進行反結賬、反記賬等操作&#xff0c;以下是具體的操作流程 第一步、反結賬 1、進入用友T3件&#xff0c;打開總賬系統模塊&#xff0c;點月末結賬&#xff0c…

uc_15_TCP協議

1 TCP協議 TCP提供客戶機與服務器的鏈接。一個完整TCP通信過程需要經歷三個階段 1&#xff09;首先&#xff0c;客戶機必須建立與服務器的連接&#xff0c;所謂虛電路 2&#xff09;然后&#xff0c;憑借已建立好的連接&#xff0c;通信雙方相互交換數據 3&#xff09;最后&am…

短信驗證碼無法獲取,通過獲取cookies直接登錄

web端&#xff0c;selenium短信驗證碼無法獲取&#xff0c;通過獲取cookies直接登錄 1&#xff0c;先獲取cookies driver webdriver.Chrome() driver.get("") driver.implicitly_wait(2) # 獲取彈窗&#xff0c;并取消 driver.find_element(By.XPATH,"/html/…

智能優化算法應用:基于粒子群算法3D無線傳感器網絡(WSN)覆蓋優化 - 附代碼

智能優化算法應用&#xff1a;基于粒子群算法3D無線傳感器網絡(WSN)覆蓋優化 - 附代碼 文章目錄 智能優化算法應用&#xff1a;基于粒子群算法3D無線傳感器網絡(WSN)覆蓋優化 - 附代碼1.無線傳感網絡節點模型2.覆蓋數學模型及分析3.粒子群算法4.實驗參數設定5.算法結果6.參考文…

Python---異常的綜合案例

☆ 異常的傳遞 需求&#xff1a; ① 嘗試只讀方式打開python.txt文件&#xff0c;如果文件存在則讀取文件內容&#xff0c;文件不存在則提示用戶即可。 ② 讀取內容要求&#xff1a;嘗試循環讀取內容&#xff0c;讀取過程中如果檢測到用戶意外終止程序&#xff0c;則except捕…

個人博客網站如何實現https重定向(301)到http

對于個人網站站注冊比較少的&#xff0c;服務器配置不是很好的&#xff0c;沒必要https,https跳轉到http是要時間的&#xff0c;會影響網站打開的速度。免費的https每年都要更換。個人博客網站https有一段時間了&#xff0c;而且很多頁面都有收錄排名&#xff0c;現在已去掉htt…

基于JavaWeb+SSM+Vue實習記錄微信小程序系統的設計和實現

基于JavaWebSSMVue實習記錄微信小程序系統的設計和實現 源碼獲取入口Lun文目錄前言主要技術系統設計功能截圖訂閱經典源碼專欄Java項目精品實戰案例《500套》 源碼獲取 源碼獲取入口 Lun文目錄 目 錄 摘 要 III Abstract 1 1 系統概述 1 1.1 概述 2 1.2課題意義 3 1.3 主要內…

【Linux系統編程】進度條的編寫

目錄 一&#xff0c;進度條的必備知識 1&#xff0c;緩沖區的粗略介紹 2&#xff0c;回車與換行 二&#xff0c;進度條的初步制作 1&#xff0c;進度條的初步礦建 2&#xff0c;進度條的版本一 3&#xff0c;進度條的版本二 一&#xff0c;進度條的必備知識 1&#xff…

詳細了解STM32----GPIO

提示&#xff1a;永遠支持免費開源知識文檔&#xff0c;喜歡的點個關注吧&#xff01;謝謝&#xff01; 文章目錄 一、什么是GPIO&#xff1f;二、GPIO基本結構三、GPIO的輸入輸出模式1、推挽輸出2、開漏輸出3、復用推挽4、復用開漏1、浮空輸入2、上拉輸入&#xff13;、下拉輸…

FastAPI之嵌套模型

請求體 - 嵌套模型 使用 FastAPI&#xff0c;你可以很隨意的實現模型的嵌套、定義、校驗、記錄文檔&#xff0c;并使用任意深度嵌套的模型&#xff0c;這其實都是FastAPI的核心模塊P一單提成進行做的。。 List 字段 from fastapi import FastAPI from pydantic import BaseM…

基于JavaWeb+SSM+Vue童裝商城小程序系統的設計和實現

基于JavaWebSSMVue童裝商城小程序系統的設計和實現 源碼獲取入口Lun文目錄前言主要技術系統設計功能截圖訂閱經典源碼專欄Java項目精品實戰案例《500套》 源碼獲取 源碼獲取入口 Lun文目錄 目 錄 摘 要 III Abstract 1 1 系統概述 2 1.1 概述 3 1.2課題意義 4 1.3 主要內容 5…

BearPi Std 板從入門到放棄 - 先天篇(1)(階段 : 智慧城市 - 智慧路燈)

簡介 對前面幾篇整合, 做個小小匯總試驗, 使用BearPi E53_SC1擴展板主芯片: STM32L431RCT6串口: Usart1擴展板與主板連接: I2C : I2C1 (光照強度傳感器&#xff1a;BH1750)LED: PB9步驟 創建項目 參考 BearPi Std 板從入門到放棄 - 引氣入體篇&#xff08;1&#xff09;(由零創…

【測試人生】數據同步和遷移的變更注意事項

數據同步或者遷移操作也算是線上數據變更的一種類型。由于涉及的數據量非常大&#xff0c;一旦發生故障&#xff0c;會直接影響線上業務&#xff0c;并且較難止損。從變更風險管控的角度考慮&#xff0c;數據同步或遷移操作也需要走合理的發布窗口&#xff0c;并且在操作前也需…

淺談Google Play ASO 優化

什么是ASO ASO即APP Store Optimization&#xff0c;是用于提高APP在應用市場排名的工具&#xff0c;其實也就是移動產品的SEO工作。 ASO是為了提高該產品的搜索結果成績&#xff0c;提升APP的下載量&#xff0c;針對Google Play來說&#xff0c;ASO就是優化APP頁面。 為什么…