flink的集成測試

背景

日常測試中我們使用flink的TestHarness只能測試單個算子,很多情況下我們需要集成測試來測試真正的問題,所以在flink中進行集成測試還是非常有必要的,本文就來記錄下如何在flink中進行集成測試

flink中進行集成測試

flink中進行集成測試的關鍵類MiniClusterWithClientResource,這是一個啟動本地flink集群的關鍵類,先看一下集成測試的關鍵代碼:

/*** FLINK集成測試* https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/testing/**/
public class FlinkIntegrationTest {public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {{put("heartbeat.timeout", "300000");}});@ClassRulepublic static MiniClusterWithClientResource flinkCluster =new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));}@Testpublic void testStateFlatMap1() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));}// create a testing sinkprivate static class CollectSink implements SinkFunction<String> {// must be staticpublic static final List<String> values = Collections.synchronizedList(new ArrayList<>());@Overridepublic void invoke(String value, Context context) throws Exception {values.add(value);}}}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}

由于我們是集成測試,我們一般輸入source和輸出sink是自己構造的,比如這里的CollectSink,這里就可以正常測試包括狀態在內的pineline集成測試了

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/167482.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/167482.shtml
英文地址,請注明出處:http://en.pswp.cn/news/167482.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

css給盒子寫四個角

如圖&#xff1a;之前一直用定位 現在發現可以用css寫 background: linear-gradient(to top, #306eef, #306eef) left top no-repeat, /*上左*/ linear-gradient(to right, #306eef, #386eef) left top no-repeat, /*左上*/ linear-gradient(to left, #386eef, #306eef) righ…

查找學習筆記

1、靜態查找表 以下查找的索引均從1開始 &#xff08;1&#xff09;順序查找&#xff08;帶哨兵&#xff09; #include<iostream> #include<vector>using namespace std;int search(vector<int> arr, int key) {arr[0] key;int i;for (i arr.size() - 1…

代碼隨想錄 860. 檸檬水找零

題目 在檸檬水攤上&#xff0c;每一杯檸檬水的售價為 5 美元。顧客排隊購買你的產品&#xff0c;&#xff08;按賬單 bills 支付的順序&#xff09;一次購買一杯。 每位顧客只買一杯檸檬水&#xff0c;然后向你付 5 美元、10 美元或 20 美元。你必須給每個顧客正確找零&#xf…

python opencv -模板匹配

python opencv -模板匹配 模板匹配就是&#xff0c;我們現有一個模板和一個圖片&#xff0c;然后&#xff0c;在這個圖片中尋找和模板近似的部分。 在opencv 中主要通過cv2.matchTemplate這個函數去實現。 下面我們先看一下&#xff0c;模板圖片和需要匹配的圖片&#xff1a…

(Matalb時序預測)GA-BP遺傳算法優化BP神經網絡的多維時序回歸預測

目錄 一、程序及算法內容介紹&#xff1a; 基本內容&#xff1a; 亮點與優勢&#xff1a; 二、實際運行效果&#xff1a; 三、部分代碼 四、本文代碼數據說明手冊分享&#xff1a; 一、程序及算法內容介紹&#xff1a; 基本內容&#xff1a; 本代碼基于Matalb平臺編譯&am…

Spring IOC 和 AOP

Spring IOC 什么是 IoC ? IoC &#xff08;Inversion of Control 控制反轉&#xff09;是一種設計思想&#xff0c;而不是一個具體的技術實現。IoC 的思想就是將原本在程序中手動創建對象的控制權&#xff0c;交由給 Spring 框架來管理。 為什么叫控制反轉&#xff1f; 控制…

unsigned詳講(干貨滿滿)

前言&#xff1a;過年偷懶了(●ˇ?ˇ●)&#xff0c;但是年后開學了一定要恢復學習狀態&#xff0c;在復習加繼續學習的途中&#xff0c;我發現對于unsigned關鍵字的掌握并不是很熟練&#xff0c;于是翻閱了各個大佬的博客以及書籍&#xff0c;總結了對于unsigned的一些知識點…

數據結構與算法編程題18

循環隊列相關代碼。 #include <iostream> using namespace std;#define Maxsize 100 #define ERROR 0 #define OK 1 typedef int Elemtype; typedef struct Queue {Elemtype data[Maxsize];int front;int rear; }Queue;void Init_Queue(Queue &Q) {Q.front Q.rear …

P9 C++類

目錄 01 類是什么 02 如何創建類 03 方法 后話 本期我們要講的是 C 中的類。 我們終于講到了面向對象編程&#xff0c;這是一種非常流行的編程方式&#xff0c;面向對象編程實際上只是一種你可以采用的編寫代碼的方式&#xff0c;其他語言例如 C#、Java 這些主要是面向對象…

