Flink 物理分區算子(Physical Partitioning)
在Flink中,常見的物理分區策略有:隨機分配(Random)、輪詢分配(Round-Robin)、重縮放(Rescale)和廣播(Broadcast)。
接下來,我們通過源碼和Demo分別了解每種物理分區算子的作用和區別。
(1) 隨機分區(shuffle)
最簡單的重分區方式就是直接“洗牌”。通過調用 DataStream 的.shuffle()方法,將數據隨機地分配到下游算子的并行任務中去。
隨機分區服從均勻分布(uniform distribution),所以可以把流中的數據隨機打亂,均勻地傳遞到下游任務分區。因為是完全隨機的,所以對于同樣的輸入數據, 每次執行得到的結果也不會相同。
經過隨機分區之后,得到的依然是一個 DataStream。
我們可以做個簡單測試:將數據讀入之后直接打印到控制臺,將輸出的并行度設置為 2,
中間經歷一次 shuffle。執行多次,觀察結果是否相同。
package com.flink.DataStream.PhysicalPartitioning;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** flink 常用物理分區算子-shuffle:隨機分區-洗牌*/
public class flinkShuffle {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> socketDataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);// TODO 隨機分區socketDataStreamSource.shuffle().print();// TODO 輪詢分區//socketDataStreamSource.rebalance().print();// TODO 重縮放分區//socketDataStreamSource.rescale().print();// TODO 廣播//socketDataStreamSource.broadcast().print();// TODO 全局分區//socketDataStreamSource.global().print();streamExecutionEnvironment.execute();}
}
查看執行結果
2> 12> 21> 31> 11> 22> 3
在上述實驗中,我們設置全局env的并行度為2,嘗試執行2次job,發現2次執行的結果不一致,因為shuffle的完全隨機性,將輸入流分配到不同的分區中,且每次分配可能不一樣。
(2) 輪詢分區(Round-Robin)
輪詢,簡單來說就是“發牌”,按照先后順序將數據做依次分發。通過調用 DataStream的.rebalance()方法,就可以實現輪詢重分區。
rebalance 使用的是 Round-Robin 負載均衡算法,可以將輸入流數據平均分配到下游的并行任務中去。
stream.reblance()
設置全局env的并行度為2,嘗試執行3次job,發現3次執行的結果一致
1> 1
2> 21> 1
2> 21> 1
2> 21> 1
2> 2
(3) 重縮放分區(rescale)
重縮放分區和輪詢分區非常相似。當調用 rescale()方法時,其實底層也是使用 Round-Robin 算法進行輪詢,但是只會將數據輪詢發送到下游并行任務的一部分中。
rescale 的做法是分成小團體,發牌人只給自己團體內的所有人輪流發牌。
stream.rescale()
設置全局env的并行度為2,嘗試執行3次job,發現3次執行的結果一致
1> 1
2> 21> 1
2> 21> 1
2> 21> 1
2> 2
(4) 廣播(broadcast)
這種方式其實不應該叫做“重分區”,因為經過廣播之后,數據會在不同的分區都保留一份,可能進行重復處理。
可以通過調用 DataStream 的 broadcast()方法,將輸入數據復制并發送到下游算子的所有并行任務中去。
stream.broadcast()
將輸入數據復制并發送到下游算子的所有并行任務中去
2> 1
1> 12> 2
1> 2
(5) 全局分區(global)
全局分區也是一種特殊的分區方式。這種做法非常極端,通過調用.global()方法,會將所有的輸入流數據都發送到下游算子的第一個并行子任務中去。
這就相當于強行讓下游任務并行度變成了1,所以使用這個操作需要非常謹慎,可能對程序造成很大的壓力。
stream.global()
將所有的輸入流數據都發送到下游算子的第一個并行子任務中去
強行讓下游任務并行度變成了1,即使你并行度設置為了2
1> 1
1> 21> 1
1> 21> 1
1> 2