SparkStreaming之04:調優

SparkStreaming調優

一 、要點

4.1 SparkStreaming運行原理

在這里插入圖片描述

深入理解

在這里插入圖片描述

4.2 調優策略

4.2.1 調整BlockReceiver的數量

在這里插入圖片描述

案例演示:

object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("NetworkWordCount")val sc = new SparkContext(sparkConf)// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sc, Seconds(5))//創建多個接收器(ReceiverInputDStream),這個接收器接收一臺機器上的某個端口通過socket發送過來的數據并處理val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)val lines = lines1.union(lines2)lines.repartition(100)//處理的邏輯,就是簡單的進行word countval words = lines.repartition(100).flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))//將結果輸出到控制臺wordCounts.print()//啟動Streaming處理流ssc.start()//等待Streaming程序終止ssc.awaitTermination()ssc.stop(false)}
}
??4.2.2 調整Block的數量

batchInterval : 觸發批處理的時間間隔
blockInterval :將接收到的數據生成Block的時間間隔,spark.streaming.blockInterval(默認是200ms),那么,BlockRDD的分區數 = batchInterval / blockInterval,即一個Block就是RDD的一個分區,就是一個task
比如,batchInterval是2秒,而blockInterval是200ms,那么task數為10,如果task的數量太少,比一個executor的core數還少的話,那么可以減少blockInterval,blockInterval最好不要小于50ms,太小的話導致task數太多,那么launch task的時間久多了

4.2.3 調整Receiver的接受速率

pps:permits per second 每秒允許接受的數據量(QPS -> queries per second)
Spark Streaming默認的PPS是沒有限制的,可以通過參數spark.streaming.receiver.maxRate來控制,默認是Long.Maxvalue

??4.2.3 調整數據處理的并行度

BlockRDD的分區數

a. 通過Receiver接受數據的特點決定

b. 也可以自己通過repartition設置

ShuffleRDD的分區數

a. 默認的分區數為spark.default.parallelism(core的大小)

b. 通過我們自己設置決定

val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
4.2.4 數據的序列化

SparkStreaming兩種需要序列化的數據:
a. 輸入的數據:默認是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存儲在executor上的內存中
b. 緩存的數據:默認是以StorageLevel.MEMORY_ONLY_SER的形式存儲的內存中
使用Kryo序列化機制,比Java序列化機制性能好

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
4.2.5 內存調優
(1)需要內存大小

和transformation的類型有關,如果使用的是updateStateByKey,Window這樣的算子,那么內存就要設置得偏大

(2)數據存儲級別

如果把接收到的數據設置的存儲級別是MEMORY_DISK這種級別,也就是說如果內存不夠可以把數據存儲到磁盤上,其實性能還是不好的,性能最好的就是所有的數據都在內存里面,所以如果在資源允許的情況下,把內存調大一點,讓所有的數據都存在內存里面。

4.2.6 Outout性能
(1)MySQL,HBase

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

(2)Kafka(0.8版本)

雖然現在的Kafka的版本已經到2.x版本了,但是很多公司因為歷史遺留的原因,公司里面還是會有0.8x的Kafka。比如本人公司里面有兩個Kafka集群,一個是0.8x的kafka,一個是1.x的Kafka。開發的時候有時候需要我們使用SparkStreaming做實時的ETL,然后再把數據打回Kafka,0.8版本的kafka默認是沒有批量提交的功能的。本人公司里面一個真實的案例,一位同學寫的SparkStreaming程序將數據處理完了以后通過ForeachRDD把數據寫回到0.8Kafka。但是數據處理得很慢,經常會收到延時告警。最終發現他把數據寫到Kafka的時候是一條數據一條數據提交的性能很差。最終手動實現了批量提交的功能。從此再也沒有收到過告警。

4.2.7 Backpressure(壓力反饋)

在這里插入圖片描述
在這里插入圖片描述

Feedback Loop : 動態使得Streaming app從unstable狀態回到stable狀態

在這里插入圖片描述

從Spark1.5版本開始:spark.streaming.backpressure.enabled = true

4.2.8 Elastic Scaling(資源動態分配)

動態分配資源:

批處理動態的決定這個application中需要多少個Executors:

  1. 當一個Executor空閑的時候,將這個Executor殺掉
  2. 當task太多的時候,動態的啟動Executors

