Apache Paimon Flink引擎解析

Paimon 支持 Flink 1.17, 1.16, 1.15 和 1.14,當前 Paimon 提供了兩類 Jar 包,一類支持數據讀寫,另一類支持其它操作(compaction)

Version	      Type	        Jar
Flink 1.18	  Bundled Jar	  paimon-flink-1.18-0.7.0-incubating.jar
Flink 1.17	  Bundled Jar	  paimon-flink-1.17-0.7.0-incubating.jar
Flink 1.16	  Bundled Jar	  paimon-flink-1.16-0.7.0-incubating.jar
Flink 1.15	  Bundled Jar	  paimon-flink-1.15-0.7.0-incubating.jar
Flink 1.14	  Bundled Jar	  paimon-flink-1.14-0.7.0-incubating.jar
Flink Action	Action Jar	  paimon-flink-action-0.7.0-incubating.jar
1.環境準備

下載 Flink 后解壓

tar -xzf flink-*.tgz

拷貝 Paimon bundled jar 包到 Flink 的 lib 目錄下

cp paimon-flink-*.jar <FLINK_HOME>/lib/

拷貝 Hadoop Bundled Jar 包到 Flink 的 lib 目錄下

cp flink-shaded-hadoop-2-uber-*.jar <FLINK_HOME>/lib/

為同時運行多個Flink作業,修改/conf/flink-conf.yaml中的集群配置

taskmanager.numberOfTaskSlots: 2

本地啟動 Flink 集群

<FLINK_HOME>/bin/start-cluster.sh

驗證 Web UI 查看集群是否已啟動并運行

localhost:8081

啟動Flink SQL客戶端來執行SQL腳本

<FLINK_HOME>/bin/sql-client.sh
2.創建 Paimon Catalog 和 Table

創建 Catalog 和 Table

-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH ('type'='paimon','warehouse'='file:/tmp/paimon'
);USE CATALOG my_catalog;-- create a word count table
CREATE TABLE word_count (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT
);
3.使用Flink通用的Catalog創建Table

使用Flink通用的Catalog,需要使用Hive metastore,然后可以使用Paimon、Hive和Flink通用表(Kafka和其他表)中的所有表。

在此模式下,應該使用"connector"選項來創建tables。

Paimon將在hive-site.xml中使用hive.metastore.warehouse.dir,需要使用帶有scheme的path,例如,hdfs://....否則,Paimon將使用本地路徑。

CREATE CATALOG my_catalog WITH ('type'='paimon-generic','hive-conf-dir'='...','hadoop-conf-dir'='...'
);USE CATALOG my_catalog;-- create a word count table
CREATE TABLE word_count (word STRING PRIMARY KEY NOT ENFORCED,cnt BIGINT
) WITH ('connector'='paimon'
);
4.創建 Source 數據表
-- create a word data generator table
CREATE TEMPORARY TABLE word_table (word STRING
) WITH ('connector' = 'datagen','fields.word.length' = '1'
);-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
5.數據查詢案例

OLAP查詢

-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';-- olap query the table
SELECT * FROM word_count;

流式查詢

