【Flink-scala】DataStream編程模型之延遲數據處理

DataStream API編程模型

1.【Flink-Scala】DataStream編程模型之數據源、數據轉換、數據輸出
2.【Flink-scala】DataStream編程模型之 窗口的劃分-時間概念-窗口計算程序
3.【Flink-scala】DataStream編程模型之水位線
4.【Flink-scala】DataStream編程模型之窗口計算-觸發器-驅逐器


文章目錄

  • DataStream API編程模型
  • 一、延遲數據處理
    • 1.1 側輸出
    • 1.2代碼實例
      • 1.2.1 代碼運行結果
      • 1.2.2 結果分析
    • 1.3代碼運行展示
    • 1.4 題外話


一、延遲數據處理

前一小節已經講了水位線的相關概念,默認情況下,當水位線超過窗口結束時間之后,再有之前的數據到達時,這些數據會被刪除。

為了避免有些遲到的數據被刪除,因此產生了allowedLateness的概念。

allowedLateness就是針對事件時間而言,對于水位線超過窗口結束時間之后,還允許有一段時間(也是以事件時間來衡量)來等待之前的數據到達,以便再次處理這些數據。

對于窗口計算而言,如果沒有設置allowedLateness,窗口觸發計算以后就會被銷毀

設置了allowedLateness以后,只有水位線大于“窗口結束時間+allowedLateness”時,窗口才會被銷毀。

當沒有指定allowedLateness,默認值為0.

1.1 側輸出

通常情況下,用戶雖然希望對遲到的數據進行窗口計算,但并不想將結果混入正常的計算流程中,而是想將延遲數據和結果保存到數據庫中,便于后期對延時數據進行分析。對這種情況,就需要借助于“側輸出”(Side Output)來處理

用sideOutputLateData(OutputTag)來標記遲到數據計算的結果,然后再使用getSideOutput(lateOutputTag)從窗口中獲取lateOutputTag標簽對應的數據,之后轉成獨立的DataStream數據集進行處理

1.2代碼實例

如下:

