Flink Window DEMO 學習

該文檔演示了fink windows的操作DEMO

環境準備:

  • kafka本地運行:kafka部署
  • 自動生成名字代碼:隨機名
  • 自動生成隨機IP代碼:隨機IP
  • Flink 1.18

測試數據

自動向kafka推送數據

import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.example.dto.KafkaPvDto;
import com.wfg.flink.example.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDateTime;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.example.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.example.constants.Constants.TOPIC_NAME;public class KafkaTestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", KAFKA_BROKERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)) {int times = 100000;for (int i = 0; i < times; i++) {System.out.println("Send No. :" + i);CompletableFuture.allOf(CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer))).join();producer.flush();Random random = new Random();int randomNumber = random.nextInt(7); // 生成一個0到6的隨機數Thread.sleep(1000 * randomNumber);}} catch (InterruptedException e) {throw new RuntimeException(e);}}private static void sendKafkaMsg(Producer<String, String> producer) {String msg = createMsg();System.out.println(msg);producer.send(new ProducerRecord<>(TOPIC_NAME, UUID.randomUUID().toString().replaceAll("-", ""), msg));}private static String createMsg() {KafkaPvDto dto = new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());
//        DateTime begin = DateUtil.beginOfDay(new Date());
//        String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");String timeStr = DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss");dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}

注意:

  • kafka本地運行:kafka部署
  • 自動生成名字代碼:隨機名
  • 自動生成隨機IP代碼:隨機IP

FLINK 數據


/**** @author wfg*/
@Slf4j
public class DataSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);if (data != null) {collector.collect(new Tuple2<>(data.getUserName(), 1));}}
}

基于時間窗口

*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于時間窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(30)).sum(0).print();*/env.execute("flink window example");}
}

基于滑動時間窗口

/*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于滑動時間窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(60), Time.seconds(30)).sum(0).print();env.execute("flink window example");}
}

基于事件數量窗口

/*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件數量窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(3).sum(0).print();env.execute("flink window example");}
}

基于事件數量滑動窗口

/*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件數量滑動窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(4, 3).sum(0).print();*env.execute("flink window example");}
}

基于會話時間窗口

/*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于會話時間窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))//表示如果 5s 內沒出現數據則認為超出會話時長,然后計算這個窗口的和.sum(1).print();env.execute("flink window example");}
}

滾動窗口(Tumbling Window)

滾動窗口(Tumbling Window)

/*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//滾動窗口(Tumbling Window) 基于處理時間的 30 秒滾動窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).sum(1).print();;env.execute("flink window example");}
}

基于事件時間

/*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件時間的 30 秒滾動窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配時間戳和水印 */).window(TumblingEventTimeWindows.of(Time.seconds(30))).sum(1).print();env.execute("flink window example");}
}

滑動窗口(Sliding Window)

基于處理時間

/*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于處理時間的 30 秒滑動窗口,滑動間隔為 10 秒data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}

基于事件時間

/*** Desc: Flink Window 學習*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件時間的 30 秒滑動窗口,滑動間隔為 10 秒  data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配時間戳和水印 */).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}

注意:

  • kafka本地運行:kafka部署
  • 自動生成名字代碼:隨機名
  • 自動生成隨機IP代碼:隨機IP

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/37692.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/37692.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/37692.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

技術賦能教育:校園3D電子地圖與AR導航解決方案

隨著高考的落幕&#xff0c;又一批新鮮血液即將注入大學校園。面對陌生的環境&#xff0c;如何快速適應、準確找到目標地點&#xff0c;成為新生們的一大難題。同時&#xff0c;對于學校而言&#xff0c;如何向報考人員直觀展示校園環境&#xff0c;提供沉浸式參觀體驗&#xf…

Mybatis-Plus學習|快速入門CRUD、主鍵生成策略(雪花算法、主鍵自增等)、自動填充、樂觀鎖、分頁插件、邏輯刪除

