【圖解大數據技術】流式計算:Spark Streaming、Flink

【圖解大數據技術】流式計算:Spark Streaming、Flink

  • 批處理 VS 流式計算
  • Spark Streaming
  • Flink
    • Flink簡介
      • Flink入門案例
      • Streaming Dataflow
    • Flink架構
      • Flink任務調度與執行
      • task slot 和 task
    • EventTime、Windows、Watermarks
      • EventTime
      • Windows
      • Watermarks

批處理 VS 流式計算

計算存儲介質上的大規模數據,這類計算叫大數據批處理計算。數據是以批為單位進行計算,比如一天的訪問日志、歷史上所有的訂單數據等。這些數據通常通過 HDFS 存儲在磁盤上,使用 MapReduce 或者 Spark 這樣的批處理大數據計算框架進行計算,一般完成一次計算需要花費幾分鐘到幾小時的時間。

在這里插入圖片描述

還有一種是針對實時產生的大規模數據進行即時計算處理,比如攝像頭采集的實時視頻數據、淘寶實時產生的訂單數據等。實時處理最大的不同就是這類數據,是實時傳輸過來的針對這類大數據的實時處理系統也叫大數據流計算系統。

在這里插入圖片描述

Spark Streaming

在這里插入圖片描述

Spark是一個批處理大數據計算引擎,而 Spark Steaming 則利用了 Spark 的分片和快速計算的特性,把實時傳輸過來的數據按時間范圍進行分段,轉成一個個的小批,再交給 Spark 去處理。因此 Spark Streaming 的原理是流轉批,Spark Streaming 不是真正意義上的實時計算框架,它是一個準實時的計算框架。

Flink

Flink簡介

Flink 和 Spark Streaming 不一樣,Flink 一開始設計就是為了做實時流式計算的。它可以監聽消息隊列獲取數據流,也可以用于計算存儲在 HDFS 等存儲系統上的數據(Flink 把 這些靜態數據當做數據流來進行處理)。

在這里插入圖片描述

然后 Flink 計算后生成的結果流,也可以發送到其他存儲系統。

在這里插入圖片描述

Flink入門案例

    public static void main(String[] args) throws Exception {// 初始化一個流執行環境final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 利用這個執行環境構建數據流 DataStream(source操作)DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));// 執行各種數據轉換操作(transformation)DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});// 打印結果(sink類型操作)adults.print();// 執行env.execute();}

在這里插入圖片描述

首先構建一個執行環境env,然后通過執行環境env構建數據流DataStream(這就是source操作),然對這個數據流進行各種轉換操作(transformation),最后跟上一個sink類型操作(類似是Spark的action操作),然后調用env的execute()啟動計算。

上面是流計算的例子,如果要進行批計算,則要構建ExecutionEnvironment類型的執行環境,然后使用ExecutionEnvironment執行環境構建一個DataSet。

在這里插入圖片描述

Streaming Dataflow

Flink程序代碼會被映射為Streaming Dataflow(類似于DAG)。一個Streaming Dataflow是由一組Stream(流)和Operator(算子)組成,并且始于一個或多個Source Operator,結束于一個或多個Sink Operator,中間有一個或多個Transformation Operator。

Source Operator:

        DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));

Transformation Operator:

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});

Sink Operator:

	adults.print();

在這里插入圖片描述

由于Flink是分布式并行的,因此在程序執行期間,一個Stream流會有多個Stream Partition(流分區),一個Operator也會有多個Operator Subtask(算子子任務)。

在這里插入圖片描述

兩個 operator 之間傳遞的時候有兩種模式:

  • One to One 模式:像Source到map這種傳遞模式,不會改變數據的分區特性。
  • Redistributing (重新分配)模式:像map到keyBy這種傳遞模式,會根據key的hashcode進行重寫分區,改變分區特性的。

Flink還會進行優化,將緊密度高的算子結合成一個Operator Chain(算子鏈)。

在這里插入圖片描述

比如Source操作和map操作可以結合成一個Operator Chain,結合成Operator Chain后就在一個task中由一個thread完成。

Flink架構

Flink任務調度與執行

