場景
自定義Map或者別的算子的時候,有時候需要定義一些類變量,在flink內部高并發的情況下需要正確理解這些變量的行為
代碼
package com.pg.function;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class FlinkFunction {//對于自定義函數中的變量,只有內置的狀態是完全按照flink內置的 keyBy行為來的//如果是自定義的緩存比如ArrayList 則可能不會按照預期的行為public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStream<String> dataStream = env.fromElements( "b","b","b","c","c","c","d","d","d");dataStream.keyBy(x->{return x;}).map(new MyMap()).print();env.execute();}}class MyMap extends RichMapFunction<String, String> {public ArrayList<String> list= new ArrayList<>();
// public ValueState<Integer> counter;//存儲數據條數
// public ValueState<String> element;//存儲臨時數據
// @Override
// public void open(Configuration parameters) throws Exception {
// counter = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("counter", Types.INT));
// element = getRuntimeContext().getState(new ValueStateDescriptor<>("element", Types.STRING));
// }@Overridepublic String map(String s) throws Exception {list.add(s);if(list.size()==2){String re = list.toString();list.clear();return re;}else {return "null";}
// if (counter.value() == null) {
// counter.update(1);//遇見第一條數據的時候,計數器為1
// } else {
// counter.update(counter.value() + 1);
// }
// if (element.value() == null) {
// element.update(s);//element只存儲上一次到來的數據
// }else {
// element.update(element.value()+s);
// }
// if (counter.value() == 2) {
// String re = element.value();
// //發出結果之后清楚狀態
// counter.clear();
// element.clear();
// return re;
// }else {
// return "null";
// }}
}
分析
keyBy之后,理論上相同key的會在map中用同樣的處理邏輯,我們的預期行為是輸出:bb,cc,dd
但是用ArrayList實現的邏輯最終輸出卻是:bb,bc,cc,dd
用ValueState的輸出是:bb,cc,dd
這說明了,keBy后的邏輯,ArrayList不會按照預期的行為執行。這是因為在flink中,當多個并發的時候,多個key如果落入同一個線程
則當前線程的valueState是和某一個key綁定的,符合flink預期行為,但是ArrayList以及其它你定義的變量則不做保證, 它是線程級別的局部變量, 這點要注意。