一、環境版本
環境 | 版本 |
---|---|
Flink | 1.17.0 |
Kafka | 2.12 |
MySQL | 5.7.33 |
二、MySQL建表腳本
create table user_log
(id int auto_increment comment '主鍵'primary key,uid int not null comment '用戶id',event int not null comment '用戶行為',logtime bigint null comment '日志時間'
)comment '用戶日志表,作為驗證數據源';
三、用戶日志類
新建maven項目
用以定義Kafka和MySQL中Schema
/*** 用戶日志類*/
@Data
public class UserLog {//用戶uidprivate int uid;//用戶行為private int event;//日志時間private Date logtime;
}
四、用戶數據生成器
/*** 用戶數據生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.獲取執行環境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定義數據生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 實現類new GeneratorFunction<Long, UserLog>(){// 定義隨機數數據生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//隨機生成用戶uiduserLog.setUid(generator.nextInt(1, 100000));//隨機生成用戶行為userLog.setEvent(generator.nextInt(1, 2));//隨機生成用戶數據時間userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定輸出數據的總行數60 * 60 * 10,// 指定每秒發射的記錄數RateLimiterStrategy.perSecond(10),// 指定返回值類型, 將Java的StockPrice封裝成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//輸出生成數據
// dataGeneratorSourceStream.print();//kafka數據寫入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL數據寫入,用以數據驗證SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("你的用戶名").withPassword("你的密碼").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}
五、TableAPI 10秒鐘內用戶的訪問量
/*** 10秒鐘內用戶的訪問量*/
public class UserLogCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);env.setParallelism(1);//1.定義table的schemafinal Schema schema = Schema.newBuilder().column("uid", DataTypes.INT()).column("event", DataTypes.INT()).column("logtime", DataTypes.BIGINT())//將logtime轉換為flink使用的timsstamp格式.columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(logtime, 3)")//定義水位線.watermark("rowtime", "rowtime - INTERVAL '5' SECOND").build();//2.創建Kafka source tabletableEnv.createTable("user_log", TableDescriptor.forConnector("kafka").schema(schema).format("json")
// .option("json.timestamp-format.standard", "ISO-8601").option("json.ignore-parse-errors", "true").option("topic", "userLog").option("properties.bootstrap.servers", "hadoop01:9092").option("scan.startup.mode", "latest-offset").build());//3.創建一個滾動窗口表Table pvTable = tableEnv.from("user_log")//定義一個10秒鐘的滾動窗口.window(Tumble.over(lit(10).seconds()).on($("rowtime")).as("w")).groupBy($("w")).select($("w").start().as("w_start"),$("w").end().as("w_end"),//$("uid").count().distinct().as("uv")),$("uid").count().as("pv"));pvTable.execute().print();}
}
六、數據驗證
- 啟動 UserLogGenerator
- 啟動 UserLogCount
+----+-------------------------+-------------------------+----------------------+
| op | w_start | w_end | pv |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-08-11 15:11:50.000 | 2025-08-11 15:12:00.000 | 10 |
| +I | 2025-08-11 15:12:00.000 | 2025-08-11 15:12:10.000 | 95 |
| +I | 2025-08-11 15:12:10.000 | 2025-08-11 15:12:20.000 | 104 |
| +I | 2025-08-11 15:12:20.000 | 2025-08-11 15:12:30.000 | 104 |
| +I | 2025-08-11 15:12:30.000 | 2025-08-11 15:12:40.000 | 94 |
| +I | 2025-08-11 15:12:40.000 | 2025-08-11 15:12:50.000 | 104 |
| +I | 2025-08-11 15:12:50.000 | 2025-08-11 15:13:00.000 | 96 |
| +I | 2025-08-11 15:13:00.000 | 2025-08-11 15:13:10.000 | 100 |
- 在MySQL中驗證查詢
選取數據
+----+-------------------------+-------------------------+----------------------+
| op | w_start | w_end | pv |
+----+-------------------------+-------------------------+----------------------+
| +I | 2025-08-11 15:12:50.000 | 2025-08-11 15:13:00.000 | 96
轉換時間戳
時間戳 | 轉換前 | 轉換后 |
---|---|---|
w_start | 2025-08-11 15:12:50.000 | 1754896370000 |
w_end | 2025-08-11 15:13:00.000 | 1754896380000 |
MySQL中查詢
# 輸出96與Flink結果一致
select count(*)
from user_log
where logtime>= 1754896370000 and logtime < 1754896380000;
七、POM文件
<project><groupId>dblab</groupId><artifactId>demo</artifactId><modelVersion>4.0.0</modelVersion><name> </name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>central-repos</id><name>Central Repository</name><url>http://repo.maven.apache.org/maven2</url></repository><repository><id>alimaven</id><name>aliyun maven</name><url>https://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-files</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-files</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-csv</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.39</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
八、常見問題
8.1 未定義水位線
Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:327)at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:307)at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:300)at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:265)at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:262)at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:641)at UserLogCount.main(UserLogCount.java:42)
當TableAPI中未定義水位線時,會導致Flink無法識別窗口的時間戳
//定義水位線
.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")