Flink之狀態TTL機制內容詳解

1 狀態TTL機制

狀態的 TTL機制就是Flink提供的自動化刪除狀態中的過期數據,配置 TTL的 API可以做到對狀態中的數據進行冷熱數據分離,將熱數據一直保存在狀態存儲器中,將冷數據進行定期刪除.
1.1 API簡介

TTL常用API如下:

API注解
setTtl(Time.seconds(…))配置過期時長,當狀態中的數據到達這個時長則判定為過期數據,在new StateTtlConfig.Builder(Time.seconds(...))也可以配置,如果同時調用setTtl()方法則進行覆蓋
updateTtlOnCreateAndWrite()當該條數據在State中插入或者更新的時候,刷新計時,可用于冷熱數據分離
updateTtlOnReadAndWrite()讀或寫都刷新該數據的TTL計時,可用于冷熱數據分離
setStateVisibility(…)用于控制狀態中過期數據的可見性,當方法中設置StateTtlConfig.StateVisibility.NeverReturnExpired)時則不可見過期未被清理的數據,如果設置StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp則可見過期未被清理的數據.setStateVisibility(...)由異步線程執行,默認是NeverReturnExpired.
setTtlTimeCharacteristic(…)指定TTL的時間語義,默認是event time,可以配置process time,將StateTtlConfig.TtlTimeCharacteristic.ProcessingTime填入方法的參數即可.
disableCleanupInBackground()禁用后臺清理過期數據,使用后則不會清理過期數據
cleanupIncrementally(… , …)針對本地狀態后端,即HashMapStateBackend. 增量清理, 每當訪問狀態數據時都會驅動一次過期檢查,清除其中部分數據, 這也是HashMapBackend狀態后端唯一能真正清理過期數據的方法,cleanupIncrementally(... , ...)方法中需要傳入兩個參數int cleanupSizeboolean runCleanupForEveryRecord,cleanupSize是指key的數據量,runCleanupForEveryRecord是指是否清理所有過期數據,如果runCleanupForEveryRecord設置的值為true此時cleanupSize就會失效,但是狀態數據較多時會嚴重影響時效性.
cleanupFullSnapshot()針對快照數據,即checkpoint快照. 全量清理, 在做快照時將所有的過期數據進行清理保證快照中沒有過期數據,但是狀態后端中的過期數據沒有進行清理.
cleanupInRocksdbCompactFilter(xxx)針對于RocksdbStateBackend. 只生效于RocksDB狀態后端,通過Flink將CompactFilter傳給RocksDB,在RocksDBCompact過程中根據過濾條件將過期數據刪除,傳入的參數為過期時間.
1.2 代碼模板
  • 代碼

    class StateMapFunc2 implements MapFunction<String, String>, CheckpointedFunction {private ListState<String> listState;@Overridepublic String map(String s) throws Exception {// 將數據添加到狀態存儲器中,split[0]為用戶IDlistState.add(s);// 獲取狀態存儲器中的數據Iterable<String> iter = listState.get();StringBuffer buffer = new StringBuffer();for (String str : iter) {buffer.append(str);}// 將數據添加到狀態存儲中return buffer.toString();}@Overridepublic void snapshotState(FunctionSnapshotContext ctx) throws Exception {}@Overridepublic void initializeState(FunctionInitializationContext ctx) throws Exception {OperatorStateStore operatorStateStore = ctx.getOperatorStateStore();// 配置State TTLStateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(10)) // 設置數據存活時長,當該數據在State中存活時間超過10s時刪除該數據// 這個方法也是設置數據存活時長,和StateTtlConfig.Builder(Time.seconds(10))的作用一樣,可以不用這個方法,如果用了會覆蓋上面設置的時長.setTtl(Time.seconds(10))/*** updateTtlOnCreateAndWrite和updateTtlOnReadAndWrite二選一即可, 這兩個方法的主要作用就是配合setTtl方法將冷熱數據進行分離**/// 當該條數據在State中插入或者更新的時候,刷新計時.updateTtlOnCreateAndWrite()// 讀或寫都刷新該數據的TTL計時.updateTtlOnReadAndWrite()/*** setStateVisibility就是設置狀態的可見性,前面setTtl方法是設置刪除過期數據,刪除過期數據實際上是由另一個異步線程周期性(定時器)的完成,也就是說超過10s的數據不一定會馬上被刪除,但是* 獲取數據的時候底層會將超過存活時間的數據進行判斷過濾,setStateVisibility就是可以設置是否可以查詢到這些過期的數據,NeverReturnExpired和ReturnExpiredIfNotCleanedUp二選一.**/// 不返回過期數據,這個也是默認策略.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// 返回還沒有被清除的過期數據.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)// 指定TTL計時時間語義(默認處理時間).setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)// 禁用后臺清理過期數據.disableCleanupInBackground()/*** 針對本地狀態后端,即HashMapStateBackend* 增量清理, 每當獲取狀態數據時,迭代器都會向前推進。對遍歷的狀態數據進行檢查,并清理過期的數據* 參數1: 設置每次清理的key的數據量(copyOnWriteStateMap中的key的條目數量)* 參數2: 設置是否清理所有條目也就是key對應的數據,如果設置為true則參數1失效,在狀態數據較多時不建議設置為true,會嚴重影響時效性**/.cleanupIncrementally(10, false)/*** 針對快照數據,即checkpoint快照* 全量清理, 在做快照時將所有的過期數據進行清理保證快照中沒有過期數據,但是不會清狀態后端中的過期數據**/.cleanupFullSnapshot()/*** 針對于RocksdbStateBackend* 只生效于RocksDB狀態后端,通過Flink將CompactFilter傳給RocksDB,在RocksDB在Compact過程中根據過濾條件將過期數據刪除,傳入的參數為過期時間(也就是發生Compact時的過濾條件)**/.cleanupInRocksdbCompactFilter(10000).build();// 配置狀態描述,在ListStateDescriptor構造器中聲明數據類型,簡單類型可以使用xxx.class,符合類型需要使用到TypeInformation.of()ListStateDescriptor descriptor = new ListStateDescriptor("MapState", String.class);// 狀態描述器加載TTL配置descriptor.enableTimeToLive(ttlConfig);listState = operatorStateStore.getListState(descriptor);}
    }
    

    代碼中是以Operator State為例,如果是Keyed State則在open方法中配置TTL.

1.3 TTL機制詳解

在代碼模板中有API的使用方式,但是TTL機制不同的方法之間存在互斥或者互不影響的關系.

1.3.1 過期時間設置策略
過期時間設置有兩種方式:
  • new StateTtlConfig.Builder(Time.seconds(10))
  • setTtl(Time.seconds(10))

這兩種方式都是設置過期時間使用的,但是只需要選用其中一種即可,如果在創建StateTtlConfig對象時就設置了過期時間,又在setTtl方法中設置了過期時間,則會對過期時間進行覆蓋,本質上二者都是對同一個變量進行賦值.

  • 源碼

    new StateTtlConfig.Builder(Time.seconds(10))

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 調用Builder時對ttl變量進行了賦值public Builder(@Nonnull Time ttl) {this.ttl = ttl;}// ...
    }
    

    setTtl(Time.seconds(10))

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 這里同樣是對ttl進行了賦值@Nonnullpublic Builder setTtl(@Nonnull Time ttl) {this.ttl = ttl;return this;}// ...
    }
    

    通過源碼可以看出,使用此API時在創建StateTtlConfig對象時給一個過期時間即可,不需要再調用setTtl方法

