Flink流批一體計算(14):PyFlink Tabel API之SQL查詢

舉個例子

查詢 source 表,同時執行計算

# 通過 Table API 創建一張表:
source_table = table_env.from_path("datagen")
# 或者通過 SQL 查詢語句創建一張表:
source_table = table_env.sql_query("SELECT * FROM datagen")
result_table = source_table.select(source_table.id + 1, source_table.data)

Table API 查詢

Table 對象有許多方法,可以用于進行關系操作。

這些方法返回新的 Table 對象,表示對輸入 Table 應用關系操作之后的結果。

這些關系操作可以由多個方法調用組成,例如 table.group_by(...).select(...)。

Table API 文檔描述了流和批處理上所有支持的 Table API 操作。

以下示例展示了一個簡單的 Table API 聚合查詢:

from pyflink.table import Environmentsettings, TableEnvironment
# 通過 batch table environment 來執行查詢
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])
# 計算所有來自法國客戶的收入
revenue = orders \.select(orders.name, orders.country, orders.revenue) \.where(orders.country == 'FRANCE') \.group_by(orders.name) \.select(orders.name, orders.revenue.sum.alias('rev_sum'))
revenue.to_pandas()

Table API 也支持行操作的 API, 這些行操作包括 Map Operation, FlatMap Operation, Aggregate Operation 和 FlatAggregate Operation.

以下示例展示了一個簡單的 Table API 基于行操作的查詢

from pyflink.table import Environmentsettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd# 通過 batch table environment 來執行查詢
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue'])
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),result_type=DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.BIGINT())]),func_type="pandas")
orders.map(map_function).alias('name', 'revenue').to_pandas()

SQL 查詢

Flink 的 SQL 基于 Apache Calcite,它實現了標準的 SQL。SQL 查詢語句使用字符串來表達。SQL 支持Flink 對流和批處理。

下面示例展示了一個簡單的 SQL 聚合查詢:

from pyflink.table import Environmentsettings, TableEnvironment# 通過 stream table environment 來執行查詢env_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='8','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='11')""")table_env.execute_sql("""CREATE TABLE print_sink (id BIGINT,data_sum TINYINT) WITH ('connector' = 'print')""")table_env.execute_sql("""INSERT INTO print_sinkSELECT id, sum(data) as data_sum FROM(SELECT id / 2 as id, data FROM random_source)WHERE id > 1GROUP BY id""").wait()

Table API 和 SQL 的混合使用

Table API 中的 Table 對象和 SQL 中的 Table 可以自由地相互轉換。

下面例子展示了如何在 SQL 中使用 Table 對象:

create_temporary_view(view_path, table)? 將一個 `Table` 對象注冊為一張臨時表,類似于 SQL 的臨時表。

# 創建一張 sink 表來接收結果數據
table_env.execute_sql("""CREATE TABLE table_sink (id BIGINT,data VARCHAR) WITH ('connector' = 'print')
""")
# 將 Table API 表轉換成 SQL 中的視圖
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)
# 將 Table API 表的數據寫入結果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()

下面例子展示了如何在 Table API 中使用 SQL 表:

sql_query(query)?? 執行一條 SQL 查詢,并將查詢的結果作為一個 `Table` 對象。

# 創建一張 SQL source 表
table_env.execute_sql("""CREATE TABLE sql_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='4','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='7')
""")# 將 SQL 表轉換成 Table API 表
table = table_env.from_path("sql_source")
# 或者通過 SQL 查詢語句創建表
table = table_env.sql_query("SELECT * FROM sql_source")
# 將表中的數據寫出
table.to_pandas()

優化

數據傾斜

當數據發生傾斜(某一部分數據量特別大),雖然沒有GCGabage Collection,垃圾回收),但是task執行時間嚴重不一致。

  • 需要重新設計key,以更小粒度的key使得task大小合理化。
  • 修改并行度。
  • 調用rebalance操作,使數據分區均勻。

緩沖區超時設置

由于task在執行過程中存在數據通過網絡進行交換,數據在不同服務器之間傳遞的緩沖區超時時間可以通過setBufferTimeout進行設置。