在這里插入圖片描述

  1. 我們的代碼會被Flink解析成一個DAG圖,當我們調用env.execute()方法后,該DAG圖就會被打包通過Akka客戶端發送到JobManager。
  2. JobManager會通過調度器,把task調度到TaskManager上執行。
  3. TaskManager接收到task后,task將會在一個task slot中執行。

task slot 和 task

我們看到在TaskManager上有一個個的task slot被劃分出來,task slot的數量是在TaskManager創建之初就設置好的。每個task(正確來說應該是subtask)都會調度到一個task slot上執行。task slot的作用主要是進行內存隔離,比如TaskManager設置了3個task slot的數量,那么每個task slot占用TaskManager三分之一的內存,task在task slot執行時,task與task之間將不會有內存資源競爭的情況發生。

在這里插入圖片描述

EventTime、Windows、Watermarks

由于Flink處理的是流式計算,數據是以流的形式源源不斷的流過來的,也就是說數據是沒有邊界的,但是對數據的計算必須在一個范圍內進行,比如實時統計高速公路過去一個小時里的車流量。

在這里插入圖片描述

那么就需要給源源不斷流過來的數據劃分邊界,我們可以根據時間段或數據量來劃分邊界。

如果要按照時間段來劃分邊界,那么是通過時間字段進行劃分。

EventTime

在這里插入圖片描述

Flink有三種類型的時間:

  • Event Time
  • Ingestion Time
  • Processing Time

一般用的較多的時Event Time,因為Event Time是固定不變的,不管什么時候計算,都會得到相同的輸出結果。

Windows

有了時間字段后,就可以根據時間劃分時間窗,比如下面就是劃分1分鐘為一個時間窗,然后就可以對時間窗內的數據做計算。

.window(TumblingEventTimeWindows.of(Time.minutes(1)))

TumblingEventTimeWindows是滾動時間窗:

在這里插入圖片描述

還有SlidingEventTimeWindows滑動時間窗:

// 沒10秒計算前1分鐘窗口內的數據
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))

在這里插入圖片描述

以及EventTimeSessionWindows會話時間窗:

// 間隔超過5s的話,下一達到的事件在新的窗口內計算,否則在同一窗口內計算
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))

上面設置的會話時間窗表示如果兩個事件間的間隔超過5秒,那么后一個事件就會在新的窗口中計算;如果兩個事件間隔沒有超過5秒,那么就在同一窗口內計算。

在這里插入圖片描述

Watermarks

但是事件流并不一定是有序的,它有可能是無序,有可能早發生的事件反而比晚發生的事件更晚到達。這時Flink需要等待較早發生的事件都到達了,才能進行一個時間窗的計算。

但是Flink無法得知什么時候邊界內的所有事件都達到,因此必須有一種機制控制Flink什么時候停止等待。

這時候就要使用watermarks ,Flink接收到每一條數據時,會使用watermark生成器根據EventTime計算出一個watermark然后插入到數據中。當我們設置watermark的延遲時長是t時,那么watermark就等于當前所有達到數據中的EventTime中的最大值(maxEventTime)減去時間t,代表EventTime在 maxEventTime - t 之前的數據都已達到,結束時間為 maxEventTime - t 的時間窗可以進行計算。

在這里插入圖片描述

比如上面的例子,我們設置wartemark的延時時間t為2,那么當EventTime為7的事件到達時,該事件的watermark就是5(maxEventTime = 7, t = 2, watermark = maxEventTime - t = 7 - 2 = 5),那么表示Flink認定EventTime在5或5之前的時間都已經達到了,那么如果有一個窗口的結束時間為5的話,該窗口就會觸發計算。

watermarks的使用:

DataStream<Event> stream = ...;WatermarkStrategy<Event> strategy = WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.timestamp);DataStream<Event> withTimestampsAndWatermarks =stream.assignTimestampsAndWatermarks(strategy);

當然,使用了watermarks之后,也不一定就能保證百分之一百準確。當我們把延時時間t設置的較短時,就能獲取更低的延遲,但是準確性也相對下降;而如果我們把t設的較大,那么延遲就更大,但是準確性就想對較高。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/44822.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/44822.shtml
英文地址,請注明出處:http://en.pswp.cn/web/44822.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何查找電腦的MAC地址

