1 狀態TTL機制
1.1 API簡介
TTL
常用API
如下:
API | 注解 |
---|---|
setTtl(Time.seconds(…)) | 配置過期時長,當狀態中的數據到達這個時長則判定為過期數據,在new StateTtlConfig.Builder(Time.seconds(...)) 也可以配置,如果同時調用setTtl() 方法則進行覆蓋 |
updateTtlOnCreateAndWrite() | 當該條數據在State中插入或者更新的時候,刷新計時,可用于冷熱數據分離 |
updateTtlOnReadAndWrite() | 讀或寫都刷新該數據的TTL計時,可用于冷熱數據分離 |
setStateVisibility(…) | 用于控制狀態中過期數據的可見性,當方法中設置StateTtlConfig.StateVisibility.NeverReturnExpired) 時則不可見過期未被清理的數據,如果設置StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp 則可見過期未被清理的數據.setStateVisibility(...) 由異步線程執行,默認是NeverReturnExpired . |
setTtlTimeCharacteristic(…) | 指定TTL 的時間語義,默認是event time ,可以配置process time ,將StateTtlConfig.TtlTimeCharacteristic.ProcessingTime 填入方法的參數即可. |
disableCleanupInBackground() | 禁用后臺清理過期數據,使用后則不會清理過期數據 |
cleanupIncrementally(… , …) | 針對本地狀態后端,即HashMapStateBackend. 增量清理, 每當訪問狀態數據時都會驅動一次過期檢查,清除其中部分數據, 這也是HashMapBackend 狀態后端唯一能真正清理過期數據的方法,cleanupIncrementally(... , ...) 方法中需要傳入兩個參數int cleanupSize 和boolean runCleanupForEveryRecord ,cleanupSize 是指key 的數據量,runCleanupForEveryRecord 是指是否清理所有過期數據,如果runCleanupForEveryRecord 設置的值為true 此時cleanupSize 就會失效,但是狀態數據較多時會嚴重影響時效性. |
cleanupFullSnapshot() | 針對快照數據,即checkpoint快照. 全量清理, 在做快照時將所有的過期數據進行清理保證快照中沒有過期數據,但是狀態后端中的過期數據沒有進行清理. |
cleanupInRocksdbCompactFilter(xxx) | 針對于RocksdbStateBackend. 只生效于RocksDB 狀態后端,通過Flink將CompactFilter 傳給RocksDB ,在RocksDB 在Compact 過程中根據過濾條件將過期數據刪除,傳入的參數為過期時間. |
1.2 代碼模板
-
代碼
class StateMapFunc2 implements MapFunction<String, String>, CheckpointedFunction {private ListState<String> listState;@Overridepublic String map(String s) throws Exception {// 將數據添加到狀態存儲器中,split[0]為用戶IDlistState.add(s);// 獲取狀態存儲器中的數據Iterable<String> iter = listState.get();StringBuffer buffer = new StringBuffer();for (String str : iter) {buffer.append(str);}// 將數據添加到狀態存儲中return buffer.toString();}@Overridepublic void snapshotState(FunctionSnapshotContext ctx) throws Exception {}@Overridepublic void initializeState(FunctionInitializationContext ctx) throws Exception {OperatorStateStore operatorStateStore = ctx.getOperatorStateStore();// 配置State TTLStateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(10)) // 設置數據存活時長,當該數據在State中存活時間超過10s時刪除該數據// 這個方法也是設置數據存活時長,和StateTtlConfig.Builder(Time.seconds(10))的作用一樣,可以不用這個方法,如果用了會覆蓋上面設置的時長.setTtl(Time.seconds(10))/*** updateTtlOnCreateAndWrite和updateTtlOnReadAndWrite二選一即可, 這兩個方法的主要作用就是配合setTtl方法將冷熱數據進行分離**/// 當該條數據在State中插入或者更新的時候,刷新計時.updateTtlOnCreateAndWrite()// 讀或寫都刷新該數據的TTL計時.updateTtlOnReadAndWrite()/*** setStateVisibility就是設置狀態的可見性,前面setTtl方法是設置刪除過期數據,刪除過期數據實際上是由另一個異步線程周期性(定時器)的完成,也就是說超過10s的數據不一定會馬上被刪除,但是* 獲取數據的時候底層會將超過存活時間的數據進行判斷過濾,setStateVisibility就是可以設置是否可以查詢到這些過期的數據,NeverReturnExpired和ReturnExpiredIfNotCleanedUp二選一.**/// 不返回過期數據,這個也是默認策略.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// 返回還沒有被清除的過期數據.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)// 指定TTL計時時間語義(默認處理時間).setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)// 禁用后臺清理過期數據.disableCleanupInBackground()/*** 針對本地狀態后端,即HashMapStateBackend* 增量清理, 每當獲取狀態數據時,迭代器都會向前推進。對遍歷的狀態數據進行檢查,并清理過期的數據* 參數1: 設置每次清理的key的數據量(copyOnWriteStateMap中的key的條目數量)* 參數2: 設置是否清理所有條目也就是key對應的數據,如果設置為true則參數1失效,在狀態數據較多時不建議設置為true,會嚴重影響時效性**/.cleanupIncrementally(10, false)/*** 針對快照數據,即checkpoint快照* 全量清理, 在做快照時將所有的過期數據進行清理保證快照中沒有過期數據,但是不會清狀態后端中的過期數據**/.cleanupFullSnapshot()/*** 針對于RocksdbStateBackend* 只生效于RocksDB狀態后端,通過Flink將CompactFilter傳給RocksDB,在RocksDB在Compact過程中根據過濾條件將過期數據刪除,傳入的參數為過期時間(也就是發生Compact時的過濾條件)**/.cleanupInRocksdbCompactFilter(10000).build();// 配置狀態描述,在ListStateDescriptor構造器中聲明數據類型,簡單類型可以使用xxx.class,符合類型需要使用到TypeInformation.of()ListStateDescriptor descriptor = new ListStateDescriptor("MapState", String.class);// 狀態描述器加載TTL配置descriptor.enableTimeToLive(ttlConfig);listState = operatorStateStore.getListState(descriptor);} }
代碼中是以
Operator State
為例,如果是Keyed State
則在open
方法中配置TTL
.
1.3 TTL機制詳解
在代碼模板中有API
的使用方式,但是TTL
機制不同的方法之間存在互斥或者互不影響的關系.
1.3.1 過期時間設置策略
- new StateTtlConfig.Builder(Time.seconds(10))
- setTtl(Time.seconds(10))
這兩種方式都是設置過期時間使用的,但是只需要選用其中一種即可,如果在創建StateTtlConfig
對象時就設置了過期時間,又在setTtl
方法中設置了過期時間,則會對過期時間進行覆蓋,本質上二者都是對同一個變量進行賦值.
-
源碼
new StateTtlConfig.Builder(Time.seconds(10))
public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 調用Builder時對ttl變量進行了賦值public Builder(@Nonnull Time ttl) {this.ttl = ttl;}// ... }
setTtl(Time.seconds(10))
public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 這里同樣是對ttl進行了賦值@Nonnullpublic Builder setTtl(@Nonnull Time ttl) {this.ttl = ttl;return this;}// ... }
通過源碼可以看出,使用此
API
時在創建StateTtlConfig
對象時給一個過期時間即可,不需要再調用setTtl
方法
1.3.2 過期時間刷新策略
- updateTtlOnCreateAndWrite()
- updateTtlOnReadAndWrite()
這兩方法就是互斥的,只能生效一個,同樣是因為二者都是對同一個變量進行賦值,就是說在二者同時調用的情況下,誰在后面調用誰就生效,如代碼模板中線調用的updateTtlOnCreateAndWrite()
后調用的updateTtlOnReadAndWrite()
那么生效的就是updateTtlOnReadAndWrite()
策略.
-
源碼
public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 此方法給updateType進行賦值@Nonnullpublic Builder setUpdateType(UpdateType updateType) {this.updateType = updateType;return this;}/** 二者方法體中調用的都是同一個方法setUpdateType*/@Nonnullpublic Builder updateTtlOnCreateAndWrite() {return setUpdateType(UpdateType.OnCreateAndWrite);}@Nonnullpublic Builder updateTtlOnReadAndWrite() {return setUpdateType(UpdateType.OnReadAndWrite);}// ... }
源碼可以看出二者調用同一個方法
setUpdateType
,而setUpdateType
方法又是給updateType
賦值的一個方法,所以再使用時要根據實際的業務場景選擇updateTtlOnCreateAndWrite()
和updateTtlOnReadAndWrite()
中的一個.
1.3.3 返回過期數據策略
- setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
- setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
-
源碼
public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 此方法給stateVisibility進行賦值@Nonnullpublic Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {this.stateVisibility = stateVisibility;return this;}// 下面兩個方法體中都是調用setStateVisibility方法@Nonnullpublic Builder returnExpiredIfNotCleanedUp() {return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);}@Nonnullpublic Builder neverReturnExpired() {return setStateVisibility(StateVisibility.NeverReturnExpired);}// ... }
這二者同樣是互斥的原則,使用選其一即可,即使都調用也是后被調用者生效.
1.3.4 過期數據清除策略
- cleanupIncrementally(10, false)
- cleanupFullSnapshot()
- cleanupInRocksdbCompactFilter(10000)
這三種過期數據清除策略針對的是不同的場景(本地狀態后端、快照、RocksDB狀態后端),所以三者是可以同時使用的,不會存在同時調用后者會對前者進行覆蓋的問題,在API
簡介章節介紹了這種三策略的作用,這里著重介紹cleanupIncrementally
策略.
HashMapStateBackend
使用的存儲結構是Flink團隊自己開發的一種數據存儲結構copyOnWriteStateMap
,說這個存儲結構是因為cleanupIncrementally
策略刪除過期數據的操作和這種結構息息相關.
關于copyOnWriteStateMap
的結構可以簡單的理解為K,V
形式存儲的結構,其中的Key
就是使用keyBy
時指定的key
,如果沒有使用keyBy
那么所有數據key
都會給一個相同的默認值,其中的Value
是指ListState
、MapState
等,也就是在構建狀態存儲器時候選擇存儲形式,如下圖:
在本地狀態后端(HashMapStateBackend
)中默認使用的就是cleanupIncrementally
清除策略,默認值為cleanupIncrementally(5, false)
,也就是說只要設置了TTL
的過期時間,HashMapStateBackend
就會使用cleanupIncrementally
策略來清理過期數據,只不過cleanupIncrementally
對用戶提供了選擇方式,這里將結合圖解說明cleanupIncrementally
如何清除過期數據的.
- 只要訪問狀態數據就會觸發
cleanupIncrementally
執行. - 如果用戶沒有設置
cleanupIncrementally
,TTL
會根據cleanupIncrementally(5, false)
來刪除過期數據,如果用戶指定了參數則按照用戶定義的參數刪除數據. - 比如現在是
cleanupIncrementally(10, false)
,迭代器會從k1
開始,到k10
結束,將這10個條目的key
中的ListState
中的過期數據進行清理. CopyOnWriteStateMap
中的數據存放是無序的,而且Flink在創建CopyOnWriteStateMap
時候給的默認大小是128
,也就說處理數據中key
的數量超過128
,否則就算只有一個key
,CopyOnWriteStateMap
的大小也是128
,迭代器最少也要迭代128次.- 當設置
cleanupIncrementally(10, false)
時,假如數據中只有一個key
,那么這個k -> ListState(...)
在CopyOnWriteStateMap
中的存放位置是任意的,假設在CopyOnWriteStateMap
中存放的位置是22,就會出現當第一次訪問狀態數據時,并不會刪除這個key
對應的ListState
中的數據,訪問狀態數據時同樣還是不會刪除過期數據,只有第三次訪問時,才會刪除過期數據,因為cleanupSize
設置的大小為10
,迭代器每次只會迭代10
個條目的key
,每當訪問狀態數據時,迭代器都會從最后一次迭代的指針位置開始繼續推進. - 當迭代器的指針推進位置到
128
時,又會從0
的位置從新開始推進(這里是指CopyOnWriteStateMap
的大小是128
),以此類推. - 如果
cleanupIncrementally(10, true)
中的runCleanupForEveryRecord
為true
時,那就是說每次訪問狀態數據迭代器都會把CopyOnWriteStateMap
中的所有條目都清理一遍,所以說為true
時第一個參數(cleanupSize
)會失效.
cleanupIncrementally
的執行機制就很好的解釋了,為什么在使用本地狀態后端(HashMapStateBackend
)時經常會出現明明已經來了7,8條數據,數據過期數據還沒有清理到,或者距離上一次訪問狀態數據過了1h甚至更久都沒有清理過期數據的情況.