動態過濾器
動態過濾器能夠實時過濾數據流,并允許定義傳入數據必須滿足的條件才能進行處理。
動態過濾器demo
CREATE TABLE sales(id int ,profit_margin double ,PRIMARY KEY (id)
);CREATE TABLE products(product_name string ,product_profit double);--返回products中利潤率大于sales表中記錄的最大利潤率的所有的product_name
CREATE MATERIALIZED VIEW product_profit_v AS
WITH max_profit AS (
SELECT max(profit_margin) max FROM sales
)
SELECT product_name FROM products, max_profit
WHERE product_profit > max;--sink
CREATE SINK product_profit_v_sink FROM product_profit_v
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='product_profit_v_sink_t'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);
測試數據
INSERT INTO sales values(1,10);
INSERT INTO sales values(2,20);
INSERT INTO sales values(3,30);INSERT INTO sales values(2,8);
INSERT INTO sales values(1,8);INSERT INTO products values('a',10);
INSERT INTO products values('b',15);
消費
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t product_profit_v_sink_t -C -J
時間過濾器
時間過濾器允許根據特定時間(例如當前時間、特定日期或日期范圍)過濾數據
CREATE TABLE t_minutes(id integer ,minute timestamp) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
alter database dev set timezone='PRC'; -- dbname為數據庫名稱
show timezone;
--select pg_typeof(now());--查看數據類型--用法一:刪除并清理過期
--篩選出一周之內的數據
SELECT * FROM t_minutes where minute > NOW() - INTERVAL '1 day';--用法二:延遲表變更
--以minute字段為基準延遲一分鐘輸出
CREATE MATERIALIZED VIEW timer_t_minutes AS
SELECT * FROM t_minutes where minute + INTERVAL '1 minute' <= now(); --在這里需要注意時區務必要對其--sink
CREATE SINK timer_t_minutes_sink FROM timer_t_minutes
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='timer_t_minutes_sink_t'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);
測試數據
INSERT INTO t_minutes values(2,'2020-01-01 00:00:00'::TIMESTAMP);
INSERT INTO t_minutes values(2,'2023-01-01 00:00:00'::TIMESTAMP);
INSERT INTO t_minutes values(3,cast(now() as timestamp) - INTERVAL '1 hour');
INSERT INTO t_minutes values(4,cast(now() as timestamp) + INTERVAL '1 minute');
INSERT INTO t_minutes values(5,cast(now() as timestamp));
消費
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t timer_t_minutes_sink_t -C
參考:
Dynamic filters
Temporal filters