KDP數據分析實戰:從0到1完成數據實時采集處理到可視化

智領云自主研發的開源輕量級Kubernetes數據平臺,即Kubernetes Data Platform (簡稱KDP),能夠為用戶提供在Kubernetes上的一站式云原生數據集成與開發平臺。在最新的v1.1.0版本中,用戶可借助 KDP 平臺上開箱即用的?Airflow、AirByte、Flink、Kafka、MySQL、ClickHouse、Superset?等開源組件快速搭建實時、半實時或批量采集、處理、分析的數據流水線以及可視化報表展示,可視化展示效果如下:

247984561aa6a7370c0c0741a330aded.png

以下我們將介紹一個實時訂單數據流水線從數據采集到數據處理,最后到可視化展示的詳細建設流程。

?1.流水線設計

借助 KDP 平臺的開源組件 Airflow、MySQL、Flink、Kafka、ClickHouse、Superset 完成數據實時采集處理及可視化分析,架構如下:?

8ea91c86309be540823ed486fe2b0dce.jpeg

1.1 數據流

  • 直接使用Flink構建實時數倉,由Flink進行清洗加工轉換和聚合匯總,將各層結果集寫入Kafka中;

  • ClickHouse從Kafka分別訂閱各層數據,將各層數據持久化到ClickHouse中,用于之后的查詢分析。

1.2 數據表

本次分析數據基于mock數據,包含數據實時采集處理及可視化分析:

  • 消費者表:customers

字段

字段說明

id

用戶ID

name

姓名

age

年齡

gender

性別

  • 訂單表:orders

字段

字段說明

order_id

訂單ID

order_revenue

訂單金額

order_region

下單地區

customer_id

用戶ID

create_time

下單時間

1.3 環境說明

在 KDP 頁面安裝如下組件并完成組件的 QuickStart:

  • MySQL: 實時數據數據源及 Superset/Airflow 元數據庫,安裝時需要開啟binlog

  • Kafka: 數據采集sink

  • Flink: 數據采集及數據處理

  • ClickHouse: 數據存儲

  • Superset: 數據可視化

  • Airflow: 作業調度

2. 數據集成與處理

文中使用的賬號密碼信息請根據實際集群配置進行修改。

2.1 創建MySQL表

2.2 創建 Kafka Topic

進入Kafka broker pod,執行命令創建 Topic,也可以通過Kafka manager 頁面創建,以下為進入pod并通過命令行創建的示例:

export BOOTSTRAP="kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092" bin/kafka-topics.sh --create \--topic ods-order \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAP bin/kafka-topics.sh --create \--topic ods-customers \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dwd-order-customer-valid \--replication-factor 3 \--partitions 10 \--bootstrap-server $BOOTSTRAPbin/kafka-topics.sh --create \--topic dws-agg-by-region \--replication-factor 3 \--partitions 10 \--bootstrap-server?$BOOTSTRAP

2.3 創建 ClickHouse 表

進入clickhouse pod,使用`clickhouse-client`執行命令創建表,以下為建表語句:

