55、Flink 中使用 Java Lambda 表達式詳解

1)概述
1.注意

Flink 支持對 Java API 的所有算子使用 Lambda 表達式,但是,當 Lambda 表達式使用 Java 泛型時,需要 顯式 地聲明類型信息。

2.示例和限制

示例: map() 函數使用 Lambda 表達式計算輸入值的平方。

不需要聲明 map() 函數的輸入 i 和輸出參數的數據類型,因為 Java 編譯器會對它們做出推斷。

env.fromElements(1, 2, 3)
// 返回 i 的平方
.map(i -> i*i)
.print();

由于 OUTInteger 而不是泛型,所以 Flink 可以從方法簽名 OUT map(IN value) 的實現中自動提取出結果的類型信息。

但像 flatMap() 這樣的函數,它的簽名 void flatMap(IN value, Collector out) 被 Java 編譯器編譯為 void flatMap(IN value, Collector out)。Flink 就無法自動推斷輸出的類型信息了。

Flink 很可能拋出如下異常:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.Otherwise the type has to be specified explicitly using type information.

此時需要 顯式 指定類型信息,否則輸出將被視為 Object 類型,這會導致低效的序列化。

DataStream<Integer> input = env.fromElements(1, 2, 3);// 必須聲明 collector 類型
input.flatMap((Integer number, Collector<String> out) -> {StringBuilder builder = new StringBuilder();for(int i = 0; i < number; i++) {builder.append("a");out.collect(builder.toString());}
})
// 顯式提供類型信息
.returns(Types.STRING)
// 打印 "a", "a", "aa", "a", "aa", "aaa"
.print();

當使用 map() 函數返回泛型類型的時候也會發生類似的問題。下面示例中的方法簽名 Tuple2<Integer,Integer> map(Integer value) 被擦除為 Tuple2 map(Integer value)

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;env.fromElements(1, 2, 3).map(i -> Tuple2.of(i, i))    // 沒有關于 Tuple2 字段的信息.print();

