Flink實時流量統計:基于窗口函數與Redis Sink的每小時PV監控系統(學習記錄)

題目:

利用flink統計網站瀏覽量,并寫入redis。

利用窗口函數以及算子實現每小時PV(網站的頁面瀏覽量)統計,對統計后結果數據格式進行設計,存儲至Redis中(利用sink將處理后結果數據輸出到redis數據庫中)。

操作步驟:

1.redis在虛擬機上的安裝

(1)下載redis安裝包

cd /opt/software?# 進入一個用于存放安裝包的目錄,可自行選擇

wget http://download.redis.io/releases/redis-6.2.6.tar.gz

# 下載Redis 6.2.6版本,可根據需求更換版本號

(2)解壓安裝包——使用tar命令解壓下載好的安裝包

tar -zxvf redis-6.2.6.tar.gz

解壓后會生成一個名為redis-6.2.6的目錄。

(3)安裝gcc編譯工具

yum install -y gcc gcc-c++ make

(4)編譯和安裝 Redis

cd redis-6.2.6

make?# 編譯Redis,此過程可能需要一些時間,取決于虛擬機性能

make install?# 安裝Redis,默認會安裝到/usr/local/bin目錄下

(5)配置redis——?Redis 默認沒有生成配置文件,需要手動創建相關目錄和文件:

mkdir /etc/redis?# 創建存放配置文件的目錄

cp /opt/redis-6.2.6/redis.conf /etc/redis/ # 將示例配置文件復制到新目錄下,路徑根據實際解壓位置調整

如圖所示,redis安裝成功并且進行了正常訪問。

此處還需要關閉虛擬機上的防火墻,使redis的6379端口可以被正常訪問。

2.代碼展示

項目創建

在 IntelliJ IDEA 中,選擇File?->?New?->?Project,然后選擇Maven,按照向導創建一個新的 Maven 項目。在pom.xml文件中添加以下依賴:

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-pv-redis</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.13.6</flink.version><redis.clients.version>3.8.0</redis.clients.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- Flink 核心依賴 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><!-- Flink Redis 連接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><!-- Redis 客戶端依賴 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.clients.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><!-- 阿里云鏡像 --><repositories><repository><id>aliyunmaven</id><name>阿里云公共倉庫</name><url>https://maven.aliyun.com/repository/public</url></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin></plugins></build>
</project>

定義數據類型

創建一個 Java 類來表示UserBehavior,UserBehavior.csv文件的每一行包含userId、behavior等五個字段,以逗號分隔。