import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Timecase class StockPrice(stockId:String,timeStamp:Long,price:Double)object AllowedLatenessTest {def main(args: Array[String]): Unit = {//設定執行環境val env = StreamExecutionEnvironment.getExecutionEnvironment//設定時間特性為事件時間env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//設定程序并行度env.setParallelism(1)//創建數據源val source = env.socketTextStream("localhost", 9999)//指定針對數據流的轉換操作邏輯val stockDataStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//為數據流分配時間戳和水位線val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//執行窗口計算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L))//注意這里.sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))//打印輸出sumStream.print("window計算結果:")val late = sumStream.getSideOutput(lateData)late.print("遲到的數據:")//指定名稱并觸發流計算env.execute("AllowedLatenessTest")}//指定水位線生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {element.timeStamp //從到達消息中提取時間戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={new WatermarkGenerator[StockPrice](){val maxOutOfOrderness = 10000L //設定最大延遲為10秒var currentMaxTimestamp: Long = 0Lvar a: Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)}
override def onPeriodicEmit(output:WatermarkOutput): Unit = {// 沒有使用周期性發送水印,因此這里沒有執行任何操作}}}}
}

先注意這里:

  val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//執行窗口計算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L))   //注意這里.sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

允許元素延遲到達最多 2 秒。即,如果一個元素的時間戳在窗口結束后的 2 秒內到達,它仍然會被包含在窗口的計算中。

 .allowedLateness(Time.seconds(2L))  設置時間為2s。

其余的就是生成水位線和時間戳了,這就不用解釋啦。

1.2.1 代碼運行結果

1.在nc端輸入
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32
然后啟動程序

在這里插入圖片描述
分析:水位線生成策略:

每次收到一個新的事件,都會比較當前事件的時間戳和之前記錄的最大時間戳,更新 currentMaxTimestamp。
水位線被設定為 currentMaxTimestamp - maxOutOfOrderness,即允許最大 10 秒的延遲

再來看遲到數據:

輸入數據是:
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32

數據超出窗口結算數據后2s到達算遲到,任何時間戳比水位線晚 10 秒的事件都會被視為遲到。
之前我看書上寫的有點疑問,然后擁有環境的網站也有點問題,我就自己本地了,windows 下載了個netcat,運行結果如下:
在這里插入圖片描述

timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark @ 1602031561000 (2020-10-07 08:46:01.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark @ 1602031568000 (2020-10-07 08:46:08.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark @ 1602031571000 (2020-10-07 08:46:11.000)
timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031591000|2020-10-07 08:46:31.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031580000|2020-10-07 08:46:20.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window計算結果:> StockPrice(stock_1,1602031567000,8.14)
window計算結果:> StockPrice(stock_1,1602031571000,8.23)
window計算結果:> StockPrice(stock_1,1602031577000,16.48)
window計算結果:> StockPrice(stock_1,1602031578000,25.970000000000002)
window計算結果:> StockPrice(stock_1,1602031578000,34.42)
window計算結果:> StockPrice(stock_1,1602031578000,42.75)
window計算結果:> StockPrice(stock_1,1602031578000,51.31)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
遲到的數據:> StockPrice(stock_1,1602031577000,8.32)

這里是windows運行結果,可能稍微和ubuntu不一樣

1.2.2 結果分析

  • stock_1,1602031567000,8.14
    水位線 557
    窗口時間**[567,570)**
    當前事件:stock_1,1602031567000,8.14
window計算結果:> StockPrice(stock_1,1602031567000,8.14)
  • stock_1,1602031571000,8.23
    水位線: 561
    窗口時間:[571,573)
    當前窗口事件 stock_1,1602031571000,8.23
window計算結果:> StockPrice(stock_1,1602031571000,8.23)
  • stock_1,1602031577000,8.24
    水位線:567
    窗口時間:[577,580)
    當前窗口事件:stock_1,1602031577000,8.24
    8.24+8.23
window計算結果:> StockPrice(stock_1,1602031577000,16.48)
  • stock_1,1602031578000,8.87
    水位線:568
    窗口時間:和上一個相同 [577,580)
    當前窗口事件:stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24
    16.48+8.87
window計算結果:> StockPrice(stock_1,1602031578000,25.970000000000002)
  • stock_1,1602031579000,8.55
    水位線:569
    窗口時間:和上一個相同 [577,580)
    當前窗口事件(3個)
    stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24

    stock_1,1602031579000,8.55
    25.97+8.55
window計算結果:> StockPrice(stock_1,1602031578000,34.42)
  • stock_1,1602031577000,8.24
    水位線:這里-10s為567,最大水位線為569,那么最大水位線569
    窗口時間: 屬于[567,570) 沒有遲到
    當前窗口事件:
    stock_1,1602031567000,8.14
    stock_1,1602031577000,8.24
    34.42+8.24
window計算結果:> StockPrice(stock_1,1602031578000,42.75)
  • stock_1,1602031581000,8.43
    水位線:571
    窗口時間:[581,584)
    當前窗口事件 stock_1,1602031581000,8.43
    注意 571水位線,第一個事件事件窗口時間[567,570),這個就應該結束計算了、
window計算結果:> StockPrice(stock_1,1602031578000,51.31)
之后就沒了
為什么沒了呢,窗口還沒計算完呢,一直在監聽啊。
水位線也不漲了
  • stock_1,1602031582000,8.78
    水位線:572
    窗口時間:[581,584)
    當前窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78

  • stock_1,1602031581000,8.76
    水位線:572
    窗口時間:[581,584)
    當前窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78

    stock_1,1602031581000,8.76

  • stock_1,1602031579000,8.55
    水位線:572
    窗口時間:[577,580)
    當前窗口事件:(4個)
    stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24
    stock_1,1602031579000,8.55

    stock_1,1602031579000,8.55

  • stock_1,1602031591000,8.13
    水位線:581
    窗口時間:[591,594)
    當前窗口事件:
    stock_1,1602031591000,8.13

  • stock_1,1602031581000,8.34
    -水位線:581
    窗口時間:[581,584)
    (此時:[577,580)就應該結束了)
    窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78
    stock_1,1602031581000,8.76

    stock_1,1602031581000,8.34

  • stock_1,1602031580000,8.45
    水位線:581
    窗口時間 [580,583)
    窗口事件:stock_1,1602031580000,8.45

  • stock_1,1602031579000,8.33
    水位線:581
    窗口時間:[577,580) 水位線在581的時候就結束了,但它在 allowedLateness 的 2 秒內到達,因此它 不會被視為遲到數據。

  • stock_1,1602031578000,8.56
    水位線:581
    窗口時間屬于 [577,580)
    判斷水位線和時間戳的關系:水位線超過窗口結束時間
    事件是否遲到:允許超出2s,正好就在580的邊界,根據 Flink 的延遲數據規則,任何事件的時間戳如果大于當前窗口結束時間,并且超出了 allowedLateness(即 2 秒),就會被視為遲到數據。這個事件 stock_1,1602031578000,8.56 的時間戳剛好等于窗口 [577, 580) 的結束時間,并且它的時間戳在當前窗口結束后剛好到達,所以 它不會被視為遲到數據。

找了chatgpt問了一下:

這個事件不算遲到數據。盡管它的時間戳正好是窗口結束的邊界,但它仍然屬于這個窗口 [577, 580) 的一部分,不超出窗口時間范圍,也沒有超過 allowedLateness 設置的 2 秒,所以它被處理為正常數據。
為什么不算遲到?
Flink 的窗口操作是基于事件時間戳的,窗口 [577, 580) 的結束時間是 1602031578000。當水位線達到 581 時,窗口已經結束,但如果事件的時間戳恰好是窗口的結束時間,它依然被認為是屬于該窗口的。
遲到數據的定義:遲到數據是指事件的時間戳超過了當前窗口結束時間,并且超出了允許的遲到時間。在這種情況下,事件時間戳與窗口結束時間對齊,因此并不算遲到。

  • stock_1,1602031577000,8.32
    水位線 581
    窗口時間:[577,580)
    577+2<581
    當前事件的時間戳 1602031577000 恰好等于窗口的起始時間 577,但水位線已經推進到了 581。
    窗口已經結束:水位線達到 581 時,窗口 [577, 580) 的數據已經完全處理完畢,因此任何時間戳在 580 之后到達的事件,都被視為遲到數據。

  • 說明:其中的刪除線,我是手動分析的他們如何遲到,但是后來代碼發現,數據是一個個來的,你沒有超出水位線,那么我就立馬把你這條數據計算了,因此輸出結果就是累加起來的。

1.3代碼運行展示

本來使用的是頭歌平臺提供的環境,但是環境有問題了,自己手動搭建了一下。幸虧是socket,windows還能實現監聽自己,就把代碼放在這里啦!
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Flink_scala2.13</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.11.0</flink.version> <!-- 使用 Flink 1.15.0 --><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.16</scala.version></properties><dependencies><!-- 引入 Flink 相關依賴--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency></dependencies>
</project>

test1.scala (AllowedLatenessTest函數)


/*
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32*/
import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object test1 {def main(args: Array[String]): Unit = {// *************************** Begin ****************************//設定執行環境val env = StreamExecutionEnvironment.getExecutionEnvironment//設定時間特性為事件時間env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//設定程序并行度env.setParallelism(1)//創建數據源val source = env.socketTextStream("localhost", 9999)//指定針對數據流的轉換操作邏輯val stockDataStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//為數據流分配時間戳和水位線val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//執行窗口計算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L)).sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))// **************************** End *****************************//打印輸出sumStream.print("window計算結果:")val late = sumStream.getSideOutput(lateData)late.print("遲到的數據:")//指定名稱并觸發流計算env.execute("AllowedLatenessTest")}//指定水位線生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {element.timeStamp //從到達消息中提取時間戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={new WatermarkGenerator[StockPrice](){val maxOutOfOrderness = 10000L //設定最大延遲為10秒var currentMaxTimestamp: Long = 0Lvar a: Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)}override def onPeriodicEmit(output:WatermarkOutput): Unit = {// 沒有使用周期性發送水印,因此這里沒有執行任何操作}}}}
}

這個是頭歌提供的pom.xml,二者應該都能使用,但是你需要下載flink和scala,這不再多說,版本還要匹配哦

<project><groupId>cn.edu.xmu.dblab</groupId><artifactId>wordcount_myID</artifactId><modelVersion>4.0.0</modelVersion><name>WordCount</name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.12.2</scala.version><scala.binary.version>2.12</scala.binary.version><hadoop.version>2.7.7</hadoop.version><flink.version>1.11.2</flink.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.0</version><scope>compile</scope></dependency></dependencies>
<build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

1.4 題外話

我使用的課本是廈門大學林子雨老師編寫的《Flink編程基礎》,當前我用的書是2021年9月第一版,第5.6章節的延遲數據處理中(201頁),最后遲到的數據8.43寫錯了,應該是:8.32。正是書上寫錯了,我才有點疑問發現此處的問題。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/63019.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/63019.shtml
英文地址,請注明出處:http://en.pswp.cn/web/63019.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

react useRef、useContext、useReducer使用中遇到的問題及解決辦法

在 React 中&#xff0c;useRef、useContext 和 useReducer 是三個非常有用的 Hook&#xff0c;它們可以幫助我們更好地管理組件的狀態和行為。然而&#xff0c;在使用這些 Hook 時&#xff0c;可能會遇到一些問題和困惑。本文將詳細解釋這三個 Hook 的工作原理&#xff0c;并提…

2024告別培訓班 數通、安全、云計算、云服務、存儲、軟考等1000G資源分享

大類有&#xff1a;軟考初級 軟考中級 軟考高級 華為認證 華三認證&#xff1a; 軟考初級&#xff1a; 信息處理技術員 程序員 網絡管理員 軟考中級&#xff1a; 信息安全工程師 信息系統監理師 信息系統管理工程師 嵌入式系統設計時 數據庫系統工程師 電子商務設…

《操作系統 - 清華大學》8 -1:進程的組成

文章目錄 1. 進程的組成2. 進程與程序的聯系3. 進程與程序的區別4. 進程與程序關系 1. 進程的組成 進程具體包含哪些東西&#xff1a; 首先要執行相應的代碼&#xff0c;所以執行代碼需要放到內存中代碼執行需要處理數據&#xff0c;數據需要放到內存中需要知道現在要執行哪條…

【Java】String類API

創建字符串 字符串字面量"Hello"高效&#xff0c;常量池復用常見、簡單的字符串創建 new 關鍵字new String("Hello")每次創建新對象&#xff0c;性能開銷較高顯式創建新對象 字符數組new String(char[])轉換字符數組字符數組轉字符串 StringBuilder/St…

數據結構初階---二叉樹---堆

一、樹 1.樹的概念 樹是一種非線性的數據結構&#xff0c;由n(n≥0)個有限結點組成的一個有層次關系的集合。形狀類似一棵倒掛的樹&#xff0c;根朝上&#xff0c;分支向下。 根結點沒有前驅結點&#xff0c;可以有n(n≥0)個后繼結點。 其余結點被分為M個互不相交的集合&am…

CocosCreator對配置文件加密

一、加密 1.首先假設你已經將Excel表格數據導出為了json數據 2.然后可以通關nodejs對其進行xor加密 const fs require(fs);// 讀取配置文件 const path "hero_info.json"; const data fs.readFileSync(path, utf-8); const jsonObject JSON.parse(data);// XO…

學習 Dockerfile 常用指令

學習 Dockerfile 常用指令 在構建 Docker 鏡像時&#xff0c;Dockerfile 文件是一份至關重要的配置文件&#xff0c;它定義了構建鏡像的所有步驟。通過在 Dockerfile 中使用不同的指令&#xff08;命令&#xff09;&#xff0c;我們可以控制鏡像的構建過程、設置環境、指定執行…

D95【python 接口自動化學習】- pytest進階之fixture用法

day95 pytest的fixture詳解&#xff08;二&#xff09; 學習日期&#xff1a;20241210 學習目標&#xff1a;pytest基礎用法 -- pytest的fixture詳解&#xff08;二&#xff09; 學習筆記&#xff1a; fixture(autouseTrue) func的autouse是TRUE時&#xff0c;所有函數方法…

C語言 字符串輸入輸出函數、scanf(“%[^\n]“,)可輸入空格 、fgets刪除換行符

字符串輸入函數&#xff1a; scanf&#xff08;"%s"&#xff0c;數組名&#xff09; gets&#xff08;數組名&#xff09; fgets&#xff08;&#xff09; --- 文件流輸入函數 函數原型&#xff1a; int scanf( const char *format, ...…

深度學習camp-第J4周:ResNet與DenseNet結合探索

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習記錄博客&#x1f356; 原作者&#xff1a;K同學啊 本周任務&#xff1a; 探索ResNet和DenseNet的結合可能性本周任務較難&#xff0c;我們在chatGPT的幫助下完成 一、網絡的構建 設計一種結合 ResNet 和 Den…

「iOS」通過CoreLocation Framework深入了解MVC架構

「iOS」通過CoreLocation Framework重新了解多界面傳值以及MVC架構 文章目錄 「iOS」通過CoreLocation Framework重新了解多界面傳值以及MVC架構前言CoreLocation了解根據需求建模設計屬性方法設計協議傳值Block傳值KVONotification通知方式 總結參考文章 前言 在這個學期的前…

Debezium系列之:使用Debezium采集oceanbase數據庫

Debezium系列之:使用Debezium采集oceanbase數據庫 一、oceanbase數據庫二、安裝OceanBase三、安裝oblogproxy四、基于Docker的簡單采集案例五、生產實際應用案例Debezium 是一個開源的分布式平臺,用于監控數據庫變化和捕捉數據變動事件,并以事件流的形式導出到各種消費者。D…

線程sleep的時候會釋放鎖嗎

來看一段代碼&#xff1a; void task1(mutex &m) {cout << "thread 1 init..." << endl;{std::unique_lock<mutex> lock(m);cout << "thread 1 getLock" << endl;sleep(5);}cout << "thread 1 freeLock&quo…

wordpress建站--如何用Let‘s Encrypt給網站添加免費ssl證書,支持https訪問

本文首發網站&#xff1a;https://www.click234.com 默認情況下我們的網站是http訪問&#xff0c;為了增加訪問安全性&#xff0c;我們需要添加ssl證書&#xff0c;支持采用https方式訪問&#xff0c;今天我們來看下怎么創建免費的ssl證書--Lets Encrypt 使用 Certbot 自動化工…

青少年編程與數學 02-004 Go語言Web編程 02課題、依賴管理

青少年編程與數學 02-004 Go語言Web編程 02課題、依賴管理 課題摘要:一、項目結構各目錄說明&#xff1a; 二、依賴項三、依賴管理任務四、依賴管理步驟1. 初始化Go Modules項目2. 添加依賴3. 指定依賴版本4. 更新依賴5. 清理未使用的依賴6. 離線工作7. 模塊隔離8. 可重現構建 …

Debezium OracleConnection 分析

Debezium OracleConnection 分析 目錄 1. 概述2. 核心功能3. 實現分析4. 使用場景5. 示例分析6. 最佳實踐7. 總結1. 概述 OracleConnection 是 Debezium Oracle 連接器中的數據庫連接管理組件,主要負責: 數據庫連接的建立和管理事務控制查詢執行元數據操作LogMiner 會話管理…

【每日一練 基礎題】[藍橋杯 2022 省 A] 求和

[藍橋杯 2022 省 A] 求和 暴力破解會超時,用因式分解的平方差公式 a2 2abb2(a)2 a-2abb2(a-b)2 輸出整數((a1a2a3…an)-a1-a2-a3-…-an)/2 import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner scanner new Scanner(System.in);l…

ArrayList源碼分析、擴容機制面試題,數組和List的相互轉換,ArrayList與LinkedList的區別

目錄 1.java集合框架體系 2. 前置知識-數組 2.1 數組 2.1.1 定義&#xff1a; 2.1.2 數組如何獲取其他元素的地址值&#xff1f;&#xff08;尋址公式&#xff09; 2.1.3 為什么數組索引從0開始呢&#xff1f;從1開始不行嗎&#xff1f; 3. ArrayList 3.1 ArrayList和和…

【C++】- 掌握STL List類:帶你探索雙向鏈表的魅力

文章目錄 前言&#xff1a;一.list的介紹及使用1. list的介紹2. list的使用2.1 list的構造2.2 list iterator的使用2.3 list capacity2.4 list element access2.5 list modi?ers2.6 list的迭代器失效 二.list的模擬實現1. list的節點2. list的成員變量3.list迭代器相關問題3.1…

Docker--Docker Container(容器) 之容器實戰

對docker容器的前兩篇文章 Docker–Docker Container(容器) 之 操作實例 Docker–Docker Container(容器&#xff09; Mysql容器化安裝 我們可以先在Docker Hub上查看對應的Mysql鏡像,拉取對應的鏡像&#xff1a; 拉取mysql5.7版本的鏡像&#xff1a; docker pull mysql:5.7…