Flink流批一體計算(13):PyFlink Tabel API之SQL DDL

1. TableEnvironment

創建 TableEnvironment

from pyflink.table import Environmentsettings, TableEnvironment# create a streaming TableEnvironmentenv_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)# or create a batch TableEnvironmentenv_settings = Environmentsettings.in_batch_mode()table_env = TableEnvironment.create(env_settings)

TableEnvironment 是 Table API 和 SQL 集成的核心概念。

TableEnvironment 可以用來:

  • ·創建 Table
  • ·將 Table 注冊成臨時表
  • ·執行 SQL 查詢
  • ·注冊用戶自定義的 (標量,表值,或者聚合) 函數
  • ·配置作業
  • ·管理 Python 依賴
  • ·提交作業執行

創建 source 表

table_env.execute_sql("""CREATE TABLE datagen (id INT,data STRING) WITH ('connector' = 'datagen','fields.id.kind' = 'sequence','fields.id.start' = '1','fields.id.end' = '10')""")

創建 sink 表

table_env.execute_sql("""CREATE TABLE print (id INT,data STRING) WITH ('connector' = 'print')""")

2. Table

Table 是 Python Table API 的核心組件。Table 是 Table API 作業中間結果的邏輯表示。

一個 Table 實例總是與一個特定的 TableEnvironment 相綁定。

不支持在同一個查詢中合并來自不同 TableEnvironments 的表,例如 join 或者 union 它們。

通過列表類型的對象創建

你可以使用一個列表對象創建一張表:

from pyflink.table import Environmentsettings, TableEnvironment# 創建 批 TableEnvironmentenv_settings = Environmentsettings.in_batch_mode()table_env = TableEnvironment.create(env_settings)table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])table.to_pandas()==>print(table.to_pandas())table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])print(table.to_pandas())

通過 DDL 創建

你可以通過 DDL 創建一張表,execute_sql(stmt) 執行指定的語句并返回執行結果。

執行語句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。

注意,對于 "INSERT INTO" 語句,這是一個異步操作,通常在向遠程集群提交作業時才需要使用。

但是,如果在本地集群或者 IDE 中執行作業時,你需要等待作業執行完成。

from pyflink.table import Environmentsettings, TableEnvironment# 創建流 TableEnvironmentenv_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='3','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='6')""")table = table_env.from_path("random_source")table.to_pandas()

通過 Catalog 創建

Catalog

Catalog提供了元數據信息,例如數據庫、表、分區、視圖以及數據庫或其他外部系統中存儲的函數和信息。

數據處理最關鍵的方面之一是管理元數據。

元數據可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。

元數據也可以是持久化的,例如 Hive Metastore 中的元數據。

Catalog 提供了一個統一的API,用于管理元數據,并使其可以從 Table API 和 SQL 查詢語句中來訪問。

Catalog類型

GenericInMemoryCatalog

基于內存實現的 Catalog,所有元數據只在 session 的生命周期內可用。

JdbcCatalog

JdbcCatalog使得用戶可以將 Flink 通過 JDBC 協議連接到關系數據庫。

PostgresCatalog 是當前實現的唯一一種 JDBC Catalog。

HiveCatalog

HiveCatalog 有兩個用途:作為原生 Flink 元數據的持久化存儲,以及作為讀寫現有 Hive 元數據的接口。

警告 Hive Metastore 以小寫形式存儲所有元數據對象名稱,GenericInMemoryCatalog 區分大小寫。

用戶自定義 Catalog

Catalog 是可擴展的,用戶可以通過實現 Catalog 接口來開發自定義 Catalog。

想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現自定義的 Catalog 之外,還需要為這個 Catalog 實現對應的 CatalogFactory 接口。

CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動時配置 Catalog。

這組屬性集將傳遞給發現服務,在該服務中,服務會嘗試將屬性關聯到 CatalogFactory 并初始化相應的 Catalog 實例。

創建 Flink 表并將其注冊到 Catalog

使用 SQL DDL

用戶可以使用 DDL 通過 Table API 或者 SQL Client 在 Catalog 中創建表。

from pyflink.table.catalog import HiveCatalog# Create a HiveCatalogcatalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")# Register the catalogt_env.register_catalog("myhive", catalog)# Create a catalog databaset_env.execute_sql("CREATE DATABASE mydb WITH (...)")# Create a catalog tablet_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")# should return the tables in current catalog and database.t_env.list_tables()

