一、環境版本
環境 | 版本 |
---|---|
Flink | 1.17.0 |
Kafka | 2.12 |
MySQL | 5.7.33 |
【注意】Flink 1.13版本增加Cumulate Window,之前版本Flink Sql 沒有 Trigger 功能,長時間的窗口不能在中途觸發計算,輸出中間結果。比如每 10S 更新一次截止到當前的pv、uv。只能用Trigger配合State實現,可參考如下實現方式:
Flink DataStream 按分鐘或日期統計數據量
二、MySQL建表腳本
create table user_log
(id int auto_increment comment '主鍵'primary key,uid int not null comment '用戶id',event int not null comment '用戶行為',logtime bigint null comment '日志時間'
)comment '用戶日志表,作為驗證數據源';
三、用戶日志類
新建maven項目
用以定義Kafka和MySQL中Schema
/*** 用戶日志類*/
@Data
public class UserLog {//用戶uidprivate int uid;//用戶行為private int event;//日志時間private Date logtime;//獲取日期,用于按日期統計數據public String getFormatDate() {return DateUtil.format(logtime, "yyyyMMdd");}//獲取時間,精確到分鐘public String getFormatTime() {return DateUtil.format(logtime, "yyyy-MM-dd HH:mm") + ":00";}
}
}
四、用戶數據生成器
/*** 用戶數據生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.獲取執行環境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定義數據生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 實現類new GeneratorFunction<Long, UserLog>(){// 定義隨機數數據生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//隨機生成用戶uiduserLog.setUid(generator.nextInt(1, 50));//隨機生成用戶行為userLog.setEvent(generator.nextInt(1, 2));//隨機生成用戶數據時間userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定輸出數據的總行數
// 60 * 60 * 10,1200,// 指定每秒發射的記錄數RateLimiterStrategy.perSecond(10),// 指定返回值類型, 將Java的StockPrice封裝成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//輸出生成數據
// dataGeneratorSourceStream.print();//kafka數據寫入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL數據寫入,用以數據驗證SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://192.168.31.116:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}
五、Sql按分鐘或日期統計PV和UV
public class UserLogSql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);env.setParallelism(1);// 創建一個輸入表SourceTableString sourceDDL = "create table user_log\n" +"(\n" +" uid INT\n" +" , event INT\n" +" , logtime BIGINT\n" +" , rowtime AS TO_TIMESTAMP_LTZ(logtime, 3)\n" +" , WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND\n" +") with (\n" +" 'connector' = 'kafka'\n" +" ,'topic' = 'userLog'\n" +" ,'properties.bootstrap.servers' = 'hadoop01:9092'\n" +" ,'scan.startup.mode' = 'latest-offset'\n" +" ,'format' = 'json'\n" +");";tableEnv.executeSql(sourceDDL);// 統計每分鐘PV和UVString result = "select\n" +" date_format(window_start, 'yyyy-MM-dd') cal_day\n" +" , date_format(window_start, 'HH:mm:ss') start_time\n" +" , date_format(window_end, 'HH:mm:ss') end_time\n" +" , count(uid) pv\n" +" , count(distinct uid) uv\n" +"FROM TABLE(\n" +// 每隔10秒觸發一次計算,窗口大小為1天
// " CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))\n" +// 每隔10秒觸發一次計算,窗口大小為10秒" CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '10' SECOND))\n" +" GROUP BY window_start, window_end\n" +";";// 輸出sql執行結果tableEnv.executeSql(result).print();}
}
六、sql-client方式執行Sql
# 建表語句
create table user_log
(uid INT,event INT,logtime BIGINT,rowtime AS TO_TIMESTAMP_LTZ(logtime, 3) ,WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) with ('connector' = 'kafka','topic' = 'userLog''properties.bootstrap.servers' = 'hadoop01:9092','scan.startup.mode' = 'latest-offset','format' = 'json',
);# pv、uv計算語句, 每隔10秒觸發一次計算,窗口大小為1天
selectdate_format(window_start, 'yyyy-MM-dd') cal_day,date_format(window_start, 'HH:mm:ss') start_time,date_format(window_end, 'HH:mm:ss') end_time,count(uid) pv,count(distinct uid) uv
FROM TABLE(CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))GROUP BY window_start, window_end;
七、數據驗證
- 啟動 UserLogGenerator
- 啟動 UserLogSql或在sql-client執行Sql
- 在MySQL中驗證查詢
轉換時間戳
時間戳 | 轉換前 | 轉換后 |
---|---|---|
w_start | 2025-08-16 14:45:40 | 1755326740000 |
w_end | 2025-08-16 14:45:50 | 1755326750000 |
select count(distinct uid) from user_log where logtime< 1755326750000 and logtime>=1755326740000;
# 與MySql中輸出一致SQL Query Result (Table) Refresh: 1 s Page: Last of 1 Updated: 23:50:09.972 cal_day start_time end_time pv uv2025-08-15 23:45:30 23:45:40 15 152025-08-15 23:45:40 23:45:50 101 452025-08-15 23:45:50 23:46:00 104 422025-08-15 23:46:00 23:46:10 100 422025-08-15 23:46:10 23:46:20 97 452025-08-15 23:46:20 23:46:30 104 402025-08-15 23:46:30 23:46:40 97 422025-08-15 23:46:40 23:46:50 99 442025-08-15 23:46:50 23:47:00 103 442025-08-15 23:47:00 23:47:10 97 442025-08-15 23:47:10 23:47:20 100 43
八、常見問題
- sql-client執行查詢,缺少kafka包
# 運行SQL命令
Flink SQL> select * from user_log;
# 報錯
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
解決方法
# 下載flink對應版本的kafka包,放到flink的lib目錄下
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.0/flink-sql-connector-kafka-1.17.0.jar -P ${FLINK_HOME}/lib/
九、參考鳴謝
Flink 實時統計歷史 pv、uv
Flink Cumulate Window