package Bean;public class UserBehavior {private Long userId;private Long itemId;private Integer categoryId;private String behavior;  // 行為類型:"pv"為頁面瀏覽private Long timestamp;   // 時間戳(毫秒)public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {this.userId = userId;this.itemId = itemId;this.categoryId = categoryId;this.behavior = behavior;this.timestamp = timestamp;}public Long getUserId() {return userId;}public void setUserId(Long userId) {this.userId = userId;}public Long getItemId() {return itemId;}public void setItemId(Long itemId) {this.itemId = itemId;}public Integer getCategoryId() {return categoryId;}public void setCategoryId(Integer categoryId) {this.categoryId = categoryId;}public String getBehavior() {return behavior;}public void setBehavior(String behavior) {this.behavior = behavior;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}}

編寫Flink程序

創建一個主類,比如PvStatisticsToRedis.java,編寫 Flink 程序來統計每小時 PV 并寫入 Redis。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.connector.redis.sink.RedisSink;
import org.apache.flink.connector.redis.sink.RedisSinkFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;public class PvStatisticsToRedis {public static void main(String[] args) throws Exception {// 創建 Flink 執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從 CSV 文件讀取數據,假設第一行是表頭,這里簡單跳過DataStream<UserBehavior> userBehaviorDataStream = env.readTextFile("/mnt/UserBehavior(1).csv").skip(1).map(new MapFunction<String, UserBehavior>() {@Overridepublic UserBehavior map(String line) throws Exception {String[] parts = line.split(",");long userId = Long.parseLong(parts[0]);String behavior = parts[1];return new UserBehavior(userId, behavior);}});// 過濾出 pv 行為的數據SingleOutputStreamOperator<Tuple2<String, Long>> pvStream = userBehaviorDataStream.filter(behavior -> "pv".equals(behavior.getBehavior())).map(behavior -> Tuple2.of("pv", behavior.getUserId())).returns(Types.TUPLE(Types.STRING, Types.LONG));// 按照 "pv" 進行分組,并使用滑動窗口統計每小時的 PV 數量KeyedStream<Tuple2<String, Long>, String> keyedStream = pvStream.keyBy(t -> t.f0);SingleOutputStreamOperator<Map<String, Object>> pvCountStream = (SingleOutputStreamOperator<Map<String, Object>>) keyedStream.window(TumblingProcessingTimeWindows.of(Time.hours(1))).process(new KeyedProcessFunction<String, Tuple2<String, Long>, Map<String, Object>>() {private transient HashSet<Long> userIds;@Overridepublic void open(Configuration parameters) throws Exception {userIds = new HashSet<>();}@Overridepublic void processElement(Tuple2<String, Long> value, Context ctx, Collector<Map<String, Object>> out) throws Exception {userIds.add(value.f1);Map<String, Object> result = new HashMap<>();result.put("window_end_time", ctx.timerService().currentProcessingTime());result.put("pv_count", userIds.size());out.collect(result);}@Overridepublic void close() throws Exception {userIds.clear();}});// 將統計結果轉換為字符串格式SingleOutputStreamOperator<String> resultStream = pvCountStream.map(new MapFunction<Map<String, Object>, String>() {@Overridepublic String map(Map<String, Object> value) throws Exception {return "window_end_time: " + value.get("window_end_time") + ", pv_count: " + value.get("pv_count");}});// 配置 Redis 連接FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();// 創建 Redis SinkRedisSinkFunction<String> redisSinkFunction = new RedisSinkFunction<>(conf, new RedisMapper<String>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH, "pv_statistics");}@Overridepublic String getKeyFromData(String data) {return null;}@Overridepublic String getValueFromData(String data) {return data;}});RedisSink<String> redisSink = new RedisSink<>(redisSinkFunction);// 將結果寫入 RedisresultStream.addSink(redisSink);// 打印結果到控制臺(可選)resultStream.addSink(new PrintSinkFunction<>());// 執行 Flink 任務env.execute("PV Statistics to Redis");}
}

資源文件中加載日志log4j.poverities,代碼如下:

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

運行效果:

3.sink輸出到redis數據庫

核心原理

Flink 的RedisSink通過自定義RedisMapper實現數據寫入 Redis,主要需指定:

Redis 命令:如SET(存儲鍵值對)、HSET(存儲哈希)、LPUSH(存儲列表)等。

鍵(Key):用于標識數據的唯一性(如按小時的 PV 統計可將window_end_time作為 Key)。

值(Value):需要存儲的具體統計結果(如 PV 數量)。

具體實現步驟

假設統計結果為每小時的 PV 數,設計存儲格式如下:

Redis 鍵(Key):pv_statistics:{window_end_time}(其中window_end_time為窗口結束時間戳,精確到小時)。

Redis 值(Value):該小時的 PV 總數(如12345)。

數據結構:采用String類型(通過SET命令存儲),便于后續查詢和聚合。

RedisMapper是 Flink 與 Redis 交互的核心接口,需實現 3 個方法:

getCommandDescription():指定 Redis 命令(如SET)。

getKeyFromData():從統計結果中提取 Redis 的 Key。

getValueFromData():從統計結果中提取 Redis 的 Value。

代碼實現

RedisMapper<PvResult> redisMapper = new RedisMapper<PvResult>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "pv:hour");}@Overridepublic String getKeyFromData(PvResult data) {SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");String key = sdf.format(new Date(data.getWindowStart()));System.out.println("Redis Key: " + key);return key;}@Overridepublic String getValueFromData(PvResult data) {return String.valueOf(data.getCount());}
};
package Bean;
public class PvResult {private long windowStart; // 窗口開始時間(毫秒)private long windowEnd;   // 窗口結束時間(毫秒)private long count;       // 該窗口的 PV 數public PvResult() {}public PvResult(long windowStart, long windowEnd, long count) {this.windowStart = windowStart;this.windowEnd = windowEnd;this.count = count;}// Getters & Setterspublic long getWindowStart() { return windowStart; }public void setWindowStart(long windowStart) { this.windowStart = windowStart; }public long getWindowEnd() { return windowEnd; }public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }public long getCount() { return count; }public void setCount(long count) { this.count = count; }
}

配置 Redis 連接