一. 什么是mac地址&#xff1f; mac地址本質上幫助我們連接到我們遇到的大多數本地網絡。每個網絡適配器通常由網絡接口??控制器(NIC) 制造商分配一個唯一的 mac 地址。 二. 如何查找mac地址 1.點擊網絡和Internet設置 2.點擊WLAN點擊硬件屬性 3.即可查看mac地址

智慧城市3d數據可視化系統提升信息匯報的時效和精準度

在信息大爆炸的時代&#xff0c;數據的力量無可估量。而如何將這些數據以直觀、高效的方式呈現出來&#xff0c;成為了一個亟待解決的問題。為此&#xff0c;我們推出了全新的3D可視化數據大屏系統&#xff0c;讓數據“躍然屏上”&#xff0c;助力您洞察先機&#xff0c;決勝千…

從零開始實現大語言模型(五):縮放點積注意力機制

1. 前言 縮放點積注意力機制(scaled dot-product attention)是OpenAI的GPT系列大語言模型所使用的多頭注意力機制(multi-head attention)的核心,其目標與前文所述簡單自注意力機制完全相同,即輸入向量序列 x 1 , x 2 , ? ? , x n x_1, x_2, \cdots, x_n x

pytorch訓練的時候 shm共享內存不足,導致訓練停止

1.查看shm情況 df -h /dev/shm內存已經滿了&#xff0c;因為之前訓練多次訓練意外停止到shm中的緩存不能及時被清理 2、手動清理shm 依然沒被釋放 3、查看關聯的進程&#xff0c;一個一個kill lsof |grep deletedkill -9 46619 44618 44617 。。。。。4、搞定

Spring @Scheduled學習

一. Jdk中的定時任務 我們平時在 Spring 項目中會使用 Scheduled 開啟定時任務&#xff1b; jdk 中其實也提供了定時任務線程池 ScheduledThreadPool&#xff0c;我們可以直接通過 Executors 工具類獲取&#xff1b; // 創建了核心線程數為 2 的 ScheduledThreadPool 對象 S…

ROS2 + 科大訊飛 初步實現機器人語音控制

環境配置&#xff1a; 電腦端&#xff1a; ubuntu22.04實體機作為上位機 ROS版本&#xff1a;ros2-humble 實體機器人&#xff1a; STM32 思嵐A1激光雷達 科大訊飛語音SDK 訊飛開放平臺-以語音交互為核心的人工智能開放平臺 實現步驟&#xff1a; 1. 下載和處理科大訊飛語音模…

開發指南048-前端模塊版本

平臺前端框架內置了一個文件version.vue <template> <div> <br> 應用名稱: {{name}} <br> 當前版本&#xff1a;{{version}} <br> 服務網關: {{gateway}} </div> </template> <scrip…

qt 創建一個包含兩按鈕,且安裝和自定義控件間沒有間距

在 Qt 中創建一個包含兩個按鈕且按鈕之間沒有間距的自定義控件&#xff0c;你可以使用 QHBoxLayout 或 QVBoxLayout&#xff08;取決于你希望按鈕是水平排列還是垂直排列&#xff09;&#xff0c;并設置布局的間距為 0。以下是一個簡單的示例&#xff0c;展示了如何創建一個水平…

Dataset for Stable Diffusion

1.Dataset for Stable Diffusion 筆記來源&#xff1a; 1.Flickr8k數據集處理 2.處理Flickr8k數據集 3.Github&#xff1a;pytorch-stable-diffusion 4.Flickr 8k Dataset 5.dataset_flickr8k.json 1.1 Dataset 采用Flicker8k數據集&#xff0c;該數據集有兩個文件&#xff…

Node.js_mongodb用戶名和密碼操作

mongodb用戶名和密碼操作 查看用戶密碼創建管理員用戶和密碼mongodb的目標是實現快速簡單部署,所以存在很多安全問題 默認配置下沒有用戶和密碼,無需身份驗證即可登錄,不像mysql那樣需要登錄才能操作數據庫本身安全問題:升級3.0以上版本查看用戶密碼 密碼是加密存儲的,并且…

