一、說明
衡量網站流量一個最簡單的指標,就是網站的頁面瀏覽量(Page View,PV)。用戶每次打開一個頁面便記錄1次PV,多次打開同一頁面則瀏覽量累計。
一般來說,PV與來訪者的數量成正比,但是PV并不直接決定頁面的真實來訪者數量,如同一個來訪者通過不斷的刷新頁面,也可以制造出非常高的PV。接下來我們就用Flink算子來實現PV的統計。
二、測試數據準備
把數據文件 UserBehavior 復制到project的input目錄下
用于封裝數據的JavaBean類
package com.atguigu.flink.java.chapter_6;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @Author lizhenchao@atguigu.cn* @Date 2020/12/10 19:32*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserBehavior {private Long userId;private Long itemId;private Integer categoryId;private String behavior;private Long timestamp;
}
三、代碼
pv實現思路1: WordCount
package com.lyh.flink06;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class PVcount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.readTextFile("input/UserBehavior.csv").map(line -> { // 對數據切割, 然后封裝到POJO中String[] split = line.split(",");return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),String.valueOf(split[3]),Long.valueOf(split[4]));}).filter(behavior -> "pv".equals(behavior.getBehavior())) //過濾出pv行為.map(behavior -> Tuple2.of("pv", 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG)) // 使用Tuple類型, 方便后面求和.keyBy(value -> value.f0) // keyBy: 按照key分組.sum(1) // 求和.print();env.execute();}
}
pv實現思路2: process
package com.lyh.flink06;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class PVprocess {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.readTextFile("input/UserBehavior.csv").map(line -> {String[] split = line.split(",");return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),String.valueOf(split[3]),Long.valueOf(split[4]));}).filter(behavior -> "pv".equals(behavior.getBehavior())).keyBy(UserBehavior::getBehavior).process(new KeyedProcessFunction<String, UserBehavior, Long>() {long count = 0;@Overridepublic void processElement(UserBehavior userBehavior,Context ctx,Collector<Long> out) throws Exception {count++;out.collect(count);}}).print();env.execute();}
}