Flink ClickHouse 連接器數據寫入源碼深度解析

一、引言

在大數據處理的實際應用場景中,數據的高效存儲與處理至關重要。Flink 作為一款強大的流式計算框架,能夠對海量數據進行實時處理;而 ClickHouse 作為高性能的列式數據庫,擅長處理大規模數據分析任務。Flink ClickHouse 連接器則將二者的優勢結合起來,允許用戶將 Flink 處理后的數據高效地寫入 ClickHouse 數據庫。下面我們將深入剖析其數據寫入的源碼實現,探究其背后的工作原理和設計思路。

二、整體架構概述

Flink ClickHouse 連接器的數據寫入主要圍繞 AbstractClickHouseOutputFormat 及其子類展開。AbstractClickHouseOutputFormat 作為抽象基類,定義了寫入數據的基本流程和核心方法,為后續的具體實現提供了統一的框架。具體的寫入邏輯由其子類 ClickHouseBatchOutputFormatClickHouseShardOutputFormat 實現,它們分別適用于不同的場景,以滿足多樣化的需求。

三、核心類及方法詳細解析
1. ClickHouseConnectionOptions
// For testing.
@VisibleForTestingpublic ClickHouseConnectionOptions(String url) {this(url, null, null, null, null);}

這個構造函數是專門為測試目的而設計的。在測試環境中,為了簡化測試用例的編寫,我們可能只需要關注 URL 參數,而不需要設置其他復雜的配置。通過這個構造函數,我們可以方便地創建一個僅包含 URL 的 ClickHouseConnectionOptions 對象,從而更專注于對特定功能的測試。

2. AbstractClickHouseOutputFormat.Builder

AbstractClickHouseOutputFormat.Builder 類采用了建造者模式,用于構建 AbstractClickHouseOutputFormat 的實例。它提供了一系列的 withXXX 方法,允許用戶通過鏈式調用的方式設置各種配置參數,最后通過 build 方法創建具體的輸出格式實例。這種設計模式使得代碼更加簡潔易讀,同時也提高了代碼的可維護性。

public Builder withOptions(ClickHouseDmlOptions options) {this.options = options;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}

這些 withXXX 方法通過將傳入的參數賦值給 Builder 類的成員變量,并返回 this 指針,實現了鏈式調用的效果。例如,用戶可以這樣使用:

AbstractClickHouseOutputFormat.Builder builder = new AbstractClickHouseOutputFormat.Builder();
builder.withOptions(options).withConnectionProperties(connectionProperties);
public AbstractClickHouseOutputFormat build() {Preconditions.checkNotNull(options);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(primaryKeys);Preconditions.checkNotNull(partitionKeys);if (primaryKeys.length > 0) {LOG.warn("If primary key is specified, connector will be in UPSERT mode.");LOG.warn("The data will be updated / deleted by the primary key, you will have significant performance loss.");}ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(options, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),options.getDatabaseName(),options.getTableName());boolean isDistributed = engineFullSchema != null;return isDistributed && options.isUseLocal()? createShardOutputFormat(connectionProvider.getOrCreateConnection(), engineFullSchema): createBatchOutputFormat();} catch (Exception exception) {throw new RuntimeException("Build ClickHouse output format failed.", exception);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}

build 方法中,首先會對必要的參數進行非空檢查,確保所有必需的配置都已正確設置。如果指定了主鍵,會發出警告,因為使用主鍵會使連接器進入 UPSERT 模式,這可能會導致性能下降。接著,會創建 ClickHouseConnectionProvider 對象,用于管理與 ClickHouse 數據庫的連接。然后,嘗試獲取分布式引擎的完整信息,判斷當前表是否為分布式表。根據是否為分布式表以及是否使用本地表,選擇創建 ClickHouseShardOutputFormatClickHouseBatchOutputFormat 實例。最后,無論創建過程是否成功,都會關閉 ClickHouseConnectionProvider 以釋放連接資源。

3. ClickHouseBatchOutputFormat 和 ClickHouseShardOutputFormat

ClickHouseBatchOutputFormat 用于批量寫入數據,它將多條記錄打包成一個批次,一次性發送到 ClickHouse 數據庫,從而減少了與數據庫的交互次數,提高了寫入性能。而 ClickHouseShardOutputFormat 用于分片寫入數據,適用于分布式表。在分布式環境中,數據會被分散存儲在多個分片上,ClickHouseShardOutputFormat 會根據分片策略將數據正確地分發到相應的分片上。

