一、引言
在大數據處理的實際應用場景中,數據的高效存儲與處理至關重要。Flink 作為一款強大的流式計算框架,能夠對海量數據進行實時處理;而 ClickHouse 作為高性能的列式數據庫,擅長處理大規模數據分析任務。Flink ClickHouse 連接器則將二者的優勢結合起來,允許用戶將 Flink 處理后的數據高效地寫入 ClickHouse 數據庫。下面我們將深入剖析其數據寫入的源碼實現,探究其背后的工作原理和設計思路。
二、整體架構概述
Flink ClickHouse 連接器的數據寫入主要圍繞 AbstractClickHouseOutputFormat
及其子類展開。AbstractClickHouseOutputFormat
作為抽象基類,定義了寫入數據的基本流程和核心方法,為后續的具體實現提供了統一的框架。具體的寫入邏輯由其子類 ClickHouseBatchOutputFormat
和 ClickHouseShardOutputFormat
實現,它們分別適用于不同的場景,以滿足多樣化的需求。
三、核心類及方法詳細解析
1. ClickHouseConnectionOptions
// For testing.
@VisibleForTestingpublic ClickHouseConnectionOptions(String url) {this(url, null, null, null, null);}
這個構造函數是專門為測試目的而設計的。在測試環境中,為了簡化測試用例的編寫,我們可能只需要關注 URL 參數,而不需要設置其他復雜的配置。通過這個構造函數,我們可以方便地創建一個僅包含 URL 的 ClickHouseConnectionOptions
對象,從而更專注于對特定功能的測試。
2. AbstractClickHouseOutputFormat.Builder
AbstractClickHouseOutputFormat.Builder
類采用了建造者模式,用于構建 AbstractClickHouseOutputFormat
的實例。它提供了一系列的 withXXX
方法,允許用戶通過鏈式調用的方式設置各種配置參數,最后通過 build
方法創建具體的輸出格式實例。這種設計模式使得代碼更加簡潔易讀,同時也提高了代碼的可維護性。
public Builder withOptions(ClickHouseDmlOptions options) {this.options = options;return this;
}public Builder withConnectionProperties(Properties connectionProperties) {this.connectionProperties = connectionProperties;return this;
}
這些 withXXX
方法通過將傳入的參數賦值給 Builder
類的成員變量,并返回 this
指針,實現了鏈式調用的效果。例如,用戶可以這樣使用:
AbstractClickHouseOutputFormat.Builder builder = new AbstractClickHouseOutputFormat.Builder();
builder.withOptions(options).withConnectionProperties(connectionProperties);
public AbstractClickHouseOutputFormat build() {Preconditions.checkNotNull(options);Preconditions.checkNotNull(fieldNames);Preconditions.checkNotNull(fieldTypes);Preconditions.checkNotNull(primaryKeys);Preconditions.checkNotNull(partitionKeys);if (primaryKeys.length > 0) {LOG.warn("If primary key is specified, connector will be in UPSERT mode.");LOG.warn("The data will be updated / deleted by the primary key, you will have significant performance loss.");}ClickHouseConnectionProvider connectionProvider = null;try {connectionProvider =new ClickHouseConnectionProvider(options, connectionProperties);DistributedEngineFull engineFullSchema =getDistributedEngineFull(connectionProvider.getOrCreateConnection(),options.getDatabaseName(),options.getTableName());boolean isDistributed = engineFullSchema != null;return isDistributed && options.isUseLocal()? createShardOutputFormat(connectionProvider.getOrCreateConnection(), engineFullSchema): createBatchOutputFormat();} catch (Exception exception) {throw new RuntimeException("Build ClickHouse output format failed.", exception);} finally {if (connectionProvider != null) {connectionProvider.closeConnections();}}
}
在 build
方法中,首先會對必要的參數進行非空檢查,確保所有必需的配置都已正確設置。如果指定了主鍵,會發出警告,因為使用主鍵會使連接器進入 UPSERT 模式,這可能會導致性能下降。接著,會創建 ClickHouseConnectionProvider
對象,用于管理與 ClickHouse 數據庫的連接。然后,嘗試獲取分布式引擎的完整信息,判斷當前表是否為分布式表。根據是否為分布式表以及是否使用本地表,選擇創建 ClickHouseShardOutputFormat
或 ClickHouseBatchOutputFormat
實例。最后,無論創建過程是否成功,都會關閉 ClickHouseConnectionProvider
以釋放連接資源。
3. ClickHouseBatchOutputFormat 和 ClickHouseShardOutputFormat
ClickHouseBatchOutputFormat
用于批量寫入數據,它將多條記錄打包成一個批次,一次性發送到 ClickHouse 數據庫,從而減少了與數據庫的交互次數,提高了寫入性能。而 ClickHouseShardOutputFormat
用于分片寫入數據,適用于分布式表。在分布式環境中,數據會被分散存儲在多個分片上,ClickHouseShardOutputFormat
會根據分片策略將數據正確地分發到相應的分片上。
private ClickHouseBatchOutputFormat createBatchOutputFormat() {return new ClickHouseBatchOutputFormat(new ClickHouseConnectionProvider(options, connectionProperties),fieldNames,primaryKeys,partitionKeys,logicalTypes,options);
}private ClickHouseShardOutputFormat createShardOutputFormat(ClickHouseConnection connection, DistributedEngineFull engineFullSchema)throws SQLException {SinkShardingStrategy shardingStrategy;List<FieldGetter> fieldGetters = null;if (options.isShardingUseTableDef()) {Expression shardingKey = engineFullSchema.getShardingKey();if (shardingKey instanceof FieldExpr) {shardingStrategy = SinkShardingStrategy.VALUE;FieldGetter fieldGetter =getFieldGetterOfShardingKey(((FieldExpr) shardingKey).getColumnName());fieldGetters = singletonList(fieldGetter);} else if (shardingKey instanceof FunctionExpr&& "rand()".equals(shardingKey.explain())) {shardingStrategy = SinkShardingStrategy.SHUFFLE;fieldGetters = emptyList();} else if (shardingKey instanceof FunctionExpr&& "javaHash".equals(((FunctionExpr) shardingKey).getFunctionName())&& ((FunctionExpr) shardingKey).getArguments().stream().allMatch(expression -> expression instanceof FieldExpr)) {shardingStrategy = SinkShardingStrategy.HASH;fieldGetters = parseFieldGetters((FunctionExpr) shardingKey);} else {throw new RuntimeException("Unsupported sharding key: " + shardingKey.explain());}} else {shardingStrategy = options.getShardingStrategy();if (shardingStrategy.shardingKeyNeeded) {fieldGetters =options.getShardingKey().stream().map(this::getFieldGetterOfShardingKey).collect(toList());}}ClusterSpec clusterSpec = getClusterSpec(connection, engineFullSchema.getCluster());return new ClickHouseShardOutputFormat(new ClickHouseConnectionProvider(options, connectionProperties),clusterSpec,engineFullSchema,fieldNames,primaryKeys,partitionKeys,logicalTypes,shardingStrategy.provider.apply(fieldGetters),options);
}
在 createShardOutputFormat
方法中,會根據配置選擇不同的分片策略,如 VALUE
、SHUFFLE
或 HASH
。對于不同的分片策略,會解析相應的分片鍵,并創建 FieldGetter
列表。例如,如果分片策略為 VALUE
,會根據分片鍵的字段名創建一個 FieldGetter
;如果為 SHUFFLE
,則不需要 FieldGetter
;如果為 HASH
,會解析函數表達式中的字段名并創建相應的 FieldGetter
列表。最后,會獲取集群信息,并創建 ClickHouseShardOutputFormat
實例。
四、寫入流程總結
- 配置參數:使用
AbstractClickHouseOutputFormat.Builder
的withXXX
方法設置寫入選項、連接屬性、字段信息等參數。這些參數將決定數據寫入的行為和方式。 - 構建輸出格式:調用
build
方法,根據是否為分布式表以及是否使用本地表,選擇創建ClickHouseBatchOutputFormat
或ClickHouseShardOutputFormat
實例。這個過程中會進行參數檢查、連接創建和分片策略解析等操作。 - 數據寫入:通過創建的輸出格式實例,將數據批量或分片寫入 ClickHouse 數據庫。在寫入過程中,會根據配置的批量大小和刷新間隔進行數據的緩存和批量提交,以提高寫入性能。
- 資源管理:在寫入完成后,關閉
ClickHouseConnectionProvider
以釋放連接資源,避免資源泄漏。
五、優化建議
- 合理配置批量大小和刷新間隔:根據實際的業務場景和硬件資源,合理調整
sink.batch-size
和sink.flush-interval
參數,以平衡寫入性能和內存使用。 - 避免使用主鍵進行 UPSERT 操作:如果不是必要情況,盡量避免指定主鍵,因為 UPSERT 操作會帶來較大的性能開銷。
- 選擇合適的分片策略:根據數據的特點和分布情況,選擇合適的分片策略,如
VALUE
、SHUFFLE
或HASH
,以確保數據均勻分布在各個分片上。
六、結論
通過對 Flink ClickHouse 連接器數據寫入源碼的深入分析,我們了解了其核心類和方法的實現細節,以及數據寫入的整體流程。這有助于我們在實際應用中更好地配置和優化數據寫入過程,提高寫入性能和可靠性。同時,我們也可以根據具體的業務需求對源碼進行擴展和定制,以滿足更多復雜的場景。