-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';-- track the changes of table and calculate the count interval statistics
SELECT `interval`, COUNT(*) AS interval_cnt FROM(SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;
6.退出Flink SQL客戶端,停止Flink集群

退出Flink SQL客戶端

-- uncomment the following line if you want to drop the dynamic table and clear the files
-- DROP TABLE word_count;-- exit sql-client
EXIT;

停止Flink集群

./bin/stop-cluster.sh
7.觸發 Savepoint 和 recover

由于Paimon有自己的snapshot管理,可能與Flink的checkpoint管理相沖突,在從savepoint恢復時會導致異常(不會導致存儲損壞)。

建議使用以下方法開啟savepoint

使用Stop with savepoint。
使用 Tag with savepoint,并在從savepoint恢復之前rollback-to-tag。

8.使用Action Jar

Flink本地集群啟動后,使用以下命令執行 action jar

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \<action><args>

compact 一張 table

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \compact \--path <TABLE_PATH>
9.支持的Flink數據類型

支持所有Flink數據類型,除了

  • MULTISET不受支持。
  • MAP不支持作為主鍵。
10.使用Flink Managed Memory

Paimon 任務可以創建 memory pools,基于Flink executor管理的 executor memory , 像Flink任務管理的 managed memory。

通過 executor 管理的多個任務的 writer buffers 可以提升 sinks 的穩定性和性能。

使用 Flink managed memory 的配置如下:

OptionDefaultDescription
sink.use-managed-memory-allocatorfalseIf true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator, which means each task allocates and manages its own memory pool (heap memory), if there are too many tasks in one Executor, it may cause performance issues and even OOM.
sink.managed.writer-buffer-memory256MWeight of writer buffer in managed memory, Flink will compute the memory size, for writer according to the weight, the actual memory used depends on the running environment. Now the memory size defined in this property are equals to the exact memory allocated to write buffer in runtime.

在SQL中為Flink Managed Memory設置內存權重,然后Flink sink operator將獲得memory pool大小,并為Paimon writer創建allocator。

INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */
SELECT * FROM ....;
11.Setting dynamic options

與Paimon表交互時,可以在不更改catalog options的情況下調整table options。

Paimon將獲取job-level的dynamic options,并在current session中生效,dynamic options的格式是:

paimon.${catalogName}.${dbName}.${tableName}.${config_key}catalogName/dbName/tableName 可以是 *

例如:

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;
12.Procedures

Flink 1.18及以上版本支持Call Statements,通過編寫SQL來操作Paimon表的數據和元數據。

注意:當 call 一個 procedure 時,必須按順序傳遞參數,如果不想傳遞某些參數,必須使用 ‘’ 作為占位符。

例如,用并行度為4的任務壓縮表default.t,但不想指定分區和排序策略,調用語句應該是
CALL sys.compact('default.t', '', '', '', 'sink.parallelism=4')

指定分區:使用字符串來表示partition filter,“,“表示"AND”,”;"表示“OR”。

例如,指定兩個分區date=01或date=02,需要寫’date=01;date=02’;如果指定一個帶有date=01和day=01的分區,需要寫’date=01,day=01’。

table options 語法:使用字符串來表示table options,格式是’key1=value1,key2=value2…'。

Procedure NameUsageExplainationExample
compactCALL [catalog.]sys.compact(‘identifier’) CALL [catalog.]sys.compact(‘identifier’, ‘partitions’) CALL [catalog.]sys.compact(‘identifier’, ‘partitions’, ‘order_strategy’, ‘order_columns’, ‘table_options’)TO compact a table. Arguments:identifier: the target table identifier. Cannot be empty.partitions: partition filter.order_strategy: ‘order’ or ‘zorder’ or ‘none’. Left empty for ‘none’.order_columns: the columns need to be sort. Left empty if ‘order_strategy’ is ‘none’.table_options: additional dynamic options of the table.CALL sys.compact(‘default.T’, ‘p=0’, ‘zorder’, ‘a,b’, ‘sink.parallelism=4’)
compact_databaseCALL [catalog.]sys.compact_database() CALL [catalog.]sys.compact_database(‘includingDatabases’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’) CALL [catalog.]sys.compact_database(‘includingDatabases’, ‘mode’, ‘includingTables’, ‘excludingTables’, ‘tableOptions’)To compact databases. Arguments:includingDatabases: to specify databases. You can use regular expression.mode: compact mode. “divided”: start a sink for each table, detecting the new table requires restarting the job; “combined” (default): start a single combined sink for all tables, the new table will be automatically detected.includingTables: to specify tables. You can use regular expression.excludingTables: to specify tables that are not compacted. You can use regular expression.tableOptions: additional dynamic options of the table.CALL sys.compact_database(‘db1|db2’, ‘combined’, ‘table_.*’, ‘ignore’, ‘sink.parallelism=4’)
create_tagCALL [catalog.]sys.create_tag(‘identifier’, ‘tagName’, snapshotId)To create a tag based on given snapshot. Arguments:identifier: the target table identifier. Cannot be empty.tagName: name of the new tag.snapshotId (Long): id of the snapshot which the new tag is based on.CALL sys.create_tag(‘default.T’, ‘my_tag’, 10)
delete_tagCALL [catalog.]sys.delete_tag(‘identifier’, ‘tagName’)To delete a tag. Arguments:identifier: the target table identifier. Cannot be empty.tagName: name of the tag to be deleted.CALL sys.delete_tag(‘default.T’, ‘my_tag’)
merge_into– when matched then upsert CALL [catalog.]sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’) – when matched then upsert; when not matched then insert CALL [catalog.]sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’, ‘notMatchedInsertCondition’,‘notMatchedInsertValues’) – when matched then delete CALL [catalog].sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedDeleteCondition’) – when matched then upsert + delete; – when not matched then insert CALL [catalog].sys.merge_into(‘identifier’,‘targetAlias’, ‘sourceSqls’,‘sourceTable’,‘mergeCondition’, ‘matchedUpsertCondition’,‘matchedUpsertSetting’, ‘notMatchedInsertCondition’,‘notMatchedInsertValues’, ‘matchedDeleteCondition’)To perform “MERGE INTO” syntax. See merge_into action for details of arguments.– for matched order rows, – increase the price, – and if there is no match, – insert the order from – the source table CALL sys.merge_into(‘default.T’, ‘’, ‘’, ‘default.S’, ‘T.id=S.order_id’, ‘’, ‘price=T.price+20’, ‘’, ‘*’)
remove_orphan_filesCALL [catalog.]sys.remove_orphan_files(‘identifier’) CALL [catalog.]sys.remove_orphan_files(‘identifier’, ‘olderThan’)To remove the orphan data files and metadata files. Arguments:identifier: the target table identifier. Cannot be empty.olderThan: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.CALL remove_orphan_files(‘default.T’, ‘2023-10-31 12:00:00’)
reset_consumer– reset the new next snapshot id in the consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’, nextSnapshotId) – delete consumer CALL [catalog.]sys.reset_consumer(‘identifier’, ‘consumerId’)To reset or delete consumer. Arguments:identifier: the target table identifier. Cannot be empty.consumerId: consumer to be reset or deleted.nextSnapshotId (Long): the new next snapshot id of the consumer.CALL sys.reset_consumer(‘default.T’, ‘myid’, 10)
rollback_to– rollback to a snapshot CALL sys.rollback_to(‘identifier’, snapshotId) – rollback to a tag CALL sys.rollback_to(‘identifier’, ‘tagName’)To rollback to a specific version of target table. Argument:identifier: the target table identifier. Cannot be empty.snapshotId (Long): id of the snapshot that will roll back to.tagName: name of the tag that will roll back to.CALL sys.rollback_to(‘default.T’, 10)
expire_snapshots– expires snapshot CALL sys.expire_snapshots(‘identifier’, retainMax)To expire snapshots. Argument:identifier: the target table identifier. Cannot be empty.retainMax: the maximum number of completed snapshots to retain.CALL sys.expire_snapshots(‘default.T’, 2)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/717127.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/717127.shtml
英文地址,請注明出處:http://en.pswp.cn/news/717127.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SentenceTransformer簡單使用