private ClickHouseBatchOutputFormat createBatchOutputFormat() {return new ClickHouseBatchOutputFormat(new ClickHouseConnectionProvider(options, connectionProperties),fieldNames,primaryKeys,partitionKeys,logicalTypes,options);
}private ClickHouseShardOutputFormat createShardOutputFormat(ClickHouseConnection connection, DistributedEngineFull engineFullSchema)throws SQLException {SinkShardingStrategy shardingStrategy;List<FieldGetter> fieldGetters = null;if (options.isShardingUseTableDef()) {Expression shardingKey = engineFullSchema.getShardingKey();if (shardingKey instanceof FieldExpr) {shardingStrategy = SinkShardingStrategy.VALUE;FieldGetter fieldGetter =getFieldGetterOfShardingKey(((FieldExpr) shardingKey).getColumnName());fieldGetters = singletonList(fieldGetter);} else if (shardingKey instanceof FunctionExpr&& "rand()".equals(shardingKey.explain())) {shardingStrategy = SinkShardingStrategy.SHUFFLE;fieldGetters = emptyList();} else if (shardingKey instanceof FunctionExpr&& "javaHash".equals(((FunctionExpr) shardingKey).getFunctionName())&& ((FunctionExpr) shardingKey).getArguments().stream().allMatch(expression -> expression instanceof FieldExpr)) {shardingStrategy = SinkShardingStrategy.HASH;fieldGetters = parseFieldGetters((FunctionExpr) shardingKey);} else {throw new RuntimeException("Unsupported sharding key: " + shardingKey.explain());}} else {shardingStrategy = options.getShardingStrategy();if (shardingStrategy.shardingKeyNeeded) {fieldGetters =options.getShardingKey().stream().map(this::getFieldGetterOfShardingKey).collect(toList());}}ClusterSpec clusterSpec = getClusterSpec(connection, engineFullSchema.getCluster());return new ClickHouseShardOutputFormat(new ClickHouseConnectionProvider(options, connectionProperties),clusterSpec,engineFullSchema,fieldNames,primaryKeys,partitionKeys,logicalTypes,shardingStrategy.provider.apply(fieldGetters),options);
}

createShardOutputFormat 方法中,會根據配置選擇不同的分片策略,如 VALUESHUFFLEHASH。對于不同的分片策略,會解析相應的分片鍵,并創建 FieldGetter 列表。例如,如果分片策略為 VALUE,會根據分片鍵的字段名創建一個 FieldGetter;如果為 SHUFFLE,則不需要 FieldGetter;如果為 HASH,會解析函數表達式中的字段名并創建相應的 FieldGetter 列表。最后,會獲取集群信息,并創建 ClickHouseShardOutputFormat 實例。

四、寫入流程總結
  1. 配置參數:使用 AbstractClickHouseOutputFormat.BuilderwithXXX 方法設置寫入選項、連接屬性、字段信息等參數。這些參數將決定數據寫入的行為和方式。
  2. 構建輸出格式:調用 build 方法,根據是否為分布式表以及是否使用本地表,選擇創建 ClickHouseBatchOutputFormatClickHouseShardOutputFormat 實例。這個過程中會進行參數檢查、連接創建和分片策略解析等操作。
  3. 數據寫入:通過創建的輸出格式實例,將數據批量或分片寫入 ClickHouse 數據庫。在寫入過程中,會根據配置的批量大小和刷新間隔進行數據的緩存和批量提交,以提高寫入性能。
  4. 資源管理:在寫入完成后,關閉 ClickHouseConnectionProvider 以釋放連接資源,避免資源泄漏。
五、優化建議
  1. 合理配置批量大小和刷新間隔:根據實際的業務場景和硬件資源,合理調整 sink.batch-sizesink.flush-interval 參數,以平衡寫入性能和內存使用。
  2. 避免使用主鍵進行 UPSERT 操作:如果不是必要情況,盡量避免指定主鍵,因為 UPSERT 操作會帶來較大的性能開銷。
  3. 選擇合適的分片策略:根據數據的特點和分布情況,選擇合適的分片策略,如 VALUESHUFFLEHASH,以確保數據均勻分布在各個分片上。
六、結論

