FlinkSQL的常用語言

FlinkSQL 常用語言指南

FlinkSQL 是 Apache Flink 提供的 SQL 接口,允許用戶使用標準 SQL 或擴展的 SQL 語法來處理流式和批式數據。以下是 FlinkSQL 的常用語言元素和操作:

  1. 基本查詢
-- 選擇查詢
SELECT * FROM table_name;-- 帶條件的查詢
SELECT column1, column2 FROM table_name WHERE condition;-- 分組聚合
SELECT user_id, COUNT(*) as cnt 
FROM orders 
GROUP BY user_id;
  1. 時間屬性定義
-- 定義處理時間
CREATE TABLE orders (order_id STRING,product STRING,amount DOUBLE,order_time TIMESTAMP(3),-- 聲明處理時間屬性proc_time AS PROCTIME()
) WITH (...);-- 定義事件時間和水位線
CREATE TABLE orders (order_id STRING,product STRING,amount DOUBLE,order_time TIMESTAMP(3),-- 聲明事件時間屬性WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
  1. 窗口操作
-- 滾動窗口
SELECT window_start, window_end, SUM(amount) as total_amount
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;-- 滑動窗口
SELECT window_start, window_end, user_id,SUM(amount) as total_amount
FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTES, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, user_id;-- 會話窗口
SELECT window_start, window_end, user_id,COUNT(*) as event_count
FROM TABLE(SESSION(TABLE orders, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end, user_id;
  1. 連接操作
-- 常規連接
SELECT o.order_id, o.product, u.user_name
FROM orders AS o
JOIN users AS u ON o.user_id = u.user_id;-- 時間區間連接
SELECT o.order_id, p.promotion_name,o.order_time,o.amount
FROM orders o
JOIN promotions p 
ON o.product_id = p.product_id
AND o.order_time BETWEEN p.start_time AND p.end_time;-- 窗口連接
SELECT o.order_id,s.shipment_id,o.order_time,s.ship_time,TIMESTAMPDIFF(HOUR, o.order_time, s.ship_time) as hours_to_ship
FROM orders o
JOIN shipments s
ON o.order_id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '1' HOUR AND s.ship_time;
  1. 常用函數

標量函數

-- 字符串函數
SELECT LOWER(name), SUBSTRING(email, 1, 5) FROM users;-- 數學函數
SELECT ABS(amount), ROUND(price, 2) FROM products;-- 時間函數
SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'),TIMESTAMPDIFF(DAY, order_time, CURRENT_TIMESTAMP)
FROM orders;

聚合函數

SELECT COUNT(*) as total_orders,SUM(amount) as total_amount,AVG(amount) as avg_amount,MAX(amount) as max_amount,MIN(amount) as min_amount
FROM orders;

窗口函數

SELECT product_id,order_time,amount,ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY order_time) as row_num,SUM(amount) OVER (PARTITION BY product_id ORDER BY order_time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as moving_sum
FROM orders;
  1. DDL 語句
-- 創建表
CREATE TABLE orders (order_id STRING,product_id STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'kafka:9092','format' = 'json'
);-- 創建視圖
CREATE VIEW large_orders AS
SELECT * FROM orders WHERE amount > 1000;-- 創建函數
CREATE FUNCTION my_udf AS 'com.example.MyUDF';
  1. DML 語句
-- 插入數據
INSERT INTO target_table
SELECT * FROM source_table WHERE amount > 100;-- 更新數據 (Flink 1.12+ 支持有限)
UPDATE orders SET amount = 200 WHERE order_id = '123';-- 刪除數據 (Flink 1.12+ 支持有限)
DELETE FROM orders WHERE order_id = '456';
  1. 模式匹配 (MATCH_RECOGNIZE)
SELECT *
FROM orders
MATCH_RECOGNIZE (PARTITION BY user_idORDER BY order_timeMEASURESSTART_ROW.order_id AS start_order,LAST(PRICE_DOWN.order_id) AS bottom_order,LAST(PRICE_UP.order_id) AS end_orderONE ROW PER MATCHAFTER MATCH SKIP TO LAST PRICE_UPPATTERN (START_ROW PRICE_DOWN+ PRICE_UP+)DEFINEPRICE_DOWN AS (LAST(PRICE_DOWN.amount, 1) IS NULL AND PRICE_DOWN.amount < START_ROW.amount) OR PRICE_DOWN.amount < LAST(PRICE_DOWN.amount, 1),PRICE_UP AS PRICE_UP.amount > LAST(PRICE_DOWN.amount, 1)
) MR;
  1. 配置參數
-- 設置參數
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '1000';
  1. 常用連接器配置
-- Kafka 源表
CREATE TABLE kafka_source (id INT,name STRING,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'input_topic','properties.bootstrap.servers' = 'kafka:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'latest-offset','format' = 'json'
);-- JDBC 結果表
CREATE TABLE jdbc_sink (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://mysql:3306/mydb','table-name' = 'sink_table','username' = 'user','password' = 'password'
);

FlinkSQL 不斷演進,這里只是舉例一些常用的語句,參考官方文檔可以獲取最新語法和功能。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/77432.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/77432.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/77432.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

spring mvc異步請求 sse 大文件下載 斷點續傳下載Range

學習連接 異步Servlet3.0 Spring Boot 處理異步請求&#xff08;DeferredResult 基礎案例、DeferredResult 超時案例、DeferredResult 擴展案例、DeferredResult 方法匯總&#xff09; spring.io mvc Asynchronous Requests 官網文檔 spring.io webflux&webclient官網文…

一問看懂——支持向量機SVM(Support Vector Machine)

目錄 蕪湖~~~支持向量機&#xff08;SVM&#xff09; 1. 引言 2. 基本思想 3. 數學模型 3.1 超平面定義 3.2 分類間隔與目標函數 3.3 軟間隔與松弛變量 4. 核函數方法&#xff08;Kernel Trick&#xff09; 4.1 核函數定義 4.2 常用核函數 5. SVM 的幾種類型 6. SV…

藍橋杯 拼數(字符串大小比較)

題目描述 設有 n 個正整數 a1?…an?&#xff0c;將它們聯接成一排&#xff0c;相鄰數字首尾相接&#xff0c;組成一個最大的整數。 輸入格式 第一行有一個整數&#xff0c;表示數字個數 n。 第二行有 n 個整數&#xff0c;表示給出的 n 個整數 ai?。 輸出格式 一個正整…

Elasticsearch 系列專題 - 第三篇:搜索與查詢

搜索是 Elasticsearch 的核心功能之一。本篇將介紹如何構建高效的查詢、優化搜索結果,以及調整相關性評分,幫助你充分發揮 Elasticsearch 的搜索能力。 1. 基礎查詢 1.1 Match Query 與 Term Query 的區別 Match Query:用于全文搜索,會對查詢詞進行分詞。 GET /my_index/_…

本地電腦使用sshuttle命令將網絡流量代理到ssh連接的電腦去實現訪問受限網絡

本地電腦使用sshuttle命令將網絡流量代理到ssh連接的電腦去實現訪問受限網絡 安裝使用 工作過程中, 經常會遇到, 需要訪問客戶內網環境的問題, 一般都需要安轉各式各樣的VPN客戶端到本地電腦上, 軟件多了也會造成困擾, 所有, 找了一款還不錯的命令工具去解決這個痛點 安裝 官方…

雙相機結合halcon的條碼檢測

以下是針對提供的C#代碼的詳細注釋和解釋&#xff0c;結合Halcon庫的功能和代碼結構進行說明&#xff1a; --- ### **代碼整體結構** 該代碼是一個基于Halcon庫的條碼掃描類GeneralBarcodeScan&#xff0c;支持單臺或雙臺相機的條碼檢測&#xff0c;并通過回調接口返回結果。…

python基礎語法12-迭代器與生成器

Python 生成器與迭代器詳解 在 Python 中&#xff0c;生成器和迭代器是處理大量數據時的強大工具。它們能夠幫助我們節省內存&#xff0c;避免一次性加載過多數據。生成器通過 yield 關鍵字實現&#xff0c;允許我們逐步產生數據&#xff0c;而迭代器通過實現特定的接口&#…

公司內部建立pypi源

有一篇建立apt源的文章在這里&#xff0c;需要的可以查看&#xff1a;公司內部建立apt源-CSDN博客 server: pip install pypiserver mkdir -d pypi/packages cp test.whl pypi/packages pypi-server run --port 8080 /home/xu/pypi/packages & 網頁訪問&#xff1a;http:…

VMware Workstation/Player 的詳細安裝使用指南

以下是 VMware Workstation/Player 的完整下載、安裝指南&#xff0c;包含詳細步驟、常見問題及解決方法&#xff0c;以及進階使用技巧&#xff0c;適用于 Windows 和 macOS 用戶。 VMware Workstation/Player 的詳細安裝使用指南—目錄 一、下載與安裝詳細指南1. 系統要求2. 下…

藍橋杯python組考前準備

1.保留k位小數 round(10/3, 2) # 第二個參數表示保留幾位小數 2.輸入代替方案&#xff08;加速讀取&#xff09; import sys n int(sys.stdin.readline()) # 讀取整數&#xff08;不加int就是字符串&#xff09; a, b map(int, sys.stdin.readline().split()) # 一行讀取多個…

【JSON2WEB】16 login.html 登錄密碼加密傳輸

【JSON2WEB】系列目錄 【JSON2WEB】01 WEB管理信息系統架構設計 【JSON2WEB】02 JSON2WEB初步UI設計 【JSON2WEB】03 go的模板包html/template的使用 【JSON2WEB】04 amis低代碼前端框架介紹 【JSON2WEB】05 前端開發三件套 HTML CSS JavaScript 速成 【JSON2WEB】06 JSO…

計算機網絡起源

互聯網的起源和發展是一個充滿創新、突破和變革的歷程&#xff0c;從20世紀60年代到1989年&#xff0c;這段時期為互聯網的誕生和普及奠定了堅實的基礎。讓我們詳細回顧這一段激動人心的歷史。 計算機的發展與ARPANET的建立&#xff08;20世紀60年代&#xff09; 互聯網的誕生…

洛谷P1824進擊的奶牛簡單二分

題目如下 代碼如下 謝謝觀看

如何建立高效的會議機制

建立高效的會議機制需做到&#xff1a;明確會議目標、制定并提前分發議程、控制會議時長、確保有效溝通與反饋、及時跟進執行情況。其中&#xff0c;明確會議目標是核心關鍵&#xff0c;它直接決定了會議的方向與效率。只有明確目標&#xff0c;會議才不會偏離初衷&#xff0c;…

開源AI大模型AI智能名片S2B2C商城小程序:科技浪潮下的商業新引擎

摘要&#xff1a; 本文聚焦于科技迅猛發展背景下&#xff0c;開源AI大模型、AI智能名片與S2B2C商城小程序的融合應用。通過分析元宇宙、人工智能、區塊鏈、5G等前沿科技帶來的商業變革&#xff0c;闡述開源AI大模型AI智能名片S2B2C商城小程序在整合資源、優化服務、提升用戶體驗…

基于大模型構建金融客服的技術調研

OpenAI-SB api接口 https://openai-sb.com/ ChatGPT與Knowledge Graph (知識圖譜)分享交流 https://www.bilibili.com/video/BV1bo4y1w72m/?spm_id_from333.337.search-card.all.click&vd_source569ef4f891360f2119ace98abae09f3f 《要研究的方向和準備》 https://ww…

WSA(Windows Subsystem for Android)安裝LSPosed和應用教程

windows安卓子系統WSA的Lsposed和shamiko的安裝教程 WSA(Windows Subsystem for Android)安裝LSPosed和應用教程 一、環境準備 在開始之前,請確保: 已經安裝好WSA(Windows Subsystem for Android)已經安裝好ADB工具下載好LSPosed和Shamiko框架安裝包 二、連接WSA 首先需要…

辛格迪客戶案例 | 河南宏途食品實施電子合約系統(eSign)

01 河南宏途食品有限公司&#xff1a;食品行業的數字化踐行者 河南宏途食品有限公司&#xff08;以下簡稱“宏途食品”&#xff09;作為國內食品行業的創新企業&#xff0c;專注于各類食品的研發、生產和銷售。公司秉承“質量為先、創新驅動、服務至上”的核心價值觀&#xff…

手機靜態ip地址怎么獲取?方法與解析?

而在某些特定情境下&#xff0c;我們可能需要為手機設置一個靜態IP地址。本文將詳細介紹手機靜態IP地址詳解及獲取方法 一、什么是靜態IP地址&#xff1f; 靜態IP&#xff1a;由用戶手動設置的固定IP地址&#xff0c;不會因網絡重啟或設備重連而改變。 動態IP&#xff1a;由路…

天下飛飛【老飛飛服務端】+客戶端+數據庫測試帶視頻教程

天下飛飛服務器搭建測試視頻 天下飛飛【老飛飛服務端】客戶端數據庫測試帶視頻教程 完整安裝教程。 測試環境 系統server2019 sql2022數據庫 sql的安裝 odbc搭建 sql加載數據庫 此測試端能用于服務器搭建測試。 下載地址為&#xff1a;https://download.csdn.net/d…