MyBatisPlus概述 為什么要學習它呢?MyBatisPlus可以節省我們大量工作時間&#xff0c;所有的CRUD代碼它都可以自動化完成! JPA、tk-mapper、MyBatisPlus 偷懶的! MyBatis-Plus(簡稱 MP)是一個 MyBatis 的增強工具&#xff0c;在 MyBatis 的基礎上只做增強不做改變&#xff…

Pytorch學習之torch.nn.functional.pad()函數

PyTorch學習之torch.nn.functional.pad函數 一、簡介 torch.nn.functional.pad 是 PyTorch 中用于對張量進行填充操作的函數。填充操作在處理圖像、序列數據等任務時非常常見&#xff0c;它可以在張量的指定維度兩端添加一定數量的元素&#xff0c;填充方式多樣&#xff0c;包…

Git的基本使用方法

Git的基本使用方法 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01;今天我們將深入探討Git的基本使用方法&#xff0c;Git作為目前最流行的版本控制系統之一&…

Day 48 消息隊列集群RabbitMQ

消息隊列集群-RabbitMQ 一、消息中間件 中間件 tomcat java web中間件 web容器 mysql php php mysql uwsgi python mysql mycat 數據庫中間件 rabbitMQ 消息中間件 1、簡介 MQ 全稱為&#xff08;Message Queue消息隊列&#xff09;。是一種應用程序對應用程序的通信方…

【全球首個開源AI數字人】DUIX數字人-打造你的AI伴侶!

目錄 1. 引言1.1 數字人技術的發展背景1.2 DUIX數字人項目的開源意義1.3 DUIX數字人技術的獨特價值1.4 本文目的與結構 2. DUIX數字人概述2.1 定義與核心概念2.2 硅基智能與DUIX的關系2.3 技術架構2.4 開源優勢2.5 應用場景2.6 安全與合規性 3. DUIX數字人技術特點3.1 開源性與…

【Java Gui精美界面】IDEA安裝及配置SwingX

SwingX 是一個基于 Swing 的 Java GUI 庫&#xff0c;旨在為 Swing 提供額外的功能和豐富的組件 特點描述基于 Swing繼承了 Swing 的所有特性和功能。豐富組件SwingX 提供了一組高級 UI 組件&#xff0c;例如 TreeTable仍在發展中不活躍的發展ing。。。支持搜索高亮如 TreeTab…

【分布式系列】分布式鎖的設計與實現

&#x1f49d;&#x1f49d;&#x1f49d;歡迎來到我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;希望您在這里可以感受到一份輕松愉快的氛圍&#xff0c;不僅可以獲得有趣的內容和知識&#xff0c;也可以暢所欲言、分享您的想法和見解。 推薦:kwan 的首頁,持續學…

steam社區加載異常、加載失敗、無法加載、黑屏的解決方法

隨著steam夏季特賣的臨近&#xff0c;最近幾天開啟史低折扣的大作已經越來越少了&#xff0c;不過也并不是沒有。最經典的知名大作文明6之前已經打到1折的骨折價了&#xff0c;沒想到也能背刺&#xff0c;現在是新史低價0.5折11元&#xff0c;很多玩家入手后紛紛前往社區看新手…

ZABBIX-7.0LTS在線部署部署教程

ZABBIX-7.0LTS在線部署部署教程 環境&#xff1a; 操作系統&#xff1a; ubuntu 22.04zabbix-server版本&#xff1a; 7.0LTS系統配置[需結合監控的業務量提供配置]&#xff1a; 建議2C(CPU)8G(運行) 100GB(存儲)架構&#xff1a;LNMP 第一步&#xff1a; 系統初始化 1.配置…

計算機網絡知識整理筆記

目錄 1.對網絡協議的分層&#xff1f; 2.TCP/IP和UDP之間的區別&#xff1f; 3.建立TCP連接的三次握手&#xff1f; 4.斷開TCP連接的四次揮手&#xff1f; 5.TCP協議如何保證可靠性傳輸&#xff1f; 6.什么是TCP的擁塞控制&#xff1f; 7.什么是HTTP協議&#xff1f; 8…

