深入(流批【牛批】框架)Flink的機制

flink本身是專注有狀態的無限流處理,有限流處理【batch批次】是無限流處理的一中特殊情況!

應用場景

  • 實時ETL
集成流計算現有的諸多數據通道和SQL靈活的加工能力,對流式數據進行實時清洗、歸并和結構化
處理;同時,對離線數倉進行有效的補充和優化,并為數據實時傳輸提供可計算通道。
  • 實時報表
實時化采集、加工流式數據存儲;實時監控和展現業務、客戶各類指標,讓數據化運營實時化。
如通過分析訂單處理系統中的數據獲知銷售增長率;
通過分析分析運輸延遲原因或預測銷售量調整庫存;
  • 監控預警
對系統和用戶行為進行實時監測和分析,以便及時發現危險行為,如計算機網絡入侵、詐騙預警等
  • 在線系統
實時計算各類數據指標,并利用實時結果及時調整在線系統的相關策略,在各類內容投放、智能推
送領域有大量的應用,如在客戶瀏覽商品的同時推薦相關商品等

flink主要角色

JobManager

協調分布式執行,它們用來調度task,協調檢查點(CheckPoint),協調失敗時恢復等

TaskManager

也稱之為worker,主要職責是接收jobmanager協調的task,部署和啟動任務,接收上游的數據并處理。同時向resourcemanager反注冊自己的資源信息。

ResourceManager

管理集群資源,如Yarn

Dispatcher

作用:提供一個REST接口來讓我們提交需要執行的應用。一旦一個應用提交執行,Dispatcher會啟動一個JobManager,并將應用轉交給他。
Dispatcher還會啟動一個webUI來提供有關作業執行信息
注意:某些應用的提交執行的方式,有可能用不到Dispatcher
Task
一個完整的處理階段(如map階段),由多個相同功能的SubTask組成?
SubTask
Task的并行實例,是實際執行的最小單元。例如設置并行度為3的map操作會產生3個SubTask?
slot:
TaskManager中的資源格子,每個Slot有獨立的內存分配(如4GB)?

一個Slot可以運行多個SubTask(線程級共享)

不同Job的Slot內存隔離,但共享網絡等資源?

slot sharing:?

默認行為?:同一Job的上下游SubTask可共享Slot(如map和filter擠在一個Slot)?

優勢?:減少數據傳輸延遲(同Slot內直接內存交換)?提高資源利用率(輕量級操作不獨占Slot)

程序架構主要部分

source

本地集合:fromCollection(seq); fromElements();

文件:readTextFile(path);

socket:socketTextStream();

自定義:StreamExecutionEnvironment.addSource(sourceFunction),flink本身提供了許多源,也可以implements SourceFunction方法是為非并行源,或者為并行源 implements ParallelSourceFunction接口,或者extends RichParallelSourceFunction。

RichParallelSourceFunction與ParallelSourceFunction是Flink中用于實現自定義數據源的兩種關鍵接口,主要區別如下:

  1. ?功能擴展性?

    • RichParallelSourceFunction繼承了RichFunction,提供了open()close()方法,支持訪問運行時上下文(如并行度、任務ID等),便于資源管理(如數據庫連接)7。
    • ParallelSourceFunction僅標記接口,無額外方法,需自行實現資源管理邏輯2。
  2. ?并行度支持?

    • 兩者均支持并行執行(通過實現ParallelSourceFunction標記接口)26。
    • 但RichParallelSourceFunction通過運行時上下文可動態分配數據分片(如MySQL分頁查詢)7。
  3. ?狀態管理?

    • RichParallelSourceFunction可結合檢查點機制實現狀態一致性,適合有狀態數據源(如偏移量記錄)5。
    • ParallelSourceFunction需自行處理狀態持久化4。
  4. ?典型應用場景?

    • RichParallelSourceFunction適用于需要復雜初始化或狀態管理的場景(如連接外部系統)
    • ParallelSourceFunction適合簡單無狀態數據源(如內存集合)3。
  5. ?實現復雜度?

    • RichParallelSourceFunction需實現更多生命周期方法,但開發更規范6。
    • ParallelSourceFunction更輕量,但靈活性較低2。

