iceberg系列之 hadoop catalog 小文件合并實戰

  1. 背景
    flink1.15 hadoop3.0
  2. pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.iceberg</groupId><artifactId>flink-iceberg</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--idea運行時也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>compile</scope></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime-1.15</artifactId><version>1.3.0</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-core</artifactId><version>1.3.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><!-- 指定主類 --><mainClass>com.iceberg.flink.UnionDelData</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
  1. 資源配置文件
    hadoop三個常用配置文件core-site.xml hdfs-site.xml yarn-site.xml 放到資源目錄下
  2. java代碼
package com.iceberg.flink;import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;import java.io.File;
import java.net.MalformedURLException;public class UnionDelData {public static void main(String[] args) throws MalformedURLException {      String tableNames = args[1];long targetsSize = parseSizeToBytes(args[2]);int parallelism = Integer.parseInt(args[3]);long retainTime = parseTimeToMillis(args[4]);int retainLastNum = Integer.parseInt(args[5]);Configuration conf = new Configuration();conf.addResource(new File("/home/hadoop/hadoopconf/core-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/hdfs-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/yarn-site.xml").toURI().toURL());HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, "/user/hadoop/path/");for (String tableName : tableNames.split(",")) {Table table = hadoopCatalog.loadTable(TableIdentifier.of("prod", tableName));UnionDataFile(table,parallelism,targetsSize);deleteSnap(table,retainTime,retainLastNum);}}public static void UnionDataFile(Table table,int parallelism,long targetsSize) {Actions.forTable(table).rewriteDataFiles().maxParallelism(parallelism).caseSensitive(false).targetSizeInBytes(targetsSize).execute();}public static void deleteSnap(Table table,long retainTime,int retainLastNum){Snapshot snapshot = table.currentSnapshot();long oldSnapshot = snapshot.timestampMillis() - retainTime;if (snapshot != null) {            table.expireSnapshots().expireOlderThan(oldSnapshot).cleanExpiredFiles(true).retainLast(retainLastNum).commit();}}public static long parseSizeToBytes(String sizeWithUnit) {long size = Long.parseLong(sizeWithUnit.substring(0, sizeWithUnit.length() - 1));char unit = sizeWithUnit.charAt(sizeWithUnit.length() - 1); switch (unit) {case 'B':return size;case 'K':case 'k': return size * 1024;case 'M':case 'm': return size * 1024 * 1024;case 'G':case 'g': return size * 1024 * 1024 * 1024;default:throw new IllegalArgumentException("Invalid size unit: " + unit);}}public static long parseTimeToMillis(String timeWithUnit) {long time = Long.parseLong(timeWithUnit.substring(0, timeWithUnit.length() - 1));char unit = timeWithUnit.charAt(timeWithUnit.length() - 1);switch (unit) {case 's':case 'S':return time * 1000;case 'm':case 'M':return time * 60 * 1000;case 'h':case 'H':return time * 60 * 60 * 1000;case 'd':case 'D':return time * 24 * 60 * 60 * 1000;default:throw new IllegalArgumentException("Invalid time unit: " + unit);}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/41423.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/41423.shtml
英文地址,請注明出處:http://en.pswp.cn/news/41423.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

UBuntu18.04 Qt之雙HDMI屏切換

UBuntu18.04 Qt之雙HDMI接2個4K屏并分別設置分辨率、主屏、副屏 一、設置HDMI-2為主屏 在main函數里面添加&#xff1a; #include "mainwindow.h" #include <QApplication>int main(int argc, char *argv[]) {QApplication a(argc, argv);{long nTotal 0;c…

spring cloud gateway中配置uri

gateway中配置uri配置有三種方式: websocket方式&#xff1a;uri: ws://localhost:9000http方式: uri: http://localhost:8130/lb注冊中心配置方式&#xff08;注冊的服務名稱&#xff09;: uri: lb://monitor-ms gateway的lb方式識別的服務名稱命名規則&#xff1a; "[…

設計模式——適配器模式

引入實例 說起適配器其實在我們的生活中是非常常見的&#xff0c;比如&#xff1a;學校的宿舍的電壓都比較低&#xff0c;而有的學生想使用大功率電器&#xff0c;宿舍的就會跳閘&#xff0c;然而如果你使用一個適配器&#xff08;變壓器&#xff09;就可以使用了&#xff08;…

Jtti:windows虛擬內存最小值太低如何解決

當Windows虛擬內存的最小值設置過低時&#xff0c;可能會導致系統性能下降、應用程序崩潰甚至系統不穩定。解決方法包括&#xff1a; 調整虛擬內存設置&#xff1a; 可以通過以下步驟調整虛擬內存的設置&#xff1a; 右鍵點擊“此電腦”或“計算機”&#xff0c;選擇“屬性”。…

被迫學習一波Linux命令

事情起因 部署一個服務&#xff0c;人家說了最低配置是3G&#xff0c;我沒當回事&#xff0c;拿著個2G的服務器直接就上了&#xff0c;結果&#xff0c;哈哈&#xff0c;都能猜到結果&#xff1a;服務器內存爆了&#xff01;&#xff01;&#xff01;而且最可氣的是服務器還登…

ansible案列之LNMP分布式劇本

LNMP分布式劇本 一&#xff1a;環境設置二&#xff1a;編寫Nginx劇本準備nginx下載源準備配置文件并開放PHP的訪問路徑準備php測試頁面編寫nginx劇本 三&#xff1a;編寫Mysql劇本編寫密碼獲取腳本準備Mysql的yum源編寫mysql劇本 四&#xff1a;準備PHP劇本準備兩個配置文件編寫…

深入理解linux內核--塊設備驅動程序

塊設備的處理 塊設備驅動程序上的每個操作都涉及很多內核組件&#xff1b;其中最重要的一些如圖14-1所示。 例如&#xff0c;我們假設一個進程在某個磁盤文件上發出一個read()系統調用 ——我們將會看到處理write請求本質上采用同樣的方式。 下面是內核對進程請求給予回應的一…

煤礦調度IP語音對講廣播模塊一鍵求助對講礦用調度通信系統SIP語音對講求助終端

硬件接口描述 SV-2101VP/ SV-2103VP系列網絡音頻模塊&#xff0c;所有外部連接采用端子&#xff0c;電源采用2.0mm的端子&#xff0c;網絡采用標準RJ45連接器&#xff0c;其他都是1.25mm的連接器。 端口類型定義 P ———— 電源 AI ———— 模擬輸入&#xff08;在這里是音…

微信小程序前后端開發快速入門(完結篇)

這篇是微信小程序前后端快速入門完結篇了&#xff0c;今天利用之前學習過的所有知識做一個新的項目「群登記助手v1.0」小程序。 整體技術架構&#xff1a;小程序原生前端小程序云開發。 經歷了前面教程的學習&#xff0c;大家有了一定的基礎&#xff0c;所以本次分享重心主要是…

Ubuntu服務器service版本初始化

下載 下載路徑 官網&#xff1a;https://cn.ubuntu.com/ 下載路徑&#xff1a;https://cn.ubuntu.com/download 服務器&#xff1a;https://cn.ubuntu.com/download/server/step1 點擊下載&#xff08;22.04.3&#xff09;&#xff1a;https://cn.ubuntu.com/download/server…

【Python百日進階-Web開發-Peewee】Day271 - Peewee API文檔 - 字段(二)

文章目錄 11.3.17 class UUIDField11.3.18 class BinaryUUIDField11.3.19 class DateTimeField11.3.20 class DateField11.3.21 class TimeField11.3.22 class TimestampField11.3.23 class IPField11.3.24 class BooleanField11.3.25 class BareField11.3.26 class ForeignKey…

神經網絡基礎-神經網絡補充概念-06-計算圖

概念 “計算圖”&#xff08;Computational Graph&#xff09;是一種用于表示數學表達式計算過程的圖結構&#xff0c;廣泛用于深度學習和自動微分等領域。計算圖將復雜的數學表達式分解為一系列簡單的計算節點&#xff0c;這些節點之間通過邊連接&#xff0c;形成了一個有向無…

【jwt】JWT原理,JWT是用來解決什么問題的,如何自定義生成JWT數據,并且實現jwt數據的解碼

JWT&#xff1a; JSON Web Token 1. jwt概述 用戶登錄成功后&#xff0c;服務端 如何知道客戶端的每次請求對應的是哪個用戶呢&#xff1f;怎么做&#xff1a;目前有兩種方式實現. 1.1. 一是通過sessionId的方式&#xff0c;登錄成功后服務端返回sessionId給客戶端&#xff0…

【2023年11月第四版教材】《第5章-信息系統工程之數據工程(第三部分)》

《第5章-信息系統工程之數據工程&#xff08;第三部分&#xff09;》 2 數據工程2.1 數據建模2.2 數據標準化2.3 數據運維2.4 數據開發利用2.5 數據庫安全 2 數據工程 2.1 數據建模 1、根據模型應用目的不同&#xff0c;可以將數據模型劃分為三類:概念模型、邏輯模型和物理模…

【數據結構】棧與隊列

1 棧 1.1 棧的概念及結構 棧&#xff1a;一種特殊的線性表&#xff0c;其只允許在固定的一端進行插入和刪除元素操作。進行數據插入和刪除操作的一端稱為棧頂&#xff0c;另一端稱為棧底。棧中的數據元素遵守后進先出 LIFO (Last In First Out) 的原則。 壓棧&#xff1a;棧…

力扣75——圖廣度優先搜索

總結leetcode75中的圖廣度優先搜索算法題解題思路。 上一篇&#xff1a;力扣75——圖深度優先搜索 力扣75——圖廣度優先搜索 1 迷宮中離入口最近的出口2 腐爛的橘子1-2 解題總結 1 迷宮中離入口最近的出口 題目&#xff1a; 給你一個 m x n 的迷宮矩陣 maze &#xff08;下標…

Kafka中的 ISR 機制

ISR 是什么 ISR 的全稱叫做&#xff1a; In-Sync Replicas &#xff08;同步副本集&#xff09;, 可以理解為和 leader 保持同步的所有副本的集合。ISR 動態維護了一個和 leader 副本保持同步副本集合&#xff0c;ISR 中的副本全部都和 leader 的數據保持同步。 設一個場景&a…

JupyterHub實戰應用

一、JupyerHub jupyter notebook 是一個非常有用的工具&#xff0c;我們可以在瀏覽器中任意編輯調試我們的python代碼&#xff0c;并且支持markdown 語法&#xff0c;可以說是科研利器。但是這種情況適合個人使用&#xff0c;也就是jupyter notebook以我們自己的主機作為服務器…

PostgreSQL邏輯備份pg_dump使用及其原理解析

一、原理分析 1、循環調用getopt_long解析命令行參數&#xff0c;將參數保存到static DumpOptions dopt;中 2、判斷參數是否相容&#xff0c;不相容則退出&#xff1a; options -s/--schema-only and -a/--data-only cannot be used togetheroptions -c/--clean and -a/--data…

uni-app中監聽網絡狀態,并在嵌入webView頁面的組件中添加網絡監測

uni-app中監聽網絡狀態&#xff0c;并在嵌入webView頁面的組件中添加網絡監測 uni-app中監聽網絡狀態 下載插件 打開網絡異常組件頁面&#xff0c;點擊"下載插件并導入HBuilderX"按鈕&#xff0c;打開HBuilderX軟件后&#xff0c;選擇需要導入插件的項目&#xff…