【實戰-08】 flink自定義Map中的變量的行為

場景

自定義Map或者別的算子的時候,有時候需要定義一些類變量,在flink內部高并發的情況下需要正確理解這些變量的行為

代碼

package com.pg.function;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class FlinkFunction {//對于自定義函數中的變量,只有內置的狀態是完全按照flink內置的 keyBy行為來的//如果是自定義的緩存比如ArrayList 則可能不會按照預期的行為public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStream<String> dataStream = env.fromElements( "b","b","b","c","c","c","d","d","d");dataStream.keyBy(x->{return x;}).map(new MyMap()).print();env.execute();}}class MyMap extends RichMapFunction<String, String> {public ArrayList<String> list= new ArrayList<>();
//     public ValueState<Integer> counter;//存儲數據條數
//     public ValueState<String> element;//存儲臨時數據
//     @Override
//     public void open(Configuration parameters) throws Exception {
//         counter = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("counter", Types.INT));
//         element = getRuntimeContext().getState(new ValueStateDescriptor<>("element", Types.STRING));
//     }@Overridepublic String map(String s) throws Exception {list.add(s);if(list.size()==2){String re = list.toString();list.clear();return re;}else {return "null";}
//        if (counter.value() == null) {
//            counter.update(1);//遇見第一條數據的時候,計數器為1
//        } else {
//            counter.update(counter.value() + 1);
//        }
//        if (element.value() == null) {
//            element.update(s);//element只存儲上一次到來的數據
//        }else {
//            element.update(element.value()+s);
//        }
//        if (counter.value() == 2) {
//            String re = element.value();
//            //發出結果之后清楚狀態
//            counter.clear();
//            element.clear();
//            return re;
//        }else {
//            return "null";
//        }}
}

分析

keyBy之后,理論上相同key的會在map中用同樣的處理邏輯,我們的預期行為是輸出:bb,cc,dd
但是用ArrayList實現的邏輯最終輸出卻是:bb,bc,cc,dd
用ValueState的輸出是:bb,cc,dd
這說明了,keBy后的邏輯,ArrayList不會按照預期的行為執行。這是因為在flink中,當多個并發的時候,多個key如果落入同一個線程
則當前線程的valueState是和某一個key綁定的,符合flink預期行為,但是ArrayList以及其它你定義的變量則不做保證, 它是線程級別的局部變量, 這點要注意。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/711447.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/711447.shtml
英文地址,請注明出處:http://en.pswp.cn/news/711447.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

哇去,有了這篇文章,項目中引入了再多的字體包,我都不怕了!!!

通常情況下&#xff0c;我們在開發一個新項目的時候&#xff0c;項目那邊通常都會提供一些項目所需的字體包&#xff0c;來滿足項目對字體展示的特殊需求。 這部分大家都比較熟悉&#xff0c;就不詳細講了&#xff0c;直接上代碼&#xff1a; /* 引入字體包 */ font-face {fo…

異常處理(黑馬學習筆記)

當前問題 登錄功能和登錄校驗功能我們都實現了&#xff0c;下面我們學習下今天最后一塊技術點&#xff1a;異常處理。首先我們先來看一下系統出現異常之后會發生什么現象&#xff0c;再來介紹異常處理的方案。 我們打開瀏覽器&#xff0c;訪問系統中的新增部門操作&#xff0…

GEE高階應用python wxee——MODIS氣象數據可視化處理(2022年3-9月葡萄牙為例)以及可視化地圖加載

MODIS wxee 是專為處理氣象數據而設計的,但它在遙感數據方面也很有用。在本示例中,我們將了解 wxee 如何處理 MODIS 傳感器的數據,以及如何利用 xarray 對象創建彩色復合圖。 安裝和設定 #!pip install wxeeimport ee import wxeeee.Authenticate() wxee.Initialize(proje…

前端筆記01---html 的加載

文章目錄 HTML<meta><script>MIME CSSHTML 與 DOM 有什么不同MDNMozilla 臟檢查依賴注入虛擬 DOM虛擬DOM性能開銷 性能性能開銷包括哪些方面性能瓶頸性能&#xff1f; 事件事件委托事件冒泡passive: true 合成器線程 HTML html head <meta> <meta> 元素…

貪心算法介紹

貪心算法是一種在求解問題時總是做出在當前看來是最好的選擇的算法。它不從整體最優上加以考慮&#xff0c;所做出的選擇只是在某種意義上的局部最優解。貪心算法不是對所有問題都能得到整體最優解&#xff0c;關鍵是貪心策略的選擇&#xff0c;選擇的貪心策略必須具備無后效性…

K8S相關小技巧《五》

需求&#xff1a; 作為Kubernetes管理員&#xff0c;前一段時間有收到一個需求&#xff0c;需要創建一個可用的storage class&#xff0c;用于提供給給隔離的用戶使用共享磁盤。共享磁盤為NFS磁盤&#xff0c;本例以NFS為例&#xff0c;其他類型的storage class創建也是類似&a…

模型優化_如何提高網絡/模型的泛化能力?(全面)

目錄 1. 以數據為中心的泛化方法 1.1 使用更多數據 1.2 做好數據預處理 特征工程 1.3 數據增強 1.4 調整數據分布 2. 以模型為中心的泛化方法 2.1 使用更大批次 超參數調優 2.2 調整目標函數 2.3 調整網絡結構 2.4 屏蔽網絡節點 2.5 權值正則化 2.6 偏差-方差權衡…