總結:優先選擇RichParallelSourceFunction以獲得更完善的開發支持,僅在無狀態且無需資源管理時考慮ParallelSourceFunction

  • 需要訪問運行時上下文(如獲取并行任務ID)25
  • 需管理資源(如數據庫連接、文件句柄)310
  • 需結合檢查點機制實現狀態一致性(如Kafka偏移量記錄)58
  • 典型場景:MySQL分頁查詢、Kafka消費者、分布式日志采集

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.configuration.Configuration
import java.sql.{Connection, DriverManager, ResultSet}class MySQLRichParallelSource extends RichParallelSourceFunction[String] {private var connection: Connection = _private var isRunning = trueoverride def open(parameters: Configuration): Unit = {// 初始化數據庫連接connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "user", "password")}override def run(ctx: SourceFunction.SourceContext[String]): Unit = {val stmt = connection.createStatement()val rs = stmt.executeQuery("SELECT * FROM sensor_data")while (isRunning && rs.next()) {ctx.collect(rs.getString("value")) // 發射數據}}override def cancel(): Unit = isRunning = falseoverride def close(): Unit = {if (connection != null) connection.close()}
}
  • 簡單無狀態數據生成(如內存集合、隨機數流)914
  • 無需資源初始化/清理的并行任務115
  • 典型場景:模擬傳感器數據、內存集合并行讀取

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import scala.util.Randomclass RandomParallelSource extends ParallelSourceFunction[Int] {private var isRunning = trueoverride def run(ctx: SourceFunction.SourceContext[Int]): Unit = {val rand = new Random()while (isRunning) {ctx.collect(rand.nextInt(100)) // 生成0-100隨機數Thread.sleep(500)}}override def cancel(): Unit = isRunning = false
}

transformation

  1. ?Map?
    功能:對數據流中的每個元素進行一對一轉換14

val dataStream: DataStream[Int] = env.fromElements(1, 2, 3) val mappedStream = dataStream.map(_ * 2) // 輸出2,4,6

  1. ?FlatMap?
    功能:將每個元素轉換為0個、1個或多個輸出元素17

val words = env.fromElements("hello world", "flink streaming") val splitWords = words.flatMap(_.split(" ")) // 輸出各單詞

  1. ?Filter?
    功能:根據條件過濾數據元素24

val numbers = env.fromElements(1, 2, 3, 4) val evens = numbers.filter(_ % 2 == 0) // 輸出2,4

  1. ?KeyBy?
    功能:按指定key對數據進行分區25

case class Sensor(id: String, temp: Double) val sensors = env.fromElements(Sensor("s1", 35.2), Sensor("s2", 28.5)) val keyedStream = sensors.keyBy(_.id)

  1. ?Reduce?
    功能:對分組數據流進行聚合操作59

keyedStream.reduce((s1, s2) => Sensor(s1.id, s1.temp + s2.temp)) // 相同id的溫度累加

  1. ?Aggregations?
    功能:內置聚合函數(sum/min/max等)25

keyedStream.sum("temp") // 按key求溫度總和

  1. ?Window?
    功能:在數據流上定義時間或計數窗口58

sensors.keyBy(_.id) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum("temp")

  1. ?Union?
    功能:合并多個同類型數據流48

val stream1 = env.fromElements(1, 2) val stream2 = env.fromElements(3, 4) val merged = stream1.union(stream2) // 輸出1,2,3,4

  1. ?Join?
    功能:基于key連接兩個數據流89

val streamA = env.fromElements(("a", 1), ("b", 2)) val streamB = env.fromElements(("a", 3), ("b", 4)) streamA.join(streamB) .where(_._1).equalTo(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply((a,b) => (a._1, a._2 + b._2)) // 輸出("a",4),("b",6)

  1. ?CoFlatMap?
    功能:連接兩個流并共享狀態8

val controlStream = env.fromElements("HIGH", "LOW") val dataStream = env.fromElements(1.2, 3.4, 5.6) controlStream.connect(dataStream) .flatMap( (ctrl: String, out: Collector[Double]) => {}, (value: Double, out: Collector[Double]) => { if (currentThreshold == "HIGH") out.collect(value) } )

  1. ?ProcessFunction?
    功能:提供對時間和狀態的底層訪問

class TempAlertFunction extends ProcessFunction[Sensor, String] { override def processElement( sensor: Sensor, ctx: ProcessFunction[Sensor, String]#Context, out: Collector[String]): Unit = { if (sensor.temp > 100) { out.collect(s"Alert! ${sensor.id} temp=${sensor.temp}") } } }

  1. ?Iterate?
    功能:創建迭代流處理循環

val numbers = env.fromElements(1, 2, 3, 4) val iterated = numbers.iterate( iteration => { val minusOne = iteration.map(_ - 1) val stillGreaterThanZero = minusOne.filter(_ > 0) val lessThanZero = minusOne.filter(_ <= 0) (stillGreaterThanZero, lessThanZero) } )

  1. ?Window Apply?
    功能:對窗口數據應用自定義函數

sensors.keyBy(_.id) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply { (key, window, vals, out: Collector[String]) => out.collect(s"$key: ${vals.map(_.temp).sum}") }

  1. ?Side Output?
    功能:將數據分流到側輸出流

val outputTag = OutputTag[String]("side-output") val mainStream = numbers.process(new ProcessFunction[Int, Int] { override def processElement( value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { if (value % 2 != 0) { ctx.output(outputTag, s"Odd: $value") } out.collect(value) } }) val sideStream = mainStream.getSideOutput(outputTag)

  1. ?Broadcast?
    功能:將流廣播到所有并行任務8

val ruleStream = env.fromElements("rule1", "rule2").broadcast val dataStream = env.fromElements(1, 2, 3) dataStream.connect(ruleStream) .process(new BroadcastProcessFunction[Int, String, String] { override def processElement( value: Int, ctx: BroadcastProcessFunction[Int, String, String]#ReadOnlyContext, out: Collector[String]): Unit = { val rules = ctx.getBroadcastState(...) // 使用廣播規則處理數據 } })

完整項目實現示例:


import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collectorobject FlinkOperatorsDemo {case class SensorReading(id: String, timestamp: Long, temperature: Double)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 數據源val sensorData = env.fromElements(SensorReading("sensor1", 1L, 35.6),SensorReading("sensor2", 2L, 28.3),SensorReading("sensor1", 3L, 37.2))// 2. 算子演示val processed = sensorData.filter(_.temperature > 30)  // Filter.map(r => (r.id, r.temperature))  // Map.keyBy(_._1)  // KeyBy.timeWindow(Time.seconds(5))  // Window.reduce((r1, r2) => (r1._1, r1._2 + r2._2))  // Reduceprocessed.print()env.execute("Flink Operators Demo")}
}

代碼說明:

  1. 包含Flink核心算子鏈式調用演示
  2. 使用case class定義數據類型
  3. 展示從數據源到窗口聚合的完整流程

其他重要算子補充說明:

  • ?Fold?:已棄用,推薦使用Reduce
  • ?WindowAll?:非分組全局窗口
  • ?Project?:選擇部分字段(僅DataSet API)
  • ?Cross?:兩個流的笛卡爾積
  • ?CoGroup?:分組連接兩個數據集

sink

文件: writeAsText(path)

HDFS:?


import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import java.time.ZoneIdobject FlinkToHdfs {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 創建測試數據源val dataStream = env.fromElements("2025-08-20 15:00:00,user1,click","2025-08-20 15:01:00,user2,purchase","2025-08-20 15:02:00,user3,view")// 2. 配置HDFS輸出路徑val hdfsPath = "hdfs://namenode:9000/flink/output"// 3. 創建StreamingFileSinkval sink = StreamingFileSink.forRowFormat(new Path(hdfsPath),new SimpleStringEncoder[String]("UTF-8")).withBucketAssigner(new DateTimeBucketAssigner[String]("yyyy-MM-dd--HH",ZoneId.of("UTC"))).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(60 * 60 * 1000) // 1小時滾動一次.withInactivityInterval(15 * 60 * 1000) // 15分鐘不活動則滾動.withMaxPartSize(128 * 1024 * 1024) // 128MB.build()).build()// 4. 添加sink到數據流dataStream.addSink(sink)env.execute("Flink to HDFS Example")}
}

kafka:?FlinkKafkaProducer

redis:RedisSink

hbase:?

自定義:SinkToMySql extends RichSinkFunction

Chain

Operator Chain

數據傳輸

Flink在處理任務間的數據傳輸過程中,采用了緩沖區機制。

Yarn部署機制

(1)啟動一個YARN?yarn-session.sh -h????????
/export/servers/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
# -n 表示申請2個容器,這里指的就是多少個taskmanager
# -s 表示每個TaskManagerslots數量
# -tm 表示每個TaskManager的內存大小
# -d 表示以后臺程序方式運行
如果不想讓Flink YARN客戶端始終運行,那么也可以啟動分離的 YARN會話。該參數被稱為-d--detached。在這種情況下,Flink YARN客戶端只會將Flink提交給集群,然后關閉它自己
(2)直接在YARN上提交運行Flink作業(Run a Flink job on YARN)

bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /export/servers/flink/examples/batch/WordCount.jar

# -m jobmanager的地址

# -yn 表示TaskManager的個數
停止yarn-cluster
yarn application -kill application_1527077715040_0003
rm -rf /tmp/.yarn-properties-root

窗口

通俗講,Window是用來對一個無限的流設置一個有限的集合,從而在有界的數據集上進行操作的一種機制。流上的集合由Window來劃定范圍,比如計算過去10分鐘或者最后50個元素的和
分為:
時間窗口(TimeWindow)
數量窗口(CountWindow)

滾動窗口

時間驅動:keyedStream.timeWindow(Time.seconds(10))
數量驅動:keyedStream.countWindow(3)

滑動窗口

時間驅動:keyedStream.timeWindow(Time.seconds(10), Time.seconds(5))
數量驅動:keyedStream.countWindow(3, 2)

會話窗口

keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

全局窗口

EventTime[事件時間]

事件發生的時間,例如:點擊網站上的某個鏈接的時間,每一條日志都會記錄自己的生成時間。如果以EventTime為基準來定義時間窗口那將形成EventTimeWindow,要求消息本身就應該攜帶EventTime。如果要使用事件時間,要設置:env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //設置使用事件時間

IngestionTime[攝入時間]

數據進入Flink的時間,如某個Flink節點的source operator接收到數據的時間,例如:某個source消費到kafka中的數據。如果以IngesingtTime為基準來定義時間窗口那將形成IngestingTimeWindow,sourcesystemTime為準

ProcessingTime[處理時間]

某個Flink節點執行某個operation的時間,例如:timeWindow處理數據時的系統時間,默認的時間屬性就是Processing Time如果以ProcessingTime基準來定義時間窗口那將形成ProcessingTimeWindow,以operator的systemTime為準

水印(水位線)

水位線是為了控制這個事件能不能被處理,如果在水位線允許的事件范圍之外,肯定是不會被當前窗口處理的。emitWatermark(當前最大時間 - 亂序允許時間)這個方法200ms周期執行,就是告訴框架只能提交這個時間及之后的時間的事件過來。?當水位線超過窗口結束時間時,對應的窗口才會被觸發計算。這種機制確保了:即使數據亂序到達,只要在允許的時間范圍內,仍然能被正確處理;超過容忍時間的遲到數據會被丟棄。

窗口的創建是根據事件時間來的,是獨立的任務來創建,框架會根據事件時間,將當前事件歸到那個窗口。水位線是觸發窗口計算,跟窗口創建沒關系。

例如:窗口[10:00:00 - 10:00:10),當窗口接收到第一條10:00:10事件,就會創建下一個[10:00:10- 10:00:20),但是這個時候因為水位線,上一個窗口還沒被觸發計算,只有當水位線時間 >= 10:00:10時,才會被觸發計算。

水位線生成器: 事件處理方法,給下游發送水位時間方法(200ms執行一次)

事件時間提取器:提取事件時間

new WatermarkStrategy[OrderDetail] {override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[OrderDetail] = {new WatermarkGenerator[OrderDetail] {//每來一條數據,都會調用onEventvar maxTimestamp = Long.MinValuevar maxOutOfOrderness = 500L;override def onEvent(event: OrderDetail, eventTimestamp: Long, output: WatermarkOutput): Unit = {maxTimestamp = Math.max(maxTimestamp, format.parse(event.orderCreateTime).getTime)}override def onPeriodicEmit(output: WatermarkOutput): Unit = {output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness))}}}//1.老版本  2. lambda}.withTimestampAssigner(new SerializableTimestampAssigner[OrderDetail] {override def extractTimestamp(element: OrderDetail, recordTimestamp: Long): Long = {format.parse(element.orderCreateTime).getTime}}
  1. 整體流程?:

    • 數據流中每個事件會先經過extractTimestamp提取時間戳
    • 然后觸發onEvent方法處理
    • 最后周期性調用onPeriodicEmit生成水位線
  2. ?方法調用時機詳解?:

    (1)?extractTimestamp方法:

    • ?調用時機?:每條數據到達時立即調用
    • ?作用?:就像快遞員拆包裹看發貨日期,從數據中提取事件時間戳
    • ?示例?:MyEvent("A", 1630000000000)會提取出1630000000000

    (2)?onEvent方法:

    • ?調用時機?:緊接在extractTimestamp之后
    • ?作用?:像記錄最高水位的標尺,比較并保存當前最大時間戳
    • ?示例?:當收到時間戳1000的事件,會更新currentMaxTimestamp = 1000

    (3)?onPeriodicEmit方法:

    • ?調用時機?:默認每200ms自動調用一次(類似心跳機制)
    • ?作用?:發出水位線 = 當前最大時間戳 - 允許亂序時間
    • ?示例?:如果currentMaxTimestamp=5000maxOutOfOrderness=3000,則發出水位線2000
  3. ?參數說明?:

    • maxOutOfOrderness:相當于"寬容度",設置允許遲到數據的最長時間(如設為3000表示允許3秒內的遲到數據)
    • currentMaxTimestamp:動態變化的變量,始終記錄已見數據的最大時間戳
  4. ?工作類比?:
    想象老師在批改時間亂序提交的作業:

    • extractTimestamp:查看每份作業的提交日期
    • onEvent:記錄目前看到的最晚提交日期
    • onPeriodicEmit:每隔一段時間宣布:"現在開始只接受比(最晚日期-3天)更早的作業"
遲到數據處理方法:允許延遲,側輸出流,自定義觸發器,全局窗口+延遲合并

遲到數據的處理機制?

1. ?允許延遲(Allowed Lateness)?

這是最常用的方式,?擴展窗口等待時間?:

javaCopy Code

.window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(5)) // 允許5秒延遲

  • ?運作方式?:
    • 水位線到達?窗口結束時間?時觸發?初次計算?
    • 在?[窗口結束時間, 窗口結束時間+允許延遲]?區間內:
      • 新到達的屬于該窗口的數據?重新觸發窗口計算?
      • 窗口狀態持續更新
  • ?示例?:

    mermaidCopy Code

    timeline title 10秒窗口 + 5秒允許延遲 section 事件時間 00:13 : 水位線到達00:10 → 觸發初次計算 00:14 : 遲到數據(事件時間00:09)到達 → 重新觸發計算 00:15 : 遲到數據(事件時間00:08)到達 → 再次觸發計算 00:16 : 水位線到達00:11 → 窗口正式關閉

2. ?側輸出流(Side Output)?

捕獲?超過允許延遲的遲到數據?:

javaCopy Code

OutputTag<Event> lateTag = new OutputTag<>("late-data"); DataStream<Event> main = stream .keyBy(...) .window(...) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateTag) // 捕獲超時遲到數據 .process(...); // 獲取遲到數據流 DataStream<Event> lateStream = main.getSideOutput(lateTag);

  • ?應用場景?:
    • 關鍵業務數據補全(如金融交易)
    • 延遲監控和告警
    • 數據質量分析
3. ?觸發器(Trigger)自定義?

精細控制計算觸發邏輯:

javaCopy Code

.trigger(new ContinuousEventTimeTrigger(20) { @Override public TriggerResult onElement(...) { // 自定義遲到數據處理邏輯 if (isLateData(element)) { updateWindowState(element); // 手動更新窗口狀態 return TriggerResult.FIRE; // 立即觸發計算 } return super.onElement(...); } })

  • ?適用場景?:
    • 特殊業務規則(如證券交易中的尾單處理)
    • 動態延遲策略(基于數據量調整)
4. ?全局窗口(GlobalWindow)+ 延遲合并?

處理?無界遲到數據?:

javaCopy Code

.window(GlobalWindows.create()) .trigger(PurgingTrigger.of( new LateDataFireTrigger() // 自定義處理所有遲到數據 ))


?遲到數據處理流程?

mermaidCopy Code

flowchart TD A[數據到達] --> B{事件時間是否在<br/>當前水位線之前?} B -->|是| C[遲到數據] B -->|否| D[正常處理] C --> E{是否在允許延遲內?} E -->|是| F[更新窗口狀態并重新觸發計算] E -->|否| G{是否配置側輸出?} G -->|是| H[輸出到側輸出流] G -->|否| I[直接丟棄]


?實際案例:電商訂單統計?

?場景?:統計每10分鐘的訂單總額,容忍3秒延遲,超時訂單單獨記錄

javaCopy Code

// 定義遲到標簽 OutputTag<Order> lateOrders = new OutputTag<>("late-orders"); SingleOutputStreamOperator<Double> result = orders .keyBy(Order::getCategory) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateOrders) .aggregate(new OrderSumAggregator()); // 處理主結果 result.print(); // 處理超時訂單 result.getSideOutput(lateOrders) .map(order -> "LATE_ORDER: " + order) .addSink(new LateOrderSink());

?數據處理效果?:

數據事件時間處理水位線處理方式
12:15:0712:15:10進入正常窗口 [12:10,12:20)
12:19:5812:20:05允許延遲期內 → 更新窗口結果
12:20:0212:20:08允許延遲期內 → 再次更新結果
12:20:0512:20:13超過延遲期 → 輸出到側輸出流

?最佳實踐建議?

  1. ?允許延遲設置?:

    • 一般設為網絡延遲峰值的 ?2-3倍?(如Kafka延遲監控的P99值)
    • 不宜過大(避免狀態膨脹)
  2. ?監控配置?:

    javaCopy Code

    // 監控遲到數據比例 lateDataStream .map(_ => 1L).windowAll(TumblingProcessingTimeWindows.of(Time.hours(1))) .sum(0) .addSink(new LateDataMetricSink());

  3. ?狀態清理優化?:

    javaCopy Code

    .withLateFiredPurgingTrigger() // 及時清理已完成窗口狀態

  4. ?動態延遲策略?(高級):

    javaCopy Code

    .allowedLateness(ctx => { if (ctx.element().isPriority()) return Time.minutes(5); return Time.seconds(30); })

💡 ?核心原則?:水位線決定?何時觸發計算?,允許延遲決定?等待遲到數據多久?,側輸出決定?超時數據的歸宿?。三者協同實現完整的遲到數據處理機制

狀態

Flink中的狀態(State)是指算子任務在運行過程中需要記住的信息,這些信息可以用來實現復雜的有狀態計算。狀態是Flink實現精確一次(exactly-once)語義和故障恢復的基礎。

keyed state 鍵控狀態 只能用于keyedStream上

ValueState,MapState,ReducingState,AggregatingState


import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._object KeyedStateExamples {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 示例數據流val input = env.fromElements(("user1", 10),("user2", 5),("user1", 20),("user3", 8),("user2", 15))// 1. ValueState示例val valueStateResult = input.keyBy(_._1).map(new ValueStateExample).print("ValueState結果")// 2. ListState示例val listStateResult = input.keyBy(_._1).map(new ListStateExample).print("ListState結果")// 3. MapState示例val mapStateResult = input.keyBy(_._1).map(new MapStateExample).print("MapState結果")// 4. ReducingState示例val reducingStateResult = input.keyBy(_._1).map(new ReducingStateExample).print("ReducingState結果")// 5. AggregatingState示例val aggregatingStateResult = input.keyBy(_._1).map(new AggregatingStateExample).print("AggregatingState結果")env.execute("Keyed State Examples")}
}// 1. ValueState實現
class ValueStateExample extends RichMapFunction[(String, Int), (String, Int)] {private var state: ValueState[Int] = _override def open(parameters: Configuration): Unit = {val stateDesc = new ValueStateDescriptor[Int]("valueState", TypeInformation.of(classOf[Int]))state = getRuntimeContext.getState(stateDesc)}override def map(value: (String, Int)): (String, Int) = {val current = Option(state.value()).getOrElse(0)val newValue = current + value._2state.update(newValue)(value._1, newValue)}
}// 2. ListState實現
class ListStateExample extends RichMapFunction[(String, Int), (String, List[Int])] {private var state: ListState[Int] = _override def open(parameters: Configuration): Unit = {val stateDesc = new ListStateDescriptor[Int]("listState", TypeInformation.of(classOf[Int]))state = getRuntimeContext.getListState(stateDesc)}override def map(value: (String, Int)): (String, List[Int]) = {state.add(value._2)(value._1, state.get().asScala.toList)}
}// 3. MapState實現
class MapStateExample extends RichMapFunction[(String, Int), (String, Map[String, Int])] {private var state: MapState[String, Int] = _override def open(parameters: Configuration): Unit = {val stateDesc = new MapStateDescriptor[String, Int]("mapState", TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int]))state = getRuntimeContext.getMapState(stateDesc)}override def map(value: (String, Int)): (String, Map[String, Int]) = {state.put(value._1, value._2)(value._1, state.entries().asScala.map(e => e.getKey -> e.getValue).toMap)}
}// 4. ReducingState實現
class ReducingStateExample extends RichMapFunction[(String, Int), (String, Int)] {private var state: ReducingState[Int] = _override def open(parameters: Configuration): Unit = {val stateDesc = new ReducingStateDescriptor[Int]("reducingState",(a: Int, b: Int) => a + b,  // ReduceFunctionTypeInformation.of(classOf[Int]))state = getRuntimeContext.getReducingState(stateDesc)}override def map(value: (String, Int)): (String, Int) = {state.add(value._2)(value._1, state.get())}
}// 5. AggregatingState實現
class AggregatingStateExample extends RichMapFunction[(String, Int), (String, Double)] {private var state: AggregatingState[Int, Double] = _override def open(parameters: Configuration): Unit = {val stateDesc = new AggregatingStateDescriptor[Int, (Int, Int), Double]("aggregatingState",new AverageAggregate,TypeInformation.of(classOf[(Int, Int)]),TypeInformation.of(classOf[Double]))state = getRuntimeContext.getAggregatingState(stateDesc)}override def map(value: (String, Int)): (String, Double) = {state.add(value._2)(value._1, state.get())}
}// 用于AggregatingState的聚合函數
class AverageAggregate extends AggregateFunction[Int, (Int, Int), Double] {override def createAccumulator(): (Int, Int) = (0, 0)override def add(value: Int, accumulator: (Int, Int)): (Int, Int) = {(accumulator._1 + value, accumulator._2 + 1)}override def getResult(accumulator: (Int, Int)): Double = {accumulator._1.toDouble / accumulator._2}override def merge(a: (Int, Int), b: (Int, Int)): (Int, Int) = {(a._1 + b._1, a._2 + b._2)}
}

算子狀態


import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.SourceFunctionclass BufferingSource(threshold: Int) extends SourceFunction[String] with CheckpointedFunction {private var isRunning = trueprivate var bufferedElements = List[String]()private var checkpointedState: ListState[String] = _override def run(ctx: SourceFunction.SourceContext[String]): Unit = {while (isRunning) {bufferedElements ::= s"element-${System.currentTimeMillis()}"if (bufferedElements.size >= threshold) {bufferedElements.reverse.foreach(ctx.collect)bufferedElements = Nil}Thread.sleep(100)}}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.update(bufferedElements.asJava)}override def initializeState(context: FunctionInitializationContext): Unit = {checkpointedState = context.getOperatorStateStore.getListState(new ListStateDescriptor[String]("buffered-elements", classOf[String]))if (context.isRestored) {bufferedElements = checkpointedState.get().asScala.toList}}
}

class UnionListSource extends SourceFunction[Int] with CheckpointedFunction {private var checkpointedState: ListState[Int] = _private var toEmit = (1 to 100).toListoverride def run(ctx: SourceFunction.SourceContext[Int]): Unit = {toEmit.foreach { num =>ctx.collect(num)Thread.sleep(10)}}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.update(toEmit.asJava)}override def initializeState(context: FunctionInitializationContext): Unit = {checkpointedState = context.getOperatorStateStore.getUnionListState(new ListStateDescriptor[Int]("union-state", classOf[Int]))if (context.isRestored) {toEmit = checkpointedState.get().asScala.toList}}
}

import org.apache.flink.api.common.state.{MapStateDescriptor, BroadcastState}
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collectorclass DynamicFilterFunction extends BroadcastProcessFunction[String, String, String] {private final val ruleDescriptor = new MapStateDescriptor[String, String]("rules", classOf[String], classOf[String])override def processElement(value: String,ctx: ReadOnlyContext,out: Collector[String]): Unit = {val rule = ctx.getBroadcastState(ruleDescriptor).get("filter")if (rule == null || value.contains(rule)) {out.collect(value)}}override def processBroadcastElement(rule: String,ctx: Context,out: Collector[String]): Unit = {ctx.getBroadcastState(ruleDescriptor).put("filter", rule)}
}

廣播狀態

狀態存儲

狀態生存時間

CEP

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/919671.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/919671.shtml
英文地址,請注明出處:http://en.pswp.cn/news/919671.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Git 2.15.0 64位安裝步驟Windows詳細教程從下載到驗證(附安裝包下載)

一、下載后雙擊運行 安裝包下載&#xff1a;https://pan.quark.cn/s/7200b32a1ecf&#xff0c;找到下載好的文件&#xff1a;?Git-2.15.0-64-bit.exe?雙擊這個文件&#xff0c;就會彈出安裝向導窗口&#xff0c;點 ??“Next”&#xff08;下一步&#xff09;?? 二、選擇…

在職老D滲透日記day23:sqli-labs靶場通關(第29關-31關)http參數過濾

5.29.第29關 http參數過濾 閉合5.29.1.手動注入&#xff08;1&#xff09;判斷注入類型、注入點閉合&#xff08;2&#xff09;有回顯&#xff0c;優先用聯合查詢注入&#xff0c;判讀字段數?id1&id2 order by 3 -- ?id1&id2 order by 4 --&#xff08;3&#xff09;…

Spring Boot整合Amazon SNS實戰:郵件訂閱通知系統開發

Spring Boot整合Amazon SNS實戰引言配置服務總結新用戶可獲得高達 200 美元的服務抵扣金 亞馬遜云科技新用戶可以免費使用亞馬遜云科技免費套餐&#xff08;Amazon Free Tier&#xff09;。注冊即可獲得 100 美元的服務抵扣金&#xff0c;在探索關鍵亞馬遜云科技服務時可以再額…

LeetCode_動態規劃1

動態規劃1.動態規劃總結1.1 01背1.1.1 二維數組1.1.2 一維數組1.2 完全背包2.斐波那契數(力扣509)3.爬樓梯(力扣70)4.使用最小花費爬樓梯(力扣746)5.不同路徑(力扣62)6.不同路徑 II(力扣63)7.整數拆分(力扣343)8.不同的二叉搜索樹(力扣96)9.分割等和子集(力扣416)10.最后一塊石…

【STM32】HAL庫中的實現(九):SPI(串行外設接口)

SPI 接口通信原理 SPI&#xff08;Serial Peripheral Interface&#xff09;是全雙工主從通信協議&#xff0c;特點是&#xff1a; 信號線功能SCK串行時鐘MOSI主設備輸出&#xff0c;從設備輸入MISO主設備輸入&#xff0c;從設備輸出CS&#xff08;NSS&#xff09;片選信號&am…

Git常用操作大全(附git操作命令)

Git常用操作大全 一、基礎配置 1.1 設置用戶名和郵箱 git config --global user.name "你的名字" git config --global user.email "你的郵箱"1.2 查看配置 git config --list二、倉庫管理 2.1 初始化本地倉庫 git init2.2 克隆遠程倉庫 git clone <倉庫…

詳解flink table api基礎(三)

文章目錄1.使用flink的原因&#xff1a;2. Flink支持兩種模式&#xff1a;3. flink table api工作原理&#xff1a;4. Flink table api 使用5. select語句&flink table api&#xff1a;6. 使用flink table api 創建table7. 使用flink table api 寫流式數據輸出到表或sink8.…

Vue2+Vue3前端開發_Day5

參考課程: 【黑馬程序員 Vue2Vue3基礎入門到實戰項目】 [https://www.bilibili.com/video/BV1HV4y1a7n4] ZZHow(ZZHow1024) 自定義指令 基本語法&#xff08;全局 & 局部注冊&#xff09; 介紹&#xff1a;自己定義的指令&#xff0c;可以封裝一些 DOM 操作&#xff0c…

機器學習--決策樹2

目錄 第一代裁判&#xff1a;ID3 與信息增益的 “偏愛” 第二代裁判&#xff1a;C4.5 用 “增益率” 找平衡 第三代裁判&#xff1a;CART 的 “基尼指數” 新思路 遇到連續值&#xff1f;先 “砍幾刀” 再說 給決策樹 “減肥”&#xff1a;剪枝的學問 動手試試&#xff1…

yggjs_react使用教程 v0.1.1

yggjs_react是一個用于快速創建React項目的工具&#xff0c;它集成了Vite、TypeScript、Zustand和React Router等現代前端技術棧&#xff0c;幫助開發者快速搭建高質量的React應用。 快速入門 快速入門部分將指導您如何安裝yggjs_react工具、創建新項目并啟動開發服務器。 安…

vulhub可用的docker源

這一塊不太容易找&#xff0c;我試了好幾個源&#xff0c;下面是20250820測試可用源 編輯方法sudo mkdir -p /etc/docker sudo vim /etc/docker/daemon.json 配置內容 [1] {"registry-mirrors" : ["https://docker.registry.cyou", "https://docker-…

基于YOLOv8-SEAttention與LLMs融合的農作物害蟲智能診斷與防控決策系統

1. 引言 1.1 研究背景與意義 農作物蟲害是制約農業產量與質量的重要因素。據FAO報告&#xff0c;全球每年因病蟲害造成的糧食損失高達 20%–40%。傳統人工巡查與經驗診斷具有時效性差、成本高與專業人才不足等缺陷。近年來&#xff0c;計算機視覺特別是目標檢測技術在農業檢測…

從零開始構建GraphRAG紅樓夢知識圖譜問答項目(三)

文章結尾有CSDN官方提供的學長的聯系方式&#xff01;&#xff01; 歡迎關注B站從零開始構建一個基于GraphRAG的紅樓夢項目 第三集01 搭建后端服務 創建一個python文件server.py 完整源碼放到文章最后了。 1.1 graphrag 相關導入 # GraphRAG 相關導入 from graphrag.query.cont…

S32K328(Arm Cortex-M7)適配CmBacktrace錯誤追蹤

CmBacktrace 相當于重寫了hard_fault函數&#xff0c;在hard_fault函數里面去分析SCB寄存器的信息和堆棧信息&#xff0c;然后把這些信息打印出來(或者寫到flash)&#xff1b;通過使用串口輸出產生hard_fault的堆棧信息&#xff0c;然后利用addr2line工具反推出具體的代碼執行函…

AI研究引擎的簡單技術實現步驟

產品愿景與核心功能 1.1 產品使命 “洞見 Weaver”是一個全棧AI Web應用,旨在將用戶的復雜研究問題,通過AI驅動的動態思維導圖和結構化報告,轉化為一次沉浸式的、可追溯的視覺探索之旅。我們的使命是,將AI復雜的推理過程透明化,將人類的探索直覺與AI的分析能力無縫結合,…

open webui源碼分析5-Tools

本文從最簡單的時間工具入手&#xff0c;分析Tools相關的代碼。一、安裝工具git clone https://github.com/open-webui/openapi-servers cd openapi-servers# 進入時間工具目錄 cd servers/timepip install -r requirements.txt# 啟動服務 uvicorn main:app --host 0.0.0.0 --r…

windows下通過vscode遠程調試linux c/cpp程序配置

windows下通過vscode遠程調試linux c/cpp程序配置vscode插件配置linux依賴工具安裝launch.json配置vscode插件配置 CodeLLDB插件需要提前下載&#xff1a; linux依賴工具安裝 sudo apt update sudo apt install cmake clangdlaunch.json配置 {"version": "0…

IDEA報JDK版本問題

解決思路&#xff1a;1.找到配置jdk的IDEA配置位置settings和project structure2.先配置setting3.再修改項目結構

VirtualBox 安裝 Ubuntu Server 系統及 Ubuntu 初始配置

文章目錄簡介VirtualBoxUbuntu Server 簡介Ubuntu Server 下載安裝 Ubuntu Server首選項配置導入系統鏡像配置系統用戶配置內存 CPU 虛擬硬盤開始安裝 Ubuntu安裝完成登錄系統配置網絡Ubuntu 系統配置安裝常用工具安裝 SSH設置 root 密碼配置 IP 地址&#xff08;推薦自動分配I…

Milvus 可觀測性最佳實踐

Milvus 介紹 Milvus 是一個開源的向量數據庫&#xff0c;專為處理大規模、高維度向量數據而設計&#xff0c;廣泛應用于人工智能、推薦系統、圖像檢索、自然語言處理等場景。它支持億級向量的高效存儲與快速檢索&#xff0c;內置多種相似度搜索算法&#xff08;如 HNSW、IVF、…