解決方式如下

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;// 使用顯式的 ".returns(...)"
env.fromElements(1, 2, 3).map(i -> Tuple2.of(i, i)).returns(Types.TUPLE(Types.INT, Types.INT)).print();// 使用類來替代
env.fromElements(1, 2, 3).map(new MyTuple2Mapper()).print();public static class MyTuple2Mapper extends MapFunction<Integer, Tuple2<Integer, Integer>> {@Overridepublic Tuple2<Integer, Integer> map(Integer i) {return Tuple2.of(i, i);}
}// 使用匿名類來替代
env.fromElements(1, 2, 3).map(new MapFunction<Integer, Tuple2<Integer, Integer>> {@Overridepublic Tuple2<Integer, Integer> map(Integer i) {return Tuple2.of(i, i);}}).print();// 也可以像這個示例中使用 Tuple 的子類來替代
env.fromElements(1, 2, 3).map(i -> new DoubleTuple(i, i)).print();public static class DoubleTuple extends Tuple2<Integer, Integer> {public DoubleTuple(int f0, int f1) {this.f0 = f0;this.f1 = f1;}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/39672.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/39672.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/39672.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

大學新生人工智能學習路線規劃

1. 引言 七月來臨&#xff0c;各省高考分數已揭榜完成。而高考的完結并不意味著學習的結束&#xff0c;而是新旅程的開始。對于有志于踏入IT領域的高考少年們&#xff0c;這個假期是開啟探索IT世界的絕佳時機。作為該領域的前行者和經驗前輩&#xff0c;我愿意為準新生們提供一…

基于Hadoop平臺的電信客服數據的處理與分析③項目開發:搭建基于Hadoop的全分布式集群---任務10:Hive安裝部署

任務描述 任務內容為安裝并配置在Hadoop集群中使用Hive。 任務指導 Hive是一個基于Hadoop的數據倉庫框架&#xff0c;在實際使用時需要將元數據存儲在數據庫中 具體安裝步驟如下&#xff1a; 1. 安裝MySQL數據庫&#xff08;已安裝&#xff09; 2. 解壓縮Hive的壓縮包 3…

剪映 v5.5 Pro Vip解鎖版:使用指南與注意事項

摘要&#xff1a;本文介紹了剪映Pro VIP解鎖版的使用方法&#xff0c;包括安裝、測試和使用VIP素材的步驟&#xff0c;以及如何避免誤報和保持解鎖狀態的建議。 正文&#xff1a; 剪映Pro是一款廣受歡迎的視頻編輯軟件&#xff0c;提供了豐富的視頻編輯功能和大量高質量的素材…

發送微信消息和文件

參考&#xff1a;https://www.bilibili.com/video/BV1S84y1m7xd 安裝&#xff1a; pip install PyOfficeRobotimport PyOfficeRobotPyOfficeRobot.chat.send_message(who"文件傳輸助手", message"你好&#xff0c;我是PyOfficeRobot&#xff0c;有什么可以幫助…

RabbitMQ中java實現隊列和交換機的聲明

java實現隊列和交換機的聲明 在之前我們都是基于RabbitMQ控制臺來創建隊列、交換機。但是在實際開發時&#xff0c;隊列和交換機是程序員定義的&#xff0c;將來項目上線&#xff0c;又要交給運維去創建。那么程序員就需要把程序中運行的所有隊列和交換機都寫下來&#xff0c;…

【PYG】 PyTorch中size方法和屬性

在 PyTorch 中&#xff0c;size 方法和屬性用于獲取張量的維度信息。下面是它們的用法和區別&#xff1a; node_features.size&#xff1a; 這是一個屬性&#xff08;attribute &#xff09;&#xff0c;返回一個 torch.Size 對象&#xff0c;表示張量的維度。這是不可調用的&a…

用MySQL+node+vue做一個學生信息管理系統(一):配置項目

先用npm init -y生成配置文件 在項目下新建src文件夾&#xff0c;app.js文件。src目錄用來放靜態資源文件&#xff0c;app.js是服務器文件&#xff0c;index.js是vue的入口文件 使用npm install express下載express框架 在app.js文件夾開啟node服務&#xff0c;監聽的端口為…

C++ //練習 14.29 為什么不定義const版本的遞增和遞減運算符?

C Primer&#xff08;第5版&#xff09; 練習 14.29 練習 14.29 為什么不定義const版本的遞增和遞減運算符&#xff1f; 環境&#xff1a;Linux Ubuntu&#xff08;云服務器&#xff09; 工具&#xff1a;vim 解釋&#xff1a; 遞增和遞減要改變對象本身&#xff0c;const類…

Go語言--運算符

算術運算符 關系運算符 不能寫0<a<10&#xff0c;要判斷必須0<a&&a<10。因為int和bool不兼容 邏輯運算符 位運算符 賦值運算符 其他 運算符的優先級

AcWing 1254:找樹根和孩子

【題目來源】https://www.acwing.com/problem/content/1256/【題目描述】 給定一棵樹&#xff0c;輸出樹的根root&#xff0c;孩子最多的結點max以及他的孩子。【輸入格式】 第一行&#xff1a;n&#xff0c;m&#xff0c;表示樹的節點數和邊數。 以下m行&#xff1a;每行兩個結…

浮點數在內存中的存儲結構

浮點數在內存中的存儲可以參考《IEEE754標準》https://people.eecs.berkeley.edu/~wkahan/ieee754status/IEEE754.PDF 參考博文&#xff1a;IEEE754詳解&#xff08;最詳細簡單有趣味的介紹&#xff09;-CSDN博客 單精度float占內存4字節&#xff0c;最高位bit31表示符號位&…

國家海岸線變化評估:新英格蘭和中大西洋沿岸海岸線的歷史變化

National Assessment of Shoreline Change: Historical Shoreline Change along the New England and Mid-Atlantic Coasts 國家海岸線變化評估&#xff1a;新英格蘭和中大西洋沿岸海岸線的歷史變化 摘要 海灘侵蝕是美國許多公海沿岸的一個長期問題。隨著沿岸人口的不斷增加…

永輝超市購物卡有什么用?

感覺現在在超市買東西&#xff0c;還不如網購 這不&#xff0c;端午的時候&#xff0c;朋友送的永輝卡&#xff0c;一直沒時間去用&#xff0c;我總擔心過期 但是去了超市后&#xff0c;又不知道買什么&#xff0c;最后空手而歸 還好收卡云可以回收永輝卡&#xff0c;兩張三…

《C++20設計模式》適配器模式經驗分享

文章目錄 一、前言二、對于接口的討論三、實現1、對象適配器1.1 UML類圖1.2 實現 2、類適配器 四、最后 一、前言 從適配器模式開始就是類的組合聚合&#xff0c;類與類之間結構性的問題了。 適配器模式解決的問題&#xff1a; 適配器模式能夠在不破壞現有系統結構的情況下&a…

mapreduce實現bean的序列化與反序列化

目錄 序列化&#xff08;Serialization&#xff09; 反序列化&#xff08;Deserialization&#xff09; 事例操作 UserSale 重寫序列化方法 重寫反序列化 重寫toString方法 SaleMapper SaleReducer SaleDriver 序列化&#xff08;Serialization&#xff09; 序列化是將…

【后端面試題】【中間件】【NoSQL】MongoDB的配置服務器、復制機制、寫入語義和面試準備

MongoDB的配置服務器 引入了分片機制之后&#xff0c;MongoDB啟用了配置服務器(config server) 來存儲元數據&#xff0c;這些元數據包括分片信息、權限控制信息&#xff0c;用來控制分布式鎖。其中分片信息還會被負責執行查詢mongos使用。 MongoDB的配置服務器有一個很大的優…

WPF----自定義滾動條ScrollViewer

滾動條是項目當中經常用到的一個控件&#xff0c;大部分對外項目都有外觀的需求&#xff0c;因此需要自定義&#xff0c;文中主要是針對一段動態的狀態數據進行展示&#xff0c;并保證數據始終在最新一條&#xff0c;就是需要滾動條滾動到底部。 1&#xff0c;xaml中引入 <…

zxing-cpp+OpenCV根據字符串生成條形碼

編譯構建 需要使用到 CMake、Git、GCC 或 MSVC。 github 鏈接&#xff1a;https://github.com/zxing-cpp/zxing-cpp 編譯之前請確保&#xff1a; 確保安裝了 CMake 版本 3.15 或更高版本。 確保安裝了與 C17 兼容的編譯器(最低VS 2019 16.8 / gcc 7 / clang 5)。 編譯構建…

Python面試寶典第4題:環形鏈表

題目 給你一個鏈表的頭節點 head &#xff0c;判斷鏈表中是否有環。如果存在環 &#xff0c;則返回 true 。 否則&#xff0c;返回 false 。 如果鏈表中有某個節點&#xff0c;可以通過連續跟蹤 next 指針再次到達&#xff0c;則鏈表中存在環。 為了表示給定鏈表中的環&#xf…

重寫父類方法、創建單例對象 題目

題目 JAVA27 重寫父類方法分析&#xff1a;代碼&#xff1a; JAVA28 創建單例對象分析&#xff1a;代碼&#xff1a; JAVA27 重寫父類方法 描述 父類Base中定義了若干get方法&#xff0c;以及一個sum方法&#xff0c;sum方法是對一組數字的求和。請在子類 Sub 中重寫 getX() 方…