智領云自主研發的開源輕量級Kubernetes數據平臺,即Kubernetes Data Platform (簡稱KDP),能夠為用戶提供在Kubernetes上的一站式云原生數據集成與開發平臺。在最新的v1.1.0版本中,用戶可借助 KDP 平臺上開箱即用的?Airflow、AirByte、Flink、Kafka、MySQL、ClickHouse、Superset?等開源組件快速搭建實時、半實時或批量采集、處理、分析的數據流水線以及可視化報表展示,可視化展示效果如下:
以下我們將介紹一個實時訂單數據流水線從數據采集到數據處理,最后到可視化展示的詳細建設流程。
?1.流水線設計
借助 KDP 平臺的開源組件 Airflow、MySQL、Flink、Kafka、ClickHouse、Superset 完成數據實時采集處理及可視化分析,架構如下:?
1.1 數據流
直接使用Flink構建實時數倉,由Flink進行清洗加工轉換和聚合匯總,將各層結果集寫入Kafka中;
ClickHouse從Kafka分別訂閱各層數據,將各層數據持久化到ClickHouse中,用于之后的查詢分析。
1.2 數據表
本次分析數據基于mock數據,包含數據實時采集處理及可視化分析:
消費者表:customers
字段 | 字段說明 |
id | 用戶ID |
name | 姓名 |
age | 年齡 |
gender | 性別 |
訂單表:orders
字段 | 字段說明 |
order_id | 訂單ID |
order_revenue | 訂單金額 |
order_region | 下單地區 |
customer_id | 用戶ID |
create_time | 下單時間 |
1.3 環境說明
在 KDP 頁面安裝如下組件并完成組件的 QuickStart:
MySQL: 實時數據數據源及 Superset/Airflow 元數據庫,安裝時需要開啟binlog
Kafka: 數據采集sink
Flink: 數據采集及數據處理
ClickHouse: 數據存儲
Superset: 數據可視化
Airflow: 作業調度
2. 數據集成與處理
文中使用的賬號密碼信息請根據實際集群配置進行修改。
2.1 創建MySQL表
2.2 創建 Kafka Topic
進入Kafka broker pod,執行命令創建 Topic,也可以通過Kafka manager 頁面創建,以下為進入pod并通過命令行創建的示例:
export BOOTSTRAP="kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092" bin/kafka-topics.sh --create \--topic ods-order \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAP bin/kafka-topics.sh --create \--topic ods-customers \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dwd-order-customer-valid \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dws-agg-by-region \--replication-factor 3 \--partitions 10 \--bootstrap-server?$BOOTSTRAP
2.3 創建 ClickHouse 表
進入clickhouse pod,使用`clickhouse-client`執行命令創建表,以下為建表語句:
CREATE DATABASE IF NOT EXISTS kdp_demo;
USE kdp_demo;-- kafka_dwd_order_customer_valid
CREATE TABLE IF NOT EXISTS kdp_demo.dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = MergeTree()
ORDER BY order_id;CREATE TABLE kdp_demo.kafka_dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dwd-order-customer-valid',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dwd_order_customer_valid TO kdp_demo.dwd_order_customer_valid AS
SELECTorder_id,order_revenue,order_region,create_time,customer_id,customer_age,customer_name,customer_gender
FROM kdp_demo.kafka_dwd_order_customer_valid;-- kafka_dws_agg_by_region
CREATE TABLE IF NOT EXISTS kdp_demo.dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = ReplacingMergeTree()
ORDER BY order_region;CREATE TABLE kdp_demo.kafka_dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dws-agg-by-region',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dws_agg_by_region TO kdp_demo.dws_agg_by_region AS
SELECTorder_region,order_cnt,order_total_revenue
FROM?kdp_demo.kafka_dws_agg_by_region;
2.4 創建 Flink SQL 作業
2.4.1 SQL部分
CREATE DATABASE IF NOT EXISTS `default_catalog`.`kdp_demo`;-- create source tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`orders_src`(`order_id` INT NOT NULL,`order_revenue` FLOAT NOT NULL,`order_region` STRING NOT NULL,`customer_id` INT NOT NULL,`create_time` TIMESTAMP,PRIMARY KEY(`order_id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'orders'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`customers_src` (`id` INT NOT NULL,`age` FLOAT NOT NULL,`name` STRING NOT NULL,`gender` STRING NOT NULL,PRIMARY KEY(`id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'customers'
);-- create ods dwd and dws tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_order_table` (`order_id` INT,`order_revenue` FLOAT,`order_region` VARCHAR(40),`customer_id` INT,`create_time` TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-order','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_customers_table` (`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`gender` STRING,PRIMARY KEY (customer_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-customers','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dwd_order_customer_valid` (`order_id` INT,`order_revenue` FLOAT,`order_region` STRING,`create_time` TIMESTAMP,`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`customer_gender` STRING,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dwd-order-customer-valid','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dws_agg_by_region` (`order_region` VARCHAR(40),`order_cnt` BIGINT,`order_total_revenue` FLOAT,PRIMARY KEY (order_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dws-agg-by-region','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);USE kdp_demo;
-- EXECUTE STATEMENT SET
-- BEGIN
INSERT INTO ods_order_table SELECT * FROM orders_src;
INSERT INTO ods_customers_table SELECT * FROM customers_src;
INSERT INTOdwd_order_customer_valid
SELECTo.order_id,o.order_revenue,o.order_region,o.create_time,c.id as customer_id,c.age as customer_age,c.name as customer_name,c.gender as customer_gender
FROMcustomers_src cJOIN orders_src o ON c.id = o.customer_id
WHEREc.id <> -1;
INSERT INTOdws_agg_by_region
SELECTorder_region,count(*) as order_cnt,sum(order_revenue) as order_total_revenue
FROMdwd_order_customer_valid
GROUP BYorder_region;
--?END;
2.4.2 使用?StreamPark 創建 Flink SQL 作業
具體使用參考?StreamPark?文檔。
maven 依賴:
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version>
</dependency>
2.5 創建 Airflow DAG
2.5.1 DAG 文件部分
import random
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_agodefault_args = {'owner': 'admin','depends_on_past': False,'email_on_failure': False,'email_on_retry': False,'retries': 1,
}dag = DAG('kdp_demo_order_data_insert',description='Insert into orders by using random data',schedule_interval=timedelta(minutes=1),start_date=days_ago(1),catchup=False,tags=['kdp-example'],
)# MySQL connection info
mysql_host = 'kdp-data-mysql'
mysql_db = 'kdp_demo'
mysql_user = 'bdos_dba'
mysql_password = 'KdpDba!mysql123'
mysql_port = '3306'
cities = ["北京", "上海", "廣州", "深圳", "成都", "杭州", "重慶", "武漢", "西安", "蘇州", "天津", "南京", "鄭州","長沙", "東莞", "青島", "寧波", "沈陽", "昆明", "合肥", "大連", "廈門", "哈爾濱", "福州", "濟南", "溫州","佛山", "南昌", "長春", "貴陽", "南寧", "金華", "石家莊", "常州", "泉州", "南通", "太原", "徐州", "嘉興","烏魯木齊", "惠州", "珠海", "揚州", "蘭州", "煙臺", "汕頭", "濰坊", "保定", "海口"]
city = random.choice(cities)
consumer_id = random.randint(1, 100)
order_revenue = random.randint(1, 100)
# 插入數據的 BashOperator
insert_data_orders = BashOperator(task_id='insert_data_orders',bash_command=f'''mysql -h {mysql_host} -P {mysql_port} -u {mysql_user} -p{mysql_password} {mysql_db} -e "INSERT INTO orders(order_revenue,order_region,customer_id) VALUES({order_revenue},'{city}',{consumer_id});"''',dag=dag,
)
insert_data_orders
2.5.2 DAG 說明及執行
當前Airflow安裝時,需要指定可訪問的git 倉庫地址,因此需要將 Airflow DAG 提交到 Git 倉庫中。每分鐘向orders表插入一條數據。
2.6 數據驗證
使用ClickHouse驗證數據:
(1)進入ClickHouse客戶端
clickhouse-client
# default pass: ckdba.123
(2)執行查詢
SELECT * FROM kdp_demo.dwd_order_customer_valid;
SELECT?count(*)?FROM?kdp_demo.dwd_order_customer_valid;
(3)對比驗證MySQL中數據是否一致
select?count(*)?from?kdp_demo.orders;
3. 數據可視化
在2.6中數據驗證通過后,可以通過Superset進行數據可視化展示。使用賬號`admin/admin`登錄Superset頁面(注意添加本地 Host 解析):http://superset-kdp-data.kdp-e2e.io
3.1 創建圖表
導入我們制作好的圖表:
下載面板:https://gitee.com/linktime-cloud/example-datasets/raw/main/superset/dashboard_export_20240607T100739.zip
導入面板
(1)選擇下載的文件導入
(2)輸入 ClickHouse 的用戶`default`的默認密碼`ckdba.123`:
3.2 效果展示
最終的實時訂單數據圖表展示如下,隨著訂單數據的更新,圖表中的數據也會實時更新:
快速體驗
🚀GitHub項目:
https://github.com/linktimecloud/kubernetes-data-platform
歡迎您參與開源社區的建設🤝
?-?FIN?-?? ? ??
更多精彩推薦
我們開源啦!一鍵部署免費使用!Kubernetes上直接運行大數據平臺!
開源 KDP ?v1.1.0 版本正式發布,新增數據集成開發應用場景
在 KubeSphere 上快速安裝和使用 KDP 云原生數據平臺
在 Rancher 上快速安裝和使用 KDP 云原生數據平臺