題目:
利用flink統計網站瀏覽量,并寫入redis。
利用窗口函數以及算子實現每小時PV(網站的頁面瀏覽量)統計,對統計后結果數據格式進行設計,存儲至Redis中(利用sink將處理后結果數據輸出到redis數據庫中)。
操作步驟:
1.redis在虛擬機上的安裝
(1)下載redis安裝包
cd /opt/software?# 進入一個用于存放安裝包的目錄,可自行選擇
wget http://download.redis.io/releases/redis-6.2.6.tar.gz
# 下載Redis 6.2.6版本,可根據需求更換版本號
(2)解壓安裝包——使用tar命令解壓下載好的安裝包
tar -zxvf redis-6.2.6.tar.gz
解壓后會生成一個名為redis-6.2.6的目錄。
(3)安裝gcc編譯工具
yum install -y gcc gcc-c++ make
(4)編譯和安裝 Redis
cd redis-6.2.6
make?# 編譯Redis,此過程可能需要一些時間,取決于虛擬機性能
make install?# 安裝Redis,默認會安裝到/usr/local/bin目錄下
(5)配置redis——?Redis 默認沒有生成配置文件,需要手動創建相關目錄和文件:
mkdir /etc/redis?# 創建存放配置文件的目錄
cp /opt/redis-6.2.6/redis.conf /etc/redis/ # 將示例配置文件復制到新目錄下,路徑根據實際解壓位置調整
如圖所示,redis安裝成功并且進行了正常訪問。
此處還需要關閉虛擬機上的防火墻,使redis的6379端口可以被正常訪問。
2.代碼展示
項目創建
在 IntelliJ IDEA 中,選擇File?->?New?->?Project,然后選擇Maven,按照向導創建一個新的 Maven 項目。在pom.xml文件中添加以下依賴:
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-pv-redis</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.6</flink.version><redis.clients.version>3.8.0</redis.clients.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- Flink 核心依賴 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><!-- Flink Redis 連接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><!-- Redis 客戶端依賴 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.clients.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><!-- 阿里云鏡像 --><repositories><repository><id>aliyunmaven</id><name>阿里云公共倉庫</name><url>https://maven.aliyun.com/repository/public</url></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin></plugins></build>
</project>
定義數據類型
創建一個 Java 類來表示UserBehavior,UserBehavior.csv文件的每一行包含userId、behavior等五個字段,以逗號分隔。
package Bean;public class UserBehavior {private Long userId;private Long itemId;private Integer categoryId;private String behavior; // 行為類型:"pv"為頁面瀏覽private Long timestamp; // 時間戳(毫秒)public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {this.userId = userId;this.itemId = itemId;this.categoryId = categoryId;this.behavior = behavior;this.timestamp = timestamp;}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public Long getItemId() {return itemId;}public void setItemId(Long itemId) {this.itemId = itemId;}public Integer getCategoryId() {return categoryId;}public void setCategoryId(Integer categoryId) {this.categoryId = categoryId;}public String getBehavior() {return behavior;}public void setBehavior(String behavior) {this.behavior = behavior;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}}
編寫Flink程序
創建一個主類,比如PvStatisticsToRedis.java,編寫 Flink 程序來統計每小時 PV 并寫入 Redis。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.connector.redis.sink.RedisSink;
import org.apache.flink.connector.redis.sink.RedisSinkFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;public class PvStatisticsToRedis {public static void main(String[] args) throws Exception {// 創建 Flink 執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從 CSV 文件讀取數據,假設第一行是表頭,這里簡單跳過DataStream<UserBehavior> userBehaviorDataStream = env.readTextFile("/mnt/UserBehavior(1).csv").skip(1).map(new MapFunction<String, UserBehavior>() {@Overridepublic UserBehavior map(String line) throws Exception {String[] parts = line.split(",");long userId = Long.parseLong(parts[0]);String behavior = parts[1];return new UserBehavior(userId, behavior);}});// 過濾出 pv 行為的數據SingleOutputStreamOperator<Tuple2<String, Long>> pvStream = userBehaviorDataStream.filter(behavior -> "pv".equals(behavior.getBehavior())).map(behavior -> Tuple2.of("pv", behavior.getUserId())).returns(Types.TUPLE(Types.STRING, Types.LONG));// 按照 "pv" 進行分組,并使用滑動窗口統計每小時的 PV 數量KeyedStream<Tuple2<String, Long>, String> keyedStream = pvStream.keyBy(t -> t.f0);SingleOutputStreamOperator<Map<String, Object>> pvCountStream = (SingleOutputStreamOperator<Map<String, Object>>) keyedStream.window(TumblingProcessingTimeWindows.of(Time.hours(1))).process(new KeyedProcessFunction<String, Tuple2<String, Long>, Map<String, Object>>() {private transient HashSet<Long> userIds;@Overridepublic void open(Configuration parameters) throws Exception {userIds = new HashSet<>();}@Overridepublic void processElement(Tuple2<String, Long> value, Context ctx, Collector<Map<String, Object>> out) throws Exception {userIds.add(value.f1);Map<String, Object> result = new HashMap<>();result.put("window_end_time", ctx.timerService().currentProcessingTime());result.put("pv_count", userIds.size());out.collect(result);}@Overridepublic void close() throws Exception {userIds.clear();}});// 將統計結果轉換為字符串格式SingleOutputStreamOperator<String> resultStream = pvCountStream.map(new MapFunction<Map<String, Object>, String>() {@Overridepublic String map(Map<String, Object> value) throws Exception {return "window_end_time: " + value.get("window_end_time") + ", pv_count: " + value.get("pv_count");}});// 配置 Redis 連接FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();// 創建 Redis SinkRedisSinkFunction<String> redisSinkFunction = new RedisSinkFunction<>(conf, new RedisMapper<String>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH, "pv_statistics");}@Overridepublic String getKeyFromData(String data) {return null;}@Overridepublic String getValueFromData(String data) {return data;}});RedisSink<String> redisSink = new RedisSink<>(redisSinkFunction);// 將結果寫入 RedisresultStream.addSink(redisSink);// 打印結果到控制臺(可選)resultStream.addSink(new PrintSinkFunction<>());// 執行 Flink 任務env.execute("PV Statistics to Redis");}
}
資源文件中加載日志log4j.poverities,代碼如下:
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
運行效果:
3.sink輸出到redis數據庫
核心原理
Flink 的RedisSink通過自定義RedisMapper實現數據寫入 Redis,主要需指定:
Redis 命令:如SET(存儲鍵值對)、HSET(存儲哈希)、LPUSH(存儲列表)等。
鍵(Key):用于標識數據的唯一性(如按小時的 PV 統計可將window_end_time作為 Key)。
值(Value):需要存儲的具體統計結果(如 PV 數量)。
具體實現步驟
假設統計結果為每小時的 PV 數,設計存儲格式如下:
Redis 鍵(Key):pv_statistics:{window_end_time}(其中window_end_time為窗口結束時間戳,精確到小時)。
Redis 值(Value):該小時的 PV 總數(如12345)。
數據結構:采用String類型(通過SET命令存儲),便于后續查詢和聚合。
RedisMapper是 Flink 與 Redis 交互的核心接口,需實現 3 個方法:
getCommandDescription():指定 Redis 命令(如SET)。
getKeyFromData():從統計結果中提取 Redis 的 Key。
getValueFromData():從統計結果中提取 Redis 的 Value。
代碼實現
RedisMapper<PvResult> redisMapper = new RedisMapper<PvResult>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "pv:hour");}@Overridepublic String getKeyFromData(PvResult data) {SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");String key = sdf.format(new Date(data.getWindowStart()));System.out.println("Redis Key: " + key);return key;}@Overridepublic String getValueFromData(PvResult data) {return String.valueOf(data.getCount());}
};
package Bean;
public class PvResult {private long windowStart; // 窗口開始時間(毫秒)private long windowEnd; // 窗口結束時間(毫秒)private long count; // 該窗口的 PV 數public PvResult() {}public PvResult(long windowStart, long windowEnd, long count) {this.windowStart = windowStart;this.windowEnd = windowEnd;this.count = count;}// Getters & Setterspublic long getWindowStart() { return windowStart; }public void setWindowStart(long windowStart) { this.windowStart = windowStart; }public long getWindowEnd() { return windowEnd; }public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }public long getCount() { return count; }public void setCount(long count) { this.count = count; }
}
配置 Redis 連接
通過FlinkJedisPoolConfig配置 Redis 連接信息(如主機、端口、密碼等):
FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.100.20") // 虛擬機Redis IP.setPort(6379).build();
注意,這里連接的是虛擬機上的redis,需要更改為虛擬機上的IP地址。
效果展示: