【圖解大數據技術】流式計算:Spark Streaming、Flink
- 批處理 VS 流式計算
- Spark Streaming
- Flink
- Flink簡介
- Flink入門案例
- Streaming Dataflow
- Flink架構
- Flink任務調度與執行
- task slot 和 task
- EventTime、Windows、Watermarks
- EventTime
- Windows
- Watermarks
批處理 VS 流式計算
計算存儲介質上的大規模數據,這類計算叫大數據批處理計算。數據是以批為單位進行計算,比如一天的訪問日志、歷史上所有的訂單數據等。這些數據通常通過 HDFS 存儲在磁盤上,使用 MapReduce 或者 Spark 這樣的批處理大數據計算框架進行計算,一般完成一次計算需要花費幾分鐘到幾小時的時間。
還有一種是針對實時產生的大規模數據進行即時計算處理,比如攝像頭采集的實時視頻數據、淘寶實時產生的訂單數據等。實時處理最大的不同就是這類數據,是實時傳輸過來的針對這類大數據的實時處理系統也叫大數據流計算系統。
Spark Streaming
Spark是一個批處理大數據計算引擎,而 Spark Steaming 則利用了 Spark 的分片和快速計算的特性,把實時傳輸過來的數據按時間范圍進行分段,轉成一個個的小批,再交給 Spark 去處理。因此 Spark Streaming 的原理是流轉批,Spark Streaming 不是真正意義上的實時計算框架,它是一個準實時的計算框架。
Flink
Flink簡介
Flink 和 Spark Streaming 不一樣,Flink 一開始設計就是為了做實時流式計算的。它可以監聽消息隊列獲取數據流,也可以用于計算存儲在 HDFS 等存儲系統上的數據(Flink 把 這些靜態數據當做數據流來進行處理)。
然后 Flink 計算后生成的結果流,也可以發送到其他存儲系統。
Flink入門案例
public static void main(String[] args) throws Exception {// 初始化一個流執行環境final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 利用這個執行環境構建數據流 DataStream(source操作)DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));// 執行各種數據轉換操作(transformation)DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});// 打印結果(sink類型操作)adults.print();// 執行env.execute();}
首先構建一個執行環境env,然后通過執行環境env構建數據流DataStream(這就是source操作),然對這個數據流進行各種轉換操作(transformation),最后跟上一個sink類型操作(類似是Spark的action操作),然后調用env的execute()啟動計算。
上面是流計算的例子,如果要進行批計算,則要構建ExecutionEnvironment類型的執行環境,然后使用ExecutionEnvironment執行環境構建一個DataSet。
Streaming Dataflow
Flink程序代碼會被映射為Streaming Dataflow(類似于DAG)。一個Streaming Dataflow是由一組Stream(流)和Operator(算子)組成,并且始于一個或多個Source Operator,結束于一個或多個Sink Operator,中間有一個或多個Transformation Operator。
Source Operator:
DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));
Transformation Operator:
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});
Sink Operator:
adults.print();
由于Flink是分布式并行的,因此在程序執行期間,一個Stream流會有多個Stream Partition(流分區),一個Operator也會有多個Operator Subtask(算子子任務)。
兩個 operator 之間傳遞的時候有兩種模式:
- One to One 模式:像Source到map這種傳遞模式,不會改變數據的分區特性。
- Redistributing (重新分配)模式:像map到keyBy這種傳遞模式,會根據key的hashcode進行重寫分區,改變分區特性的。
Flink還會進行優化,將緊密度高的算子結合成一個Operator Chain(算子鏈)。
比如Source操作和map操作可以結合成一個Operator Chain,結合成Operator Chain后就在一個task中由一個thread完成。
Flink架構
Flink任務調度與執行
- 我們的代碼會被Flink解析成一個DAG圖,當我們調用env.execute()方法后,該DAG圖就會被打包通過Akka客戶端發送到JobManager。
- JobManager會通過調度器,把task調度到TaskManager上執行。
- TaskManager接收到task后,task將會在一個task slot中執行。
task slot 和 task
我們看到在TaskManager上有一個個的task slot被劃分出來,task slot的數量是在TaskManager創建之初就設置好的。每個task(正確來說應該是subtask)都會調度到一個task slot上執行。task slot的作用主要是進行內存隔離,比如TaskManager設置了3個task slot的數量,那么每個task slot占用TaskManager三分之一的內存,task在task slot執行時,task與task之間將不會有內存資源競爭的情況發生。
EventTime、Windows、Watermarks
由于Flink處理的是流式計算,數據是以流的形式源源不斷的流過來的,也就是說數據是沒有邊界的,但是對數據的計算必須在一個范圍內進行,比如實時統計高速公路過去一個小時里的車流量。
那么就需要給源源不斷流過來的數據劃分邊界,我們可以根據時間段或數據量來劃分邊界。
如果要按照時間段來劃分邊界,那么是通過時間字段進行劃分。
EventTime
Flink有三種類型的時間:
- Event Time
- Ingestion Time
- Processing Time
一般用的較多的時Event Time,因為Event Time是固定不變的,不管什么時候計算,都會得到相同的輸出結果。
Windows
有了時間字段后,就可以根據時間劃分時間窗,比如下面就是劃分1分鐘為一個時間窗,然后就可以對時間窗內的數據做計算。
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
TumblingEventTimeWindows是滾動時間窗:
還有SlidingEventTimeWindows滑動時間窗:
// 沒10秒計算前1分鐘窗口內的數據
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
以及EventTimeSessionWindows會話時間窗:
// 間隔超過5s的話,下一達到的事件在新的窗口內計算,否則在同一窗口內計算
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
上面設置的會話時間窗表示如果兩個事件間的間隔超過5秒,那么后一個事件就會在新的窗口中計算;如果兩個事件間隔沒有超過5秒,那么就在同一窗口內計算。
Watermarks
但是事件流并不一定是有序的,它有可能是無序,有可能早發生的事件反而比晚發生的事件更晚到達。這時Flink需要等待較早發生的事件都到達了,才能進行一個時間窗的計算。
但是Flink無法得知什么時候邊界內的所有事件都達到,因此必須有一種機制控制Flink什么時候停止等待。
這時候就要使用watermarks ,Flink接收到每一條數據時,會使用watermark生成器根據EventTime計算出一個watermark然后插入到數據中。當我們設置watermark的延遲時長是t時,那么watermark就等于當前所有達到數據中的EventTime中的最大值(maxEventTime)減去時間t,代表EventTime在 maxEventTime - t 之前的數據都已達到,結束時間為 maxEventTime - t 的時間窗可以進行計算。
比如上面的例子,我們設置wartemark的延時時間t為2,那么當EventTime為7的事件到達時,該事件的watermark就是5(maxEventTime = 7, t = 2, watermark = maxEventTime - t = 7 - 2 = 5),那么表示Flink認定EventTime在5或5之前的時間都已經達到了,那么如果有一個窗口的結束時間為5的話,該窗口就會觸發計算。
watermarks的使用:
DataStream<Event> stream = ...;WatermarkStrategy<Event> strategy = WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.timestamp);DataStream<Event> withTimestampsAndWatermarks =stream.assignTimestampsAndWatermarks(strategy);
當然,使用了watermarks之后,也不一定就能保證百分之一百準確。當我們把延時時間t設置的較短時,就能獲取更低的延遲,但是準確性也相對下降;而如果我們把t設的較大,那么延遲就更大,但是準確性就想對較高。