SentenceTransformer簡單使用 1 SentenceTransformer介紹 SentenceTransformer主要用于對句子、文本和圖像進行嵌入。可用于文本和圖像的相似度對比查找等 # SentenceTransformer官網地址 https://www.sbert.net/# 安裝SentenceTransformer pip install -U sentence-transfo…

求數字的每一位之和

求數字的每一位之和 題目描述&#xff1a;解法思路&#xff1a;解法代碼&#xff1a;運行結果&#xff1a; 題目描述&#xff1a; 輸入一個整數m&#xff0c;求這個整數m的每?位之和&#xff0c;并打印。 測試1&#xff1a; 輸?&#xff1a;1234 輸出&#xff1a;10 測試2&…

土壤侵蝕量化評估

根據之前的文章,已經算出了R、K、LS、C、P 現在計算土壤侵蝕,將幾個前期制作好的因子的TIFF文件,用柵格計算器相乘 發現局部地區存在輕度侵蝕,大部分區域是微度侵蝕 然后對比了一下范圍 其中的幾個因子都在文獻范圍內,說明計算結果并未出錯,可能就是研究區正常范圍和結…

6020一拖二快充線:手機充電的革命性創新

在快節奏的現代生活中&#xff0c;手機已不僅僅是一個通訊工具&#xff0c;更是我們工作、學習和娛樂的得力助手。然而&#xff0c;手機的電量問題一直是困擾著我們的難題。為了解決這個問題&#xff0c;市場上出現了一種名為“一拖二快充線”的充電設備&#xff0c;它不僅具備…

