Flink Table API 編程入門實踐


Flink Table API 編程入門實踐

前言

Apache Flink 是目前大數據實時計算領域的明星產品,Flink Table API 則為開發者提供了聲明式、類似 SQL 的數據處理能力,兼具 SQL 的易用性與編程 API 的靈活性。本文將帶你快速了解 Flink Table API 的基本用法,并通過代碼示例幫助你快速上手。


一、環境準備

在 Flink 中,所有 Table API 操作都需要基于 TableEnvironment。對于流處理場景,我們一般這樣創建環境:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

二、數據源定義

Table API 支持多種數據源。最常見的兩種方式為:

1. 從 DataStream 創建 Table

DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("Alice", 12),new MyPojo("Bob", 10)
);
Table table = tableEnv.fromDataStream(dataStream);

2. 從外部系統注冊 Table

比如從 Kafka 注冊一張表:

tableEnv.executeSql("CREATE TABLE user_orders (" +" user_id STRING, " +" order_amount DOUBLE " +") WITH (" +" 'connector' = 'kafka', " +" 'topic' = 'orders', " +" 'properties.bootstrap.servers' = 'localhost:9092', " +" 'format' = 'json'" +")"
);

三、Table API 常見操作

Table API 提供了豐富的數據處理能力,如篩選、聚合、分組、連接等。例如:

import static org.apache.flink.table.api.Expressions.$;// 篩選和選擇字段
Table result = table.filter($("age").isGreater(10)).select($("name"), $("age"));// 分組聚合
Table agg = table.groupBy($("name")).select($("name"), $("age").avg().as("avg_age"));

四、結果輸出

將 Table 轉換為 DataStream,方便后續處理或輸出:

DataStream<Row> resultStream = tableEnv.toDataStream(result);
resultStream.print();

五、與 SQL API 結合

Table API 與 SQL API 可以無縫結合。例如:

Table sqlResult = tableEnv.sqlQuery("SELECT name, AVG(age) as avg_age FROM my_table GROUP BY name"
);

六、完整示例

下面是一個完整的 Flink Table API 示例,演示數據流到 Table 的轉換、聚合與結果輸出:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class TableApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 創建數據流DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("Alice", 12),new MyPojo("Bob", 10),new MyPojo("Alice", 15));// 轉換為 TableTable table = tableEnv.fromDataStream(dataStream);// Table API 查詢Table result = table.groupBy($("name")).select($("name"), $("age").avg().as("avg_age"));// 輸出結果DataStream<Row> resultStream = tableEnv.toDataStream(result);resultStream.print();env.execute();}public static class MyPojo {public String name;public Integer age;public MyPojo() {}public MyPojo(String name, Integer age) {this.name = name;this.age = age;}}
}

七、常見問題與建議

  • 字段名區分大小寫,需與數據結構一致。
  • Table API 與 SQL API 可混用,靈活應對不同場景。
  • 生產環境推薦結合 Catalog 管理元數據。
  • Flink 1.14 以后批流統一,建議優先采用流模式開發。

結語

Flink Table API 極大地提升了大數據實時處理的開發效率,結合 SQL 的易用性和 API 的靈活性,非常適合復雜業務場景的數據處理。希望本文能幫你快速入門 Flink Table API,后續還可以深入了解窗口聚合、UDF、自定義 Connector 等高級特性。

如果你在學習和實踐中遇到問題,歡迎留言交流!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/85247.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/85247.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/85247.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Android之ListView

1&#xff1a;簡單列表(ArrayAdapter) 1&#xff1a;運行的結果&#xff1a; 2&#xff1a;首先在MyListView里面創建一個按鈕&#xff0c;點擊的時候進行跳轉。 這里讓我吃驚的是&#xff0c;Button里面可以直接設置onClick .java里面的方法。 也即是點擊這個按鈕之后就會去…

Python(十四)