白嫖CTG4.0

大家好&#xff0c;到點了我來給各位大佬獻策CTG&#xff0c;不是花錢買不起&#xff0c;而是免費更有性價比&#xff0c;哈哈哈不調侃了我們自此開始正文&#xff0c;咱們主打的就是一個分享是一種態度 當然我更希望大家支持國產對國產有自己的信心&#xff08;文心一言&…

Git常用命令詳細總結,更適合中國寶寶體質

文章目錄 代碼倉庫創建倉庫1.進入需要創建代碼庫的文件夾2.創建/切始化倉庫3.關聯遠程倉庫拉取遠程倉庫到本地 添加文件到倉庫1.查看工作區狀態2.添加文件到暫存區3.提交到本地倉庫4.對比工作區文件變化 倉庫配置1.配置全局用戶名和郵箱2.配當前倉庫用戶名和郵箱3.查看Git全局配…

Selenium中常用的JS操作總結

? 目錄 前言&#xff1a; JS相關操作 JS Xpath定位 獲取單個元素 獲取元素集合 文本輸入 獲取坐標 獲取瀏覽器窗口的內部高度 獲取瀏覽器窗口的內部寬度&#xff1b; 坐標計算 設置樣式 設置窗口大小 類數組對象arguments JQuery選擇器 jQuery 選擇器 jQuery …

多模態——使用stable-video-diffusion將圖片生成視頻

多模態——使用stable-video-diffusion將圖片生成視頻 0. 內容簡介1. 運行環境2. 模型下載3. 代碼梳理3.1 修改yaml文件中的svd路徑3.2 修改DeepFloyDataFiltering的vit路徑3.3 修改open_clip的clip路徑3.4 代碼總體結構 4. 資源消耗5. 效果預覽 0. 內容簡介 近期&#xff0c;…

Linux上安裝Redis

案例中Linux版本為CentOS7.9&#xff0c;安裝目錄為 /root/software/ 1、使用 wget 命令從官網下載安裝包 wget https://github.com/redis/redis/archive/7.2.3.tar.gz2、解壓縮 tar -xzf 7.2.3.tar.gz3、進入解壓后的目錄 cd redis-7.2.34、 編譯和安裝Redis make make i…

npm中,你不了解的.npmrc文件

原文鏈接&#xff1a;npm中&#xff0c;你不了解的.npmrc文件 寫在前面 對于寫JS的程序員來說&#xff0c;可能沒有人不知道npm&#xff0c;但是有些同學對他的配置文件(即.npmrc文件)并不了解。結合我的學習心得&#xff0c;寫一篇博客跟大家分享一些該配置文件的知識。 .np…

理解CLIP模型

1.簡介 學習深度學習必看CLIP&#xff01;論文鏈接arxiv.org/pdf/2103.00020v1.pdf。 簡單來說就是傳統的分類任務被用來預測指定的類別&#xff0c;有監督訓練限制了模型的通用性和可用性&#xff0c;并且需要帶有標簽的數據來訓練&#xff0c;該篇論文就想直接從原始文本中…

Navicat 技術指引 | 適用于 GaussDB 的用戶權限設置

Navicat Premium&#xff08;16.2.8 Windows版或以上&#xff09; 已支持對 GaussDB 主備版的管理和開發功能。它不僅具備輕松、便捷的可視化數據查看和編輯功能&#xff0c;還提供強大的高階功能&#xff08;如模型、結構同步、協同合作、數據遷移等&#xff09;&#xff0c;這…

Spring 七大組件

文章目錄 Spring 七大組件 Spring 七大組件 核心容器(Spring core) 核心容器提供Spring框架的基本功能。Spring以bean的方式組織和管理Java應用中的各個組件及其關系。Spring使用BeanFactory來產生和管理Bean&#xff0c;它是工廠模式的實現。BeanFactory使用控制反轉(IOC)模式…

(Matalb分類預測)GA-BP遺傳算法優化BP神經網絡的多維分類預測

目錄 一、程序及算法內容介紹&#xff1a; 基本內容&#xff1a; 亮點與優勢&#xff1a; 二、實際運行效果&#xff1a; 三、部分代碼&#xff1a; 四、本文代碼數據說明手冊分享 一、程序及算法內容介紹&#xff1a; 基本內容&#xff1a; 本代碼基于Matalb平臺編譯&am…

Flink Flink中的分流

一、什么是分流 所謂“分流”&#xff0c;就是將一條數據流拆分成完全獨立的兩條、甚至多條流。也就是基于一個DataStream&#xff0c;定義一些篩選條件&#xff0c;將符合條件的數據揀選出來放到對應的流里。 二、基于filter算子的簡單實現分流 其實根據條件篩選數據的需求…