通過對 Flink ClickHouse 連接器數據寫入源碼的深入分析,我們了解了其核心類和方法的實現細節,以及數據寫入的整體流程。這有助于我們在實際應用中更好地配置和優化數據寫入過程,提高寫入性能和可靠性。同時,我們也可以根據具體的業務需求對源碼進行擴展和定制,以滿足更多復雜的場景。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/90216.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/90216.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/90216.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

OpenCV 人臉分析------面部關鍵點檢測類cv::face::FacemarkLBF

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 使用 Local Binary Features (LBF) 算法進行面部關鍵點檢測&#xff08;facial landmark detection&#xff09;。該算法通過級聯回歸樹預測人臉的…

Netstat高級分析工具:Windows與Linux雙系統兼容的精準篩查利器

Netstat高級分析工具&#xff1a;Windows與Linux雙系統兼容的精準篩查利器在網絡安全運維中&#xff0c;快速識別可疑連接是防御入侵的關鍵一步。本文將介紹一款我本人開發的原創高效的雙系統兼容Netstat信息分析工具&#xff0c;大幅提升惡意連接篩查效率。一、Netstat分析在安…

Bright Data MCP+Trae :快速構建電商導購助手垂直智能體

聲明&#xff1a;本測試報告系作者基于個人興趣及使用場景開展的非專業測評&#xff0c;測試過程中所涉及的方法、數據及結論均為個人觀點&#xff0c;不代表任何官方立場或行業標準。 文章目錄 一、引言1.1 當前AI智能體的趨勢1.2 構建智能體面臨的最大挑戰&#xff1a;數據來…

plantuml用法總結

時序圖 參考 https://blog.csdn.net/vitaviva/article/details/120735745用PlantUML簡化復雜時序圖的秘訣 startuml skin rose actor User as user participant "Component A" as A participant "Component B" as Buser -> A: Request data activate …

基于自研心電芯片國產化手持單導/6導/12導心電解決方案

蘇州唯理作為國內心電芯片國產化廠商&#xff0c;面向家用場景&#xff0c;推出了國產化的手持單導/6導/12導心電儀技術解決方案&#xff0c;可以讓家用心電圖儀成本可控&#xff0c;信號鏈路質量更佳穩定。該方案已在多家客戶中實現批量出貨。唯理科技同樣提供了醫療級的心電圖…

Sass詳解:功能特性、常用方法與最佳實踐

Sass詳解&#xff1a;功能特性、常用方法與最佳實踐 Sass&#xff08;Syntactically Awesome Style Sheets&#xff09;作為CSS預處理器領域的先驅&#xff0c;自2006年由Hampton Catlin創建以來&#xff0c;已成為現代前端開發中不可或缺的工具。它通過引入變量、嵌套、混合宏…

vulnhub靶機滲透:PWNLAB: INIT