前端工程化10-webpack靜態的模塊化打包工具之各種loader處理器

9.1、案例編寫 我們創建一個component.js 通過JavaScript創建了一個元素&#xff0c;并且希望給它設置一些樣式&#xff1b; 我們自己寫的css,要把他加入到Webpack的圖結構當中&#xff0c;這樣才能被webpack檢測到進行打包&#xff0c; style.css–>div_cn.js–>main…

速盾:ddos高防ip哪里好用?

隨著互聯網的飛速發展&#xff0c;DDoS攻擊問題逐漸突出。DDoS攻擊是一種通過在網絡上創建大量請求&#xff0c;使目標網絡或服務器過載而無法正常工作的攻擊方式。為了應對DDoS攻擊&#xff0c;提高網絡的安全性和穩定性&#xff0c;使用高防IP成為了一種常見的解決辦法。 DD…

Flower花所比特幣交易及交易費用科普

在加密貨幣交易中&#xff0c;選擇一個可靠的平臺至關重要。Flower花所通過提供比特幣交易服務脫穎而出。本文將介紹在Flower花所進行比特幣交易的基礎知識及其交易費用。 什么是Flower花所&#xff1f; Flower花所是一家加密貨幣交易平臺&#xff0c;為新手和資深交易者提供…

【C++】開源:drogon-web框架配置使用

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 這篇文章主要介紹drogon-web框架配置使用。 無專精則不能成&#xff0c;無涉獵則不能通。——梁啟超 歡迎來到我的博客&#xff0c;一起學習&#xff0c;共同進步。 喜歡的朋友可以關注一下&#xff0c;…

Linux系統編程-線程同步詳解

線程同步是指多個線程協調工作&#xff0c;以便在共享資源的訪問和操作過程中保持數據一致性和正確性。在多線程環境中&#xff0c;線程是并發執行的&#xff0c;因此如果多個線程同時訪問和修改共享資源&#xff0c;可能會導致數據不一致、競態條件&#xff08;race condition…

面試題008-Java-SpringBoot

面試題008-Java-SpringBoot 目錄 面試題008-Java-SpringBoot題目自測題目答案1. Spring 和 Spring Boot有什么區別&#xff1f;2. Spring Boot 的主要優點是什么&#xff1f;3. 什么是Spring Boot Starter&#xff1f;4. 介紹一下SpringBootApplication注解&#xff1f;5. Spri…

【密碼學】消息認證

你發送給朋友一條消息&#xff08;內容&#xff1a;明天下午來我家吃飯&#xff09;&#xff0c;這一過程中你不想讓除你朋友以外的人看到消息的內容&#xff0c;這就叫做消息的機密性&#xff0c;用來保護消息機密性的方式被叫做加密機制。 現在站在朋友的視角&#xff0c;某一…

使用PyQt5實現添加工具欄、增加SwitchButton控件

前言&#xff1a;通過在網上找到的“電池電壓監控界面”&#xff0c;學習PyQt5中添加工具欄、增加SwitchButton控件&#xff0c;在滑塊控件右側增加文本顯示、設置界面背景顏色、修改文本控件字體顏色等。 1. 上位機界面效果展示 網絡上原圖如下&#xff1a; 自己使用PyQt5做…

springboot異常(一):springboot自定義全局異常處理

&#x1f337;1. 自定義一個異常類 自定義一個異常&#xff0c;有兩個變量異常代碼、異常消息&#xff0c;定義了兩個構造方法&#xff0c;一個無參構造方法&#xff0c;一個所有參數構造方法。 在構造方法中要掉用父類的構造方法&#xff0c;主要目的是在日志或控制臺打印異…

【Linux】多線程_3

文章目錄 九、多線程3. C11中的多線程4. 線程的簡單封裝 未完待續 九、多線程 3. C11中的多線程 Linux中是根據多線程庫來實現多線程的&#xff0c;C11也有自己的多線程&#xff0c;那它的多線程又是怎樣的&#xff1f;我們來使用一些C11的多線程。 Makefile&#xff1a; te…