通過 SQL DDL 創建的表和視圖, 例如 “create table …” 和 “create view …",都存儲在 catalog 中。

你可以通過 SQL 直接訪問 catalog 中的表。

使用 Java/Scala

用戶可以用編程的方式使用Java 或者 Scala 來創建 Catalog 表。

from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
from pyflink.table.descriptors import Kafka
settings = Environmentsettings.in_batch_mode()
t_env = TableEnvironment.create(settings)
# Create a HiveCatalog
catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# Create a catalog database
database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", database)
# Create a catalog table
schema = Schema.new_builder() \.column("name", DataTypes.STRING()) \.column("age", DataTypes.INT()) \.build()??
catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka").schema(schema)// ….build())
# tables should contain "mytable"
tables = catalog.list_tables("mydb")

TableEnvironment 維護了一個使用標識符創建的表的 catalogs 映射。

Catalog 中的表既可以是臨時的,并與單個 Flink 會話生命周期相關聯,也可以是永久的,跨多個 Flink 會話可見。

如果你要用 Table API 來使用 catalog 中的表,可以使用 “from_path” 方法來創建 Table API 對象:

from_path(path)?? 通過指定路徑下已注冊的表來創建一個表,例如通過 create_temporary_view 注冊表。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/42166.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/42166.shtml
英文地址,請注明出處:http://en.pswp.cn/news/42166.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

嵌入式Linux開發實操(九):CAN接口開發

前言: CAN網絡在汽車中的使用可以說相當廣泛。而CAN網絡需要的收發器最常用的就是NXP 的TJA1042: CAN網絡:

605. 種花問題

鏈接 假設有一個很長的花壇&#xff0c;一部分地塊種植了花&#xff0c;另一部分卻沒有。可是&#xff0c;花不能種植在相鄰的地塊上&#xff0c;它們會爭奪水源&#xff0c;兩者都會死去。給你一個整數數組 flowerbed 表示花壇&#xff0c;由若干 0 和 1 組成&#xff0c;其中…

8/16總結

WebSocket是雙向通信協議&#xff0c;模擬Socket協議&#xff0c;可以雙向發送或者接收信息 而Http是單向的 WebSocket是需要瀏覽器和服務器握手進行建立連接的 而http是瀏覽器發起向服務器的連接&#xff0c;服務器預先并不知道這個連接 WebSocket在建立握手時&#xff0c;數…

Python3內置函數大全

吐血整理 Python3內置函數大全 1.abs()函數2.all()函數3.any()函數4.ascii()函數5.bin()函數6.bool()函數7.bytes()函數8.challable()函數9.chr()函數10.classmethod()函數11.complex()函數12.complie()函數13.delattr()函數14.dict()函數15.dir()函數16.divmod()函數17.enumer…

注解@JsonInclude

注解JsonInclude 1. 注解由來 JsonInclude是一個用于Java類中字段或方法的注解&#xff0c;它來自于Jackson庫。Jackson庫是一個用于處理JSON數據的流行開源庫&#xff0c;在Java對象和JSON之間進行序列化和反序列化時經常被使用。 2. 注解示例 下面是JsonInclude注解的一個…

【kubernetes】Pod控制器

目錄 Pod控制器及其功用 pod控制器有多種類型 1、ReplicaSet ReplicaSet主要三個組件組成 2、Deployment 3、DaemonSet 4、StatefulSet 5、Job 6、Cronjob Pod與控制器之間的關系 1、Deployment 查看控制器配置 查看歷史版本 2、SatefulSet 為什么要有headless&…

2023-08-18力扣每日一題

鏈接&#xff1a; 1388. 3n 塊披薩 題意&#xff1a; 一個長度3n的環&#xff0c;選n次數字&#xff0c;每次選完以后相鄰的數字會消失&#xff0c;求選取結果最大值 解&#xff1a; 這波是~~&#xff08;ctrl&#xff09;CV工程師了~~ 核心思想是選取n個不相鄰的元素一定…

無涯教程-Perl - splice函數

描述 此函數從LENGTH元素的OFFSET元素中刪除ARRAY元素,如果指定,則用LIST替換刪除的元素。如果省略LENGTH,則從OFFSET開始刪除所有內容。 語法 以下是此函數的簡單語法- splice ARRAY, OFFSET, LENGTH, LISTsplice ARRAY, OFFSET, LENGTHsplice ARRAY, OFFSET返回值 該函數…