通過FlinkJedisPoolConfig配置 Redis 連接信息(如主機、端口、密碼等):

FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.100.20") // 虛擬機Redis IP.setPort(6379).build();

注意,這里連接的是虛擬機上的redis,需要更改為虛擬機上的IP地址。

效果展示:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/90222.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/90222.shtml
英文地址,請注明出處:http://en.pswp.cn/web/90222.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

使用Imgui和SDL2做的一個彈球小游戲-Bounze

使用Imgui和SDL2做的一個彈球小游戲-Bounze 油管上面TheCherno博主分享的一個視頻FIRST GAME in C! Did He Do a Good Job? // Code Review (C/SDL2)里面分享了一個Github項目&#xff1a; https://github.com/staticaron/Bounze 使用了Imgui和SDL2&#xff0c;并且可以設置音…

SQL 中 CASE WHEN 及 SELECT CASE WHEN 的用法

SQL 中 CASE WHEN 及 SELECT CASE WHEN 的用法 CASE WHEN 是 SQL 中非常實用的條件表達式&#xff0c;它允許你在查詢中實現條件邏輯。以下是詳細的用法說明&#xff1a; 1. 基本語法結構 CASE WHEN condition1 THEN result1WHEN condition2 THEN result2...ELSE default_resul…

CentOS 7 Linux 基礎知識點匯總

&#x1f427; CentOS 7 Linux 基礎知識點匯總為方便初學者快速掌握 CentOS 7 系統的核心操作&#xff0c;本文檔整理了常用系統命令、快捷鍵、目錄結構及文件后綴名等基礎內容&#xff0c;適合入門參考。 一、常見系統命令 &#x1f50d; 命令行提示符說明 終端中的提示符包含…

突發限制下的破局之路:國產之光 Lynx 重構 AI 開發安全壁壘

繼 Pro 套餐 “明升暗降” 爭議后&#xff0c;Cursor 本周再掀波瀾 —— 包括 Claude 系列、GPT-4 在內的主流模型一夜之間對中國用戶全面封禁。開發者社群瞬間沸騰&#xff0c;“付費卻用不了”“項目數據導不出” 的焦慮刷屏&#xff0c;境外工具的政策波動再次給行業敲響警鐘…

滲透測試實戰 | docker復雜環境下的內網打點

本文作者&#xff1a;Track-syst1m一.前言本文涉及的相關漏洞均已修復、本文中技術和方法僅用于教育目的&#xff1b;文中討論的所有案例和技術均旨在幫助讀者更好地理解相關安全問題&#xff0c;并采取適當的防護措施來保護自身系統免受攻擊。二.大概流程1. 外網打點漏洞利用?…

阿里云服務器 CentOS 7 安裝 MySQL 8.4 超詳細指南

阿里云服務器 CentOS 7 安裝 MySQL 8.4 超詳細指南 一、準備工作 系統要求&#xff1a; CentOS 7.9 64位2 核&#xff08;vCPU&#xff09;2 GiBroot 用戶權限 服務器連接工具&#xff1a; FinalShell 下載安裝包&#xff1a; 訪問 MySQL 官網選擇版本&#xff1a;MySQL 8.4.0…

解決 Electron 中 window.open 打開新窗口的各種“坑”

嘿&#xff0c;各位開發者們&#xff01;今天我們要聊聊在使用 Electron 時遇到的一個經典問題&#xff1a;如何正確地使用 window.open 來打開新窗口&#xff1f; 這聽起來似乎很簡單&#xff0c;但實際上卻充滿了各種“驚喜”&#xff08;或者說“驚嚇”&#xff09;。別擔心…

朝歌智慧盤古信息:以IMS MOM V6重構國產化智能終端新生態

隨著5G、云計算、AI、大數據等技術深度滲透&#xff0c;智能終端行業正迎來場景化創新的爆發期。面對市場需求升級與技術迭代壓力&#xff0c;國產化智能終端領域領軍企業——廣東朝歌智慧互聯科技有限公司&#xff08;以下簡稱“朝歌智慧”&#xff09;&#xff0c;基于集團“…

docker 離線安裝postgres+postgis實踐

文章目錄前言一、離線安裝docker二、導出導入PG鏡像1.導出2.導入三、啟動容器四、驗證與測試前言 在企業內網環境中部署地理信息系統&#xff08;GIS&#xff09;時&#xff0c;常常面臨網絡隔離導致無法在線拉取 Docker 鏡像的問題。 本文將詳細介紹如何通過離線方式完成 Pos…

