Flink TableAPI 按分鐘統計數據量

一、環境版本

環境版本
Flink1.17.0
Kafka2.12
MySQL5.7.33

二、MySQL建表腳本

create table user_log
(id      int auto_increment comment '主鍵'primary key,uid     int    not null comment '用戶id',event   int    not null comment '用戶行為',logtime bigint null comment '日志時間'
)comment '用戶日志表,作為驗證數據源';

三、用戶日志類

新建maven項目

用以定義Kafka和MySQL中Schema

/*** 用戶日志類*/
@Data
public class UserLog {//用戶uidprivate int uid;//用戶行為private int event;//日志時間private Date logtime;
}

四、用戶數據生成器

/*** 用戶數據生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.獲取執行環境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定義數據生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 實現類new GeneratorFunction<Long, UserLog>(){// 定義隨機數數據生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//隨機生成用戶uiduserLog.setUid(generator.nextInt(1, 100000));//隨機生成用戶行為userLog.setEvent(generator.nextInt(1, 2));//隨機生成用戶數據時間userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定輸出數據的總行數60 * 60 * 10,// 指定每秒發射的記錄數RateLimiterStrategy.perSecond(10),// 指定返回值類型, 將Java的StockPrice封裝成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//輸出生成數據
//        dataGeneratorSourceStream.print();//kafka數據寫入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL數據寫入,用以數據驗證SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("你的用戶名").withPassword("你的密碼").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}

五、TableAPI 10秒鐘內用戶的訪問量

/*** 10秒鐘內用戶的訪問量*/
public class UserLogCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);env.setParallelism(1);//1.定義table的schemafinal Schema schema = Schema.newBuilder().column("uid", DataTypes.INT()).column("event", DataTypes.INT()).column("logtime", DataTypes.BIGINT())//將logtime轉換為flink使用的timsstamp格式.columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(logtime, 3)")//定義水位線.watermark("rowtime", "rowtime - INTERVAL '5' SECOND").build();//2.創建Kafka source tabletableEnv.createTable("user_log", TableDescriptor.forConnector("kafka").schema(schema).format("json")
//                .option("json.timestamp-format.standard", "ISO-8601").option("json.ignore-parse-errors", "true").option("topic", "userLog").option("properties.bootstrap.servers", "hadoop01:9092").option("scan.startup.mode", "latest-offset").build());//3.創建一個滾動窗口表Table pvTable = tableEnv.from("user_log")//定義一個10秒鐘的滾動窗口.window(Tumble.over(lit(10).seconds()).on($("rowtime")).as("w")).groupBy($("w")).select($("w").start().as("w_start"),$("w").end().as("w_end"),//$("uid").count().distinct().as("uv")),$("uid").count().as("pv"));pvTable.execute().print();}
}

六、數據驗證

  1. 啟動 UserLogGenerator
  2. 啟動 UserLogCount
+----+-------------------------+-------------------------+----------------------+
| op |                 w_start |                   w_end |                   pv |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-08-11 15:11:50.000 | 2025-08-11 15:12:00.000 |                   10 |
| +I | 2025-08-11 15:12:00.000 | 2025-08-11 15:12:10.000 |                   95 |
| +I | 2025-08-11 15:12:10.000 | 2025-08-11 15:12:20.000 |                  104 |
| +I | 2025-08-11 15:12:20.000 | 2025-08-11 15:12:30.000 |                  104 |
| +I | 2025-08-11 15:12:30.000 | 2025-08-11 15:12:40.000 |                   94 |
| +I | 2025-08-11 15:12:40.000 | 2025-08-11 15:12:50.000 |                  104 |
| +I | 2025-08-11 15:12:50.000 | 2025-08-11 15:13:00.000 |                   96 |
| +I | 2025-08-11 15:13:00.000 | 2025-08-11 15:13:10.000 |                  100 |
  1. 在MySQL中驗證查詢

選取數據

+----+-------------------------+-------------------------+----------------------+
| op |                 w_start |                   w_end |                   pv |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-08-11 15:12:50.000 | 2025-08-11 15:13:00.000 |                   96 

轉換時間戳

時間戳轉換前轉換后
w_start2025-08-11 15:12:50.0001754896370000
w_end2025-08-11 15:13:00.0001754896380000

MySQL中查詢

# 輸出96與Flink結果一致
select count(*) 
from user_log 
where logtime>= 1754896370000 and logtime < 1754896380000;

七、POM文件

