Flink Sql 按分鐘或日期統計數據量

一、環境版本

環境版本
Flink1.17.0
Kafka2.12
MySQL5.7.33

【注意】Flink 1.13版本增加Cumulate Window,之前版本Flink Sql 沒有 Trigger 功能,長時間的窗口不能在中途觸發計算,輸出中間結果。比如每 10S 更新一次截止到當前的pv、uv。只能用Trigger配合State實現,可參考如下實現方式:
Flink DataStream 按分鐘或日期統計數據量

二、MySQL建表腳本

create table user_log
(id      int auto_increment comment '主鍵'primary key,uid     int    not null comment '用戶id',event   int    not null comment '用戶行為',logtime bigint null comment '日志時間'
)comment '用戶日志表,作為驗證數據源';

三、用戶日志類

新建maven項目

用以定義Kafka和MySQL中Schema

/*** 用戶日志類*/
@Data
public class UserLog {//用戶uidprivate int uid;//用戶行為private int event;//日志時間private Date logtime;//獲取日期,用于按日期統計數據public String getFormatDate() {return DateUtil.format(logtime, "yyyyMMdd");}//獲取時間,精確到分鐘public String getFormatTime() {return DateUtil.format(logtime, "yyyy-MM-dd HH:mm") + ":00";}
}
}

四、用戶數據生成器

