1)概述
1.注意
Flink 支持對 Java API 的所有算子使用 Lambda 表達式,但是,當 Lambda 表達式使用 Java 泛型時,需要 顯式 地聲明類型信息。
2.示例和限制
示例: map()
函數使用 Lambda 表達式計算輸入值的平方。
不需要聲明 map()
函數的輸入 i
和輸出參數的數據類型,因為 Java 編譯器會對它們做出推斷。
env.fromElements(1, 2, 3)
// 返回 i 的平方
.map(i -> i*i)
.print();
由于 OUT
是 Integer
而不是泛型,所以 Flink 可以從方法簽名 OUT map(IN value)
的實現中自動提取出結果的類型信息。
但像 flatMap()
這樣的函數,它的簽名 void flatMap(IN value, Collector out)
被 Java 編譯器編譯為 void flatMap(IN value, Collector out)
。Flink 就無法自動推斷輸出的類型信息了。
Flink 很可能拋出如下異常:
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.Otherwise the type has to be specified explicitly using type information.
此時需要 顯式 指定類型信息,否則輸出將被視為 Object
類型,這會導致低效的序列化。
DataStream<Integer> input = env.fromElements(1, 2, 3);// 必須聲明 collector 類型
input.flatMap((Integer number, Collector<String> out) -> {StringBuilder builder = new StringBuilder();for(int i = 0; i < number; i++) {builder.append("a");out.collect(builder.toString());}
})
// 顯式提供類型信息
.returns(Types.STRING)
// 打印 "a", "a", "aa", "a", "aa", "aaa"
.print();
當使用 map()
函數返回泛型類型的時候也會發生類似的問題。下面示例中的方法簽名 Tuple2<Integer,Integer> map(Integer value)
被擦除為 Tuple2 map(Integer value)
。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;env.fromElements(1, 2, 3).map(i -> Tuple2.of(i, i)) // 沒有關于 Tuple2 字段的信息.print();
解決方式如下:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;// 使用顯式的 ".returns(...)"
env.fromElements(1, 2, 3).map(i -> Tuple2.of(i, i)).returns(Types.TUPLE(Types.INT, Types.INT)).print();// 使用類來替代
env.fromElements(1, 2, 3).map(new MyTuple2Mapper()).print();public static class MyTuple2Mapper extends MapFunction<Integer, Tuple2<Integer, Integer>> {@Overridepublic Tuple2<Integer, Integer> map(Integer i) {return Tuple2.of(i, i);}
}// 使用匿名類來替代
env.fromElements(1, 2, 3).map(new MapFunction<Integer, Tuple2<Integer, Integer>> {@Overridepublic Tuple2<Integer, Integer> map(Integer i) {return Tuple2.of(i, i);}}).print();// 也可以像這個示例中使用 Tuple 的子類來替代
env.fromElements(1, 2, 3).map(i -> new DoubleTuple(i, i)).print();public static class DoubleTuple extends Tuple2<Integer, Integer> {public DoubleTuple(int f0, int f1) {this.f0 = f0;this.f1 = f1;}
}