1.type函數和init_subclass_ init_subclass_ 2.元類 類就是用來創建對象的模版&#xff0c;類是由type創造而來的&#xff0c;元類就是創建類的模版&#xff0c;type可以用來創造類&#xff0c;因為type本身就是一個元類&#xff0c;使用元類來創造類&#xff0c;元類之間也有…

當前用戶的Git全局配置情況:git config --global --list

通過config命令可以查詢當前用戶的全局配置情況。這些配置項定義了 Git 在全局范圍內的行為&#xff0c;包括如何處理大文件、SSL 證書驗證以及提交時的用戶信息。 git config --global --list http.sslVerifyfalse 這個配置項禁用了 SSL 證書驗證。這在與自簽名證書的 Git 服…

負載均衡群集---Haproxy

目錄 一、HAproxy 一、概念 二、核心作用 三、主要功能特性 四、應用場景 五、優勢與特點 二、 案例分析 1. 案例概述 2. 案例前置知識點 &#xff08;1&#xff09;HTTP 請求 &#xff08;2&#xff09;負載均衡常用調度算法 &#xff08;3&#xff09;常見的 web …

html5視頻播放器和微信小程序如何實現視頻的自動播放功能

在HTML5中實現視頻自動播放需設置autoplay和muted屬性&#xff08;瀏覽器策略要求靜音才能自動播放&#xff09;&#xff0c;并可添加loop循環播放、playsinline同層播放等優化屬性。微信小程序通過<video>組件的autoplay屬性實現自動播放&#xff0c;同時支持全屏按鈕、…

OpenHarmony定制系統組合按鍵(一)

一、開發環境 系統版本&#xff1a;OpenHarmony 4.0.10.13 設備平臺&#xff1a;rk3568 SDK版本&#xff1a;fullSDK 4.0.10.13 DevEco Studio版本&#xff1a;4.1.0.400 二、需求背景 定制OpenHarmony 系統組合按鍵功能&#xff0c;例如仿Android Power VOL_Up組合鍵實現截…

相機定屏問題分析四:【cameraserver 最大request buffer超標】后置視頻模式預覽定屏閃退至桌面

【關注我,后續持續新增專題博文,謝謝!!!】 上一篇我們講了:相機定屏問題分析三:【配流ConfigStream失敗】外屏打開相機視頻照片人像來回切換后,相機頁面卡死,點擊沒反應9055522 這一篇我們開始講: 相機定屏問題分析四:【cameraserver 最大request buffer超…

從 PyTorch 到 TensorFlow Lite:模型訓練與推理

一、方案介紹 研發階段&#xff1a;利用 PyTorch 的動態圖特性進行快速原型驗證&#xff0c;快速迭代模型設計。 靈活性與易用性&#xff1a;PyTorch 是一個非常靈活且易于使用的深度學習框架&#xff0c;特別適合研究和實驗。其動態計算圖特性使得模型的構建和調試變得更加直…

4.2.5 Spark SQL 分區自動推斷

在本節實戰中&#xff0c;我們學習了Spark SQL的分區自動推斷功能&#xff0c;這是一種提升查詢性能的有效手段。通過創建具有不同分區的目錄結構&#xff0c;并在這些目錄中放置JSON文件&#xff0c;我們模擬了一個分區表的環境。使用Spark SQL讀取這些數據時&#xff0c;Spar…

數據結構:導論

目錄 什么是“第一性原理”&#xff1f; 什么是“數據結構”&#xff1f; 數據結構解決的根本問題是什么&#xff1f; 數據結構的兩大分類 數據結構的基本操作 數據結構與算法的關系 學習數據結構的底層目標 什么是“第一性原理”&#xff1f; 在正式進入數據結構之前&…

汽車制造場景下Profibus轉Profinet網關核心功能與應用解析

