DataStream API編程模型
1.【Flink-Scala】DataStream編程模型之數據源、數據轉換、數據輸出
2.【Flink-scala】DataStream編程模型之 窗口的劃分-時間概念-窗口計算程序
3.【Flink-scala】DataStream編程模型之水位線
4.【Flink-scala】DataStream編程模型之窗口計算-觸發器-驅逐器
文章目錄
- DataStream API編程模型
- 一、延遲數據處理
- 1.1 側輸出
- 1.2代碼實例
- 1.2.1 代碼運行結果
- 1.2.2 結果分析
- 1.3代碼運行展示
- 1.4 題外話
一、延遲數據處理
前一小節已經講了水位線的相關概念,默認情況下,當水位線超過窗口結束時間之后,再有之前的數據到達時,這些數據會被刪除。
為了避免有些遲到的數據被刪除,因此產生了allowedLateness的概念。
allowedLateness就是針對事件時間而言,對于水位線超過窗口結束時間之后,還允許有一段時間(也是以事件時間來衡量)來等待之前的數據到達,以便再次處理這些數據。
對于窗口計算而言,如果沒有設置allowedLateness,窗口觸發計算以后就會被銷毀;
設置了allowedLateness以后,只有水位線大于“窗口結束時間+allowedLateness”時,窗口才會被銷毀。
當沒有指定allowedLateness,默認值為0.
1.1 側輸出
通常情況下,用戶雖然希望對遲到的數據進行窗口計算,但并不想將結果混入正常的計算流程中,而是想將延遲數據和結果保存到數據庫中,便于后期對延時數據進行分析。對這種情況,就需要借助于“側輸出”(Side Output)來處理
用sideOutputLateData(OutputTag)來標記遲到數據計算的結果,然后再使用getSideOutput(lateOutputTag)從窗口中獲取lateOutputTag標簽對應的數據,之后轉成獨立的DataStream數據集進行處理
1.2代碼實例
如下:
import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Timecase class StockPrice(stockId:String,timeStamp:Long,price:Double)object AllowedLatenessTest {def main(args: Array[String]): Unit = {//設定執行環境val env = StreamExecutionEnvironment.getExecutionEnvironment//設定時間特性為事件時間env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//設定程序并行度env.setParallelism(1)//創建數據源val source = env.socketTextStream("localhost", 9999)//指定針對數據流的轉換操作邏輯val stockDataStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//為數據流分配時間戳和水位線val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//執行窗口計算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L))//注意這里.sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))//打印輸出sumStream.print("window計算結果:")val late = sumStream.getSideOutput(lateData)late.print("遲到的數據:")//指定名稱并觸發流計算env.execute("AllowedLatenessTest")}//指定水位線生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {element.timeStamp //從到達消息中提取時間戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={new WatermarkGenerator[StockPrice](){val maxOutOfOrderness = 10000L //設定最大延遲為10秒var currentMaxTimestamp: Long = 0Lvar a: Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)}
override def onPeriodicEmit(output:WatermarkOutput): Unit = {// 沒有使用周期性發送水印,因此這里沒有執行任何操作}}}}
}
先注意這里:
val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//執行窗口計算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L)) //注意這里.sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))
允許元素延遲到達最多 2 秒。即,如果一個元素的時間戳在窗口結束后的 2 秒內到達,它仍然會被包含在窗口的計算中。
.allowedLateness(Time.seconds(2L)) 設置時間為2s。
其余的就是生成水位線和時間戳了,這就不用解釋啦。
1.2.1 代碼運行結果
1.在nc端輸入
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32
然后啟動程序
分析:水位線生成策略:
每次收到一個新的事件,都會比較當前事件的時間戳和之前記錄的最大時間戳,更新 currentMaxTimestamp。
水位線被設定為 currentMaxTimestamp - maxOutOfOrderness,即允許最大 10 秒的延遲
再來看遲到數據:
輸入數據是:
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32
數據超出窗口結算數據后2s到達算遲到,任何時間戳比水位線晚 10 秒的事件都會被視為遲到。
之前我看書上寫的有點疑問,然后擁有環境的網站也有點問題,我就自己本地了,windows 下載了個netcat,運行結果如下:
timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark @ 1602031561000 (2020-10-07 08:46:01.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark @ 1602031568000 (2020-10-07 08:46:08.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark @ 1602031571000 (2020-10-07 08:46:11.000)
timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031591000|2020-10-07 08:46:31.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031580000|2020-10-07 08:46:20.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window計算結果:> StockPrice(stock_1,1602031567000,8.14)
window計算結果:> StockPrice(stock_1,1602031571000,8.23)
window計算結果:> StockPrice(stock_1,1602031577000,16.48)
window計算結果:> StockPrice(stock_1,1602031578000,25.970000000000002)
window計算結果:> StockPrice(stock_1,1602031578000,34.42)
window計算結果:> StockPrice(stock_1,1602031578000,42.75)
window計算結果:> StockPrice(stock_1,1602031578000,51.31)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
遲到的數據:> StockPrice(stock_1,1602031577000,8.32)
這里是windows運行結果,可能稍微和ubuntu不一樣
1.2.2 結果分析
- stock_1,1602031567000,8.14
水位線 557
窗口時間**[567,570)**
當前事件:stock_1,1602031567000,8.14
window計算結果:> StockPrice(stock_1,1602031567000,8.14)
- stock_1,1602031571000,8.23
水位線: 561
窗口時間:[571,573)
當前窗口事件 stock_1,1602031571000,8.23
window計算結果:> StockPrice(stock_1,1602031571000,8.23)
- stock_1,1602031577000,8.24
水位線:567
窗口時間:[577,580)
當前窗口事件:stock_1,1602031577000,8.24
8.24+8.23
window計算結果:> StockPrice(stock_1,1602031577000,16.48)
- stock_1,1602031578000,8.87
水位線:568
窗口時間:和上一個相同 [577,580)
當前窗口事件:stock_1,1602031578000,8.87
stock_1,1602031577000,8.24
16.48+8.87
window計算結果:> StockPrice(stock_1,1602031578000,25.970000000000002)
- stock_1,1602031579000,8.55
水位線:569
窗口時間:和上一個相同 [577,580)
當前窗口事件(3個)
stock_1,1602031578000,8.87
stock_1,1602031577000,8.24
stock_1,1602031579000,8.55
25.97+8.55
window計算結果:> StockPrice(stock_1,1602031578000,34.42)
- stock_1,1602031577000,8.24
水位線:這里-10s為567,最大水位線為569,那么最大水位線569
窗口時間: 屬于[567,570) 沒有遲到
當前窗口事件:
stock_1,1602031567000,8.14
stock_1,1602031577000,8.24
34.42+8.24
window計算結果:> StockPrice(stock_1,1602031578000,42.75)
- stock_1,1602031581000,8.43
水位線:571
窗口時間:[581,584)
當前窗口事件 stock_1,1602031581000,8.43
注意 571水位線,第一個事件事件窗口時間[567,570),這個就應該結束計算了、
window計算結果:> StockPrice(stock_1,1602031578000,51.31)
之后就沒了
為什么沒了呢,窗口還沒計算完呢,一直在監聽啊。
水位線也不漲了
-
stock_1,1602031582000,8.78
水位線:572
窗口時間:[581,584)
當前窗口事件:
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78 -
stock_1,1602031581000,8.76
水位線:572
窗口時間:[581,584)
當前窗口事件:
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76 -
stock_1,1602031579000,8.55
水位線:572
窗口時間:[577,580)
當前窗口事件:(4個)
stock_1,1602031578000,8.87
stock_1,1602031577000,8.24
stock_1,1602031579000,8.55
stock_1,1602031579000,8.55 -
stock_1,1602031591000,8.13
水位線:581
窗口時間:[591,594)
當前窗口事件:
stock_1,1602031591000,8.13 -
stock_1,1602031581000,8.34
-水位線:581
窗口時間:[581,584)
(此時:[577,580)就應該結束了)
窗口事件:
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031581000,8.34 -
stock_1,1602031580000,8.45
水位線:581
窗口時間 [580,583)
窗口事件:stock_1,1602031580000,8.45 -
stock_1,1602031579000,8.33
水位線:581
窗口時間:[577,580) 水位線在581的時候就結束了,但它在 allowedLateness 的 2 秒內到達,因此它 不會被視為遲到數據。 -
stock_1,1602031578000,8.56
水位線:581
窗口時間屬于 [577,580)
判斷水位線和時間戳的關系:水位線超過窗口結束時間
事件是否遲到:允許超出2s,正好就在580的邊界,根據 Flink 的延遲數據規則,任何事件的時間戳如果大于當前窗口結束時間,并且超出了 allowedLateness(即 2 秒),就會被視為遲到數據。這個事件 stock_1,1602031578000,8.56 的時間戳剛好等于窗口 [577, 580) 的結束時間,并且它的時間戳在當前窗口結束后剛好到達,所以 它不會被視為遲到數據。
找了chatgpt問了一下:
這個事件不算遲到數據。盡管它的時間戳正好是窗口結束的邊界,但它仍然屬于這個窗口 [577, 580) 的一部分,不超出窗口時間范圍,也沒有超過 allowedLateness 設置的 2 秒,所以它被處理為正常數據。
為什么不算遲到?
Flink 的窗口操作是基于事件時間戳的,窗口 [577, 580) 的結束時間是 1602031578000。當水位線達到 581 時,窗口已經結束,但如果事件的時間戳恰好是窗口的結束時間,它依然被認為是屬于該窗口的。
遲到數據的定義:遲到數據是指事件的時間戳超過了當前窗口結束時間,并且超出了允許的遲到時間。在這種情況下,事件時間戳與窗口結束時間對齊,因此并不算遲到。
-
stock_1,1602031577000,8.32
水位線 581
窗口時間:[577,580)
577+2<581
當前事件的時間戳 1602031577000 恰好等于窗口的起始時間 577,但水位線已經推進到了 581。
窗口已經結束:水位線達到 581 時,窗口 [577, 580) 的數據已經完全處理完畢,因此任何時間戳在 580 之后到達的事件,都被視為遲到數據。 -
說明:其中的刪除線,我是手動分析的他們如何遲到,但是后來代碼發現,數據是一個個來的,你沒有超出水位線,那么我就立馬把你這條數據計算了,因此輸出結果就是累加起來的。
1.3代碼運行展示
本來使用的是頭歌平臺提供的環境,但是環境有問題了,自己手動搭建了一下。幸虧是socket,windows還能實現監聽自己,就把代碼放在這里啦!
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Flink_scala2.13</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.11.0</flink.version> <!-- 使用 Flink 1.15.0 --><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.16</scala.version></properties><dependencies><!-- 引入 Flink 相關依賴--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency></dependencies>
</project>
test1.scala (AllowedLatenessTest函數)
/*
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32*/
import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object test1 {def main(args: Array[String]): Unit = {// *************************** Begin ****************************//設定執行環境val env = StreamExecutionEnvironment.getExecutionEnvironment//設定時間特性為事件時間env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//設定程序并行度env.setParallelism(1)//創建數據源val source = env.socketTextStream("localhost", 9999)//指定針對數據流的轉換操作邏輯val stockDataStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//為數據流分配時間戳和水位線val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//執行窗口計算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L)).sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))// **************************** End *****************************//打印輸出sumStream.print("window計算結果:")val late = sumStream.getSideOutput(lateData)late.print("遲到的數據:")//指定名稱并觸發流計算env.execute("AllowedLatenessTest")}//指定水位線生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {element.timeStamp //從到達消息中提取時間戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={new WatermarkGenerator[StockPrice](){val maxOutOfOrderness = 10000L //設定最大延遲為10秒var currentMaxTimestamp: Long = 0Lvar a: Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)}override def onPeriodicEmit(output:WatermarkOutput): Unit = {// 沒有使用周期性發送水印,因此這里沒有執行任何操作}}}}
}
這個是頭歌提供的pom.xml,二者應該都能使用,但是你需要下載flink和scala,這不再多說,版本還要匹配哦
<project><groupId>cn.edu.xmu.dblab</groupId><artifactId>wordcount_myID</artifactId><modelVersion>4.0.0</modelVersion><name>WordCount</name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.12.2</scala.version><scala.binary.version>2.12</scala.binary.version><hadoop.version>2.7.7</hadoop.version><flink.version>1.11.2</flink.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.0</version><scope>compile</scope></dependency></dependencies>
<build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
1.4 題外話
我使用的課本是廈門大學林子雨老師編寫的《Flink編程基礎》,當前我用的書是2021年9月第一版,第5.6章節的延遲數據處理中(201頁),最后遲到的數據8.43寫錯了,應該是:8.32。正是書上寫錯了,我才有點疑問發現此處的問題。