聊聊flink Table的OrderBy及Limit

本文主要研究一下flink Table的OrderBy及Limit

實例

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");Table in = tableEnv.fromDataSet(ds, "a, b, c");// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5); // skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
  • orderBy方法類似sql的order by;limit則由offset及fetch兩個方法構成,類似sql的offset及fetch

Table

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(private[flink] val tableEnv: TableEnvironment,private[flink] val logicalPlan: LogicalNode) {//......def orderBy(fields: String): Table = {val parsedFields = ExpressionParser.parseExpressionList(fields)orderBy(parsedFields: _*)}def orderBy(fields: Expression*): Table = {val order: Seq[Ordering] = fields.map {case o: Ordering => ocase e => Asc(e)}new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))}def offset(offset: Int): Table = {new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))}def fetch(fetch: Int): Table = {if (fetch < 0) {throw new ValidationException("FETCH count must be equal or larger than 0.")}this.logicalPlan match {case Limit(o, -1, c) =>// replace LIMIT without FETCH by LIMIT with FETCHnew Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))case Limit(_, _, _) =>throw new ValidationException("FETCH is already defined.")case _ =>new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))}}//......
}
  • Table的orderBy方法,支持String或Expression類型的參數,其中String類型最終是轉為Expression類型;orderBy方法最后使用Sort重新創建了Table;offset及fetch方法,使用Limit重新創建了Table(offset方法創建的Limit其fetch為-1;fetch方法如果之前沒有指定offset則創建的Limit的offset為0)

Sort

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {override def output: Seq[Attribute] = child.outputoverride protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {child.construct(relBuilder)relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)}override def validate(tableEnv: TableEnvironment): LogicalNode = {if (tableEnv.isInstanceOf[StreamTableEnvironment]) {failValidation(s"Sort on stream tables is currently not supported.")}super.validate(tableEnv)}
}
  • Sort繼承了UnaryNode,它的構造器接收Set類型的Ordering,其construct方法使用relBuilder.sort來構建sort條件

Ordering

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/ordering.scala

abstract class Ordering extends UnaryExpression {override private[flink] def validateInput(): ValidationResult = {if (!child.isInstanceOf[NamedExpression]) {ValidationFailure(s"Sort should only based on field reference")} else {ValidationSuccess}}
}case class Asc(child: Expression) extends Ordering {override def toString: String = s"($child).asc"override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {child.toRexNode}override private[flink] def resultType: TypeInformation[_] = child.resultType
}case class Desc(child: Expression) extends Ordering {override def toString: String = s"($child).desc"override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {relBuilder.desc(child.toRexNode)}override private[flink] def resultType: TypeInformation[_] = child.resultType
}
  • Ordering是一個抽象類,它有Asc及Desc兩個子類

Limit

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {override def output: Seq[Attribute] = child.outputoverride protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {child.construct(relBuilder)relBuilder.limit(offset, fetch)}override def validate(tableEnv: TableEnvironment): LogicalNode = {if (tableEnv.isInstanceOf[StreamTableEnvironment]) {failValidation(s"Limit on stream tables is currently not supported.")}if (!child.isInstanceOf[Sort]) {failValidation(s"Limit operator must be preceded by an OrderBy operator.")}if (offset < 0) {failValidation(s"Offset should be greater than or equal to zero.")}super.validate(tableEnv)}
}
  • Limit繼承了UnaryNode,它的構造器接收offset及fetch參數,它的construct方法通過relBuilder.limit來設置offset及fetch

小結

  • Table的orderBy方法類似sql的order by;limit則由offset及fetch兩個方法構成,類似sql的offset及fetch
  • Table的orderBy方法,支持String或Expression類型的參數,其中String類型最終是轉為Expression類型;orderBy方法最后使用Sort重新創建了Table;offset及fetch方法,使用Limit重新創建了Table(offset方法創建的Limit其fetch為-1;fetch方法如果之前沒有指定offset則創建的Limit的offset為0)
  • Sort繼承了UnaryNode,它的構造器接收Set類型的Ordering,其construct方法使用relBuilder.sort來構建sort條件;Ordering是一個抽象類,它有Asc及Desc兩個子類;Limit繼承了UnaryNode,它的構造器接收offset及fetch參數,它的construct方法通過relBuilder.limit來設置offset及fetch

