大數據-玩轉數據-Flink RedisSink

一、添加Redis Connector依賴

具體版本根據實際情況確定

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version>
</dependency>

二、啟動redis

參見大數據-玩轉數據-Redis 安裝與使用

三、編寫代碼

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class SinkRedis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer key) throws Exception {return key.intValue();}});FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop100").setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).setDatabase(0).setPassword("redis").build();keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}@Overridepublic String getKeyFromData(Integer integer) {return integer.toString();}@Overridepublic String getValueFromData(Integer integer) {return integer.toString();}}));env.execute();}
}

可以根據要寫入的redis的不同數據類型進行調整

四、查詢結果

在這里插入圖片描述
在這里插入圖片描述

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/41652.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/41652.shtml
英文地址,請注明出處:http://en.pswp.cn/news/41652.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何提高深度學習性能

可用于 對抗過度擬合并獲得更好泛化能力的20 個提示、技巧和技術 如何從深度學習模型中獲得更好的性能? 這是我最常被問到的問題之一。 可能會被問為: 如何提高準確率? ……或者可以反過來說: 如果我的神經網絡表現不佳該怎么辦? 我經常回答說:“我不太清楚,但我有很…

分類預測 | MATLAB實現DBN-SVM深度置信網絡結合支持向量機多輸入分類預測

分類預測 | MATLAB實現DBN-SVM深度置信網絡結合支持向量機多輸入分類預測 目錄 分類預測 | MATLAB實現DBN-SVM深度置信網絡結合支持向量機多輸入分類預測預測效果基本介紹程序設計參考資料 預測效果 基本介紹 1.分類預測 | MATLAB實現DBN-SVM深度置信網絡結合支持向量機多輸入分…

工作紀實36-ES跨集群遷移

1.es數據備份、恢復 https://blog.csdn.net/andy_only/article/details/111319175 2.reindex命令 https://codeleading.com/article/40964498185/ 添加配置、重啟ES cd bin sh elasticsearch -d3.開源工具 https://github.com/elasticsearch-dump/elasticsearch-dump 4.…

回歸預測 | MATLAB實現基于SAE堆疊自編輯器多輸入單輸出回歸預測

回歸預測 | MATLAB實現基于SAE堆疊自編輯器多輸入單輸出回歸預測 目錄 回歸預測 | MATLAB實現基于SAE堆疊自編輯器多輸入單輸出回歸預測預測效果基本介紹模型描述程序設計參考資料 預測效果 基本介紹 1.MATLAB實現基于SAE堆疊自編輯器多輸入單輸出回歸預測&#xff1b; 2.運行環…

Request+Response

文章目錄 1. 介紹2. Request對象2.1 Request繼承體系2.2 Request獲取請求數據1.獲取請求行2.獲取請求頭3.獲取請求體4. 請求參數的通用方式5. 解決中文亂碼問題 2.3 Request請求轉發請求轉發資源間共享數據: 3. Response對象3.0 Response 繼承體系3.1 Response設置響應數據的功…

使用GEWE框架進行個人微信收藏夾及標簽管理(收藏夾篇)適用于微信群管、社群管理

友情鏈接&#xff1a;geweapi.com 點擊即可訪問&#xff01; 如果個人有多個微信&#xff0c;進行收藏夾管理是非常麻煩的事情&#xff0c;這時候可以用得到GEWE框架的管理模塊&#xff01;下面來看一下使用方法吧~ 獲取收藏信息 小提示&#xff1a; 獲取收藏相關信息注意&am…

iOS手機無法安裝Charles 的ssl證書

問題描述 iOS客戶端安裝證書時一直卡在下載這一步&#xff0c;無法抓包 1、打開Charles&#xff0c;選擇help→SSL Proxying→Install Charles Root Certificate on a Mobile Device or Remote Browser 2、按照步驟1中的提示進行操作&#xff0c;手機連接電腦代理&#xff0c;…

Spring系列七:聲明式事務

&#x1f418;聲明式事務 和AOP有密切的聯系, 是AOP的一個實際的應用. &#x1f432;事務分類簡述 ●分類 1.編程式事務: 示意代碼, 傳統方式 Connection connection JdbcUtils.getConnection(); try { //1.先設置事務不要自動提交 connection.setAutoCommit(false…

ZooKeeper的應用場景(分布式鎖、分布式隊列)

7 分布式鎖 分布式鎖是控制分布式系統之間同步訪問共享資源的一種方式。如果不同的系統或是同一個系統的不同主機之間共享了一個或一組資源&#xff0c;那么訪問這些資源的時候&#xff0c;往往需要通過一些互斥手段來防止彼此之間的干擾&#xff0c;以保證一致性&#xff0c;…