1.3.2 過期時間刷新策略
過期時間刷新策略有兩種:
  • updateTtlOnCreateAndWrite()
  • updateTtlOnReadAndWrite()

這兩方法就是互斥的,只能生效一個,同樣是因為二者都是對同一個變量進行賦值,就是說在二者同時調用的情況下,誰在后面調用誰就生效,如代碼模板中線調用的updateTtlOnCreateAndWrite()后調用的updateTtlOnReadAndWrite()那么生效的就是updateTtlOnReadAndWrite()策略.

  • 源碼

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 此方法給updateType進行賦值@Nonnullpublic Builder setUpdateType(UpdateType updateType) {this.updateType = updateType;return this;}/** 二者方法體中調用的都是同一個方法setUpdateType*/@Nonnullpublic Builder updateTtlOnCreateAndWrite() {return setUpdateType(UpdateType.OnCreateAndWrite);}@Nonnullpublic Builder updateTtlOnReadAndWrite() {return setUpdateType(UpdateType.OnReadAndWrite);}// ...
    }
    

    源碼可以看出二者調用同一個方法setUpdateType,而setUpdateType方法又是給updateType賦值的一個方法,所以再使用時要根據實際的業務場景選擇updateTtlOnCreateAndWrite()updateTtlOnReadAndWrite()中的一個.