一、信息收集1、主機發現2、端口掃描PORT STATE SERVICE VERSION 80/tcp open http Apache httpd 2.4.10 ((Debian))111/tcp open rpcbind 2-4 (RPC #100000)3306/tcp open mysql MySQL 5.5.47-0deb8u151649/tcp open status 1 (RPC #100024)3、目錄掃描&…

LiveKit 本地部署全流程指南(含 HTTPS/WSS)

1. 環境準備 操作系統&#xff1a;Windows 10/11 或 Linux/Mac需有本地公網/內網 IP&#xff08;如 192.168.x.x&#xff09;推薦瀏覽器&#xff1a;Chrome/Edge/Firefox/Safari端口未被占用&#xff0c;防火墻允許相關端口 2. 目錄結構建議 livekit/livekit-server.execonf…

NumPy-統計函數詳解

NumPy-統計函數詳解一、基礎統計函數&#xff1a;均值、方差、標準差1. 全局統計&#xff1a;忽略維度的整體計算2. 按軸統計&#xff1a;指定維度方向的計算二、位置統計&#xff1a;中位數、分位數、百分位數1. 中位數計算2. 分位數與百分位數三、離散程度&#xff1a;極差、…

音頻被動降噪技術

音頻被動降噪技術 音頻被動降噪技術是一種通過物理結構和材料設計來減少或隔離外部噪聲的降噪方式,其核心原理是通過物理屏障或吸聲材料來阻斷或吸收聲波,從而降低環境噪聲對聽覺體驗的影響。以下將從技術原理、應用場景、優缺點及與其他降噪技術的對比等方面進行詳細分析。…

中國蟻劍使用方法

找到mysql配置文件 secure-file-priv工作目錄 D:\tool\huli\gui_webshell\AntSword\AntSword\antSword-master重點是tool目錄后面 前面大家可能都不一樣 添加數據一句話木馬 3C3F706870206576616C28245F504F53545B22636D64225D293B3F3E 翻譯過來 <?php eval($_POST["c…

8.1 prefix Tunning與Prompt Tunning模型微調方法

1 prefix Tunning 鏈接&#xff1a;https://blog.csdn.net/m0_66890670/article/details/142942034 這里有基礎的細節介紹。我下面直接總結。 連接2 &#xff1a;https://zhuanlan.zhihu.com/p/1899112824342577371&#xff0c;簡單明了 prefix Tunning改變了什么呢&#xff…

FlashAttention 深入淺出

一 標準Attention的計算 1.1 標準Attention機制詳解 標準Attention&#xff08;注意力&#xff09;機制是深度學習&#xff0c;尤其是在自然語言處理領域中一項革命性的技術&#xff0c;它允許模型在處理序列數據時&#xff0c;動態地將焦點放在輸入序列的不同部分&#xff0c;…

C/C++ inline-hook(x86)高級函數內聯鉤子

&#x1f9f5; C/C inline-hook&#xff08;x86&#xff09;高級函數內聯鉤子 引用&#xff1a; fetch-x86-64-asm-il-sizeC i386/AMD64平臺匯編指令對齊長度獲取實現 &#x1f9e0; 一、Inline Hook技術體系架構 Inline Hook是一種二進制指令劫持技術&#xff0c;通過修改目…

云服務器的安全防護指南:從基礎安全設置到高級威脅防御

隨著云計算的廣泛應用&#xff0c;云服務器已成為企業和個人存儲數據、運行應用的重要基礎設施。然而&#xff0c;隨之而來的安全威脅也日益增多——從常見的網絡攻擊&#xff08;如 DDoS、SQL 注入&#xff09;到復雜的惡意軟件和零日漏洞&#xff0c;無一不考驗著系統的安全性…

狀態機管家:MeScroll 的交互秩序維護

一、核心架構設計與性能基石 MeScroll作為高性能滾動解決方案&#xff0c;其架構設計遵循"分層解耦、精準控制、多端適配"的原則&#xff0c;通過四大核心模塊實現流暢的滾動體驗&#xff1a; 事件控制層&#xff1a;精準捕獲觸摸行為&#xff0c;區分滾動方向與距…

數據出海的隱形冰山:企業如何避開跨境傳輸的“合規漩渦”?

首席數據官高鵬律師數字經濟團隊創作&#xff0c;AI輔助凌晨三點的寫字樓&#xff0c;某跨境電商的技術總監盯著屏幕上的報錯提示&#xff0c;指尖懸在鍵盤上遲遲沒落下。剛從新加坡服務器調取的用戶行為數據&#xff0c;在傳輸到國內分析系統時被攔截了——系統提示“不符合跨…

【Rust base64庫】Rust bas64編碼解碼詳細解析與應用實戰

?? 歡迎大家來到景天科技苑?? ???? 養成好習慣,先贊后看哦~???? ?? 作者簡介:景天科技苑 ??《頭銜》:大廠架構師,華為云開發者社區專家博主,阿里云開發者社區專家博主,CSDN全棧領域優質創作者,掘金優秀博主,51CTO博客專家等。 ??《博客》:Rust開發…

如何利用AI大模型對已有創意進行評估,打造殺手級的廣告創意

摘要 廣告創意是影響廣告效果的最重要的因素之一&#xff0c;但是如何評估和優化廣告創意&#xff0c;一直是一個難題。傳統的方法&#xff0c;如人工評審、A/B測試、點擊率等&#xff0c;都有各自的局限性和缺陷。本文將介紹一種新的方法&#xff0c;即利用人工智能大模型&am…

OSCP - HTB - Cicada

主要知識點 SMB 用戶爆破Backup Operator 組提權 具體步驟 nmap掃描一下先&#xff0c;就像典型的windows 靶機一樣&#xff0c;開放了N多個端口 Nmap scan report for 10.10.11.35 Host is up (0.19s latency). Not shown: 65522 filtered tcp ports (no-response) PORT …