etcd入門-(1)安裝篇

一、etcd安裝 https://github.com/etcd-io/etcd/releases 根據需要下載安裝etcd, 確保添加到環境變量 執行 etcd -v 查看安裝版本 二、etcd運行 本地運行集群 1.首先安裝goreman go install github.com/mattn/goremanlatest2.準備Procfile 將腳本下載到本地&#xff0c;或者復…

八. 實戰:CUDA-BEVFusion部署分析-分析BEVFusion中各個ONNX

目錄 前言0. 簡述1. camera.backbone.onnx(fp16)2. camera.backbone.onnx(int8)3. camera.vtransform.onnx(fp16)4. fuser.onnx(fp16)5. fuser.onnx(int8)6. lidar.backbone.xyz.onnx7. head.bbox.onnx(fp16)總結下載鏈接參考 前言 自動駕駛之心推出的《CUDA與TensorRT部署實戰…

每日一類:Qt中的萬能容器

在Qt框架中&#xff0c;QVariant類扮演著一個非常重要的角色。它是一個萬能容器類&#xff0c;可以存儲Qt中的任何基本類型數據&#xff0c;包括自定義類型。這種靈活性使得QVariant成為Qt編程中不可或缺的工具&#xff0c;特別是在需要處理不同類型數據或進行對象間通信時。 …

Unity UGUI之Scrollbar基本了解

Unity的Scrollbar組件是用于在UI中創建滾動條的組件之一。滾動條通常與其他可滾動的UI元素&#xff08;如滾動視圖或列表&#xff09;一起使用&#xff0c;以便用戶可以在內容超出可見區域時滾動內容。 以下是Scrollbar的基本信息和用法: 1、創建 在Unity的Hierarchy視圖中右…

柯西矩陣介紹

經典定義 柯西矩陣&#xff08;Cauchy Matrix&#xff09;&#xff0c;是一種特殊類型的矩陣&#xff0c;它在數學中的多個領域&#xff0c;包括線性代數、數值分析和插值理論中都有重要應用。柯西矩陣以19世紀法國數學家奧古斯丁-路易柯西的名字命名。 柯西矩陣是一個方陣&am…

Krylov matrix

Krylov矩陣是一種在數值線性代數中使用的矩陣&#xff0c;尤其是在迭代解法中用于求解線性方程組、特征值問題和其他線性代數問題。它是由俄國數學家阿列克謝尼古拉耶維奇克雷洛夫&#xff08;Alexei Nikolaevich Krylov&#xff09;的名字命名的。 Krylov子空間由以下形式的矩…

jetson nano——編譯安裝opencv==4.4