1.3.3 返回過期數據策略
回返過期數據策略有兩種:
  • setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  • setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
  • 源碼

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 此方法給stateVisibility進行賦值@Nonnullpublic Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {this.stateVisibility = stateVisibility;return this;}// 下面兩個方法體中都是調用setStateVisibility方法@Nonnullpublic Builder returnExpiredIfNotCleanedUp() {return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);}@Nonnullpublic Builder neverReturnExpired() {return setStateVisibility(StateVisibility.NeverReturnExpired);}// ...
    }
    

    這二者同樣是互斥的原則,使用選其一即可,即使都調用也是后被調用者生效.

1.3.4 過期數據清除策略
過期數據清除策略有三種:
  • cleanupIncrementally(10, false)
  • cleanupFullSnapshot()
  • cleanupInRocksdbCompactFilter(10000)

這三種過期數據清除策略針對的是不同的場景(本地狀態后端、快照、RocksDB狀態后端),所以三者是可以同時使用的,不會存在同時調用后者會對前者進行覆蓋的問題,在API簡介章節介紹了這種三策略的作用,這里著重介紹cleanupIncrementally策略.

HashMapStateBackend使用的存儲結構是Flink團隊自己開發的一種數據存儲結構copyOnWriteStateMap,說這個存儲結構是因為cleanupIncrementally策略刪除過期數據的操作和這種結構息息相關.

關于copyOnWriteStateMap的結構可以簡單的理解為K,V形式存儲的結構,其中的Key就是使用keyBy時指定的key,如果沒有使用keyBy那么所有數據key都會給一個相同的默認值,其中的Value是指ListStateMapState等,也就是在構建狀態存儲器時候選擇存儲形式,如下圖:
ttl02

在本地狀態后端(HashMapStateBackend)中默認使用的就是cleanupIncrementally清除策略,默認值為cleanupIncrementally(5, false),也就是說只要設置了TTL的過期時間,HashMapStateBackend就會使用cleanupIncrementally策略來清理過期數據,只不過cleanupIncrementally對用戶提供了選擇方式,這里將結合圖解說明cleanupIncrementally如何清除過期數據的.
ttl02

  1. 只要訪問狀態數據就會觸發cleanupIncrementally執行.
  2. 如果用戶沒有設置cleanupIncrementally,TTL會根據cleanupIncrementally(5, false)來刪除過期數據,如果用戶指定了參數則按照用戶定義的參數刪除數據.
  3. 比如現在是cleanupIncrementally(10, false),迭代器會從k1開始,到k10結束,將這10個條目的key中的ListState中的過期數據進行清理.
  4. CopyOnWriteStateMap中的數據存放是無序的,而且Flink在創建CopyOnWriteStateMap時候給的默認大小是128,也就說處理數據中key的數量超過128,否則就算只有一個key,CopyOnWriteStateMap的大小也是128,迭代器最少也要迭代128次.
  5. 當設置cleanupIncrementally(10, false)時,假如數據中只有一個key,那么這個k -> ListState(...)CopyOnWriteStateMap中的存放位置是任意的,假設在CopyOnWriteStateMap中存放的位置是22,就會出現當第一次訪問狀態數據時,并不會刪除這個key對應的ListState中的數據,訪問狀態數據時同樣還是不會刪除過期數據,只有第三次訪問時,才會刪除過期數據,因為cleanupSize設置的大小為10,迭代器每次只會迭代10個條目的key,每當訪問狀態數據時,迭代器都會從最后一次迭代的指針位置開始繼續推進.
  6. 當迭代器的指針推進位置到128時,又會從0的位置從新開始推進(這里是指CopyOnWriteStateMap的大小是128),以此類推.
  7. 如果cleanupIncrementally(10, true)中的runCleanupForEveryRecordtrue時,那就是說每次訪問狀態數據迭代器都會把CopyOnWriteStateMap中的所有條目都清理一遍,所以說為true時第一個參數(cleanupSize)會失效.