doc

  • OrderBy, Offset & Fetch

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/388353.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/388353.shtml
英文地址,請注明出處:http://en.pswp.cn/news/388353.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

binary masks_Python中的Masks概念

binary masksAll men are sculptors, constantly chipping away the unwanted parts of their lives, trying to create their idea of a masterpiece … Eddie Murphy所有的人都是雕塑家&#xff0c;不斷地消除生活中不必要的部分&#xff0c;試圖建立自己的杰作理念……埃迪墨…

css+沿正方形旋轉,CSS3+SVG+JS 正方形沿著正方本中軸移動翻轉的動畫

CSS語言&#xff1a;CSSSCSS確定* {margin: 0;padding: 0;fill: currentColor;vector-effect: non-scaling-stroke;}html {overflow: hidden;}body {display: flex;flex-direction: column;margin: 0;min-width: 10em;height: 100vh;}body > svg,body > input,body > …

Iphone在ScrollView下點擊TextField使文本筐不被鍵盤遮住

1.拖一個Scroll View視圖填充View窗口&#xff0c;將Scroll View視圖拖大一些&#xff0c;使其超出屏幕。 2.向Scroll View拖&#xff08;添加&#xff09;多個Label視圖和Text View視圖。 3.在.h頭文件中添加如下代碼&#xff1a; [cpp] view plaincopyprint?#import <U…

兩高發布司法解釋 依法嚴懲涉地下錢莊犯罪

新華社北京1月31日電&#xff08;記者丁小溪&#xff09;最高人民法院、最高人民檢察院31日聯合發布關于辦理非法從事資金支付結算業務、非法買賣外匯刑事案件適用法律若干問題的解釋&#xff0c;旨在依法懲治非法從事資金支付結算業務、非法買賣外匯犯罪活動&#xff0c;維護金…

python 儀表盤_如何使用Python刮除儀表板

python 儀表盤Dashboard scraping is a useful skill to have when the only way to interact with the data you need is through a dashboard. We’re going to learn how to scrape data from a dashboard using the Selenium and Beautiful Soup packages in Python. The S…

VS2015 定時服務及控制端

一. 服務端 如下圖—新建項目—經典桌面—Windows服務—起名svrr2. 打到server1 改名為svrExecSqlInsert 右擊對應的設計界面&#xff0c;添加安裝服務目錄結構如圖 3. svrExecSqlInsert里有打到OnStart()方法開始寫代碼如下 /// <summary>/// 服務開啟操作/// </su…

css文件如何設置scss,Webpack - 如何將scss編譯成單獨的css文件?