防考試作弊切屏

防考試作弊切屏 方法一&#xff1a;監聽頁面失焦聚焦事件&#xff1a;防止任何操作 監聽考試頁面失焦事件記錄切出時間頁面聚焦時累積記錄切入時間&#xff0c;累積時間大于1分鐘自動交卷并移除時間頁面銷毀移出事件***bug&#xff1a;必須把事件回調定義為方法&#xff0c;在…

全國夜間燈光指數數據、GDP密度分布、人口密度分布、土地利用數據、降雨量數據

引言 DMSP/OLS的1992-2013年全球遙感影像&#xff0c;包括三種非輻射定標的夜間燈光影像。三種全年平均影像分別是&#xff1a;無云觀測頻數影像、平均燈光影像和穩定燈光影像。目前地理遙感生態網可提供全國穩定燈光影像免費下載。穩定燈光影像是標定夜間平均燈光強度的年度柵…

【論文閱讀筆記】Explicit Visual Prompting for Low-Level Structure Segmentations

1.介紹 Explicit Visual Prompting for Low-Level Structure Segmentations 低級結構分割的顯式視覺提示 2023年發表在IEEE CVPR Paper Code 2.摘要 檢測圖像中低級結構&#xff08;低層特征&#xff09;一般包括分割操縱部分、識別失焦像素、分離陰影區域和檢測隱藏對象。雖…

c# Excel轉換成DataTable

/// <summary> /// Excel轉換成DataTable&#xff08;.xls&#xff09; /// </summary> /// <param name"filePath">Excel文件路徑</param> /// <returns></returns> public static Da…

人造太陽光熱模擬能量密度太陽模擬器

人造太陽模擬器其他名稱&#xff1a;能量密度太陽能光熱模擬能量密度太陽模擬器、能流密度太陽光模擬器、高通量太陽模擬器 高通量能留密度太陽能爐和太陽光模擬器產生高度集中的太陽能和人造光&#xff0c;用于新技術和材料的研究和測試。這使研究人員能夠進行制氫實驗、太陽…

備戰藍橋杯---線段樹基礎1

引入&#xff1a;RMQ問題&#xff1a; 什么是RMQ&#xff1f; 顯然&#xff0c;我們無法用前綴維護&#xff0c;因此&#xff0c;我們需要用到線段樹的知識&#xff1a; 什么是線段樹&#xff1f; 線段樹是用一種樹狀結構存儲一個連續區間信息的數據結構 下面我們用圖解釋用…

C++知識點總結(22):模擬算法真題 ★★★★☆《越野比賽》

二、越野比賽 1. 審題 題目描述 最近賽車手 K a l n Kaln Kaln 加入了心心念念的 F a r Far Far 車隊&#xff0c;馬上就迎來了自己的首秀&#xff0c;參加一場直線加速賽&#xff1a;已知 F a r Far Far 車隊會提供 n n n 種類型的賽車&#xff0c; K a l n Kaln Kaln 只…

【數據結構】隊列OJ題《用隊列實現棧》(題庫+解析+代碼)

1.前言 通過前面隊列的實現和詳解大家對隊列應該有一定熟悉了&#xff0c;現在上強度開始做題吧 隊列詳解&#xff1a;http://t.csdnimg.cn/dvTsW 2.OJ題目訓練225. 用隊列實現棧 題目分析 請你僅使用兩個隊列實現一個后入先出&#xff08;LIFO&#xff09;的棧&#xff0…

git 設置代理 取消代理

設置代理 git config --global http.proxy 127.0.0.1:7890 git config --global https.proxy 127.0.0.1:7890注意&#xff1a;加上 --global 是對 git 設置全局代理&#xff0c;不加 --global 對指定倉庫目錄設置代理&#xff0c;局部代理 查看已修改 git 配置信息 git conf…

【GPU驅動開發】- AST簡介

前言 不必害怕未知&#xff0c;無需恐懼犯錯&#xff0c;做一個Creator&#xff01; AST&#xff0c;抽象語法樹&#xff0c;是一種包含豐富語義信息的格式&#xff0c;其中包括類型、表達式樹和符號等。 TranslationUnitDecl&#xff1a;該類表示一個輸入源文件 ASTContext&…

Qt注冊類對象單例與單類型區別

1.實現類型SingletonTypeExample #ifndef SINGLETONTYPEEXAMPLE_H #define SINGLETONTYPEEXAMPLE_H#include <QObject>class SingletonTypeExample : public QObject {Q_OBJECT public://只能顯示構造類對象explicit SingletonTypeExample(QObject *parent nullptr);//…

【學習筆記】深度學習實戰 | LeNet

簡要聲明 學習相關網址 [雙語字幕]吳恩達深度學習deeplearning.aiPapers With CodeDatasets 深度學習網絡基于PyTorch學習架構&#xff0c;代碼測試可跑。本學習筆記單純是為了能對學到的內容有更深入的理解&#xff0c;如果有錯誤的地方&#xff0c;懇請包容和指正。 參考文獻…

KubeEdge 邊緣計算

文章目錄 1.KubeEdge2.KubeEdge 特點3.KubeEdge 組成4.KubeEdge 架構 KubeEdge # KubeEdgehttps://iothub.org.cn/docs/kubeedge/ https://iothub.org.cn/docs/kubeedge/kubeedge-summary/1.KubeEdge KubeEdge 是一個開源的系統&#xff0c;可將本機容器化應用編排和管理擴展…