MySQL InnoDB支持幾種行格式

數據庫表的行格式決定了一行數據是如何進行物理存儲的&#xff0c;進而影響查詢和DML操作的性能。 在InnoDB中&#xff0c;常見的行格式有4種&#xff1a; 1、COMPACT&#xff1a;是MySQL 5.0之前的默認格式&#xff0c;除了保存字段值外&#xff0c;還會利用空值列表保存null…

快速傅里葉變換(Fast Fourier Transform,FFT)

快速傅里葉變換&#xff08;Fast Fourier Transform&#xff0c;FFT&#xff09;是一種算法&#xff0c;用于快速計算離散傅里葉變換&#xff08;DFT&#xff09;及其逆變換。傅里葉變換將時間或空間域的信號轉換為頻率域的信號&#xff0c;便于分析信號的頻率特性。FFT顯著提高…

動手學深度學習(Pytorch版)代碼實踐 -卷積神經網絡-20填充與步幅

20填充與步幅 import torch from torch import nn# 此函數初始化卷積層權重&#xff0c;并對輸入和輸出提高和縮減相應的維數 def comp_conv2d(conv2d, X):# 這里的&#xff08;1&#xff0c;1&#xff09;表示批量大小和通道數都是1#將輸入張量 X 的形狀調整為 (1, 1, height,…

Grafana-11.0.0 在線部署教程

Grafana-11.0.0 在線部署教程 環境&#xff1a; 操作系統&#xff1a; ubuntugrafana版本&#xff1a; 11.0.0 &#xff08;建議不要按照最新版&#xff09;grafana要求的系統配置不高&#xff0c;建議直接部署在監控服務器上&#xff0c;比如zabbix服務器、prometheus服務器…

從菌群代謝到健康影響——認識腸道丙酸和丁酸

谷禾健康 短鏈脂肪酸這一詞經常出現在谷禾的文章和報告中&#xff0c;那你真的了解短鏈脂肪酸嗎&#xff1f;短鏈脂肪酸(SCFA)主要是腸道微生物群在結腸內通過發酵碳水化合物(包括膳食和內源性碳水化合物&#xff0c;主要是抗性淀粉和膳食纖維)和一些微生物可利用的蛋白質而產生…

光線追蹤:原理與實現

版權聲明 本文為“優夢創客”原創文章&#xff0c;您可以自由轉載&#xff0c;但必須加入完整的版權聲明文章內容不得刪減、修改、演繹本文視頻版本&#xff1a;見文末 各位同學大家好&#xff0c;今天我要給大家分享的是光線追蹤的原理和實現大家知道在過往很多年里面&#x…

超簡單的nodejs使用log4js保存日志到本地(可直接復制使用)

引入依賴 npm install log4js 新建配置文件logUtil.js const log4js require(log4js);// 日志配置 log4js.configure({appenders: {// 控制臺輸出consoleAppender: { type: console },// 文件輸出fileAppender: {type: dateFile,filename: ./logs/default, //日志文件的存…

如何從0構建一款類似pytest的工具

Pytest主要模塊 Pytest 是一個強大且靈活的測試框架&#xff0c;它通過一系列步驟來發現和運行測試。其核心工作原理包括以下幾個方面&#xff1a;測試發現&#xff1a;Pytest 會遍歷指定目錄下的所有文件&#xff0c;找到以 test_ 開頭或 _test.py 結尾的文件&#xff0c;并且…

python 實例002 - 數據轉換

題目&#xff1a; 有一組用例數據如下&#xff1a; cases [[case_id, case_title, url, data, excepted],[1, 用例1, www.baudi.com, 001, ok],[4, 用例4, www.baudi.com, 002, ok],[2, 用例2, www.baudi.com, 002, ok],[3, 用例3, www.baudi.com, 002, ok],[5, 用例5, www.ba…