/*** 用戶數據生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.獲取執行環境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定義數據生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 實現類new GeneratorFunction<Long, UserLog>(){// 定義隨機數數據生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//隨機生成用戶uiduserLog.setUid(generator.nextInt(1, 50));//隨機生成用戶行為userLog.setEvent(generator.nextInt(1, 2));//隨機生成用戶數據時間userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定輸出數據的總行數
//                60 * 60 * 10,1200,// 指定每秒發射的記錄數RateLimiterStrategy.perSecond(10),// 指定返回值類型, 將Java的StockPrice封裝成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//輸出生成數據
//        dataGeneratorSourceStream.print();//kafka數據寫入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL數據寫入,用以數據驗證SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://192.168.31.116:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}

五、Sql按分鐘或日期統計PV和UV

public class UserLogSql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);env.setParallelism(1);// 創建一個輸入表SourceTableString sourceDDL = "create table user_log\n" +"(\n" +"    uid  INT\n" +"    , event INT\n" +"    , logtime BIGINT\n" +"    , rowtime AS TO_TIMESTAMP_LTZ(logtime, 3)\n" +"    , WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND\n" +") with (\n" +"      'connector' = 'kafka'\n" +"      ,'topic' = 'userLog'\n" +"      ,'properties.bootstrap.servers' = 'hadoop01:9092'\n" +"      ,'scan.startup.mode' = 'latest-offset'\n" +"      ,'format' = 'json'\n" +");";tableEnv.executeSql(sourceDDL);// 統計每分鐘PV和UVString result = "select\n" +" date_format(window_start, 'yyyy-MM-dd') cal_day\n" +" , date_format(window_start, 'HH:mm:ss') start_time\n" +" , date_format(window_end, 'HH:mm:ss') end_time\n" +" , count(uid) pv\n" +" , count(distinct uid) uv\n" +"FROM TABLE(\n" +// 每隔10秒觸發一次計算,窗口大小為1天
//                "    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))\n" +// 每隔10秒觸發一次計算,窗口大小為10秒"    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '10' SECOND))\n" +"  GROUP BY window_start, window_end\n" +";";// 輸出sql執行結果tableEnv.executeSql(result).print();}
}

六、sql-client方式執行Sql

# 建表語句
create table user_log
(uid  INT,event INT,logtime BIGINT,rowtime AS TO_TIMESTAMP_LTZ(logtime, 3) ,WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) with ('connector' = 'kafka','topic' = 'userLog''properties.bootstrap.servers' = 'hadoop01:9092','scan.startup.mode' = 'latest-offset','format' = 'json',
);# pv、uv計算語句, 每隔10秒觸發一次計算,窗口大小為1天
selectdate_format(window_start, 'yyyy-MM-dd') cal_day,date_format(window_start, 'HH:mm:ss') start_time,date_format(window_end, 'HH:mm:ss') end_time,count(uid) pv,count(distinct uid) uv
FROM TABLE(CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))GROUP BY window_start, window_end;

七、數據驗證

  1. 啟動 UserLogGenerator
  2. 啟動 UserLogSql或在sql-client執行Sql
  3. 在MySQL中驗證查詢

轉換時間戳

時間戳轉換前轉換后
w_start2025-08-16 14:45:401755326740000
w_end2025-08-16 14:45:501755326750000
select count(distinct uid) from user_log where logtime< 1755326750000 and logtime>=1755326740000;
# 與MySql中輸出一致SQL Query Result (Table)                                                               Refresh: 1 s                                                      Page: Last of 1                                              Updated: 23:50:09.972 cal_day                     start_time                       end_time                   pv                   uv2025-08-15                       23:45:30                       23:45:40                   15                   152025-08-15                       23:45:40                       23:45:50                  101                   452025-08-15                       23:45:50                       23:46:00                  104                   422025-08-15                       23:46:00                       23:46:10                  100                   422025-08-15                       23:46:10                       23:46:20                   97                   452025-08-15                       23:46:20                       23:46:30                  104                   402025-08-15                       23:46:30                       23:46:40                   97                   422025-08-15                       23:46:40                       23:46:50                   99                   442025-08-15                       23:46:50                       23:47:00                  103                   442025-08-15                       23:47:00                       23:47:10                   97                   442025-08-15                       23:47:10                       23:47:20                  100                   43

八、常見問題

  1. sql-client執行查詢,缺少kafka包
# 運行SQL命令
Flink SQL> select * from user_log;
# 報錯
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

解決方法

# 下載flink對應版本的kafka包,放到flink的lib目錄下
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.0/flink-sql-connector-kafka-1.17.0.jar -P ${FLINK_HOME}/lib/

九、參考鳴謝

Flink 實時統計歷史 pv、uv
Flink Cumulate Window

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/93347.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/93347.shtml
英文地址,請注明出處:http://en.pswp.cn/web/93347.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

LeetCode 2460.對數組執行操作

給你一個下標從 0 開始的數組 nums &#xff0c;數組大小為 n &#xff0c;且由 非負 整數組成。 你需要對數組執行 n - 1 步操作&#xff0c;其中第 i 步操作&#xff08;從 0 開始計數&#xff09;要求對 nums 中第 i 個元素執行下述指令&#xff1a; 如果 nums[i] nums[i …

深入解析 @nestjs/typeorm的 forRoot 與 forFeature

nestjs/typeorm 是 NestJS 與 TypeORM 集成的官方模塊&#xff0c;提供了 forRoot() 和 forFeature() 兩個核心靜態方法用于配置數據庫連接和實體注冊。本文將深入解析這兩個方法的機制、使用場景和最佳實踐。 一、TypeOrmModule.forRoot() - 全局數據庫配置 forRoot() 方法用于…

關于simplifyweibo_4_moods數據集的分類問題

本來打算用情感分類數據集拿Transformer模型來練練手&#xff0c;發現訓練效果并不好。當我分析了這個數據集的標簽后發現問題了&#xff1a; 查看標簽的分布&#xff1a; import pandas as pd# 先直接讀取數據&#xff0c;不進行后續處理 data_file ~/data/simplifyweibo_4_m…

Custom SRP - Baked Light

https://catlikecoding.com/unity/tutorials/custom-srp/baked-light/本篇教程介紹將靜態光照烘焙到 light map 和 light prob 中.首先貼上我遇到的問題,希望遇到的同學幫忙解答:實踐本教程過程中,定義的 MetaPass 沒有效果, Unity 始終在使用默認的 meta pass,我使用的是 unit…

[Python]PTA:實驗2-3-1-for 求1到100的和

本題要求編寫程序&#xff0c;計算表達式 1 2 3 ... 100 的值。輸入格式&#xff1a;本題無輸入。輸出格式&#xff1a;按照以下格式輸出&#xff1a;sum 累加和代碼如下&#xff1a;x0 for i in range(1,101,1):xi print("sum {}".format(x))

【解決筆記】MyBatis-Plus 中無 selectList 方法

MyBatis-Plus 中無 selectList 方法的解決筆記 核心前提 MyBatis-Plus 的 BaseMapper 接口內置了 selectList 等基礎查詢方法&#xff0c;繼承該接口可直接使用&#xff0c;無需手動實現。 無 selectList 方法的兩種情況及解決方式 1. 未繼承 BaseMapper&#xff08;推薦方案&a…

一周學會Matplotlib3 Python 數據可視化-繪制箱線圖(Box)

鋒哥原創的Matplotlib3 Python數據可視化視頻教程&#xff1a; 2026版 Matplotlib3 Python 數據可視化 視頻教程(無廢話版) 玩命更新中~_嗶哩嗶哩_bilibili 課程介紹 本課程講解利用python進行數據可視化 科研繪圖-Matplotlib&#xff0c;學習Matplotlib圖形參數基本設置&…

4.4 vue3生命周期函數

vue3生命周期函數生命周期鉤子名稱對比表階段Vue 2 選項式 APIVue 3 組合式 API說明創建前beforeCreateonBeforeCreate&#xff08;已廢棄&#xff09;Vue 3 中 setup() 替代創建完成createdsetup()&#xff08;替代&#xff09;setup 是入口&#xff0c;代替 beforeCreate 和 …

無腦整合springboot2.7+nacos2.2.3+dubbo3.2.9實現遠程調用及配置中心

簡介&#xff1a; 好久沒有寫博客了&#xff0c;最近辭職了有時間進行一次分享&#xff0c;今天我們主要是使用單體服務springboot整合nacos實現配置中心&#xff0c;然后整合dubbo來實現遠程的rpc調用。如下是本地案例架構圖&#xff0c;生產者和消費者的配置在nacos配置中心上…

騰訊位置商業授權微信小程序逆地址解析(坐標位置描述)

微信小程序JavaScript SDK 開發指南 逆地址解析(坐標位置描述) reverseGeocoder(options:Object) 本接口提供由坐標到坐標所在位置的文字描述的轉換&#xff0c;輸入坐標返回地理位置信息和附近poi列表。 注&#xff1a;坐標系采用gcj02坐標系 options屬性說明 屬性類型必填…

3D商品展示:技術狂歡下的普及困局

當微軟推出Copilot 3D——僅需一張照片即可生成可編輯的3D模型時&#xff0c;業界曾歡呼“建模門檻徹底消失”。然而技術的美好愿景卻撞上現實的銅墻鐵壁&#xff1a;當前電商平臺3D商品加載卡頓導致用戶跳出率超60%&#xff0c;企業3D化滲透率仍不足34%。絢爛的技術煙花下&…

(Arxiv-2025)Stand-In:一種輕量化、即插即用的身份控制方法用于視頻生成

Stand-In&#xff1a;一種輕量化、即插即用的身份控制方法用于視頻生成 paper是WeChat發布在Arxiv 2025的工作 paper title:Stand-In: A Lightweight and Plug-and-Play Identity Control for Video Generation Code&#xff1a;鏈接 圖1&#xff1a;給定一張參考圖像&#xff…

數據科學與爬蟲技術學習筆記

數據科學與爬蟲技術學習筆記 一、數據科學基礎庫 1. NumPy&#xff1a;數值計算的基石 NumPy 是 Python 科學計算的核心庫&#xff0c;專為數組和矩陣操作設計&#xff0c;能大幅簡化循環操作&#xff0c;提供豐富的數學函數。 核心優勢&#xff1a;高效處理同類型元素的多維…

學習嵌入式之硬件——I2C

一、I2C1.定義內部集成電路的簡稱&#xff0c;半雙工串行同步通信&#xff0c;是芯片和芯片之間的通信方式&#xff1b;通常只有一個主機&#xff0c;多個從機&#xff0c;采用主從應答的方式上圖所示是IIC的總線的使用場景&#xff0c;所有掛載在IIC總線上的設備都有兩根信號線…

使用websockt

封裝websocktHooksimport { ref, onMounted, onUnmounted } from vue;/*** webSocket的Hooks* param {string} websocket鏈接地址* */ export function useWebSocket(url: string) {// 核心狀態 const data: Ref<any> ref(null);//收到websocket返回的數據const socke…

Jmeter自定義腳本

目錄 log&#xff1a;輸出類 Label&#xff1a;你自定義的組件的名稱 FileName&#xff1a;添加的腳本文件的文件名 Parameters&#xff1a;你傳入的參數&#xff0c;是一個字符串 args&#xff1a;你傳入的參數&#xff0c;是一個數組 Parameters和args的異同&#xff1…

飛算 JavaAI 電商零售場景實踐:從訂單峰值到供應鏈協同的全鏈路技術革新

目錄 一、電商核心場景的技術攻堅 1.1 分布式訂單系統的事務一致性設計 1.1.1 TCC 模式下的訂單創建流程 1.1.2 訂單狀態機的可靠流轉 1.2 高并發秒殺系統的架構設計 1.2.1 多級限流與流量削峰 1.2.2 庫存防超賣機制 1.3 智能推薦與用戶行為分析 1.3.1 用戶行為實時采…

51單片機-51單片機介紹

51單片機介紹單片機簡介什么是單片機呢&#xff1f;單片機是一種集成電路芯片&#xff0c;采用超大規模集成電路技術將中央處理器&#xff08;CPU&#xff09;、隨機存儲器&#xff08;RAM&#xff09;、只讀存儲器&#xff08;ROM&#xff09;、多種I/O口、中斷系統、定時器/計…

8月AI面試工具測評:破解規模化招聘難題

金秋校招臨近&#xff0c;企業面臨“百萬簡歷涌入VS面試官團隊告急”的典型困境。傳統線下面試效率低下、標準參差&#xff0c;難以應對短時間內爆發式的人才篩選需求。AI面試工具憑借自動化與智能化特性成為破局關鍵&#xff0c;但市面上產品良莠不齊——究竟哪款能兼顧效率與…

Debian新一代的APT軟件源配置文件格式DEB822詳解

Debian 的 DEB822 格式詳解&#xff1a;新一代 APT 源配置 DEB822 是一種基于 RFC 822 數據格式的配置文件語法&#xff0c;Debian 新一代的 APT 軟件源配置文件格式就采用了 DEB822。DEB822 格式從 Debian 11 (Bullseye) 開始被引入&#xff0c;并在 Debian 12 (Bookworm) 中成…