島嶼的最大面積(力扣)遞歸 JAVA

給你一個大小為 m x n 的二進制矩陣 grid 。 島嶼 是由一些相鄰的 1 (代表土地) 構成的組合&#xff0c;這里的「相鄰」要求兩個 1 必須在 水平或者豎直的四個方向上 相鄰。你可以假設 grid 的四個邊緣都被 0&#xff08;代表水&#xff09;包圍著。 島嶼的面積是島上值為 1 的…

error_Network Error

此頁面為訂單列表&#xff0c;是混合開發(頁面嵌入在客戶端中) 此頁面為訂單列表&#xff0c;此需求在開發時后端先將代碼發布在測試環境&#xff0c;我在本地調試時調用的后端接口進行聯調沒有任何問題。 此后我將代碼發布在測試環境&#xff0c;在app中打開頁面&#xff0c…

vue echarts中按鈕點擊后修改值 watch數據變化后刷新圖表

1 點擊按鈕 {feature: {myBtn1: {show: true,title: 反轉Y軸,showTitle: true,icon: path://M512 0A512 512 0 1 0 512 1024A512 512 0 0 0 512 0M320 320V192h384v128zM128 416V288h256v128zM320 704V576h384v128zM128 800V672h256v128z,onclick: () > {dataSetting.rever…

nginx服務器報錯502 Bad Gateway的原因以及解決辦法

服務器報錯nginx 502 Bad Gateway的原因以及解決辦法_502 bad gateway nginx_主題模板站的博客-CSDN博客

C++學習筆記總結練習:effective 學習日志

準則 1.少使用define define所定義的常量會在預處理的時候被替代&#xff0c;出錯編譯器不容易找到錯誤。而且還沒有作用范圍限制&#xff0c;推薦使用constdefine宏定義的函數&#xff0c;容易出錯&#xff0c;而且參數需要加上小括號&#xff0c;推薦使用inline有的類中例如…

已經開源的中文大模型對比,支持更新

大模型下載&#xff1a;互鏈高科 ClueAI/PromptCLUE-base-v1-5 at main (huggingface.co) 支持多任務生成&#xff0c;支持中文&#xff0c;不支持多輪對話&#xff0c;體驗&#xff1a;ClueAI (cluebenchmarks.com) 基于promptclue-base進一步訓練的模型&#xff1a;ClueAI/Ch…

【C與C++的相互調用方法】

C與C的相互調用方法 C與C為什么相互調用的方式不同C中調用CC中調用C致謝 C與C為什么相互調用的方式不同 C 和 C 之間的相互調用方式存在區別&#xff0c;主要是由于 C 和 C 語言本身的設計和特性不同。 函數調用和參數傳遞方式不同&#xff1a;C 和 C 在函數調用和參數傳遞方面…

docker oracle linux命令執行sql

docker 安裝參照 https://blog.csdn.net/arcsin_/article/details/123707618 docker container ls -a命令查看容器名 打開容器 docker exec -it orcl19c_03 /bin/bashsys 用戶登錄容器 sqlplus / as sysdbashow pdbs;什么是pdb數據庫&#xff1f;什么是CDB&#xff1f; 參…

游戲如何防御DDOS流量攻擊呢,用游戲盾真的有用么?

針對在線游戲行業來說&#xff0c;DDoS&#xff08;分布式拒絕服務&#xff09;攻擊是一種極具破壞性的威脅。DDoS攻擊可能導致游戲服務器不可用&#xff0c;嚴重影響游戲體驗和運營。為了解決這一問題&#xff0c;游戲盾作為一種專門為游戲行業設計的安全解決方案&#xff0c;…

微信小程序 藍牙設備連接,控制開關燈

1.前言 微信小程序中連接藍牙設備&#xff0c;信息寫入流程 1、檢測當前使用設備&#xff08;如自己的手機&#xff09;是否支持藍牙/藍牙開啟狀態 wx:openBluetoothAdapter({}) 2、如藍牙已開啟狀態&#xff0c;檢查藍牙適配器的狀態 wx.getBluetoothAdapterState({}) 3、添加…

第十三章 SpringBoot項目(總)

1.創建SpringBoot項目 1.1.設置編碼 1.4.導入已有的spring boot項目 2.快速搭建Restfull風格的項目 2.1.返回字符串 RestController public class IndexController {RequestMapping("/demo1")public Object demo1() {System.out.println("demo1 ran...."…