CREATE DATABASE IF NOT EXISTS kdp_demo;
USE kdp_demo;-- kafka_dwd_order_customer_valid
CREATE TABLE IF NOT EXISTS kdp_demo.dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = MergeTree()
ORDER BY order_id;CREATE TABLE kdp_demo.kafka_dwd_order_customer_valid (order_id Int32,order_revenue Float32,order_region String,create_time DateTime,customer_id Int32,customer_age Float32,customer_name String,customer_gender String
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dwd-order-customer-valid',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dwd_order_customer_valid TO kdp_demo.dwd_order_customer_valid AS
SELECTorder_id,order_revenue,order_region,create_time,customer_id,customer_age,customer_name,customer_gender
FROM kdp_demo.kafka_dwd_order_customer_valid;-- kafka_dws_agg_by_region
CREATE TABLE IF NOT EXISTS kdp_demo.dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = ReplacingMergeTree()
ORDER BY order_region;CREATE TABLE kdp_demo.kafka_dws_agg_by_region (order_region String,order_cnt Int64,order_total_revenue Float32
) ENGINE = Kafka
SETTINGSkafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',kafka_topic_list = 'dws-agg-by-region',kafka_group_name = 'clickhouse_group',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n';CREATE MATERIALIZED VIEW kdp_demo.mv_dws_agg_by_region TO kdp_demo.dws_agg_by_region AS
SELECTorder_region,order_cnt,order_total_revenue
FROM?kdp_demo.kafka_dws_agg_by_region;

2.4 創建 Flink SQL 作業

2.4.1 SQL部分

CREATE DATABASE IF NOT EXISTS `default_catalog`.`kdp_demo`;-- create source tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`orders_src`(`order_id` INT NOT NULL,`order_revenue` FLOAT NOT NULL,`order_region` STRING NOT NULL,`customer_id` INT NOT NULL,`create_time` TIMESTAMP,PRIMARY KEY(`order_id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'orders'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`customers_src` (`id` INT NOT NULL,`age` FLOAT NOT NULL,`name` STRING NOT NULL,`gender` STRING NOT NULL,PRIMARY KEY(`id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'kdp-data-mysql','port' = '3306','username' = 'bdos_dba','password' = 'KdpDba!mysql123','database-name' = 'kdp_demo','table-name' = 'customers'
);-- create ods dwd and dws tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_order_table` (`order_id` INT,`order_revenue` FLOAT,`order_region` VARCHAR(40),`customer_id` INT,`create_time` TIMESTAMP,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-order','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_customers_table` (`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`gender` STRING,PRIMARY KEY (customer_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'ods-customers','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dwd_order_customer_valid` (`order_id` INT,`order_revenue` FLOAT,`order_region` STRING,`create_time` TIMESTAMP,`customer_id` INT,`customer_age` FLOAT,`customer_name` STRING,`customer_gender` STRING,PRIMARY KEY (order_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dwd-order-customer-valid','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dws_agg_by_region` (`order_region` VARCHAR(40),`order_cnt` BIGINT,`order_total_revenue` FLOAT,PRIMARY KEY (order_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'dws-agg-by-region','properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092','key.format' = 'json','value.format' = 'json'
);USE kdp_demo;
-- EXECUTE STATEMENT SET
-- BEGIN
INSERT INTO ods_order_table SELECT * FROM orders_src;
INSERT INTO ods_customers_table SELECT * FROM customers_src;
INSERT INTOdwd_order_customer_valid
SELECTo.order_id,o.order_revenue,o.order_region,o.create_time,c.id as customer_id,c.age as customer_age,c.name as customer_name,c.gender as customer_gender
FROMcustomers_src cJOIN orders_src o ON c.id = o.customer_id
WHEREc.id <> -1;
INSERT INTOdws_agg_by_region
SELECTorder_region,count(*) as order_cnt,sum(order_revenue) as order_total_revenue
FROMdwd_order_customer_valid
GROUP BYorder_region;
--?END;

2.4.2 使用?StreamPark 創建 Flink SQL 作業

具體使用參考?StreamPark?文檔。

maven 依賴:

<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version>
</dependency>

2.5 創建 Airflow DAG

2.5.1 DAG 文件部分

import random
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_agodefault_args = {'owner': 'admin','depends_on_past': False,'email_on_failure': False,'email_on_retry': False,'retries': 1,
}dag = DAG('kdp_demo_order_data_insert',description='Insert into orders by using random data',schedule_interval=timedelta(minutes=1),start_date=days_ago(1),catchup=False,tags=['kdp-example'],
)# MySQL connection info
mysql_host = 'kdp-data-mysql'
mysql_db = 'kdp_demo'
mysql_user = 'bdos_dba'
mysql_password = 'KdpDba!mysql123'
mysql_port = '3306'
cities = ["北京", "上海", "廣州", "深圳", "成都", "杭州", "重慶", "武漢", "西安", "蘇州", "天津", "南京", "鄭州","長沙", "東莞", "青島", "寧波", "沈陽", "昆明", "合肥", "大連", "廈門", "哈爾濱", "福州", "濟南", "溫州","佛山", "南昌", "長春", "貴陽", "南寧", "金華", "石家莊", "常州", "泉州", "南通", "太原", "徐州", "嘉興","烏魯木齊", "惠州", "珠海", "揚州", "蘭州", "煙臺", "汕頭", "濰坊", "保定", "海口"]
city = random.choice(cities)
consumer_id = random.randint(1, 100)
order_revenue = random.randint(1, 100)
# 插入數據的 BashOperator
insert_data_orders = BashOperator(task_id='insert_data_orders',bash_command=f'''mysql -h {mysql_host} -P {mysql_port} -u {mysql_user} -p{mysql_password} {mysql_db} -e "INSERT INTO orders(order_revenue,order_region,customer_id) VALUES({order_revenue},'{city}',{consumer_id});"''',dag=dag,
)
insert_data_orders

2.5.2 DAG 說明及執行

當前Airflow安裝時,需要指定可訪問的git 倉庫地址,因此需要將 Airflow DAG 提交到 Git 倉庫中。每分鐘向orders表插入一條數據。

2.6 數據驗證

使用ClickHouse驗證數據:

(1)進入ClickHouse客戶端

clickhouse-client 
# default pass: ckdba.123

(2)執行查詢

SELECT * FROM kdp_demo.dwd_order_customer_valid;
SELECT?count(*)?FROM?kdp_demo.dwd_order_customer_valid;

(3)對比驗證MySQL中數據是否一致

select?count(*)?from?kdp_demo.orders;

3. 數據可視化

在2.6中數據驗證通過后,可以通過Superset進行數據可視化展示。使用賬號`admin/admin`登錄Superset頁面(注意添加本地 Host 解析):http://superset-kdp-data.kdp-e2e.io

3.1 創建圖表

導入我們制作好的圖表:

  1. 下載面板:https://gitee.com/linktime-cloud/example-datasets/raw/main/superset/dashboard_export_20240607T100739.zip

  2. 導入面板

(1)選擇下載的文件導入

eed49ffb69952693ad5a46da6be81f08.png

(2)輸入 ClickHouse 的用戶`default`的默認密碼`ckdba.123`:

4c07d54c7ad0f2f66085481d5bfe77ab.png

3.2 效果展示

最終的實時訂單數據圖表展示如下,隨著訂單數據的更新,圖表中的數據也會實時更新:

57bd6fa097e66e1da7dc8aa103a599b4.png

快速體驗

🚀GitHub項目:

https://github.com/linktimecloud/kubernetes-data-platform

歡迎您參與開源社區的建設🤝

?-?FIN?-?? ? ??

1ad0c68fe5ea3d59a392d306012eef0b.png

更多精彩推

  • 我們開源啦!一鍵部署免費使用!Kubernetes上直接運行大數據平臺!

  • 開源 KDP ?v1.1.0 版本正式發布,新增數據集成開發應用場景

  • 在 KubeSphere 上快速安裝和使用 KDP 云原生數據平臺

  • 在 Rancher 上快速安裝和使用 KDP 云原生數據平臺

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/44336.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/44336.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/44336.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL數據庫中利用定時作業去殺死長時查詢以防止數據庫死鎖風險

MySQL數據庫中沒有SQLServer數據庫中那種傳統的定時作業的概念。但是提供了一種【事件】的東西&#xff0c;基本和定時作業貌離神合。 下面我們在MySQL中創建一個事件&#xff0c;它的作用是去監測時間很長的異常查詢&#xff0c;并且去主動殺掉該線程以防止數據庫發生死鎖的風…

探索Perl的自動清潔工:垃圾收集機制全解析

&#x1f9f9; 探索Perl的自動清潔工&#xff1a;垃圾收集機制全解析 Perl是一種高級編程語言&#xff0c;以其強大的文本處理能力而聞名。在Perl中&#xff0c;內存管理對于開發高效且穩定的應用程序至關重要。Perl提供了自動垃圾收集機制&#xff0c;幫助開發者管理內存&…

關于原型和原型鏈的學習和實踐

在前端面試中&#xff0c;原型和原型鏈始終是一個避不開的問題&#xff0c;今天就弄明白! 原型和原型鏈 對象的創建方式工廠模式構造函數模式原型模式 原型和原型鏈實踐 對象的創建方式 原型和原型鏈都是關于對象的內容&#xff0c;先來看一下JavaScript中對象的構建方式。 工…

代碼隨想錄(day3)有序數組的平方

暴力求解法&#xff1a; 注意&#xff1a;需要確定范圍&#xff0c;比如nums.sort()是在for循環之外&#xff0c;根據函數的功能來確定 return返回的是nums&#xff0c;而不是nums[i]因為返回的是整個數組 class Solution(object):def sortedSquares(self, nums):for i in r…

人話學Python-基礎篇-數字計算

一&#xff1a;數字類型 對于最常見的數據類型,數字在Python中分為三類&#xff1a; 整型(int) 表示的是整數類型的所有數字&#xff0c;包括正整數&#xff0c;負整數和0。和C語言不同的是&#xff0c;Python中的int型沒有范圍的限制&#xff0c;理論上可以從無限小的整數取到…

RedHat運維-Ansible自動化運維基礎22-rhel-system-roles

1. system_roles的官方文檔的位置是___________________________________&#xff1b; 2. system_roles的官方文檔的位置是___________________________________&#xff1b; 3. system_roles的官方文檔的位置是___________________________________&#xff1b; 4. 安裝rhel-s…

react基礎語法,模板語法,ui渲染,jsx,useState狀態管理

創建一個react應用 這里使用create-react-app的腳手架構建項目&#xff08;結構簡潔&#xff0c;基于webpack-cli&#xff09;&#xff0c; npx create-react-app [項目名稱] 使用其他腳手架構建項目可以參考&#xff1a;react框架&#xff0c;使用vite和nextjs構建react項目…

數學建模國賽入門指南

文章目錄 認識數學建模及國賽認識數學建模什么是數學建模&#xff1f;數學建模比賽 國賽參賽規則、評獎原則如何評省、國獎評獎規則如何才能獲獎 國賽賽題分類及選題技巧國賽賽題特點賽題分類 國賽歷年題型及優秀論文數學建模分工技巧數模必備軟件數模資料文獻數據收集資料收集…

力扣題解(乘積為正數的最長子數組長度)

1567. 乘積為正數的最長子數組長度 已解答 中等 給你一個整數數組 nums &#xff0c;請你求出乘積為正數的最長子數組的長度。 一個數組的子數組是由原數組中零個或者更多個連續數字組成的數組。 請你返回乘積為正數的最長子數組長度。 本題要求乘積為正數&#xff0c;而整…

白蛇插畫:成都亞恒豐創教育科技有限公司

白蛇插畫&#xff1a;古韻今風&#xff0c;情深意長 在浩瀚的藝術長河中&#xff0c;插畫作為一種獨特的藝術形式&#xff0c;以其生動形象的畫面、豐富多彩的色彩和深邃悠遠的意境&#xff0c;成都亞恒豐創教育科技有限公司深受人們喜愛。而“白蛇插畫”&#xff0c;作為融合…

bug - while parsing file included at

bug 如下 找到這個對應文件tb_top.sv的對應行&#xff0c;發現是一個 include "inc_tb_tests_xxx.sv" 問題點&#xff1a;頭文件&#xff0c;重復定義&#xff0c;那么 解決方法- 在被include的文件首尾加入 ifndef MY_TRANSACTION__SV define MY_TRANSACTION__SV …

GenAI 技術堆棧架構師指南 - 十種工具

這篇文章于 2024 年 6 月 3 日首次出現在 The New Stack 上。 我之前寫過關于現代數據湖參考架構的文章&#xff0c;解決了每個企業面臨的挑戰——更多的數據、老化的Hadoop工具&#xff08;特別是HDFS&#xff09;以及對RESTful API&#xff08;S3&#xff09;和性能的更大需求…

《javascript語言精粹》學習筆記之函數特性

分析javascript javascript比較好的思想&#xff1a;函數、弱類型、動態對象、對象字面量表示法 不好的思想&#xff1a;基于全局變量的編程模型 函數 函數對象 函數就是對象&#xff0c;新創建的函數會連接到Function.prototype上&#xff0c;沒和函數創建時附帶有兩個隱藏…

前端--第一個前端程序

第一個前端程序 第一步&#xff1a; 使用記事本&#xff0c;編寫代碼 在你的一個磁盤里面創建一個文件夾&#xff0c;名為前端&#xff0c;然后在里面新建一個記事本&#xff0c;在里面寫如下代碼&#xff0c;注意一定要使用英文&#xff0c;然后把后綴名稱改為.html。 第二…

你明白C++中的多態嗎?(暑假提升-多態專題)

內不欺己&#xff0c;外不欺人。———孔子 有趣的多態 1、前言2、概念3、多態定義與產生條件4、多態的重要組成成員-(虛函數)5、虛函數的重寫(覆蓋)6、輔助關鍵字override與final(了解即可)7、重載&#xff0c;重定義(隱藏)&#xff0c;重寫(覆蓋)8、抽象類9、多態的原理9、1、…

PHP老照片修復文字識別圖像去霧一鍵摳圖微信小程序源碼

&#x1f50d;解鎖復古魅力&#xff0c;微信小程序黑科技大揭秘&#xff01;老照片修復&更多神奇功能等你來試&#xff01; &#x1f4f8; 【老照片修復&#xff0c;時光倒流的美顏術】 你是否珍藏著一堆泛黃的老照片&#xff0c;卻因歲月侵蝕而模糊不清&#xff1f;現在…

實驗02 黑盒測試(組合測試、場景法)

1. 組合測試用例設計技術 指出等價類劃分法和邊界值分析法通常假設輸入變量相互獨立&#xff0c;但實際情況中變量間可能存在關聯。全面測試&#xff1a;覆蓋所有輸入變量的所有可能組合&#xff0c;測試用例數量隨輸入變量的增加而指數增長。 全面測試需要對所有輸入的各個取…

2008年上半年軟件設計師【上午題】真題及答案

文章目錄 2008年上半年軟件設計師上午題--真題2008年上半年軟件設計師上午題--答案 2008年上半年軟件設計師上午題–真題 2008年上半年軟件設計師上午題–答案

按模版批量生成定制合同

提出問題 一個儀器設備采購公司&#xff0c;商品合同采購需要按模版生成的固定的文件&#xff0c;模板是固定的&#xff0c;只是每次需要替換信息&#xff0c;然后打印出來寄給客戶。 傳統方法 如果手工來做這個事情&#xff0c;準備好數據之后&#xff0c;需要從Excel表格中…

Qt5 Ubuntu18 QStackedWidget

1、在實際項目開發過程遇到&#xff0c;如果通過UI插件的屬性設置&#xff0c;通過對默認的兩個頁面進行提升需要切換操作的對象&#xff0c;如果該對象需要外部接口傳入數據&#xff0c;實現界面信息的實時刷新&#xff0c;這樣會失敗&#xff0c;失敗的原因很好理解&#xff…