Streaming分配Executor的原則是比對 process time / batchInterval 的比率

在這里插入圖片描述

如果延遲了,那么就自動增加資源

在這里插入圖片描述

在這里插入圖片描述

從Spark2.0有這個功能:: spark.streaming.dynamicAllocation.enabled = true

??4.2.8 數據傾斜調優(重要)

因為SparkStreaming的底層就是RDD,之前SparkCore的所有的數據傾斜的調優策略(見Spark之數據傾斜調優)都適合于SparkStreaming,需要靈活掌握,在實際開發的工作當中用得頻率較高。

二 、總結

面試問題:你在工作當中有SparkStreaming調優過項目嗎?怎么調優的?效果怎么樣?

  1. 比如舉foreachRDD的例子
  2. 比如舉個數據傾斜的例子
  3. 用Xmind整理調優的策略

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/71601.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/71601.shtml
英文地址,請注明出處:http://en.pswp.cn/web/71601.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

軟考初級程序員知識點匯總

以下是計算機技術與軟件專業技術資格(水平)考試(簡稱“軟考”)中 程序員(初級) 考試的核心知識點匯總,涵蓋考試大綱的主要方向,幫助你系統復習: 一、計算機基礎 計算機組…

Gauss數據庫omm用戶無法連接處理

確保gauss數據庫服務已經打開 重啟gauss服務 gs_om -t restart 連接gauss gsql -d postgres -p 26000 -r 結果發現 查看數據庫運行情況 gs_om -t status --detail 我們可以看到 cluster_state 的值是 Unavailable 不可用 那么問題大概率是出現在了這里 然后我們再查看一…

36-Openwrt wifi命令工具iwconfig、iwinfo、iwpriv、iwlist

增對wifi的調試命令有很多,這邊列出我們常用的命令提供參考,方便查看信息定位問題。 1、iwconfig 查看當前 WIFI 的工作信道以及工作帶寬模式: root@openwrt:/# iwconfig ra0 ra0 mt7603e ESSID:"openwrt" Mode:Managed Channel:8 Access Point: DC:4B…

Android 低功率藍牙之BluetoothGattDescriptor詳解

BluetoothGattDescriptor 詳解 BluetoothGattDescriptor 是 Android 中用于表示藍牙低功耗(BLE)設備中 GATT(Generic Attribute Profile)描述符 的類。描述符是 GATT 架構中的一種屬性,用于提供關于 特征值&#xff0…

計算機畢業設計Python+DeepSeek-R1大模型醫療問答系統 知識圖譜健康膳食推薦系統 食譜推薦系統 醫療大數據(源碼+LW文檔+PPT+講解)

溫馨提示:文末有 CSDN 平臺官方提供的學長聯系方式的名片! 溫馨提示:文末有 CSDN 平臺官方提供的學長聯系方式的名片! 溫馨提示:文末有 CSDN 平臺官方提供的學長聯系方式的名片! 作者簡介:Java領…

數字體驗推薦TOP8提升用戶參與

數字內容體驗推薦核心優勢 在數字化競爭日益激烈的市場環境中,數字內容體驗的差異化優勢已成為企業突圍的關鍵。通過智能算法驅動的個性化推薦系統,能夠精準捕捉用戶行為軌跡與興趣偏好,實現內容與受眾的動態匹配。這種技術不僅顯著提升頁面…

【每日學點HarmonyOS Next知識】動圖循環播放、監聽tab切換、富文本上下滾動、tab默認居中、a標簽喚起撥號

1、image加載網絡動圖播放一遍后不再播放,有什么方法可以 設置循環播放 目前ArkUI不支持gif圖片設置輪播次數,可通過三方庫ohos-gif-drawable設置輪播次數,在播放一次結束后的回調方法getLoopFinish()中更新播放次數,達到指定次數后設置播放…

redis數據遷移教程(使用RedisShake實現不停機遷移十分便捷)

1.我的場景 需要把本地的redis數據上傳到阿里云服務器上面,服務器上redis并沒有開aof持久化,但是將rdb文件上傳至服務器后每次重啟redis,rdb文件會被覆蓋導致無法同同步數據,最終決定使用RedisShake 2.RedisShake介紹 什么是 RedisShake? RedisShake 是一個用于處理和遷移…

C語言_數據結構總結4:不帶頭結點的單鏈表