視頻、音頻錄制

1&#xff0c;項目介紹。 實現全屏錄屏、選擇區域錄屏、攝像頭錄像、麥克風錄音、主板音頻錄音、截屏畫板的自由組合。并通過FFmpeg完成音頻與視頻的合并。 功能界面 畫板畫筆 參考的項目 https://github.com/yangjinming1062/RecordWin 本項目是在此項目的基礎上修復了部…

Linux文件系統理解1

目錄一、初步理解系統層面的文件1. 文件操作的本質2. 進程管理文件核心思想二、系統調用層1. 打開關閉文件函數2. 讀寫文件函數三、操作系統文件管理1. 文件管理機制2. 硬件管理機制四、理解重定向1. 文件描述符分配規則2. 重定向系統調用3. 重定向命令行調用五、理解緩沖區1. …

科技向善,銀發向暖:智慧養老與經濟共筑適老未來

人口老齡化是當今中國社會面臨的重大課題&#xff0c;也是推動社會變革與經濟轉型的重要引擎。隨著數字技術的飛速發展&#xff0c;“智慧養老”正以科技向善的溫度&#xff0c;為老年群體構建更舒適、更安全、更有尊嚴的晚年生活&#xff0c;同時為銀發經濟注入蓬勃活力&#…

numpy庫 降維,矩陣創建與元素的選取,修改

目錄 1.降維函數ravel()和flatten ravel(): flatten(): 2.矩陣存儲與內存結構 3.修改矩陣形狀的方法 4.特殊矩陣創建 全零矩陣: 如np.zeros(5) 創建含5個零的一維數組&#xff0c;輸出中零后的點&#xff08;如 0.&#xff09;表示浮點數類型。 全一矩陣&#xff1a;如n…

SpringCloud seata全局事務

項目https://github.com/apache/incubator-seata docker拉取啟動server $ docker run --name seata-server -p 8091:8091 apache/seata-server:2.1.0 seata注冊到nacos <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-…

OpenLayers 快速入門(八)事件系統

看過的知識不等于學會。唯有用心總結、系統記錄&#xff0c;并通過溫故知新反復實踐&#xff0c;才能真正掌握一二 作為一名摸爬滾打三年的前端開發&#xff0c;開源社區給了我飯碗&#xff0c;我也將所學的知識體系回饋給大家&#xff0c;助你少走彎路&#xff01; OpenLayers…

【Linux | 網絡】應用層(HTTPS)

目錄一、HTTPS的概念二、準備概念2.1 什么是加密和解密2.2 為什么要加密2.3 常見的加密方式2.3.1 對稱加密2.3.1 非對稱加密2.4 數據摘要&&數據指紋三、HTTPS理解過程3.1 只使用對稱加密3.2 只使用非對稱加密3.3 雙方都使用非對稱加密3.4 對稱加密 非對稱加密3.5 中間…

GRE協議

一、實驗拓撲二、實驗配置1、靜態路由實現GRERT1配置&#xff1a;RT1(config)# int fa1/0RT1(config-if)# ip add 192.168.20.1 255.255.255.0RT1(config-if)# no shutdownRT1(config)# int fa0/0RT1(config-if)# ip add 172.1.1.2 255.255.255.0RT1(config-if)# no shutdownRT…

JDialong彈窗

public class DialogDemo extends JFrame {public DialogDemo(){this.setVisible(true);this.setSize(700,500);this.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);//JFrame 放東西&#xff0c;容器Container contentPane this.getContentPane();//絕對布局conte…

tlias智能學習輔助系統--違紀處理(實戰)

目錄 1.StudentController.java 2.interface StudentService 3.StudentServiceImpl.java 4.interface StudentMapper 1.StudentController.java // 違紀處理PutMapping("/violation/{id}/{score}")Operation(summary "違紀處理")public Result violat…

傳統RNN模型筆記:輸入數據長度變化的結構解析

一、案例背景 本案例通過PyTorch的nn.RNN構建單隱藏層RNN模型&#xff0c;重點展示RNN對變長序列數據的處理能力&#xff08;序列長度從1變為20&#xff09;&#xff0c;幫助理解RNN的輸入輸出邏輯。 二、核心代碼與結構拆解 def dm_rnn_for_sequencelen():# 1. 定義RNN模型rnn…