實驗介紹
本實驗通過分析普通查詢過程中存在的性能瓶頸點,通過執行計劃的分析找到可能的性能優化點并加以實施,最終達到優化的效果,重點關注分布式關聯相關查詢語句的優化。
實驗目的
了解通過合理定義分布列實現分布式關聯的性能優化。
實驗步驟
步驟1 使用以下語句查詢
Explain analyze select
l_orderkey,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
order by
o_orderdate limit 10;
步驟2 修改分布鍵
觀察查詢語句中,需要對 customer、order 和 lineitem 三張表進行關聯,其中關聯條件有c_custkey = o_custkey 和 l_orderkey = o_orderkey。如果只考慮本查詢的優化,在該列數據沒有顯著傾斜的情況下,優先考慮 customer.c_custkey 和 lineitem.l_orderkey 作為分布鍵,從而減少 DN 數據的重分布。
customer、lineitem 建表語句,并導入數據文件customer.csv、lineitem.csv。
Drop table if exists customer;CREATE TABLE CUSTOMER (
C_CUSTKEY INTEGER NOT NULL,
C_NAME VARCHAR(25) NOT NULL,
C_ADDRESS VARCHAR(400) NOT NULL,
C_NATIONKEY INTEGER NOT NULL,
C_PHONE CHAR(15) NOT NULL,
C_ACCTBAL DECIMAL(15,2) NOT NULL,
C_MKTSEGMENT CHAR(10) NOT NULL,
C_COMMENT VARCHAR(400) NOT NULL
)
DISTRIBUTE BY HASH(c_custkey);Drop table if exists lineitem;CREATE TABLE LINEITEM(
L_ORDERKEY INTEGER NOT NULL,
L_PARTKEY INTEGER NOT NULL,
L_SUPPKEY INTEGER NOT NULL,
L_LINENUMBER INTEGER NOT NULL,
L_QUANTITY DECIMAL(15,2) NOT NULL,
L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
L_DISCOUNT DECIMAL(15,2) NOT NULL,
L_TAX DECIMAL(15,2) NOT NULL,
L_RETURNFLAG CHAR(1) NOT NULL,
L_LINESTATUS CHAR(1) NOT NULL,
L_SHIPDATE DATE NOT NULL,
L_COMMITDATE DATE NOT NULL,
L_RECEIPTDATE DATE NOT NULL,
L_SHIPINSTRUCT CHAR(25) NOT NULL,
L_SHIPMODE CHAR(10) NOT NULL,
L_COMMENT VARCHAR(44) NOT NULL)
DISTRIBUTE BY HASH(l_orderkey);COPY CUSTOMER FROM '/tmp/customer.csv' DELIMITER ',' QUOTE '"' ESCAPE '"' ENCODING 'GBK' CSV;COPY LINEITEM FROM '/tmp/lineitem_.csv' DELIMITER ',' QUOTE '"' ESCAPE '"' ENCODING 'UTF-8' CSV;
步驟3 嘗試使用 o_custkey 作為 orders 的分布鍵,并導入數據文件orders.csv。
Drop table if exists orders; CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey bigint NOT NULL,
o_orderstatus character(1) NOT NULL,
o_totalprice numeric(15,2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority character(15) NOT NULL,
o_clerk character(15) NOT NULL,
o_shippriority bigint NOT NULL,
o_comment character varying(79) NOT NULL
)WITH (orientation=row, compression=no)
DISTRIBUTE BY HASH(o_custkey);COPY ORDERS FROM '/tmp/orders.csv' DELIMITER ',' QUOTE '"' ESCAPE '"' ENCODING 'UTF-8' CSV;
執行步驟一中的查詢語句,設置參數:
set enable_fast_query_shipping = off;
set enable_stream_operator = on;?
該兩個參數為會話級,只在本次會話期間生效。
jiang=# Explain analyze
jiang-# select
jiang-# l_orderkey,
jiang-# o_orderdate,
jiang-# o_shippriority
jiang-# from
jiang-# customer, orders, lineitem
jiang-# where
;jiang-# c_custkey = o_custkey
jiang-# and l_orderkey = o_orderkey
jiang-# and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
jiang-# order by
jiang-# o_orderdate limit 10;id | operation | A-time | A-rows | E-rows | Peak Memory | A-width | E-width | E-costs
----+-----------------------------------------------------+-------------------+--------+--------+-------------+---------+---------+----------1 | -> Limit | 782.531 | 10 | 10 | 2KB | | 16 | 11271.182 | -> Streaming (type: GATHER) | 782.528 | 10 | 10 | 176KB | | 16 | 11271.353 | -> Limit | [753.865,768.892] | 30 | 18 | [2KB,2KB] | | 16 | 11270.604 | -> Sort | [753.861,768.887] | 30 | 18 | [28KB,29KB] | [32,32] | 16 | 11270.605 | -> Hash Join (6,7) | [752.256,767.242] | 25917 | 17 | [9KB,9KB] | | 16 | 11270.516 | -> Seq Scan on lineitem | [85.445,98.007] | 565702 | 564893 | [41KB,41KB] | | 4 | 10763.067 | -> Hash | [594.646,597.109] | 713105 | 10 | [12MB,13MB] | [40,40] | 20 | 28.738 | -> Streaming(type: REDISTRIBUTE) | [373.185,449.521] | 713105 | 10 | [69KB,69KB] | | 20 | 28.739 | -> Hash Join (10,11) | [362.084,403.154] | 713105 | 10 | [9KB,9KB] | | 20 | 28.4410 | -> Seq Scan on customer | [14.668,18.917] | 147100 | 30 | [34KB,35KB] | | 4 | 14.1411 | -> Hash | [199.532,206.435] | 727305 | 10 | [14MB,15MB] | [48,48] | 28 | 14.1812 | -> Seq Scan on orders | [145.599,150.525] | 727305 | 10 | [33KB,33KB] | | 28 | 14.18
(12 rows)Predicate Information (identified by plan id)
----------------------------------------------------------------------------------5 --Hash Join (6,7)Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)6 --Seq Scan on lineitemFilter: (l_shipdate > '1995-03-15'::date), (LLVM Optimized, Jit Execute)Rows Removed by Filter: 4828749 --Hash Join (10,11)Hash Cond: (customer.c_custkey = orders.o_custkey)12 --Seq Scan on ordersFilter: (o_orderdate < '1995-03-15'::date)Rows Removed by Filter: 772695
(10 rows)Memory Information (identified by plan id)
-----------------------------------------------------------------------Coordinator Query Peak Memory:Query Peak Memory: 2MBDatanode:Max Query Peak Memory: 15MBMin Query Peak Memory: 14MB4 --SortSort Method: top-N heapsort Memory: 26kB ~ 26kBSort Method: top-N heapsort Disk: 1024kB ~ 0kB7 --HashMax Buckets: 32768 Max Batches: 1 Max Memory Usage: 13010kBMin Buckets: 32768 Min Batches: 1 Min Memory Usage: 12984kB11 --HashMax Buckets: 32768 Max Batches: 1 Max Memory Usage: 15310kBMin Buckets: 32768 Min Batches: 1 Min Memory Usage: 14980kB
(14 rows)User Define Profiling
---------------------------------------------------------------------------Segment Id: 3 Track name: Datanode build connection(actual time=[0.203, 0.232], calls=[1, 1])Plan Node id: 2 Track name: coordinator get datanode connection(actual time=[0.029, 0.029], calls=[1, 1])Plan Node id: 2 Track name: Coordinator check and update node definition(actual time=[0.001, 0.001], calls=[1, 1])Plan Node id: 2 Track name: Coordinator serialize plan(actual time=[0.904, 0.904], calls=[1, 1])Plan Node id: 2 Track name: Coordinator send query id with sync(actual time=[0.220, 0.220], calls=[1, 1])Plan Node id: 2 Track name: Coordinator send begin command(actual time=[0.001, 0.001], calls=[1, 1])Plan Node id: 2 Track name: Coordinator start transaction and send query(actual time=[0.025, 0.025], calls=[1, 1])Plan Node id: 3 Track name: Datanode start up stream thread(actual time=[0.032, 0.049], calls=[1, 1])
(16 rows)====== Query Summary =====
--------------------------------------------------------------------------------------------Datanode executor start time [dn_6007_6008_6009, dn_6004_6005_6006]: [7.810 ms,9.919 ms]Datanode executor run time [dn_6007_6008_6009, dn_6004_6005_6006]: [753.898 ms,768.921 ms]Datanode executor end time [dn_6007_6008_6009, dn_6001_6002_6003]: [0.082 ms,0.092 ms]Coordinator executor start time: 6.973 msCoordinator executor run time: 782.544 msCoordinator executor end time: 0.031 msPlanner runtime: 0.904 msPlan size: 6167 byteQuery Id: 72902018968525132Total runtime: 789.565 ms
(10 rows)
此時,orders 和 customer 兩表由于關聯列都在各自的分布鍵上,所以可以本地進行關聯,然后其結果集再根據和 lineitem 的關聯條件作為重分布的分布列進行重分布。
步驟4 嘗試使用 o_orderkey 作為 orders 的分布列,并導入數據文件orders.csv。
Drop table if exists orders;CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey bigint NOT NULL,
o_orderstatus character(1) NOT NULL,
o_totalprice numeric(15,2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority character(15) NOT NULL,
o_clerk character(15) NOT NULL,
o_shippriority bigint NOT NULL,
o_comment character varying(79) NOT NULL
)
WITH (orientation=row, compression=no)
DISTRIBUTE BY HASH(o_orderkey);COPY ORDERS FROM '/tmp/orders.csv' DELIMITER ',' QUOTE '"' ESCAPE '"' ENCODING 'UTF-8' CSV;
執行步驟 1 中查詢語句:
jiang=# Explain analyze
jiang-# select
jiang-# l_orderkey,
jiang-# o_orderdate,
jiang-# o_shippriority
jiang-# from
jiang-# customer, orders, lineitem
jiang-# where
jiang-# c_custkey = o_custkey
jiang-# and l_orderkey = o_orderkey
jiang-# and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
jiang-# order by
jiang-# o_orderdate limit 10;id | operation | A-time | A-rows | E-rows | Peak Memory | A-width | E-width | E-costs
----+-----------------------------------------------------+-------------------+--------+--------+---------------+---------+---------+----------1 | -> Limit | 419.741 | 10 | 10 | 2KB | | 16 | 13063.962 | -> Streaming (type: GATHER) | 419.738 | 10 | 10 | 176KB | | 16 | 13064.133 | -> Limit | [415.101,416.042] | 30 | 18 | [2KB,2KB] | | 16 | 13063.384 | -> Sort | [415.097,416.037] | 30 | 18 | [28KB,29KB] | [32,32] | 16 | 13063.385 | -> Hash Join (6,7) | [413.825,414.770] | 25917 | 17 | [9KB,9KB] | | 16 | 13063.296 | -> Seq Scan on customer | [10.589,10.882] | 147100 | 147100 | [34KB,35KB] | | 4 | 1684.337 | -> Hash | [393.875,394.071] | 26500 | 17 | [840KB,840KB] | [44,44] | 24 | 11254.188 | -> Streaming(type: REDISTRIBUTE) | [387.692,387.929] | 26500 | 17 | [70KB,70KB] | | 24 | 11254.189 | -> Hash Join (10,11) | [360.934,376.886] | 26500 | 17 | [10KB,10KB] | | 24 | 11253.6010 | -> Seq Scan on lineitem | [88.832,100.285] | 565702 | 564893 | [41KB,41KB] | | 4 | 10763.0611 | -> Hash | [199.541,202.174] | 727305 | 10 | [15MB,15MB] | [48,48] | 28 | 14.1812 | -> Seq Scan on orders | [145.768,147.778] | 727305 | 10 | [33KB,33KB] | | 28 | 14.18
(12 rows)Predicate Information (identified by plan id)
---------------------------------------------------------------------5 --Hash Join (6,7)Hash Cond: (customer.c_custkey = orders.o_custkey)9 --Hash Join (10,11)Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)10 --Seq Scan on lineitemFilter: (l_shipdate > '1995-03-15'::date), (LLVM Optimized)Rows Removed by Filter: 48287412 --Seq Scan on ordersFilter: (o_orderdate < '1995-03-15'::date)Rows Removed by Filter: 772695
(10 rows)Memory Information (identified by plan id)
-----------------------------------------------------------------------Coordinator Query Peak Memory:Query Peak Memory: 2MBDatanode:Max Query Peak Memory: 1MBMin Query Peak Memory: 1MB4 --SortSort Method: top-N heapsort Memory: 26kB ~ 26kBSort Method: top-N heapsort Disk: 1024kB ~ 0kB7 --HashMax Buckets: 32768 Max Batches: 1 Max Memory Usage: 530kBMin Buckets: 32768 Min Batches: 1 Min Memory Usage: 511kB11 --HashMax Buckets: 32768 Max Batches: 1 Max Memory Usage: 15162kBMin Buckets: 32768 Min Batches: 1 Min Memory Usage: 15137kB
(14 rows)User Define Profiling
---------------------------------------------------------------------------Segment Id: 3 Track name: Datanode build connection(actual time=[0.179, 0.226], calls=[1, 1])Plan Node id: 2 Track name: coordinator get datanode connection(actual time=[0.030, 0.030], calls=[1, 1])Plan Node id: 2 Track name: Coordinator check and update node definition(actual time=[0.001, 0.001], calls=[1, 1])Plan Node id: 2 Track name: Coordinator serialize plan(actual time=[0.881, 0.881], calls=[1, 1])Plan Node id: 2 Track name: Coordinator send query id with sync(actual time=[0.246, 0.246], calls=[1, 1])Plan Node id: 2 Track name: Coordinator send begin command(actual time=[0.001, 0.001], calls=[1, 1])Plan Node id: 2 Track name: Coordinator start transaction and send query(actual time=[0.026, 0.026], calls=[1, 1])Plan Node id: 3 Track name: Datanode start up stream thread(actual time=[0.028, 0.030], calls=[1, 1])
(16 rows)====== Query Summary =====
--------------------------------------------------------------------------------------------Datanode executor start time [dn_6007_6008_6009, dn_6004_6005_6006]: [0.337 ms,0.367 ms]Datanode executor run time [dn_6007_6008_6009, dn_6001_6002_6003]: [415.130 ms,416.059 ms]Datanode executor end time [dn_6001_6002_6003, dn_6004_6005_6006]: [0.069 ms,0.101 ms]Coordinator executor start time: 6.699 msCoordinator executor run time: 419.754 msCoordinator executor end time: 0.031 msPlanner runtime: 0.679 msPlan size: 6325 byteQuery Id: 72902018968532990Total runtime: 426.499 ms
(10 rows)
從 id=8 所在行可以看到,orders 和 customer 表進行關聯時由于分布列不同,customer 選擇了廣播的計劃,再和orders 表進行管理。之后orders+customer 的結果集具有orders 相同的分布列,所以可以和lineitem 在本地進行關聯。
實驗總結
本實驗通過調整數據表分布鍵的方法,對查詢進行了調優,選擇不同的分布鍵對查詢性能會產生一定的影響,主要遵循以下幾個原則:
1)選擇沒有顯著傾斜性的字段作為分布列;
2)盡可能選擇查詢中的關聯列作為表的分布鍵;
3)存在多個滿足上面條件的分布列時,盡可能選擇數據量較少的表進行重分布或廣播,優先滿足大表間的分布鍵統一。
思考題
選擇行存表的分布列的原則有哪些??
答案:
分布列選擇有以下建議:
選擇的分布列字段和分布算法能夠將表數據在均勻分布到各個 DN 節點;
該分布字段在執行 SQL 時經常被用于作為連接字段;
進行數據訪問時最大限度地減少跨 DN 節點數據訪問。