當設置“setBufferTimeout(-1)”,會等待緩沖區滿之后才會刷新,使其達到最大吞吐量;當設置“setBufferTimeout(0)”時,可以最小化延遲,數據一旦接收到就會刷新;當設置“setBufferTimeout”大于0時,緩沖區會在該時間之后超時,然后進行緩沖區的刷新。

示例可以參考如下:

env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/42915.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/42915.shtml
英文地址,請注明出處:http://en.pswp.cn/news/42915.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

QT實現天氣預報

1. MainWindow類設計的成員變量和方法 public: MainWindow(QWidget* parent nullptr); ~MainWindow(); protected: 形成文本菜單來用來右鍵關閉窗口 void contextMenuEvent(QContextMenuEvent* event); 鼠標被點擊之后此事件被調用 void mousePressEvent(QMouseEv…

Leetcode每日一題:1444. 切披薩的方案數(2023.8.17 C++)

目錄 1444. 切披薩的方案數 題目描述: 實現代碼與解析: 二維后綴和 動態規劃 原理思路: 1444. 切披薩的方案數 題目描述: 給你一個 rows x cols 大小的矩形披薩和一個整數 k ,矩形包含兩種字符: A …

Spring(三):Spring中Bean的生命周期和作用域

前言 在 Spring 中,那些組成應用程序的主體及由 Spring IOC 容器所管理的對象,被稱之為 bean。簡單地講,bean 就是由 IOC 容器初始化、裝配及管理的對象,除此之外,bean 就與應用程序中的其他對象沒有什么區別了。而 b…

Oracle數據庫運維大全

以下是一些常見的Oracle數據庫運維任務和對應的語句腳本示例: 檢查數據庫實例狀態: SELECT instance_name, status, startup_time FROM v$instance; 查看數據庫版本和補丁級別: SELECT * FROM v$version; SELECT patch_id, action, status …

LeetCode 熱題 100(四):48. 旋轉圖像、240. 搜索二維矩陣 II、234. 回文鏈表

一.48. 旋轉圖像 題目要求:就是一個順時針的旋轉過程。 思路:觀察矩陣,得出翻轉前第i行的第J個元素 等于 翻轉后倒數第i列的第J個元素,舉例說明,第1行第2個元素為“2”,翻轉后到了 倒數第1列的第2個元素…

MAC環境,在IDEA執行報錯java: -source 1.5 中不支持 diamond 運算符

Error:(41, 51) java: -source 1.5 中不支持 diamond 運算符 (請使用 -source 7 或更高版本以啟用 diamond 運算符) 進入設置 修改java版本 pom文件中加入 <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin&l…

vue項目預覽pdf功能(解決動態文字無法顯示的問題)

最近&#xff0c;因為公司項目需要預覽pdf的功能&#xff0c;開始的時候找了市面上的一些pdf插件&#xff0c;都能用&#xff0c;但是&#xff0c;后面因為pdf變成了需要根據內容進行變化的&#xff0c;然后&#xff0c;就出現了需要動態生成的文字不顯示了。換了好多好多的插件…

Flink安裝與使用

1.安裝準備工作 下載flink Apache Flink: 下載 解壓 [dodahost166 bigdata]$ tar -zxvf flink-1.12.0-bin-scala_2.11.tgz 2.Flinnk的standalone模式安裝 2.1修改配置文件并啟動 修改&#xff0c;好像使用默認的就可以了 [dodahost166 conf]$ more flink-conf.yaml 啟動 …

【辦公自動化】使用Python批量生成PPT版榮譽證書

&#x1f935;?♂? 個人主頁&#xff1a;艾派森的個人主頁 ?&#x1f3fb;作者簡介&#xff1a;Python學習者 &#x1f40b; 希望大家多多支持&#xff0c;我們一起進步&#xff01;&#x1f604; 如果文章對你有幫助的話&#xff0c; 歡迎評論 &#x1f4ac;點贊&#x1f4…

RocketMQ消費者可以手動消費但無法主動消費問題,或生成者發送超時

1.大多數是配置問題 修改rocketmq文件夾broker.conf 2.配置與集群IP或本地IPV4一樣 重啟 在RocketMQ獨享實例中支持IPv4和IPv6雙棧&#xff0c;主要是通過在網絡層面上同時支持IPv4和IPv6協議棧來實現的。RocketMQ的Broker端、Namesrv端和客戶端都需要支持IPv4和IPv6協議&…

Python土力學與基礎工程計算.PDF-螺旋板載荷試驗

python 求解代碼如下&#xff1a; 1. import numpy as np 2. 3. # 已知參數 4. p_a 100 # 標準壓力&#xff0c; kPa 5. p np.array([25, 50, 100, 200) # 荷載&#xff0c; kPa 6. s np.array([2.88, 5.28, 9.50, 15.00) / 10 # 沉降量&#xff0c; cm 7. D 10 # 螺旋板直…

C語言:選擇+編程(每日一練)

目錄 選擇題&#xff1a; 題一&#xff1a; 題二&#xff1a; 題三&#xff1a; 題四&#xff1a; 題五&#xff1a; 編程題&#xff1a; 題一&#xff1a;尼科徹斯定理 示例1 題二&#xff1a;等差數列 示例2 本人實力有限可能對一些地方解釋和理解的不夠清晰&…

Redis知識(一)

目錄 Redis過期刪除和內存淘汰策略&#xff1a; 過期刪除策略&#xff1a; 內存淘汰策略&#xff08;解決內存過大問題&#xff09;&#xff1a; LRU和LFU以及他們在Redis里的實現 主從復制 哨兵模式 緩存 緩存雪崩 緩存擊穿 緩存穿透 數據庫和緩存一致性問題 Redis…

windows下redis服務啟動及.bat文件中中redis服務的啟動

windows windows下redis服務的啟動 1、不配置環境變量 找到redis服務的安裝目錄進入命令行窗口并輸入命令redis-server.exe redis.windows.conf2、配置環境變量 將redis安裝目錄配置在path環境變量中之后就可以在cmd窗口的任意位置輸入redis-server命令就可以啟動redis服務…

材料行業可以轉IC設計后端嗎?

近來有許多材料行業的小伙伴通過后臺來問我對于職業規劃的看法&#xff0c;甚至有些小伙伴直接點明了某個行業適不適合自己&#xff0c;那么我這邊僅以近年來比較熱門的數字芯片設計來展開講講&#xff0c;材料適不適合轉行做IC呢。 對于理工科的同學而言&#xff0c;選擇哪個…

Graal 編譯器

一開始,我們來講一個故事。假設有一個名為 John 的開發人員,他正在嘗試編寫一些高性能的 Java 代碼。他遇到了一些性能和速度問題,因為他的應用需要經常從大量的數據源中獲取數據,并進行計算。他嘗試了許多優化工具和技術,但是仍然無法滿足他的需求。在這個時候,他聽說了…

公告:微信小程序備案期限官方要求

備案期限要求 1、若微信小程序未上架&#xff0c;自2023年9月1日起&#xff0c;微信小程序須完成備案后才可上架&#xff0c;備案時間1-20日不等&#xff1b; 2、若微信小程序已上架&#xff0c;請于2024年3月31日前完成備案&#xff0c;逾期未完成備案&#xff0c;平臺將按照…

Android Studio實現列表展示圖片

效果&#xff1a; MainActivity 類 package com.example.tabulation;import android.content.Intent; import android.os.Bundle; import android.view.View;import androidx.appcompat.app.AppCompatActivity; import androidx.recyclerview.widget.LinearLayoutManager; im…

解決 Maven 創建 Spring Boot 項目時出現 “Cannot access alimaven“ 錯誤的方法

系列文章目錄 文章目錄 系列文章目錄前言一、確認 Maven 配置二、創建 Spring Boot 項目三、修改項目的 Maven 配置四、清除 Maven 本地倉庫五、重新構建項目總結前言 Maven 是 Java 項目的構建工具,而 Spring Boot 則是用于快速構建 Spring 應用程序的框架。但有時,在創建 …

Redis擴容與一致性Hash算法解析

推薦閱讀 AI文本 OCR識別最佳實踐 AI Gamma一鍵生成PPT工具直達鏈接 玩轉cloud Studio 在線編碼神器 玩轉 GPU AI繪畫、AI講話、翻譯,GPU點亮AI想象空間 資源分享 「java、python面試題」來自UC網盤app分享&#xff0c;打開手機app&#xff0c;額外獲得1T空間 https://dr…