純C語言代碼,不涉及C 0. 結點結構 typedef int ElemType; typedef struct LNode { ElemType data; //數據域 struct LNode* next; //指針域 }LNode, * LinkList; 1. 初始化 不帶頭結點的初始化,即只需將頭指針初始化為NULL即可 void Init…

78.StringBuilder簡單示例 C#例子 WPF例子

利用 StringBuilder 提升字符串操作性能 在 C# 中,字符串是不可變的,這意味著每次修改字符串時都會創建一個新的對象。這種特性雖然保證了安全性,但在頻繁修改字符串的場景中會導致性能問題。StringBuilder 正是為解決這一問題而設計的。 什…

【數據集】社區天氣資訊網絡CoWIN-香港小時尺度氣象數據(含MATLAB處理代碼)

社區天氣資訊網絡CoWIN-香港小時尺度氣象數據 數據概述氣象變量說明數據提取(MATLAB全代碼)輸出WRF所需站點氣溫數據參考數據概述 官網-Community Weather Information Network (CoWIN) data policy CoWIN 提供 2010 - 2024 年 的數據下載,每年數據均可單獨下載。下載數據…

【JAVA架構師成長之路】【Redis】第14集:Redis緩存穿透原理、規避、解決方案

30分鐘自學教程:Redis緩存穿透原理與解決方案 目標 理解緩存穿透的成因及危害。掌握布隆過濾器、空值緩存等核心防御技術。能夠通過代碼實現請求攔截與緩存保護。學會限流降級、異步加載等應急方案。 教程內容 0~2分鐘:緩存穿透的定義與核心原因 定義…

尚硅谷爬蟲note15

一、當當網 1. 保存數據 數據交給pipelines保存 items中的類名: DemoNddwItem class DemoNddwItem(scrapy.Item): 變量名 類名() book DemoNddwItem(src src, name name, price price)導入: from 項目名.items import 類…

LVGL直接解碼png圖片的方法

通過把png文件解碼為.C文件,再放到工程中的供使用,這種方式隨時速度快(應為已經解碼,代碼中只要直接加載圖片數據顯示出來即可),但是不夠靈活,適用于哪些簡單又不經常需要更換UI的場景下使用。如…

【計算機網絡】Socket

Socket 是網絡通信的核心技術之一,充當應用程序與網絡協議棧之間的接口。 1. Socket 定義 Socket(套接字)是操作系統提供的 網絡通信抽象層,允許應用程序通過標準接口(如 TCP/IP 或 UDP)進行數據傳輸。它…

Apache XTable:在數據湖倉一體中推進數據互作性

Apache XTable 通過以多種開放表格式提供對數據的訪問,在增強互作性方面邁出了一大步。移動數據很困難,在過去,這意味著在為數據湖倉一體選擇開放表格式時,您被鎖定在該選擇中。一個令人興奮的項目當在數據堆棧的這一層引入互作性…

anolis8.9-k8s1.32-node-二進制部署

一、系統 # cat /etc/anolis-release Anolis OS release 8.9 # uname -r 5.10.134-18.an8.x86_64 二、從master上拷貝dockers及cri-docker相關文件 # groupadd docker # mkdir /etc/docker# scp -P 4033 root192.168.7.201:/etc/systemd/system/containerd.service /etc/s…

《AJAX:前端異步交互的魔法指南》

什么是AJAX AJAX(Asynchronous JavaScript and XML,異步 JavaScript 和 XML) 是一種用于創建異步網頁應用的技術,允許網頁在不重新加載整個頁面的情況下,與服務器交換數據并局部更新頁面內容。盡管名稱中包含 XML&…

Python 性能優化:從入門到精通的實用指南

Langchain系列文章目錄 01-玩轉LangChain:從模型調用到Prompt模板與輸出解析的完整指南 02-玩轉 LangChain Memory 模塊:四種記憶類型詳解及應用場景全覆蓋 03-全面掌握 LangChain:從核心鏈條構建到動態任務分配的實戰指南 04-玩轉 LangChai…

利用 requestrepo 工具驗證 XML外部實體注入漏洞

1. 前言 在數字化浪潮席卷的當下,網絡安全的重要性愈發凸顯。應用程序在便捷生活與工作的同時,也可能暗藏安全風險。XXE(XML外部實體)漏洞作為其中的典型代表,攻擊者一旦利用它,便能竊取敏感信息、掌控服務…