Vue 項目運行 npm install 時,卡在 sill idealTree buildDeps 沒有反應

解決方法&#xff1a;切換到淘寶鏡像。 以下是之前安裝的 xmzs 包&#xff0c;用于控制切換淘寶鏡像。 該截圖是之前其他項目切換淘寶鏡像的截圖。 切換鏡像后&#xff0c;順利執行 npm install 。

生成國密密鑰對

在線生成國密密鑰對 生成的密鑰對要妥善保管&#xff0c;丟失是無法找回的。

selinux

一、selinux的說明 二、selinux的工作原理 三、selinux的啟動、關閉與查看 Enforcing和permissive都是臨時的&#xff0c;重啟還是依據配置文件中&#xff0c;禁用selinux&#xff0c;修改配置文件&#xff1a; 之后重啟生效 四、selinux對linux服務的影響

SpringBoot 接口調用出現亂碼解決 中文亂碼

SpringBoot 接口調用出現亂碼解決 package com.cxjg.mvc.util;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.converter.HttpMessageConverter; import org.springfra…

相同數字的積木游戲

題目描述 題目描述 小華和小薇一起通過玩積木游戲學習數學。 他們有很多積木&#xff0c;每個積木塊上都有一個數字&#xff0c;積木塊上的數字可能相同。 小華隨機拿一些積木挨著排成一排&#xff0c;請小薇找到這排積木中數字相同目所處位置最遠的2塊積木塊&#xff0c;計算…

【JAVA】我們該如何規避代碼中可能出現的錯誤?(一)

個人主頁&#xff1a;【&#x1f60a;個人主頁】 系列專欄&#xff1a;【??初識JAVA】 文章目錄 前言三種類型的異常異常處理JAVA內置異常類Exception 類的層次 前言 異常是程序中的一些錯誤&#xff0c;但并不是所有的錯誤都是異常&#xff0c;并且錯誤有時候是可以避免的&…

【BASH】回顧與知識點梳理(三十三)

【BASH】回顧與知識點梳理 三十三 三十三. 認識系統服務 (daemons)33.1 什么是 daemon 與服務 (service)早期 System V 的 init 管理行為中 daemon 的主要分類 (Optional)systemd 使用的 unit 分類systemd 的配置文件放置目錄systemd 的 unit 類型分類說明 33.2 透過 systemctl…

Grounding dino + segment anything + stable diffusion 實現圖片編輯

目錄 總體介紹總體流程 模塊介紹目標檢測&#xff1a; grounding dino目標分割&#xff1a;Segment Anything Model (SAM)整體思路模型結構&#xff1a;數據引擎 圖片繪制 集成樣例 其他問題附錄 總體介紹 總體流程 本方案用到了三個步驟&#xff0c;按順序依次為&#xff1a…

Tomcat 部署優化

Tomcat Tomcat 開放源代碼web應用服務器&#xff0c;是由java代碼開發的 tomcat就是處理動態請求和基于java代碼的頁面開發 可以在html當中寫入java代碼&#xff0c;tomcat可以解析html頁面當中的iava&#xff0c;執行動態請求 動態頁面機制有問題&#xff1a;不對tomcat進行優…

vue 使用indexDB 簡單完整邏輯

1 npm npm install idb 2 代碼 <template><div><p>Data: {{ data }}</p><button click"fetchData">Fetch Data</button></div> </template><script> import { openDB } from idb;export default {data() {…

eqtl-GWAS和GWAS-GWAS

目前教程中有eqtl-GWAS和GWAS-GWAS兩種模式&#xff0c;其他模式比較少見&#xff0c;還未進行開發 數據類型cc為分類變量即case/control&#xff0c;quant為連續變量&#xff0c;eqtl數據默認quant coloc.abf有兩個比較需要注意的點&#xff0c;就是數據集中N是代表樣本量&am…

解決Windows系統遠程登陸后vscdoe無法輸入字符,鍵盤沒有反應,鼠標可以點擊,沒有反應

文章目錄 前言操作過程 前言 使用vscode編譯器時&#xff0c;通過遠程登錄或者屏幕鎖屏解鎖后&#xff0c;vscode出現無法輸入字符內容&#xff0c;但vscode沒有死機&#xff0c;切換到其他軟件的窗口再切換回來后&#xff0c;可以使用鼠標點擊&#xff0c;但是只要使用鍵盤輸…