通過本文可以解決以下3個問題。
- 了解flink內存和配置項相關概念。
- 清楚UI中TM和JM各內存組件實際內存值的計算規則。
- 根據實際情況對內存進行調整。
1. Flink進程內存
TM和JM二者均為JVM進程(JVM通常分成堆內和堆外兩部分)。TM和JM的內存定義為進程總內存(total process memory
),從功能上分為flink應用程序使用的內存(堆內、堆外)和JVM運行進程使用的內存(堆外)兩大部分。
- flink應用程序使用內存(
total flink momory
),下圖虛線內部分 - JVM運行進程使用內存(JVM Metaspace和JVM Overhead),下圖虛線外部分
設置JM或TM內存最簡單的方式是設置total process memory
或total flink momory
之一。其他內存組件將根據默認值和額外配置自動調整。不建議同時顯式設置total process memory
和total flink momory
,可能會造成內存配置沖突導致部署失敗。配置其他內存組成部分時也需要注意可能產生的沖突配置。
JM和TM的total process memory
和total flink momory
的配置項分別為jobmanager.memory.process.size
、jobmanager.memory.flink.size
和 taskmanager.memory.process.size
、taskmanager.memory.flink.size
。該4個參數均無默認值。
設置JM或TM內存的另一種方法是配置total flink momory
中以下內存組件的大小。
- JM:
jobmanager.memory.heap.size
(上圖藍色部分)。 - TM:
taskmanager.memory.task.heap.size
(上圖藍色部分) 和taskmanager.memory.managed.size
(上圖綠色部分)。
上述3個參數無默認值,當手動配置這些參數后,建議既不要配置total process memory
,也不要配置total flink momory
,否則很容易導致內存配置沖突。
在內存組件中以兩部分內存按比例分配,同時受最大、最小值限制。
- JVM Overhead(JM、TM) 按比例分配
total process memory
的一部分 - Network memory(TM) 按比例分配
total flink momory
的一部分
示例如下
total Process memory = 1000MB
JVM Overhead min = 64MB
JVM Overhead max = 128MB
JVM Overhead fraction = 0.1
上述配置下,JVM Overhead的內存大小為1000*0.1=100MB,在64~128之間。
如果將min和max設置成相同的值,則會將內存固定為該大小。如果按比例計算出的內存小于最小值,則實際的內存大小將為最小值的大小。如果設置了total process memory
和其他內存組件的大小,可能會忽略比例配置的情況,這時JVM Overhead則為total process memory
的剩余部分,但結果仍然受最大最小值的限制,否則配置將失敗。
1.1. JVM參數
Flink將在啟動進程時根據配置或派生的內存組件值明確添加如下與內存相關的JVM參數
-Xmx and -Xms
TM:Framework + Task Heap Memory。
JM:JVM Heap Memory-XX:MaxDirectMemorySize
始終僅為TM添加,JM時只有當設置了jobmanager.memory.enable-jvm-direct-memory-limit
參數時該JVM參數才會添加到JM中
TM:Framework + Task Off-heap + Network Memory
JM:Off-heap Memory
注意:用戶代碼使用的native non-direct內存也可算作堆外內存的一部分-XX:MaxMetaspaceSize
TM:JVM Metaspace
JM:JVM Metaspace
2. TaskManager內存
2.1. 內存構成
- 應用程序使用內存,圖中虛線部分
- JVM heap,虛線內藍色部分
- managed memory,虛線內綠色部分
- other direct(or native) memory,虛線內黃色部分
- frameworkd off-heap內存,flink框架使用的堆外內存
- task off-heap內存,task使用的堆外內存
- network memory,網絡內存
- JVM運行進程內存,圖中JVM specific memory部分(JVM Metaspace和JVM Overhead)
內存模型明細
可以將native non-direct memory(堆外非直接內存)使用量算作框架堆外或任務堆外內存的一部分,但是這樣會導致直接內存限制更高。
JVM內存通常情況下分成heap memory 和 off-heap memory,即堆內存和堆外內存。堆內存是JVM管理的主要內存區域,用于存儲對象和類實例。堆外內存通常關注與直接內存和手動管理的內存區域。
native memeory、direct memory、non-direct memeory(本地內存、直接內存和非直接內存)?
本地內存指的是JVM外部所有的內存,包括堆外內存等。
直接內存由java.nio包中的ByteBuffer
類通過allocateDirect()
方法分配的內存(JVM堆外內存),直接分配在操作系統的內存中。大小受限于操作系統的可用內存。
非直接內存由ByteBuffer
類的allocate()
方法分配的內存。這部分內存通常是在堆內的,由JVM管理。直接內存可以直接與操作系統的本地I/O操作交互,避免了在Java堆和本地操作系統內存之間的數據復制,因此數據處理起來非常高效,但是由于在堆外,不受JVM垃圾回收機制的直接管理,因此使用起來需要小心。
非直接內存在堆內,優缺點同直接內存相反。
2.2. TM中所有內存組成部分參數
- 框架堆內存(Framework Heap Memory)
taskmanager.memory.framework.heap.size
默認值128MB。
分配給 Flink 框架的 JVM 堆內存(進階配置)。不建議手動配置 - 任務堆內存(Task Heap Memory)
taskmanager.memory.task.heap.size
無默認值。
分配給 Flink 應用程序運行算子和用戶代碼的 JVM 堆內存。 - 管理內存(Managed memory)
taskmanager.memory.managed.size
無默認值。
taskmanager.memory.managed.fraction
默認值0.4(total flink momory
占比)
由 Flink 管理的本地內存(堆外),用于排序、哈希表、緩存中間結果及 RocksDB State Backend。 - 框架堆外內存(Framework Off-heap Memory)
taskmanager.memory.framework.off-heap.size
默認值128MB。不建議手動配置
用于 Flink 框架的堆外(直接或本地)內存((進階配置)。 - 任務堆外內存(Task Off-heap Memory)
taskmanager.memory.task.off-heap.size
默認值0bytes。
分配給 Flink 應用的算子的堆外(直接或本地)內存。 - 網絡內存(Network Memory)
taskmanager.memory.network.min
默認值64MB。
taskmanager.memory.network.max
默認值無窮大。
taskmanager.memory.network.fraction
默認值0.1(total flink momory
占比)
用于任務之間數據傳輸的本地內存(例如網絡傳輸緩沖)。這塊內存被用于分配網絡緩沖。 - JVM 空間(JVM Metaspace)
taskmanager.memory.jvm-metaspace.size
默認值256MB。
Flink JVM 進程的 Metaspace。 - JVM 開銷(JVM Overhead)
taskmanager.memory.jvm-overhead.min
默認值192MB。
taskmanager.memory.jvm-overhead.max
默認值1GB。
taskmanager.memory.jvm-overhead.fraction
默認值0.1(total process memory
占比)
用于JVM 其他開銷的本地內存,例如棧、代碼緩存、垃圾回收空間等。
當在IDE中本地啟動flink應用程序時,則只有taskmanager.memory.task.heap.size
、taskmanager.memory.task.off-heap.size
、taskmanager.memory.managed.size
、taskmanager.memory.network.min
、taskmanager.memory.network.max
參數會起作用。
2.3. 內存配置
配置內存最簡單的方式是配置total process memory
或total flink momory
,前文已提到。
除此之外設置內存的另一種方式是設置Task Heap Memory和Managed Memory。當手動設置這兩部分內存后,建議即不要配置total process memory
,也不要配置total flink momory
,否則很容易導致內存配置沖突。
Managed memory顧名思義即由flink自己管理的內存(off-heap)。以下內容使用Manager memory
- 流處理作業中用于RocksDB方式的State Backend。
- 流處理和批處理作業中用于排序、哈希表及緩存中間結果。
- 流處理和批處理作業中用于在python進程中執行UDF。
可以通過taskmanager.memory.managed.size
或taskmanager.memory.managed.fraction
參數來設置managed memory內存的大小,前者直接設定內存大小的絕對值,后者通過total Flink memory
內存的百分比來計算大小。如果二者同時設定占比的方式將會被覆蓋。如果二者都未設置將默認使用占比的形式。
flink將框架堆外內存和任務堆外內存包含在JVM直接內存限制中。雖然本機非直接內存使用量可以算作框架堆外內存或任務堆外內存的一部分,但在這種情況下會導致更高的 JVM 直接內存限制。網路內存也是JVM直接內存的一部分,但它由flink管理,并永遠保證不會超過其配置的大小。因此在這種情況下調整網絡內存大小不會有幫助。
舉個例子。
官方版本中,采用容器化部署方式,假設容器申請4GB內存,JVM Metaspace和JVM Overhead默認配置下,total flink momory
的可用大小為
taskmanager.memory.process.size=4GB
JVM Metaspace=256MB
JVM Overhead=4GB*0.1=409MB
taskmanager.memory.flink.size=taskmanager.memory.process.size-JVM Metaspace-JVM Overhead=3431MB。
3. JobManager內存
JobManager的內存與TaskManager內存相似,但是結構更簡單。
3.1. 內存組成
- JVM heap
jobmanager.memory.heap.size
無默認值。
堆內存,用于flink框架和作業提交期間(如某些批處理源)或檢查點完成回調中執行的用戶代碼 - off-heap
jobmanager.memory.off-heap.size
默認值128mb。
JM的堆外內存,包括堆外內促你的直接內存和本地內存 - JVM Metaspace
jobmanager.memory.jvm-metaspace.size
默認值256mb
flink JVM進程的元空間 - JVM Overhead
jobmanager.memory.jvm-overhead.min
默認值192mb
jobmanager.memory.jvm-overhead.max
默認值1gb
jobmanager.memory.jvm-overhead.fraction
默認值0.1(total process memory
占比)
用于其他JVM開銷的本地內存,如stack、代碼緩存、垃圾收集器空間等
3.2. 內存配置
配置內存最簡單的方式是配置total process memory
或total flink momory
,前文已提到。
除此之外設置內存的另一種方式是配置JVM Heap Memory。當手動配置改內存后,建議既不要配置total process memory
,也不要配置total flink momory
,否則很容易導致內存配置沖突。
當在IDE中本地啟動應用程序時,不需要配置內存選項,并且JM的內存選項都將不生效。
4. 內存調優指南
- standalone部署模式
建議通過配置total flink momory
的方式為flink自身配置內存值。
如果因JVM metaspace 導致問題,也可以調整該配置項。
由于JVM Overhead并不由flink或部署環境控制因此total process memory
是無關緊要的。這種情況下只有執行機器的物理資源才重要。
- 容器化部署
建議配置total process memory
,它聲明應為flink進程分配多少內存,并與請求的容器大小相對應。
如果配置了total flink momory
,flink將隱式的添加JVM內存組件來計算total process memory
的值,并請求具有相同內存大小的容器。
如果flink或用戶代碼分配了超過容器大小的非托管堆外內存,則作業可能會失敗。因為部署環境可能會殺死有問題的容器。
- state backend
僅和TM相關
使用HashMap state backend時將managed memory大小設置為0,從而使用戶代碼可以分配到最大的堆內存。
使用rocksDB state backend時,在默認情況下rocksDB設置為native內存分配限制為managed memory內存大小。因此為狀態保留足夠大的managed memory內存非常重要。
- 批作業
僅和TM相關
flink批作業運算符利用managed memory內存來提高運行效率。這樣,某些操作可以直接在原始數據上執行,無需反序列化為Java對象。
對批作業flink將在不超過managed memory限制的前提下,嘗試分配和使用盡可能多的managed memory。當managed memory內存不足時,將會溢出到磁盤。