<project><groupId>dblab</groupId><artifactId>demo</artifactId><modelVersion>4.0.0</modelVersion><name> </name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>central-repos</id><name>Central Repository</name><url>http://repo.maven.apache.org/maven2</url></repository><repository><id>alimaven</id><name>aliyun maven</name><url>https://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-connector-files</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-connector-files</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency>
<!--    <dependency>-->
<!--      <groupId>org.apache.flink</groupId>-->
<!--      <artifactId>flink-csv</artifactId>-->
<!--      <version>${flink.version}</version>-->
<!--    </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.39</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

八、常見問題

8.1 未定義水位線

Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:327)at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:307)at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:300)at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:265)at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:262)at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:641)at UserLogCount.main(UserLogCount.java:42)

當TableAPI中未定義水位線時,會導致Flink無法識別窗口的時間戳

//定義水位線
.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/93295.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/93295.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/93295.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

18.13 《3倍效率提升!Hugging Face datasets.map高級技巧實戰指南》

3倍效率提升!Hugging Face datasets.map高級技巧實戰指南 實戰項目:使用 datasets.map 進行高級數據處理 在大模型訓練過程中,數據預處理的質量直接決定了模型最終的表現。Hugging Face Datasets 庫提供的 datasets.map 方法是處理復雜數據場景的瑞士軍刀,本章將深入解析…

實體店獲客新引擎:數據大集網如何破解傳統門店引流難題

在商業競爭日益激烈的當下&#xff0c;實體店的生存與發展正面臨前所未有的挑戰。無論是街邊的小型便利店&#xff0c;還是大型購物中心的連鎖品牌&#xff0c;都在為"如何吸引顧客進店"而絞盡腦汁。傳統廣告投放效果不佳、線下流量持續萎縮、客戶轉化率難以提升………

LeetCode 分類刷題:2302. 統計得分小于 K 的子數組數目

題目一個數組的 分數 定義為數組之和 乘以 數組的長度。比方說&#xff0c;[1, 2, 3, 4, 5] 的分數為 (1 2 3 4 5) * 5 75 。給你一個正整數數組 nums 和一個整數 k &#xff0c;請你返回 nums 中分數 嚴格小于 k 的 非空整數子數組數目。子數組 是數組中的一個連續元素序…

TDengine IDMP 基本功能(1.界面布局和操作)

UI 布局和操作說明 TDengine IDMP 的用戶界面&#xff08;UI&#xff09;設計旨在提供直觀、易用的操作體驗。下面介紹 UI 的主要區域和典型操作&#xff1a; 主要區域 IDMP 的用戶界面是完全基于瀏覽器的。登錄后的典型 UI 界面具有幾個區域&#xff1a; 主菜單&#xff1a;AI…

QT(概述、基礎函數、界面類、信號和槽)

一、概述1、QTQT是一個c的第三方庫&#xff0c;是專門用來進行界面編程的一個庫 1. QT本身實現了多種軟件&#xff1a; 2. ubuntu系統中所有界面都是QT做的 3. 最新版本的QQ也是QT做的 4. 嵌入式編程中&#xff0c;幾乎所有的上位機&#xff0c;都可以使用QT來做 QT本身除了實現…

【從零開始java學習|第六篇】運算符的使用與注意事項

目錄 一、算術運算符 1. 基本算術運算符&#xff08;二元&#xff09; 2. 自增 / 自減運算符&#xff08;一元&#xff09; 二、類型轉換&#xff08;隱式與強制&#xff09; 1. 隱式轉換&#xff08;自動類型轉換&#xff09; ?編輯 2. 強制轉換&#xff08;顯式類型轉…

shellgpt

一、介紹 官網&#xff1a;https://github.com/TheR1D/shell_gpt ShellGPT&#xff08;shell_gpt&#xff09; 是一款把 GPT 系列大模型能力直接搬到終端 的開源命令行生產力工具。用日常英語或中文描述需求&#xff0c;就能幫你 生成、解釋甚至自動執行 Shell 命令&#xff…

geoserver sql視圖調用Postgis自定義函數問題記錄

一、問題描述&#xff1a;geoserver sql視圖調用Postgis自定義函數對點圖層增加一條記錄時&#xff0c;返回結果主鍵自增ID加了2&#xff0c;但表中數據只增加一條記錄。 但在pgAdmin中直接寫SQL調用Postgis自定義函數對點圖層增加一條記錄時&#xff0c;返回結果主鍵自增ID只加…

#T1224. 最大子矩陣

題目傳送 題目描述 已知矩陣的大小定義為矩陣中所有元素的和。給定一個矩陣&#xff0c;你的任務是找到最大的非空(大小至少是11)子矩陣。 比如&#xff0c;如下44的矩陣 0 -2 -7 09 2 -6 2 -4 1 -4 1-1 8 0 -2的最大子矩陣是 9 2-4 1-1 8這…

