![]() | 博主歷時三年精心創作的《大數據平臺架構與原型實現:數據中臺建設實戰》一書現已由知名IT圖書品牌電子工業出版社博文視點出版發行,點擊《重磅推薦:建大數據平臺太難了!給我發個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側二維碼進入京東手機購書頁面。 |
1. 多表公共配置和差異配置的兩種處理方式
由于 Hudi 的 HoodieMultiTableStreamer
/ HoodieMultiTableDeltaStreamer
是一次處理多張 Hudi 表的寫入,這些表既會有如 hoodie.deltastreamer.source.kafka.value.deserializer.class
這樣相同的公共配置,也會有如 hoodie.datasource.write.recordkey.field
這樣每張表每張表都不同的個性化配置,為此,HoodieMultiTableStreamer
/ HoodieMultiTableDeltaStreamer
給出的解決方案是:將公共配置提取到一個配置文件,將每張表的個性化配置放置到多個對應文件中,至于如何將每張表的表名和它的配置文件映射起來,Hudi 提供兩種方案:
方式一:
在公共配置文件中通過 hoodie.deltastreamer.ingestion.<db>.<table>.configFile
顯式指定 <db>.<table>
對應的配置文件,以下是一個示例:
hoodie.deltastreamer.ingestion.tablesToBeIngested=db1.table1,db2.table2
hoodie.deltastreamer.ingestion.db1.table1.configFile=/tmp/config_table1.properties
hoodie.deltastreamer.ingestion.db2.table2.configFile=/tmp/config_table2.properties
方式二:
將所有表的配置文件統一放置到一個文件夾,并按照 <database>_<table>_config.properties
形式統一命名,通過 --config-folder
參數指明文件夾的路徑后,Hudi 就能根據文件名自動映射到對應表,不必再向方式一那樣顯式配置。這是使用了“約定大約配置”的處理方式,方式二更加簡潔,是首選的配置方式,我們接下來就詳細介紹一下。
2. 首選方式:使用約定的多表文件命名規則簡化配置
這一配置方式可簡述為:將所有表的配置文件統一放置到一個文件夾下,并按照 <database>_<table>_config.properties
形式統一命名,同時,在公共配置文件中通過 hoodie.deltastreamer.ingestion.tablesToBeIngested
配置項以 <db1>.<table1>,<db2>.<table2>,...
的形式列出所有表,最后,在命令行中通過參數 --config-folder
指明文件夾的路徑,這樣 Hudi 就能根據約定的命名規則找到每張表的對應配置文件,那就不必再通過 hoodie.streamer.ingestion.<database>.<table>.configFile
顯式地逐一配置。以下是一個示例:
1. common.properties
hoodie.deltastreamer.ingestion.tablesToBeIngested=db1.table1,db2.table2
2. config folder 目錄結構
/tmp
├── db1_table1_config.properties
├── db2_table2_config.properties
3. 作業提交命令
spark-submit \...--props file://common.properties \--config-folder file://tmp \...
3. 啟用 Schema Registry 時多個 Topic 的 Schema URL 的配置方法
另一個涉及多表特化配置的地方是在 HoodieMultiTableStreamer 攝取 Debezium CDC 數據寫入 Hudi 表時,由于 Hudi 的 Streamer 在處理 Debezium CDC 時強依賴 Confluent Schema Registry,在攝取每一張表對應的 Topic 時都需要指定 Topic 的 Schema Url,為了避免大量的手動配置,HoodieMultiTableStreamer 再次使用了“約定大約配置”的處理方式,它通過hoodie.streamer.schemaprovider.registry.baseUrl
指定 url 的 base 部分,通過 hoodie.streamer.schemaprovider.registry.urlSuffix
指定 url 的后綴部分,中間部分是 Topic 的名稱,由 Hudi 自動拼接,這樣動態地獲得了每張表對應 Topic 的 Schema Url。
4. 重點參數
我們上面提到的幾個重點參數再集中梳理一下:
4.1 命令行中的重要參數
--base-path-prefix
指定攝取數據后 Hudi 數據集存放的 base 目錄,數據集將按照:<base-path-prefix>/<database>/<table>
格式存放--config-folder
在HoodieMultiTableStreamer
下專門用于指定存放所有表配置文件的路徑,配置約定的文件命名 pattern:<database>_<table>_config.properties
,Hudi 就能自動找到每張表的配置文件,那不必再通過hoodie.streamer.ingestion.<database>.<table>.configFile
單獨配置
4.2 配置文件中的重要參數
-
hoodie.streamer.ingestion.tablesToBeIngested
:需要被實時攝取并同步的表,單表使用<database>.<table>
形式,多表用逗號分隔,例如:db1.table1,db1.table2
-
hoodie.streamer.ingestion.<database>.<table>.configFile
:每張表需要提供的 Hudi 配置文件的存放路徑。由于數據表可能非常多,逐一配置所有的表非常繁瑣,因此 Hudi Streamer 提供一種文件命名模式:<database>_<table>_config.properties
,只要我們將對應表的配置文件以此模式命名并放置于--config-folder
配置的文件夾下,Hudi 就能自動映射為對應表的配置,不必再顯式地配置這一項! -
hoodie.streamer.schemaprovider.registry.url
是給單表(HoodieStreamer)用的 -
hoodie.streamer.schemaprovider.registry.baseUrl
+hoodie.streamer.schemaprovider.registry.urlSuffix
聯合起來給多表 用的!!
5. 完整示例
最后,我們引用《CDC 數據入湖方案:Kafka Connect > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》一文第 6 節給出一個完整示例作為一個參考:
tee global-config.properties << EOF
# deltastreamer props
hoodie.deltastreamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter
hoodie.deltastreamer.ingestion.tablesToBeIngested=inventory.orders
hoodie.deltastreamer.schemaprovider.class=org.apache.hudi.utilities.schema.SchemaRegistryProvider
hoodie.deltastreamer.schemaprovider.registry.baseUrl=${SCHEMA_REGISTRY_URL}/subjects/
hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
# kafka props
bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS
auto.offset.reset=earliest
# schema registry props
schema.registry.url=http://10.0.13.30:8085
EOFtee inventory_orders_config.properties << EOF
include=global-config.properties
hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders
hoodie.datasource.write.recordkey.field=order_number
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.hive_sync.database=inventory
hoodie.datasource.hive_sync.table=orders
hoodie.datasource.hive_sync.partition_fields=order_date
EOFaws s3 rm --recursive $APP_S3_HOME/inventory_ordersspark-submit \--master yarn \--deploy-mode client \--jars /usr/lib/spark/connector/lib/spark-avro.jar \--class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer \/usr/lib/hudi/hudi-utilities-bundle.jar \--props file://$HOME/global-config.properties \--table-type COPY_ON_WRITE \--op UPSERT \--config-folder file://$HOME \--base-path-prefix $APP_S3_HOME \--target-table inventory.orders \--continuous \--min-sync-interval-seconds 60 \--source-class org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \--payload-class org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload \--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
關聯閱讀:
-
《CDC 數據入湖方案:Kafka Connect > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》
-
《CDC 數據入湖方案:Flink CDC > Kafka + Schema Registry > Hudi ( HoodieMultiTableStreamer ) 》