基于Flink SQL的實時指標多維分析模型

數據流程介紹

1.創建源表kafka接入消息隊列數據,定義字段映射規則;
2.創建目標表es_sink配置Elasticsearch輸出;
3.通過多級視圖(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)實現數據清洗、去重、狀態計算;
4.使用ROLLUP進行多維聚合統計;
5.最終計算結果寫入ES,包含成功率等衍生指標。
在這里插入圖片描述

Flink SQL 邏輯

SET table.exec.state.ttl=2592000s; --30 days,默認: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
-- 單位:ms, 10天
--SET table.exec.state.ttl = 864000000CREATE TABLE kafkaTable (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,id AS IF(cur['id'] IS NOT NULL , cur['id'], src ['id']),task_id AS IF(cur['task_id'] IS NOT NULL , cur['task_id'], src ['task_id']),account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type']),retry_status AS IF(cur['retry_status'] IS NOT NULL , cur['retry_status'], src ['retry_status']),update_time as IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']),event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)proctime AS PROCTIME()
--                           WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 'xxx','jdq.client.id' = 'xxx','jdq.password' = 'xxx','jdq.domain' = 'xxx','scan.startup.mode' = 'group-offsets', --  default: group-offsets,other: latest-offset,earliest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果為false,則在發生checkpoint時觸發offset提交'format' = 'binlog');CREATE TABLE es_sink(send_type      STRING,task_id        STRING,month_dim      STRING,day_dim        STRING,grouping_id    INTEGER,init           INTEGER,cancel         INTEGER,succ           INTEGER,fail           INTEGER,cancel_rate    float,succ_rate      float,fail_rate      float,update_date    STRING,PRIMARY KEY (grouping_id,send_type,month_dim,day_dim,task_id) NOT ENFORCED
)with ('connector' = 'elasticsearch-6','index' = 'index01','document-type' = 'type01','hosts' = 'xx','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb');
-- 維度:
--   - send_type, 發送類型
--   - month_dim,月份維度
--   - day_dim,天維度
--   - task_id,任務IDCREATE view  tmp as
selectsend_type,task_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and msg_status='4' then 1 else 0 end AS cancel,case when UPPER(opt) = 'UPDATE' and msg_status='1' then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and msg_status='2' then 1 else 0 end AS fail,update_time,opt,ts,id,proctime,SUBSTRING(publish_time,1,7) as month_dim,SUBSTRING(publish_time,1,10) as day_dim
FROM kafkaTable
where trim(retry_status) = '0'and publish_time >= '2025-01-01 00:00:00'and(    (UPPER(opt) = 'INSERT' and msg_status='0' and position( '_R' in task_id) = 0)or   (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4') and position( '_R' in task_id) = 0)or   (UPPER(opt) = 'UPDATE' and msg_status='1' and position( '_R' in task_id) > 0));--去重模式,去重是指對在列的集合內重復的行進行刪除,只保留第一行或最后一行數據。在聚合sum或count時,Flink回撤流會對數據進行回撤處理
create view tmp_dedup as
select * from(select *,row_number() over(partition by id,msg_status order by proctime desc) as rnfrom tmp) t
where rn=1;CREATE view tmp1 as
selectsend_type,task_id,month_dim,day_dim,init,case when cancel = 1 and update_time <= publish_time then 1 else 0 end AS cancel,succ,case when cancel = 1 and update_time > publish_time then 1 else fail end AS fail,update_time
from tmp_dedup
where position( '_R' in task_id) = 0;CREATE view tmp2 as
selectsend_type,SPLIT_INDEX(task_id,'_R',0) AS task_id,month_dim,day_dim,init,cancel,succ,-1 AS fail,update_time
from tmp_dedup
where position( '_R' in task_id) > 0
and   succ = 1 ;CREATE view tmp3 as
selectsend_type,task_id,month_dim,day_dim,init,cancel,succ,fail
from tmp1
UNION ALL
selectsend_type,task_id,month_dim,day_dim,init,cancel,succ,fail
from tmp2;CREATE view  tmp_groupby as
select
--/*+ STATE_TTL('tmp' = '10d') */COALESCE(send_type,'N') AS send_type,COALESCE(month_dim,'N') AS month_dim,COALESCE(day_dim,'N') AS day_dim,COALESCE(task_id,'N') AS task_id,case when send_type is null and month_dim is null and day_dim is null and task_id is null then 1when send_type is not null and month_dim is null and day_dim is null and task_id is null then 2when send_type is not null and month_dim is not null and day_dim is null and task_id is null then 3when send_type is not null and month_dim is not null and day_dim is not null and task_id is null then 4when send_type is not null and month_dim is not null and day_dim is not null and task_id is not null then 5end grouping_id,sum(init) as init,sum(cancel) as cancel,sum(succ) as succ,sum(fail) as fail
from tmp3
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,month_dim,day_dim,task_id); --等同于以上INSERT INTO es_sink
selectcase when trim(send_type) = '1'  then '發送類型1'when trim(send_type) = '2'  then '發送類型2'else send_type end AS send_type,task_id,month_dim,day_dim,grouping_id,init,cancel,succ,fail,ROUND(cancel*100.0/init,2) AS cancel_rate,ROUND(succ*100.0/(init - cancel),2) AS succ_rate,ROUND(fail*100.0/(init - cancel),2) AS fail_rate,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp_groupby
where init > 0
and (init - cancel) > 0;

es mapping

#POST index01/type01/_mapping
{"type01": {"properties": {"grouping_id": {"type": "byte"},"send_type": {"type": "keyword","ignore_above": 256},"month_dim": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM","ignore_malformed":"true" --忽略錯誤的各式}}},"day_dim": {"type": "keyword","fields": {"text": {"type": "keyword"},"date": {"type": "date","format": "yyyy-MM-dd","ignore_malformed":"true"}}},"task_id": {"type": "keyword"},"init": {"type": "integer"},"cancel": {"type": "integer"},"succ": {"type": "integer"},"fail": {"type": "integer"},"cancel_rate": {"type": "float"},"succ_rate": {"type": "float"},"fail_rate": {"type": "float"},"update_date": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/72230.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/72230.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/72230.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【vscode-01】vscode不同項目不同語言擴展插件隔離方案

vscode不同項目不同語言擴展插件隔離方案 1. 背景2. vscode 擴展插件隔離方案2.1 code-profile 配置文件2.2 配合extensions.json 1. 背景 最近打開vscode 發現越來越卡&#xff0c;這是一個輕量級代碼編輯器&#xff0c;怎么會如此占用內存呢&#xff1f; 我使用了‘code --l…

《基于大數據的營養果蔬推薦系統的設計與實現》開題報告

目錄 一、選題的理論意義現實意義及應用價值 &#xff08;一&#xff09;理論意義 &#xff08;二&#xff09;現實意義 1.用戶價值提升 2.效率提升 3.經濟效益提升 &#xff08;三&#xff09;應用價值 1.提升用戶健康水平 2.優化購物體驗 3.支持健康決策 4.促進農業…

《C#上位機開發從門外到門內》2-4:Modbus協議

文章目錄 一、引言二、Modbus協議概述2.1 Modbus協議的起源與發展2.2 Modbus協議的基本特點2.3 應用領域 三、Modbus通信原理詳解3.1 Modbus RTU原理3.1.1 數據幀結構3.1.2 數據傳輸與時序3.1.3 錯誤檢測 3.2 Modbus TCP原理3.2.1 數據封裝3.2.2 通信機制3.2.3 與RTU模式的區別…

觀成科技:?加密C2框架Platypus流量分析

一、工具介紹 Platypus 是一款支持多會話的交互式反向 Shell 管理器。在實際的滲透測試中&#xff0c;為了解決 Netcat/Socat 等工具在文件傳輸、多會話管理方面的不足,該工具在多會話管理的基礎上增加了在滲透測試中能更好發揮作用的功能&#xff08;如&#xff1a;交互式 Sh…

OpenBMC:BmcWeb 處理http請求

OpenBMC:BmcWeb 讀取http請求頭-CSDN博客 介紹了,在讀取完http頭后,將調用Connection::handle處理http請求 1.Connection::handle void handle() {...req = std::make_shared<crow::Request>(parser->release(), reqEc);...req->session = userSession;accept …

MySQL事務深度解析:ACID特性、隔離級別與MVCC機制

引言 在數據庫系統中&#xff0c;?事務是保障數據一致性與完整性的核心機制。MySQL通過ACID特性、多級隔離策略和MVCC&#xff08;多版本并發控制&#xff09;實現了高性能與高可靠性的平衡。本文將從底層原理出發&#xff0c;系統解析事務的四大特性、隔離級別的實現邏輯&am…

WireShark自動抓包

背景 異常流量檢測是當前保護網絡空間安全的重要檢測方法。 對流量的研究&#xff0c;首先需要在系統中進行抓包&#xff0c;并對包進行分析。 這里對WireShark自動抓包進行簡要介紹。 操作步驟 1、選擇“捕獲”>“選項”。 2、在Input下&#xff0c;選擇要抓包的網絡接…

Android 自定義View之底部導航欄

文章目錄 Android 自定義View之底部導航欄概述代碼定義TabIndex定義Tab定義TabView定義NavigationBarFragmentSwitchHelper管理類使用 源碼下載 Android 自定義View之底部導航欄 概述 封裝一個通用的底部導航欄控件。 代碼 定義TabIndex Retention(AnnotationRetention.SOU…

西門子S7-1200 PLC遠程調試技術方案(巨控GRM532模塊)

三步快速實現遠程調試 硬件部署 準備西門子S7-1200 PLC、巨控GRM552YW-C模塊及編程電腦。GRM552YW-C通過網口與PLC連接&#xff0c;支持4G/5G/Wi-Fi/有線網絡接入&#xff0c;無需復雜布線。 軟件配置 安裝GVCOM3配置軟件&#xff0c;注冊模塊&#xff08;輸入唯一序列號與密…

上下文學習思維鏈COTPrompt工程

一、上下文學習 上下文學習強調在學習過程中考慮問題所處的上下文環境。 1.1 上下文學習的分類 零樣本&#xff08;Zero-Shot&#xff09;上下文學習單樣本&#xff08;One-Shot&#xff09;上下文學習少樣本&#xff08;Few-Shot&#xff09;上下文學習 1.2 示例選擇方法 …

node.js-WebScoket心跳機制(服務器定時發送數據,檢測連接狀態,重連)

1.WebScoket心跳機制是&#xff1f; 基于上一篇文章&#xff0c;WebScoket在瀏覽器和服務器間完成一次握手&#xff0c;兩者間創建持久性連接&#xff0c;并進行雙向數據連接。node.js-node.js作為服務器&#xff0c;前端使用WebSocket&#xff08;單個TCP連接上進行全雙工通訊…

若依RuoYi-Cloud-Plus微服務版(完整版)前后端部署

一.目標 在瀏覽器上成功登錄進入 二.源碼下載 后端源碼&#xff1a;前往Gitee下載頁面(https://gitee.com/dromara/RuoYi-Cloud-Plus)下載解壓到工作目錄。 前端源碼&#xff1a; 前往Gitee下載頁面(https://gitee.com/JavaLionLi/plus-ui)下載解壓到工作目錄。 文檔地址&a…

Nginx 多協議代理功能(Nginx Multi Protocol Proxy Function)

前言 Nginx 作為高性能的反向代理和負載均衡工具&#xff0c;廣泛應用于 HTTP 和 HTTPS 協議的代理。但你知道嗎&#xff1f;Nginx 還可以代理其他協議&#xff0c;比如 TCP 和 UDP&#xff01;這些功能讓它在多協議支持方面表現出色&#xff0c;可以用于數據庫代理、流媒體服…

MistralAI挑戰DeepSeek:開源模型能否顛覆行業巨頭

在2025年&#xff0c;世界移動通信大會的展臺上&#xff0c;MistralAI的創始人ArthurMensch對著鏡頭&#xff0c;露出了溫和的笑容。不過他隨后講出的話&#xff0c;就仿佛一顆重磅炸彈&#xff0c;在AI領域引發了巨大的動蕩——他們即將推出的開源模型&#xff0c;據傳能夠超越…

代碼隨想錄第五十二天| 101.孤島的總面積 102.沉沒孤島 103.水流問題 104.建造最大島嶼

孤島的總面積 題目描述 給定一個由 1&#xff08;陸地&#xff09;和 0&#xff08;水&#xff09;組成的矩陣&#xff0c;島嶼指的是由水平或垂直方向上相鄰的陸地單元格組成的區域&#xff0c;且完全被水域單元格包圍。孤島是那些位于矩陣內部、所有單元格都不接觸邊緣的島…

八叉樹地圖的原理與實現

八叉樹與體素圖 八叉樹地圖 八叉樹地圖是可變分辨率的三維柵格地圖&#xff0c;可以自由調整分辨率&#xff0c;如下所示&#xff1a; 根據點云的數量或密度決定每個葉子方塊是否被占據 體素圖 體素就是固定分辨率的三維柵格地圖&#xff0c;如下所示&#xff1a; 根據點云…

最節省服務器,手搓電子證書查詢系統

用戶預算150元&#xff0c;想要一個最簡單證書查詢系統。前臺能查詢證書、后臺管理員能登錄能修改密碼&#xff0c;證書能夠手動輸入修改刪除、批量導入導出刪除數據、查詢搜索。能夠兼容蘋果、安卓、PC三端瀏覽器&#xff0c;最后幫忙部署到云服務器上。 用戶預算不多&#xf…

什么是全棧?

&#x1f91f;致敬讀者 &#x1f7e9;感謝閱讀&#x1f7e6;笑口常開&#x1f7ea;生日快樂?早點下班 &#x1f4d8;博主相關 &#x1f7e7;博主信息&#x1f7e8;博客首頁&#x1f7eb;專欄推薦&#x1f7e5;活動信息 &#x1f4c3;文章前言 &#x1f537;文章均為學習工…

作物移栽機器人的結構設計的介紹

作物移栽機器人的結構設計是一個復雜的機械與電子結合的系統工程&#xff0c;單純用代碼來實現整個結構設計是不現實的&#xff0c;因為結構設計更多涉及到機械結構、硬件選型等物理層面的內容。不過&#xff0c;我們可以通過代碼來模擬作物移栽機器人的部分功能&#xff0c;例…

【文獻閱讀】SPRec:用自我博弈打破大語言模型推薦的“同質化”困境

&#x1f4dc;研究背景 在如今的信息洪流中&#xff0c;推薦系統已經成為了我們生活中的“貼心小助手”&#xff0c;無論是看電影、聽音樂還是購物&#xff0c;推薦系統都在努力為我們提供個性化的內容。但這些看似貼心的推薦背后&#xff0c;其實隱藏著一個嚴重的問題——同質…