2025年大模型安全崗的面試匯總(題目+回答)

安全領域各種資源&#xff0c;學習文檔&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各種好玩的項目及好用的工具&#xff0c;歡迎關注。 目錄 1. Transformer核心機制及其對LLM突破的基石作用 2. LLM能力邊界評估框架設計 3. 模型層級安全風險分析 …

《關于省級政務云服務費支出預算標準的規定》豫財預〔2024〕106號解讀

《關于省級政務云服務費支出預算標準的規定》豫財預〔2024〕106號文件由河南省財政廳編制經省政府同意后于2024年12月3日印發執行&#xff0c;規定作為省級政務云服務費支出預算編制和審核的依據&#xff0c;旨在加強省級部門預算管理&#xff0c;規范政務云服務費支出預算編制…

使用HalconDotNet實現異步多相機采集與實時處理

文章目錄 一、核心功能與原理 功能目標: 工作原理: 關鍵機制: 二、完整C#實現代碼 三、關鍵實現解析 1. 零拷貝圖像傳輸 2. 動態幀率控制 3. HALCON并行優化 4. 異常隔離機制 四、高級優化策略 1. 硬件加速配置 2. 內存池管理 3. 實時性保障 一、核心功能與原理 功能目標:…

《瘋狂Java講義(第3版)》學習筆記ch4

ch4流程控制與數組1.switch語句后的expression表達式的數據類型只能是byte、short、char、int四種證書類型。2.建議不要在循環體內修改循環變量&#xff08;也叫循環計數器&#xff09;的值&#xff0c;否則會增加程序出錯的可能性。3.定義數組推薦語法格式&#xff1a;type[] …

COLMAP進行密集重建,三維重建的步驟

密集重建是在稀疏重建的基礎上進行的 稀疏重建見&#xff1a;用 COLMAP GUI 在 Windows 下一步步完成 相機位姿估計&#xff08;SfM&#xff09; 和 稀疏點云重建的詳細步驟&#xff1a;_colmap database導入圖片位姿-CSDN博客 完成稀疏重建后直接進入以下步驟進行密集重建&am…

基于飛算JavaAI實現Reactor模式服務器的深度實踐

一、飛算JavaAI技術概述 1.1 飛算JavaAI平臺簡介飛算JavaAI是飛算科技推出的智能化Java開發平臺&#xff0c;通過AI技術賦能傳統軟件開發流程&#xff0c;為開發者提供從需求分析到代碼實現的全流程智能化解決方案。該平臺深度融合了人工智能技術與軟件開發實踐&#xff0c;具備…

量子人工智能

量子人工智能&#xff08;QAI&#xff09;是量子計算與人工智能的強大融合。這一領域旨在將量子系統獨特的計算能力與人工智能的模式識別和學習能力相結合&#xff0c;以更快、更高效地解決問題。 量子人工智能與常規人工智能的區別是什么&#xff1f;常規人工智能在經典計算機…

算法題Day1

1. 練習1&#xff1a;Hello,World!解題步驟:using namespace std; int main() {cout<<"Hello,World!"<<endl;return 0; }2. 練習2&#xff1a;打印飛機解題步驟:#include <iostream> using namespace std; int main() {cout << " …

Cypher注入詳解:原理、類型與測試方法

Cypher&#xff0c;全稱為 (Open) Cypher Query Language&#xff0c;是一種專為圖數據庫設計的聲明式查詢語言。它以直觀的模式匹配方式&#xff0c;幫助開發者和數據分析師從復雜的圖結構數據中檢索、創建和修改信息。如果說 SQL 是關系型數據庫的語言&#xff0c;那么 Cyphe…

PG靶機 - Pelican

一、 初步偵察與服務探測 1.1 端口掃描與服務識別 首先&#xff0c;對目標主機 192.168.163.98 進行全面的端口掃描&#xff0c;以識別所有開放的服務。 sudo nmap 192.168.163.98 -p- --min-rate5000 -A圖 1: Nmap 掃描結果&#xff0c;顯示多個開放端口 掃描結果表明&#xf…

【1】Transformers快速入門:自然語言處理(NLP)是啥?

第一章&#xff1a;自然語言處理&#xff08;NLP&#xff09;是啥&#xff1f;一句話解釋&#xff1a; NLP 教電腦聽懂人話、說人話的技術 &#xff08;比如讓手機聽懂你說話、讓翻譯軟件變聰明&#xff09;NLP發展史&#xff1a;電腦學人話的 “翻車史” 第一階段&#xff08…