cleanupIncrementally的執行機制就很好的解釋了,為什么在使用本地狀態后端(HashMapStateBackend)時經常會出現明明已經來了7,8條數據,數據過期數據還沒有清理到,或者距離上一次訪問狀態數據過了1h甚至更久都沒有清理過期數據的情況.

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/164048.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/164048.shtml
英文地址,請注明出處:http://en.pswp.cn/news/164048.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker可視化管理界面工具Portainer安裝

Portainer是Docker容器管理界面工具&#xff0c;可以直觀的管理Docker。 部署也很簡單&#xff1a; 官方安裝文檔地址 1、創建數據卷 docker volume create portainer_data2、下載允許容器 docker run -d -p 8000:8000 -p 9443:9443 --name portainer --restartalways -v /v…

放棄無謂的「技術氛圍」幻想,準備戰斗

大型科技公司每年都招聘大量研發人才&#xff0c;這給了很多人一種錯覺&#xff0c;認為是「技術」導致了這些公司的成功&#xff0c;其實他們的成功是技術推動的市場戰略的成功&#xff0c;是市場需要某項服務&#xff0c;才需要研發人員夜以繼日的埋頭苦干。資本絕不會做虧本…

vue2 element el-transfer穿梭框組件支持拖拽及排序 已封裝,隨取隨用

項目場景&#xff1a; 項目中有個功能用到穿梭框組件&#xff0c;新版本需要支持穿梭框組件排序&#xff0c;由于element2版本中的穿梭框組件本身不支持排序功能 在此不僅需要支持隨意更換順序&#xff0c;還支持從一側拖拽至另一側&#xff0c;具體功能效果圖如下&#xff1…

為什么JSX只能在函數的返回語句中使用

JSX只能在函數的返回語句中使用&#xff0c;因為JSX本質上是一種聲明式的語法&#xff0c;用于描述React組件的結構和外觀。在函數的返回語句中使用JSX&#xff0c;可以將JSX表達式嵌入到組件的輸出中。 當我們編寫一個React組件時&#xff0c;我們通常需要定義一個Render函數…

消息中間件——RabbitMQ(五)快速入門生產者與消費者,SpringBoot整合RabbitMQ!

前言 本章我們來一次快速入門RabbitMQ——生產者與消費者。需要構建一個生產端與消費端的模型。什么意思呢&#xff1f;我們的生產者發送一條消息&#xff0c;投遞到RabbitMQ集群也就是Broker。 我們的消費端進行監聽RabbitMQ&#xff0c;當發現隊列中有消息后&#xff0c;就進…

森利威爾SL4010 升壓恒壓 12V升壓24V 12V升壓36V 12V升壓48V

在當今的電子設備中&#xff0c;電源管理系統的設計是非常重要的。為了保證設備的穩定運行&#xff0c;升壓和恒壓電源的應用已經成為不可或缺的一部分。在這篇文章中&#xff0c;我們將介紹森利威爾SL4010升壓恒壓電源&#xff0c;它可以實現12V升壓24V、12V升壓36V、12V升壓4…

c 在文本終端中顯示yuv圖片

把yuv422 轉為rgb32 &#xff0c;利用framebuffer 顯示 #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdlib.h> #include <unistd.h> #include <sys/ioctl.h> #include <lin…

vue2.6源碼分析

vue相關文檔 vue-cli官方文檔 vuex官方文檔 vue-router 官方文檔 vue2.6源碼地址 如何調試源碼 package.json 添加了--sourcemap "scripts": {"dev": "rollup -w -c scripts/config.js --environment TARGET:web-full-dev --sourcemap" }新增…

linux apt update錯誤提示修復

錯誤提示&#xff1a; E: Release file for http://security.debian.org/dists/bullseye-security/InRelease is expired (invalid since 15d 14h 45min 26s). Updates for this repository will not be applied. E: Release file for http://ftp.jp.debian.org/debian/dists/b…

【Hello Go】Go語言并發編程

并發編程 概述基本概念go語言的并發優勢 goroutinegoroutine是什么創建goroutine如果主goroutine退出runtime包GoschedGoexitGOMAXPROCS channel無緩沖的channel有緩沖的channelrange和close單向channel 定時器TimerTicker Select超時 概述 基本概念 并行和并發概念 并行 &…

CVE-2023-6099:優卡特臉愛云一臉通智慧管理平臺SystemMng.ashx接口未授權漏洞復現