在當今工業自動化的浪潮中&#xff0c;各種通訊協議層出不窮&#xff0c;而其中PROFIBUS與PROFINET作為兩種主流的工業通信標準&#xff0c;它們之間的轉換需求日益增長。特別是對于那些希望實現老舊設備與現代化網絡無縫對接的企業來說&#xff0c;一個高效、穩定的網關產品顯…

qt ubuntu 20.04 交叉編譯

一、交叉編譯環境搭建 1.下載交叉編譯工具鏈&#xff1a;https://developer.arm.com/downloads/-/gnu-a 可以根據自己需要下載對應版本&#xff0c;當前最新版本是10.3, 筆者使用10.3編譯后的glibc.so版本太高&#xff08;glibc_2.3.3, glibc_2.3.4, glibc_2.3.5&#xff09;…

在Babylon.js中創建3D文字:簡單而強大的方法

引言 在3D場景中添加文字是許多WebGL項目的常見需求。Babylon.js提供了多種創建3D文字的方法&#xff0c;其中使用TextBlock結合平面網格是一種簡單而高效的方式。本文將介紹如何使用Babylon.js的GUI系統在3D空間中創建美觀的文字效果。 方法概述 Babylon.js的GUI系統允許我…

油桃TV v20250519 一款電視端應用網站聚合TV播放器 支持安卓4.1

油桃TV v20250519 一款電視端應用網站聚合TV播放器 支持安卓4.1 應用簡介&#xff1a; 油桃TV是一款開源電視端應用網站聚合瀏覽器&#xff0c;它把大家常見需求的一些網站都整合到了這個應用上&#xff0c;并進行了電視端…

Perl單元測試實戰指南:從Test::Class入門到精通的完整方案

閱讀原文 前言:為什么Perl開發者需要重視單元測試? "這段代碼昨天還能運行,今天就出問題了!"——這可能是每位Perl開發者都經歷過的噩夢。在沒有充分測試覆蓋的情況下,即使是微小的改動也可能導致系統崩潰。單元測試正是解決這一痛點的最佳實踐,它能幫助我們在…

OpenCv高階(十三)——人臉檢測

文章目錄 前言一、人臉檢測—haar特征二、人臉檢測---級聯分類器1、級聯分類器2、如何訓練級聯分類器3、已存在的級聯分類器 三、代碼分析1、人臉檢測的簡單使用2、人臉微笑檢測&#xff08;1&#xff09; 初始化視頻源&#xff08;2&#xff09;主循環處理每一幀&#xff08;3…

無線通信模塊簡介

QuecPython 是運行在無線通信模塊上的開發框架。對于首次接觸物聯網開發的用戶而言&#xff0c;無線通信模塊可能是一個相對陌生的概念。本文主要針對無線通信和蜂窩網絡本身&#xff0c;以及模塊的概念、特性和開發方式進行簡要的介紹。 無線通信和蜂窩網絡 物聯網對無線通信…

Unity 中實現首尾無限循環的 ListView

之前已經實現過&#xff1a; Unity 中實現可復用的 ListView-CSDN博客文章瀏覽閱讀5.6k次&#xff0c;點贊2次&#xff0c;收藏27次。源碼已放入我的 github&#xff0c;地址&#xff1a;Unity-ListView前言實現一個列表組件&#xff0c;表現方面最核心的部分就是重寫布局&…

【C++】 類和對象(上)

1.類的定義 1.1類的定義格式 ? class為定義類的關鍵字&#xff0c;后跟一個類的名字&#xff0c;{}中為類的主體&#xff0c;注意類定義結束時后?分號不能省 略。類體中內容稱為類的成員&#xff1a;類中的變量稱為類的屬性或成員變量;類中的函數稱為類的?法或 者成員函數。…

Transformer架構詳解:從Attention到ChatGPT

Transformer架構詳解&#xff1a;從Attention到ChatGPT 系統化學習人工智能網站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目錄 Transformer架構詳解&#xff1a;從Attention到ChatGPT摘要引言一、Attention機制&#xff1a;Transformer的…