目錄 1.下載源碼&#xff0c;我提供的鏈接如下&#xff1a;1.1文件上傳的路徑位置&#xff0c;注意ymck是我自己的用戶名&#xff08;你們自己換成你們自己相對應的就行&#xff09; 2.解壓文件3.安裝依賴4.增加swap交換內存4.1臨時增加交換內存swap4.2永久增加swap 5.安裝open…

2024-03-03 作業

作業要求&#xff1a; 1.使用fwrite、fread將一張隨意的bmp圖片&#xff0c;修改成德國的國旗 2.使用提供的getch函數&#xff0c;編寫一個專門用來輸入密碼的函數&#xff0c;要求輸入密碼的時候&#xff0c;顯示 * 號&#xff0c;輸入回車的時候&#xff0c;密碼輸入結束 作業…

學習Android的第十九天

目錄 Android ExpandableListView 分組列表 ExpandableListView 屬性 ExpandableListView 事件 ExpandableListView 的 Adapter 范例 參考文檔 Android ViewFlipper 翻轉視圖 ViewFlipper 屬性 ViewFlipper 方法 為 ViewFlipper 加入 View 例子&#xff1a;全屏幕可…

【MySQL】索引(重點)-- 詳解

一、索引 沒有索引&#xff0c;可能會有什么問題&#xff1f; 索引 &#xff1a;提高數據庫的性能&#xff0c;索引是物美價廉的東西了。不用加內存&#xff0c;不用改程序&#xff0c;不用調 sql &#xff0c;只要執行正確的 create index &#xff0c;查詢速度就可能提高成…

加密與安全_探索數字證書

文章目錄 Pre概述使用keytool生成證書使用Openssl生成證書 &#xff08;推薦&#xff09;證書的吊銷小結 Pre PKI - 借助Nginx 實現Https 服務端單向認證、服務端客戶端雙向認證 PKI - 04 證書授權頒發機構&#xff08;CA&#xff09; & 數字證書 PKI - 數字簽名與數字證…

java面試題(spring框架篇)(黑馬 )

樹形圖&#xff1a; 一、Spring框架種的單例bean是線程安全嗎&#xff1f; Service Scope("singleton") public class UserServiceImpl implements UserService{ } singleton:bean在每個Spring IOC容器中只有一個實例 protype&#xff1a;一個bean的定義可以有多個…

CPU iowait是什么意思

在linux系統&#xff0c;使用top命令時&#xff0c;可以看到cpu使用統計情況&#xff0c;有時我們會注意到iowait這一項非常高。我們直到&#xff0c;在cpu運行進程、線程時&#xff0c;遇到IO操作&#xff0c;因為IO讀寫通常比較慢&#xff0c;CPU通常可以阻塞線程&#xff0c…

【Web安全靶場】xss-labs-master 1-20

xss-labs-master 其他靶場見專欄 文章目錄 xss-labs-masterlevel-1level-2level-3level-4level-5level-6level-7level-8level-9level-10level-11level-12level-13level-14level-15level-16level-17level-18level-19level-20 level-1 第一關沒有進行任何限制&#xff0c;get請求…

pytorch_神經網絡構建6

文章目錄 強化學習概念實現qLearning基于這個思路,那么解決這個問題的代碼如下 強化學習概念 強化學習有一個非常直觀的表現&#xff0c;就是從出發點到目標之間存在著一個連續的狀態轉換&#xff0c;比如說從狀態一到狀態456&#xff0c;而每一個狀態都有多種的行為&#xff…

全國青少年軟件編程(Python)等級考試試卷(一級) 測試卷2021年12月

第 1 題 【 單選題 】 下面程序的運行結果是什么&#xff1f;&#xff08; &#xff09; a10 b5 ca*b print(c) A :10 B :15 C :50 D :5 正確答案:C 試題解析: 第 2 題 【 單選題 】 與a>b and b>c等價的是&#xff1f;&#xff08; &#xff09; A…