一、數據模型簡介
在 Doris 中,數據以表(Table)的形式進行邏輯上的描述。 一張表包括行(Row)和列(Column)。Row 即用戶的一行數據。Column 用于描述一行數據中不同的字段。
Column 可以分為兩大類:Key 和 Value。從業務角度看,Key 和 Value 可以分別對應維度列和指標列。Doris的key列是建表語句中指定的列,建表語句中的關鍵字'unique key'或'aggregate key'或'duplicate key'后面的列就是 Key 列,除了 Key 列剩下的就是 Value 列。
Doris 的數據模型主要分為3類:
- Aggregate
- Unique
- Duplicate
下面我們分別介紹。
二、明細模型【Duplicate】
建表
CREATE TABLE order_info (order_date date NOT NULL COMMENT '下單日期',order_id int(11) NOT NULL COMMENT '訂單id',buy_num tinyint(4) NULL COMMENT '購買件數',user_id int(11) NULL COMMENT '[-1223371, 1223371]',create_time datetime NULL COMMENT '創建時間',update_time datetime NULL COMMENT '更新時間'
) ENGINE=OLAP
DUPLICATE KEY(order_date, order_id)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(order_id) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
通過Flink Sql自帶的datagen生成測試數據:
CREATE TABLE order_info_source (order_date DATE,order_id INT,buy_num INT,user_id INT,create_time TIMESTAMP(3),update_time TIMESTAMP(3)
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.order_id.min' = '99999','fields.order_id.max' = '10001','fields.user_id.min' = '10001','fields.user_id.max' = '90001','fields.buy_num.min' = '1','fields.buy_num.max' = '200','number-of-rows' = '10000'
)
datagen參數
'rows-per-second' = '1000' : 每秒發送1000條數據
'fields.order_id.min' = '99999': order_id最小值為99999
'fields.order_id.max' = '10001': order_id最大值為10001
'fields.user_id.min' = '10001': user_id最小值為10001
'fields.user_id.max' = '90001': user_id最大值為90001
'fields.buy_num.min' = '1': buy_num最小值為1
'fields.buy_num.max' = '200': buy_num最大值為200
'number-of-rows' = '10000': 共發送10000條數據, 不設置的話會無限量發送數據
參考文檔:DataGen | Apache FlinkDataGen SQL Connector # Scan Source: Bounded Scan Source: UnBoundedThe DataGen connector allows for creating tables based on in-memory data generation. This is useful when developing queries locally without access to external systems such as Kafka. Tables can include Computed Column syntax which allows for flexible record generation.The DataGen connector is built-in, no additional dependencies are required.Usage # By default, a DataGen table will create an unbounded number of rows with a random value for each column.https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/datagen/
注冊Sink 表
CREATE TABLE order_info_sink (
order_date DATE,
order_id INT,
buy_num INT,
user_id INT,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3)
)
WITH (
'connector' = 'doris',
'fenodes' = '192.168.56.XXX:8030',
'table.identifier' = 'test.order_info_example',
'username' = 'test123',
'password' = 'passwd123',
'sink.label-prefix' = 'sink_doris_label_8'
)
寫入Sink 表
insert into order_info_sink select * from order_info_source
通過mysql客戶端查看Doris Sink表數據?
mysql> select * from test.order_info_example limit 10;
+------------+----------+---------+---------+---------------------+---------------------+
| order_date | order_id | buy_num | user_id | create_time | update_time |
+------------+----------+---------+---------+---------------------+---------------------+
| 2024-02-22 | 30007 | 10 | 10560 | 2024-02-22 07:42:21 | 2024-02-22 07:42:21 |
| 2024-02-22 | 30125 | 16 | 17591 | 2024-02-22 07:42:26 | 2024-02-22 07:42:26 |
| 2024-02-22 | 30176 | 17 | 10871 | 2024-02-22 07:42:24 | 2024-02-22 07:42:24 |
| 2024-02-22 | 30479 | 16 | 19847 | 2024-02-22 07:42:25 | 2024-02-22 07:42:25 |
| 2024-02-22 | 30128 | 16 | 19807 | 2024-02-22 07:42:24 | 2024-02-22 07:42:24 |
| 2024-02-22 | 30039 | 13 | 18237 | 2024-02-22 07:42:28 | 2024-02-22 07:42:28 |
| 2024-02-22 | 30060 | 10 | 18309 | 2024-02-22 07:42:24 | 2024-02-22 07:42:24 |
| 2024-02-22 | 30246 | 18 | 10855 | 2024-02-22 07:42:24 | 2024-02-22 07:42:24 |
| 2024-02-22 | 30288 | 19 | 12347 | 2024-02-22 07:42:26 | 2024-02-22 07:42:26 |
| 2024-02-22 | 30449 | 17 | 11488 | 2024-02-22 07:42:23 | 2024-02-22 07:42:23 |
+------------+----------+---------+---------+---------------------+---------------------+
10 rows in set (0.05 sec)
三、Unique模型
當用戶有數據更新需求時,可以選擇使用Unique數據模型。Unique模型能夠保證Key的唯一性,當用戶更新一條數據時,新寫入的數據會覆蓋具有相同key的舊數據。
Unique模型提供了兩種實現方式:
- 讀時合并(merge-on-read)。在讀時合并實現中,用戶在進行數據寫入時不會觸發任何數據去重相關的操作,所有數據去重的操作都在查詢或者compaction時進行。因此,讀時合并的寫入性能較好,查詢性能較差,同時內存消耗也較高。
- 寫時合并(merge-on-write)。在1.2版本中,我們引入了寫時合并實現,該實現會在數據寫入階段完成所有數據去重的工作,因此能夠提供非常好的查詢性能。
Unique建表
CREATE TABLE IF NOT EXISTS example_db.example_tbl_unique
(`user_id` LARGEINT NOT NULL COMMENT "用戶id",`username` VARCHAR(50) NOT NULL COMMENT "用戶昵稱",`city` VARCHAR(20) COMMENT "用戶所在城市",`age` SMALLINT COMMENT "用戶年齡",`sex` TINYINT COMMENT "用戶性別",`phone` LARGEINT COMMENT "用戶電話",`address` VARCHAR(500) COMMENT "用戶地址",`register_time` DATETIME COMMENT "用戶注冊時間"
)
UNIQUE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
?這是一個典型的用戶基礎信息表。這類數據沒有聚合需求,只需保證主鍵唯一性。(這里的主鍵為 user_id + username)。
寫時合并
CREATE TABLE IF NOT EXISTS example_db.example_tbl_unique_merge_on_write
(`user_id` LARGEINT NOT NULL COMMENT "用戶id",`username` VARCHAR(50) NOT NULL COMMENT "用戶昵稱",`city` VARCHAR(20) COMMENT "用戶所在城市",`age` SMALLINT COMMENT "用戶年齡",`sex` TINYINT COMMENT "用戶性別",`phone` LARGEINT COMMENT "用戶電話",`address` VARCHAR(500) COMMENT "用戶地址",`register_time` DATETIME COMMENT "用戶注冊時間"
)
UNIQUE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
【注意】
- Unique表的實現方式只能在建表時確定,無法通過schema change進行修改。
- 舊的Merge-on-read的實現無法無縫升級到Merge-on-write的實現(數據組織方式完全不同),如果需要改為使用寫時合并的實現版本,需要手動執行
insert into unique-mow-table select * from source table
.
四、聚合模型【Aggregate】?
CREATE DATABASE IF NOT EXISTS example_db;CREATE TABLE IF NOT EXISTS example_db.example_tbl_agg1
(`user_id` LARGEINT NOT NULL COMMENT "用戶id",`date` DATE NOT NULL COMMENT "數據灌入日期時間",`city` VARCHAR(20) COMMENT "用戶所在城市",`age` SMALLINT COMMENT "用戶年齡",`sex` TINYINT COMMENT "用戶性別",`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用戶最后一次訪問時間",`cost` BIGINT SUM DEFAULT "0" COMMENT "用戶總消費",`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用戶最大停留時間",`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用戶最小停留時間"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
表中的列按照是否設置了?AggregationType
,分為 Key (維度列) 和 Value(指標列)。沒有設置?AggregationType
?的,如?user_id
、date
、age
?... 等稱為?Key,而設置了?AggregationType
?的稱為?Value。
當我們導入數據時,對于 Key 列相同的行會聚合成一行,而 Value 列會按照設置的?AggregationType
?進行聚合。?AggregationType
?目前有以下幾種聚合方式和agg_state:
- SUM:求和,多行的 Value 進行累加。
- REPLACE:替代,下一批數據中的 Value 會替換之前導入過的行中的 Value。
- MAX:保留最大值。
- MIN:保留最小值。
- REPLACE_IF_NOT_NULL:非空值替換。和 REPLACE 的區別在于對于null值,不做替換。
- HLL_UNION:HLL 類型的列的聚合方式,通過 HyperLogLog 算法聚合。
- BITMAP_UNION:BIMTAP 類型的列的聚合方式,進行位圖的并集聚合。
數據導入
insert into example_db.example_tbl_agg1 values
(10000,"2017-10-01","北京",20,0,"2017-10-01 06:00:00",20,10,10),
(10000,"2017-10-01","北京",20,0,"2017-10-01 07:00:00",15,2,2),
(10001,"2017-10-01","北京",30,1,"2017-10-01 17:05:45",2,22,22),
(10002,"2017-10-02","上海",20,1,"2017-10-02 12:59:12",200,5,5),
(10003,"2017-10-02","廣州",32,0,"2017-10-02 11:20:00",30,11,11),
(10004,"2017-10-01","深圳",35,0,"2017-10-01 10:00:15",100,3,3),
(10004,"2017-10-03","深圳",35,0,"2017-10-03 10:20:22",11,6,6);
?查詢數據
?五、數據模型的選擇建議
因為數據模型在建表時就已經u企鵝人,且無法修改。所以,選擇一個何時的數據模型非常重要。
- Aggregate模型可以通過預聚合,極大的降低聚合查詢時所需掃描的數據量和查詢的計算量,非常適合有固定模式的報表查詢場景。但是該模型對count(*)查詢不是很友好。同時因為固定了Value列上的聚合方式,在進行其他類型的聚合查詢時,需要考慮語意正確性。
- Unique模型針對需要唯一主鍵約束的場景,可以保證主鍵唯一性約束。但是無法利用RollUp等預聚合帶來的查詢優勢。對于聚合查詢有較高性能需求的用戶,推薦使用1.2版本加入的寫時合并實現。
- Duplicate適合任意維度的ad-hoc查詢。雖然同樣無法利用預聚合的特性,但是不受聚合模型的約束,可以發揮列存模型的優勢(只讀取相關列,而不需要讀取所有的列)。