文章目錄 優卡特臉愛云一臉通智慧管理平臺未授權SystemMng.ashx接口漏洞復現&#xff08;CVE-2023-6099&#xff09; [附POC]0x01 前言0x02 漏洞描述0x03 影響版本0x04 漏洞環境0x05 漏洞復現1.訪問漏洞環境2.構造POC3.復現 0x06 修復建議 優卡特臉愛云一臉通智慧管理平臺未授權…

mysql字符串轉為數字的三種方法、字符串轉日期

隱式轉換 在MySQL中&#xff0c;使用0運算符可以將一個非數字的值隱式地轉換為數字。這在進行數學運算或比較操作時非常有用。 需要注意的是&#xff0c;在使用0進行隱式轉換時&#xff0c;MySQL會盡可能將字符串轉換為數字。如果字符串不能轉換為數字&#xff0c;則會返回0。…

【解決】HDFS JournalNode啟動慢問題排查

文章目錄 一. 問題描述二. 問題分析1. 排查機器性能2. DNS的問題 三. 問題解決 一句話&#xff1a;因為dns的問題導致journalnode啟動時很慢&#xff0c;通過修復dns對0.0.0.0域名解析&#xff0c;修復此問題。 一. 問題描述 從journalnode啟動到服務可用&#xff0c;完成RPC…

使用Python將圖片轉換為PDF

將圖片轉為 PDF 的主要原因之一是為了方便共享和傳輸。此外&#xff0c;將多張圖片合并成一個 PDF 文件還可以簡化文件管理。之前文章詳細介紹過如何使用第三方庫Spire.PDF for Python將PDF文件轉為圖片&#xff0c;那么本文介紹使用同樣工具在Python中實現圖片轉PDF文件的功能…

【OpenCV+OCR】計算機視覺:識別圖像驗證碼中指定顏色文字

文章目錄 1. 寫在前面2. 讀取驗證碼圖像3. 生成顏色掩碼4. 生成黑白結果圖5. OCR文字識別6. 測試結果 【作者主頁】&#xff1a;吳秋霖 【作者介紹】&#xff1a;Python領域優質創作者、阿里云博客專家、華為云享專家。長期致力于Python與爬蟲領域研究與開發工作&#xff01; 【…

Spring Security(安全框架,必須登錄成功才能訪問指定資源)

一、背景知識 1、Spring Security 是一個能夠為基于Spring的企業應用系統提供聲明式的安全訪問控制解決方案的安全框架。它提供了一組可以在Spring應用上下文中配置的Bean&#xff0c;充分利用了Spring IoC&#xff0c;DI&#xff08;IOC: 控制反轉Inversion of Control ,DI:D…

24路電磁鎖控板的特點和主要參數

智能快遞柜、智能生鮮柜、電子存儲柜、超市寄存柜、智能送餐柜、電子更衣柜、檔案柜等物聯網終端設備&#xff0c;都是采用電磁鎖控制&#xff0c;這種電磁鎖控制板俗稱鎖控板。鎖控板可以遠程控制儲物柜的開關以及遠程監控并提供鎖的反饋信號。沐渥開發的24路電磁鎖控板可以控…

AI:87-基于深度學習的街景圖像地理位置識別

?? 本文選自專欄:人工智能領域200例教程專欄 從基礎到實踐,深入學習。無論你是初學者還是經驗豐富的老手,對于本專欄案例和項目實踐都有參考學習意義。 ??? 每一個案例都附帶有在本地跑過的代碼,詳細講解供大家學習,希望可以幫到大家。歡迎訂閱支持,正在不斷更新中,…

OpenAI 曾收到 AI 重大突破警告;半獨立的 OpenAI 比與微軟合并更好丨 RTE 開發者日報 Vol.91

開發者朋友們大家好&#xff1a; 這里是 「RTE 開發者日報」 &#xff0c;每天和大家一起看新聞、聊八卦。我們的社區編輯團隊會整理分享 RTE &#xff08;Real Time Engagement&#xff09; 領域內「有話題的 新聞 」、「有態度的 觀點 」、「有意思的 數據 」、「有思考的 文…

ubuntu下docker環境使用GPU配置

本文主要講述整個命令流程&#xff0c;具體講解請看官網nvidia-容器工具包和一篇總結得很詳細的博文docker使用GPU總結 docker的版本必須安裝19.0版本以上的&#xff0c;這里也只講19.0版本以上的使用方法 首先設置一下網絡信息 curl -fsSL https://nvidia.github.io/libnvi…