時序數據合并場景加速分析和實現 - 復合索引,窗口分組查詢加速,變態遞歸加速
作者
digoal
日期
2016-11-28
標簽
PostgreSQL , 數據合并 , 時序數據 , 復合索引 , 窗口查詢
背景
在很多場景中,都會有數據合并的需求。
例如記錄了表的變更明細(insert,update,delete),需要合并明細,從明細中快速取到每個PK的最新值。
又比如有很多傳感器,不斷的在上報數據,要快速的取出每個傳感器的最新狀態。
對于這種需求,可以使用窗口查詢,但是如何加速,如何快速的取出批量數據呢?
這個是有優化的門道的。
傳感器例子
假設傳感器數據不斷的上報,用戶需要查詢當前最新的,每個傳感器上報的值。
創建測試表如下,
create unlogged table sort_test(id serial8 primary key, -- 主鍵c2 int, -- 傳感器IDc3 int -- 傳感器值
); 寫入1000萬傳感器測試數據
postgres=# insert into sort_test (c2,c3) select random()*100000, random()*100 from generate_series(1,10000000);
INSERT 0 10000000
查詢語句如下
postgres=# explain (analyze,verbose,timing,costs,buffers) select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------Subquery Scan on t (cost=10001512045.83..10001837045.83 rows=50000 width=16) (actual time=23865.363..44033.984 rows=100001 loops=1)Output: t.id, t.c2, t.c3Filter: (t.rn = 1)Rows Removed by Filter: 9899999Buffers: shared hit=54055, temp read=93801 written=93801-> WindowAgg (cost=10001512045.83..10001712045.83 rows=10000000 width=24) (actual time=23865.351..41708.460 rows=10000000 loops=1)Output: sort_test.id, sort_test.c2, sort_test.c3, row_number() OVER (?)Buffers: shared hit=54055, temp read=93801 written=93801-> Sort (cost=10001512045.83..10001537045.83 rows=10000000 width=16) (actual time=23865.335..31540.089 rows=10000000 loops=1)Output: sort_test.id, sort_test.c2, sort_test.c3Sort Key: sort_test.c2, sort_test.id DESCSort Method: external merge Disk: 254208kBBuffers: shared hit=54055, temp read=93801 written=93801-> Seq Scan on public.sort_test (cost=10000000000.00..10000154055.00 rows=10000000 width=16) (actual time=0.021..1829.135 rows=10000000 loops=1)Output: sort_test.id, sort_test.c2, sort_test.c3Buffers: shared hit=54055Planning time: 0.194 msExecution time: 44110.560 ms
(18 rows)
優化手段,新增復合索引,避免SORT,注意,id需要desc
postgres=# create index sort_test_1 on sort_test(c2,id desc);
CREATE INDEX
優化后的SQL性能
postgres=# explain (analyze,verbose,timing,costs,buffers) select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------Subquery Scan on t (cost=0.43..542565.80 rows=50000 width=16) (actual time=0.048..33844.843 rows=100001 loops=1)Output: t.id, t.c2, t.c3Filter: (t.rn = 1)Rows Removed by Filter: 9899999Buffers: shared hit=10029020 read=1-> WindowAgg (cost=0.43..417564.59 rows=10000097 width=24) (actual time=0.042..30490.662 rows=10000000 loops=1)Output: sort_test.id, sort_test.c2, sort_test.c3, row_number() OVER (?)Buffers: shared hit=10029020 read=1-> Index Scan using sort_test_1 on public.sort_test (cost=0.43..242562.89 rows=10000097 width=16) (actual time=0.030..18347.482 rows=10000000 loops=1)Output: sort_test.id, sort_test.c2, sort_test.c3Buffers: shared hit=10029020 read=1Planning time: 0.216 msExecution time: 33865.321 ms
(13 rows)
如果被取出的數據需要后續的處理,可以使用游標,分批獲取,因為不需要顯示sort,所以分批獲取速度很快,從而加快整個的處理速度。
\timing
begin;
declare c1 cursor for select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
postgres=# fetch 100 from c1;id | c2 | c3
---------+----+-----9962439 | 0 | 939711199 | 1 | 529987709 | 2 | 659995611 | 3 | 349998766 | 4 | 129926693 | 5 | 81....9905064 | 98 | 449991592 | 99 | 99
(100 rows)
Time: 31.408 ms -- 很快就返回
優化前,需要顯示SORT,所以使用游標并不能加速,拿到第一條記錄是在SORT后的。
drop index sort_test_1;begin;
declare c1 cursor for select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;postgres=# fetch 100 from c1;
....
Time: 22524.783 ms -- sort結束后才開始返回,很慢
增量合并數據同步例子
類似Oracle的物化視圖,apply時,對于同一條記錄的update并不需要每次update的中間過程都需要執行,只需要執行最后一次的。
因此,也可以利用類似的操作手段,分組取最后一條,
create extension hstore;create unlogged table sort_test1(id serial8 primary key, -- 主鍵c2 int, -- 目標表PKc3 text, -- insert or update or deletec4 hstore -- row
); create index idx_sort_test1_1 on sort_test1(c2,id desc);select c2,c3,c4 from (select c2,c3,c4,row_number() over(partition by c2 order by id desc) rn from sort_test1) t where rn=1;postgres=# explain select c2,c3,c4 from (select c2,c3,c4,row_number() over(partition by c2 order by id desc) rn from sort_test1) t where rn=1;QUERY PLAN
---------------------------------------------------------------------------------------------------Subquery Scan on t (cost=0.15..46.25 rows=4 width=68)Filter: (t.rn = 1)-> WindowAgg (cost=0.15..36.50 rows=780 width=84)-> Index Scan using idx_sort_test1_1 on sort_test1 (cost=0.15..22.85 rows=780 width=76)
(4 rows)
稀疏列的變態優化方法
我們看到前面的優化手段,其實只是消除了SORT,并沒有消除掃描的BLOCK數。
如果分組很少時,即稀疏列,還有一種更變態的優化方法,遞歸查詢。
優化方法與這篇文檔類似,
《distinct xx和count(distinct xx)的變態遞歸優化方法》
例子
create type r as (c2 int, c3 int);postgres=# explain (analyze,verbose,timing,costs,buffers) with recursive skip as ( ( select (c2,c3)::r as r from sort_test where id in (select id from sort_test where c2 is not null order by c2,id desc limit 1) ) union all ( select (select (c2,c3)::r as r from sort_test where id in (select id from sort_test t where t.c2>(s.r).c2 and t.c2 is not null order by c2,id desc limit 1) ) from skip s where (s.r).c2 is not null) -- 這里的where (s.r).c2 is not null 一定要加, 否則就死循環了.
)
select (t.r).c2, (t.r).c3 from skip t where t.* is not null; QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------CTE Scan on skip t (cost=302.97..304.99 rows=100 width=8) (actual time=0.077..4184.770 rows=100001 loops=1)Output: (t.r).c2, (t.r).c3Filter: (t.* IS NOT NULL)Rows Removed by Filter: 1Buffers: shared hit=800947, temp written=476CTE skip-> Recursive Union (cost=0.91..302.97 rows=101 width=32) (actual time=0.066..3970.580 rows=100002 loops=1)Buffers: shared hit=800947-> Nested Loop (cost=0.91..2.95 rows=1 width=32) (actual time=0.064..0.066 rows=1 loops=1)Output: ROW(sort_test_1.c2, sort_test_1.c3)::rBuffers: shared hit=8-> HashAggregate (cost=0.47..0.48 rows=1 width=8) (actual time=0.044..0.044 rows=1 loops=1)Output: sort_test_2.idGroup Key: sort_test_2.idBuffers: shared hit=4-> Limit (cost=0.43..0.46 rows=1 width=12) (actual time=0.036..0.036 rows=1 loops=1)Output: sort_test_2.id, sort_test_2.c2Buffers: shared hit=4-> Index Only Scan using sort_test_1 on public.sort_test sort_test_2 (cost=0.43..267561.43 rows=10000000 width=12) (actual time=0.034..0.034 rows=1 loops=1)Output: sort_test_2.id, sort_test_2.c2Index Cond: (sort_test_2.c2 IS NOT NULL)Heap Fetches: 1Buffers: shared hit=4-> Index Scan using sort_test_pkey on public.sort_test sort_test_1 (cost=0.43..2.45 rows=1 width=16) (actual time=0.011..0.012 rows=1 loops=1)Output: sort_test_1.id, sort_test_1.c2, sort_test_1.c3Index Cond: (sort_test_1.id = sort_test_2.id)Buffers: shared hit=4-> WorkTable Scan on skip s (cost=0.00..29.80 rows=10 width=32) (actual time=0.037..0.038 rows=1 loops=100002)Output: (SubPlan 1)Filter: ((s.r).c2 IS NOT NULL)Rows Removed by Filter: 0Buffers: shared hit=800939SubPlan 1-> Nested Loop (cost=0.92..2.96 rows=1 width=32) (actual time=0.034..0.035 rows=1 loops=100001)Output: ROW(sort_test.c2, sort_test.c3)::rBuffers: shared hit=800939-> HashAggregate (cost=0.49..0.50 rows=1 width=8) (actual time=0.023..0.023 rows=1 loops=100001)Output: t_1.idGroup Key: t_1.idBuffers: shared hit=400401-> Limit (cost=0.43..0.48 rows=1 width=12) (actual time=0.021..0.021 rows=1 loops=100001)Output: t_1.id, t_1.c2Buffers: shared hit=400401-> Index Only Scan using sort_test_1 on public.sort_test t_1 (cost=0.43..133557.76 rows=3333333 width=12) (actual time=0.019..0.019 rows=1 loops=100001)Output: t_1.id, t_1.c2Index Cond: ((t_1.c2 > (s.r).c2) AND (t_1.c2 IS NOT NULL))Heap Fetches: 100000Buffers: shared hit=400401-> Index Scan using sort_test_pkey on public.sort_test (cost=0.43..2.45 rows=1 width=16) (actual time=0.006..0.007 rows=1 loops=100000)Output: sort_test.id, sort_test.c2, sort_test.c3Index Cond: (sort_test.id = t_1.id)Buffers: shared hit=400538Planning time: 0.970 msExecution time: 4209.026 ms
(54 rows)
依舊支持快速的FETCH
postgres=# begin;
BEGIN
Time: 0.079 ms
postgres=# declare cur cursor for with recursive skip as ( ( select (c2,c3)::r as r from sort_test where id in (select id from sort_test where c2 is not null order by c2,id desc limit 1) ) union all ( select (select (c2,c3)::r as r from sort_test where id in (select id from sort_test t where t.c2>(s.r).c2 and t.c2 is not null order by c2,id desc limit 1) ) from skip s where (s.r).c2 is not null) -- 這里的where (s.r).c2 is not null 一定要加, 否則就死循環了.
)
select (t.r).c2, (t.r).c3 from skip t where t.* is not null;
DECLARE CURSOR
Time: 1.240 ms
postgres=# fetch 100 from cur;r
----------(0,93)(1,52)(2,65)
.....(97,78)(98,44)(99,99)
(100 rows)Time: 4.314 ms
使用變態的遞歸優化,性能提升了10倍,僅僅花了4秒,完成了1000萬記錄的篩選。