2 個答案:答案 0 :(得分&#xff1a;3)這是我在嘗試將css編譯成單獨文件時使用的webpack.config.js文件|-- App|-- dist|-- src|-- css|-- header.css|-- sass|-- img|-- partials|-- _variables.scss|-- main.scss|--ts|-- tsconfig.json|-- user.ts|-- main.js|-- app.js|-- …

Iphone表視圖的簡單操作

1.創建一個Navigation—based—Application項目&#xff0c;這樣Interface Builder中會自動生成一個Table View&#xff0c;然后將Search Bar拖放到表示圖上&#xff0c;以我們要給表示圖添加搜索功能&#xff0c;不要忘記將Search Bar的delegate連接到File‘s Owner項&#xf…

PhantomJS的使用

PhantomJS安裝下載地址 配置環境變量 成功&#xff01; 轉載于:https://www.cnblogs.com/hankleo/p/9736323.html

aws emr 大數據分析_DataOps —使用AWS Lambda和Amazon EMR的全自動,低成本數據管道

aws emr 大數據分析Progression is continuous. Taking a flashback journey through my 25 years career in information technology, I have experienced several phases of progression and adaptation.進步是連續的。 在我25年的信息技術職業生涯中經歷了一次閃回之旅&…

21eval 函數

eval() 函數十分強大 ---- 將字符串 當成 有效的表達式 來求職 并 返回計算結果 1 # 基本的數學計算2 print(eval("1 1")) # 23 4 # 字符串重復5 print(eval("* * 5")) # *****6 7 # 將字符串轉換成列表8 print(eval("[1, 2, 3, 4]")) # [1,…

聯想r630服務器開啟虛擬化,整合虛擬化 聯想萬全R630服務器上市

虛擬化技術的突飛猛進&#xff0c;對運行虛擬化應用的服務器平臺的運算性能提出了更高的要求。近日&#xff0c;聯想萬全R630G7正式對外發布。這款計算性能強勁&#xff0c;IO吞吐能力強大的四路四核服務器&#xff0c;主要面向高端企業級應用而開發。不僅能夠完美承載大規模數…

Iphone屏幕旋轉

該示例是想在手機屏幕方向發生改變時重新定位視圖&#xff08;這里是一個button&#xff09; 1.創建一個View—based Application項目,并在View窗口中添加一個Round Rect Button視圖&#xff0c;通過尺寸檢查器設置其位置&#xff0c;然后單擊View窗口右上角的箭頭圖標來旋轉窗…

先進的NumPy數據科學

We will be covering some of the advanced concepts of NumPy specifically functions and methods required to work on a realtime dataset. Concepts covered here are more than enough to start your journey with data.我們將介紹NumPy的一些高級概念&#xff0c;特別是…

lsof命令詳解

基礎命令學習目錄首頁 原文鏈接&#xff1a;https://www.cnblogs.com/ggjucheng/archive/2012/01/08/2316599.html 簡介 lsof(list open files)是一個列出當前系統打開文件的工具。在linux環境下&#xff0c;任何事物都以文件的形式存在&#xff0c;通過文件不僅僅可以訪問常規…

Xcode中捕獲iphone/ipad/ipod手機攝像頭的實時視頻數據

目的&#xff1a;打開、關閉前置攝像頭&#xff0c;繪制圖像&#xff0c;并獲取攝像頭的二進制數據。 需要的庫 AVFoundation.framework 、CoreVideo.framework 、CoreMedia.framework 、QuartzCore.framework 該攝像頭捕抓必須編譯真機的版本&#xff0c;模擬器下編譯不了。 函…

統計和冰淇淋

Photo by Irene Kredenets on UnsplashIrene Kredenets在Unsplash上拍攝的照片 摘要 (Summary) In this article, you will learn a little bit about probability calculations in R Studio. As it is a Statistical language, R comes with many tests already built in it, …

信息流服務器哪種好,選購存儲服務器需要注意六大關鍵因素,你知道幾個?

原標題&#xff1a;選購存儲服務器需要注意六大關鍵因素&#xff0c;你知道幾個&#xff1f;信息技術的飛速發展帶動了整個信息產業的發展。越來越多的電子商務平臺和虛擬化環境出現在企業的日常應用中。存儲服務器作為企業建設環境的核心設備&#xff0c;在整個信息流中承擔著…

t3 深入Tornado

3.1 Application settings 前面的學習中&#xff0c;在創建tornado.web.Application的對象時&#xff0c;傳入了第一個參數——路由映射列表。實際上Application類的構造函數還接收很多關于tornado web應用的配置參數。 參數&#xff1a; debug&#xff0c;設置tornado是否工作…

vml編輯器

<HTML xmlns:v> <HEAD> <META http-equiv"Content-Type" content"text/html; Charsetgb2312"> <META name"GENERATOR" content"網絡程序員伴侶(Lshdic)2004"> <META name"GENERATORDOWNLOADADDRESS&q…