spark 架構
by Jayvardhan Reddy
通過杰伊瓦爾丹·雷迪(Jayvardhan Reddy)
深入研究Spark內部和架構 (Deep-dive into Spark internals and architecture)
Apache Spark is an open-source distributed general-purpose cluster-computing framework. A spark application is a JVM process that’s running a user code using the spark as a 3rd party library.
Apache Spark是一個開源的分布式通用集群計算框架。 Spark應用程序是一個JVM進程,正在使用Spark作為第三方庫來運行用戶代碼。
As part of this blog, I will be showing the way Spark works on Yarn architecture with an example and the various underlying background processes that are involved such as:
作為此博客的一部分,我將通過示例和涉及的各種基礎后臺過程來演示Spark在Yarn體系結構上的工作方式,例如:
- Spark Context 火花上下文
- Yarn Resource Manager, Application Master & launching of executors (containers). 紗線資源經理,應用程序主管和執行程序(容器)的啟動。
- Setting up environment variables, job resources. 設置環境變量,作業資源。
- CoarseGrainedExecutorBackend & Netty-based RPC. CoarseGrainedExecutor后端和基于Netty的RPC。
- SparkListeners. SparkListeners。
- Execution of a job (Logical plan, Physical plan). 執行工作(邏輯計劃,物理計劃)。
- Spark-WebUI. Spark-WebUI。
火花上下文 (Spark Context)
Spark context is the first level of entry point and the heart of any spark application. Spark-shell is nothing but a Scala-based REPL with spark binaries which will create an object sc called spark context.
Spark上下文是入口點的第一級,是任何Spark應用程序的核心。 Spark-shell只是一個基于Scala的具有火花二進制文件的REPL,它將創建一個稱為火花上下文的對象sc。
We can launch the spark shell as shown below:
我們可以如下所示啟動spark shell:
spark-shell --master yarn \ --conf spark.ui.port=12345 \ --num-executors 3 \ --executor-cores 2 \ --executor-memory 500M
As part of the spark-shell, we have mentioned the num executors. They indicate the number of worker nodes to be used and the number of cores for each of these worker nodes to execute tasks in parallel.
作為“火花殼”的一部分,我們提到了num執行程序。 它們指示要使用的工作程序節點的數量以及這些工作程序節點中的每個并行執行任務的內核的數量。
Or you can launch spark shell using the default configuration.
或者,您可以使用默認配置啟動Spark Shell。
spark-shell --master yarn
The configurations are present as part of spark-env.sh
該配置作為spark-env.sh的一部分存在
Our Driver program is executed on the Gateway node which is nothing but a spark-shell. It will create a spark context and launch an application.
我們的驅動程序在網關節點上執行,不過這只是一個火花。 它將創建一個火花上下文并啟動一個應用程序。
The spark context object can be accessed using sc.
可以使用sc訪問spark上下文對象。
After the Spark context is created it waits for the resources. Once the resources are available, Spark context sets up internal services and establishes a connection to a Spark execution environment.
創建Spark上下文后,它將等待資源。 一旦資源可用,Spark上下文將建立內部服務并建立與Spark執行環境的連接。
紗線資源經理,應用程序主管和執行程序(容器)的啟動。 (Yarn Resource Manager, Application Master & launching of executors (containers).)
Once the Spark context is created it will check with the Cluster Manager and launch the Application Master i.e, launches a container and registers signal handlers.
創建Spark上下文后,它將與集群管理器進行檢查并啟動Application Master,即啟動容器并注冊信號處理程序。
Once the Application Master is started it establishes a connection with the Driver.
一旦啟動應用程序主控器,它將與驅動程序建立連接。
Next, the ApplicationMasterEndPoint triggers a proxy application to connect to the resource manager.
接下來,ApplicationMasterEndPoint觸發代理應用程序以連接到資源管理器。
Now, the Yarn Container will perform the below operations as shown in the diagram.
現在,Yarn容器將執行以下操作,如圖所示。
ii) YarnRMClient will register with the Application Master.
ii)YarnRMClient將向Application Master注冊。
iii) YarnAllocator: Will request 3 executor containers, each with 2 cores and 884 MB memory including 384 MB overhead
iii)YarnAllocator:將請求3個執行器容器,每個執行器容器具有2個內核和884 MB內存,包括384 MB的開銷
iv) AM starts the Reporter Thread
iv)AM啟動Reporter線程
Now the Yarn Allocator receives tokens from Driver to launch the Executor nodes and start the containers.
現在,Yarn分配器從Driver接收令牌,以啟動Executor節點并啟動容器。
設置環境變量,作業資源和啟動容器。 (Setting up environment variables, job resources & launching containers.)
Every time a container is launched it does the following 3 things in each of these.
每次啟動容器時,它都會分別執行以下三項操作。
- Setting up env variables 設置環境變量
Spark Runtime Environment (SparkEnv) is the runtime environment with Spark’s services that are used to interact with each other in order to establish a distributed computing platform for a Spark application.
Spark運行時環境(SparkEnv)是帶有Spark服務的運行時環境,這些服務相互交互以為Spark應用程序建立分布式計算平臺。
- Setting up job resources 設置工作資源
- Launching container 發射容器
YARN executor launch context assigns each executor with an executor id to identify the corresponding executor (via Spark WebUI) and starts a CoarseGrainedExecutorBackend.
YARN執行程序啟動上下文為每個執行程序分配一個執行程序ID,以標識相應的執行程序(通過Spark WebUI),并啟動CoarseGrainedExecutorBackend。
CoarseGrainedExecutor后端和基于Netty的RPC。 (CoarseGrainedExecutorBackend & Netty-based RPC.)
After obtaining resources from Resource Manager, we will see the executor starting up
從資源管理器獲取資源后,我們將看到執行程序正在啟動
CoarseGrainedExecutorBackend is an ExecutorBackend that controls the lifecycle of a single executor. It sends the executor’s status to the driver.
CoarseGrainedExecutorBackend是一個ExecutorBackend,用于控制單個執行程序的生命周期。 它將執行者的狀態發送給驅動程序。
When ExecutorRunnable is started, CoarseGrainedExecutorBackend registers the Executor RPC endpoint and signal handlers to communicate with the driver (i.e. with CoarseGrainedScheduler RPC endpoint) and to inform that it is ready to launch tasks.
啟動ExecutorRunnable時,CoarseGrainedExecutorBackend將注冊Executor RPC端點和信號處理程序以與驅動程序進行通信(即與CoarseGrainedScheduler RPC端點)進行通信,并通知其已準備好啟動任務。
Netty-based RPC - It is used to communicate between worker nodes, spark context, executors.
基于Netty的RPC-用于在工作節點,Spark上下文,執行程序之間進行通信。
NettyRPCEndPoint is used to track the result status of the worker node.
NettyRPCEndPoint用于跟蹤工作程序節點的結果狀態。
RpcEndpointAddress is the logical address for an endpoint registered to an RPC Environment, with RpcAddress and name.
RpcEndpointAddress是使用RpcAddress和名稱注冊到RPC環境的端點的邏輯地址。
It is in the format as shown below:
其格式如下所示:
This is the first moment when CoarseGrainedExecutorBackend initiates communication with the driver available at driverUrl through RpcEnv.
這是CoarseGrainedExecutorBackend通過RpcEnv啟動與driverUrl上可用的驅動程序通信的第一時間。
SparkListeners (SparkListeners)
SparkListener (Scheduler listener) is a class that listens to execution events from Spark’s DAGScheduler and logs all the event information of an application such as the executor, driver allocation details along with jobs, stages, and tasks and other environment properties changes.
SparkListener(調度程序偵聽器)是一個類,用于偵聽Spark的DAGScheduler中的執行事件,并記錄應用程序的所有事件信息,例如執行程序,驅動程序分配詳細信息以及作業,階段和任務以及其他環境屬性更改。
SparkContext starts the LiveListenerBus that resides inside the driver. It registers JobProgressListener with LiveListenerBus which collects all the data to show the statistics in spark UI.
SparkContext啟動駐留在驅動程序內部的LiveListenerBus。 它向LiveListenerBus注冊JobProgressListener,LiveListenerBus收集所有數據以在spark UI中顯示統計信息。
By default, only the listener for WebUI would be enabled but if we want to add any other listeners then we can use spark.extraListeners.
默認情況下,僅啟用WebUI的偵聽器,但如果要添加其他偵聽器,則可以使用spark.extraListeners。
Spark comes with two listeners that showcase most of the activities
Spark附帶了兩個聽眾,展示了大多數活動
i) StatsReportListener
i)StatsReportListener
ii) EventLoggingListener
ii)EventLoggingListener
EventLoggingListener: If you want to analyze further the performance of your applications beyond what is available as part of the Spark history server then you can process the event log data. Spark Event Log records info on processed jobs/stages/tasks. It can be enabled as shown below...
EventLoggingListener: 如果您想進一步分析應用程序的性能,而不是Spark歷史記錄服務器提供的性能,則可以處理事件日志數據。 Spark事件日志記錄有關已處理作業/階段/任務的信息。 可以如下所示啟用它...
The event log file can be read as shown below
可以如下所示讀取事件日志文件
- The Spark driver logs into job workload/perf metrics in the spark.evenLog.dir directory as JSON files. Spark驅動程序以JSON文件身份登錄spark.evenLog.dir目錄中的作業工作量/性能指標。
- There is one file per application, the file names contain the application id (therefore including a timestamp) application_1540458187951_38909. 每個應用程序只有一個文件,文件名包含應用程序ID(因此包含時間戳)application_1540458187951_38909。
It shows the type of events and the number of entries for each.
它顯示事件的類型以及每個事件的條目數。
Now, let’s add StatsReportListener to the spark.extraListeners and check the status of the job.
現在,讓我們將StatsReportListener添加到spark.extraListeners 并檢查作業狀態。
Enable INFO logging level for org.apache.spark.scheduler.StatsReportListener logger to see Spark events.
為org.apache.spark.scheduler.StatsReportListener記錄器啟用INFO記錄級別以查看Spark事件。
To enable the listener, you register it to SparkContext. It can be done in two ways.
要啟用偵聽器,請將其注冊到SparkContext。 它可以通過兩種方式完成。
i) Using SparkContext.addSparkListener(listener: SparkListener) method inside your Spark application.
i)在Spark應用程序中使用SparkContext.addSparkListener(listener:SparkListener)方法。
Click on the link to implement custom listeners - CustomListener
單擊鏈接以實現自定義偵聽器-CustomListener
ii) Using the conf command-line option
ii)使用conf命令行選項
Let’s read a sample file and perform a count operation to see the StatsReportListener.
讓我們閱讀一個示例文件并執行計數操作以查看StatsReportListener。
執行工作(邏輯計劃,物理計劃)。 (Execution of a job (Logical plan, Physical plan).)
In Spark, RDD (resilient distributed dataset) is the first level of the abstraction layer. It is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs can be created in 2 ways.
在Spark中,RDD( 彈性分布式數據集 )是抽象層的第一層。 它是跨集群節點劃分的元素的集合,可以并行操作。 可以通過兩種方式創建RDD。
i) Parallelizing an existing collection in your driver program
在你的驅動程序我)p arallelizing現有的集合
ii) Referencing a dataset in an external storage system
ii)引用外部存儲系統中的數據集
RDDs are created either by using a file in the Hadoop file system, or an existing Scala collection in the driver program, and transforming it.
通過使用Hadoop文件系統中的文件或驅動程序中現有的Scala集合創建RDD,然后對其進行轉換。
Let’s take a sample snippet as shown below
讓我們來看一個示例片段,如下所示
The execution of the above snippet takes place in 2 phases.
以上代碼段的執行分為兩個階段。
6.1 Logical Plan: In this phase, an RDD is created using a set of transformations, It keeps track of those transformations in the driver program by building a computing chain (a series of RDD)as a Graph of transformations to produce one RDD called a Lineage Graph.
6.1邏輯計劃:在此階段,使用一組轉換來創建RDD,它通過構建計算鏈(一系列RDD)作為轉換圖來生成驅動程序中的那些轉換,以產生一個稱為ADD的RDD。 沿襲圖 。
Transformations can further be divided into 2 types
轉換可以進一步分為2種類型
Narrow transformation: A pipeline of operations that can be executed as one stage and does not require the data to be shuffled across the partitions — for example, Map, filter, etc..
窄轉換:可以作為一個階段執行的操作流水線,不需要在分區之間對數據進行混洗(例如Map,filter等)。
Now the data will be read into the driver using the broadcast variable.
現在,將使用廣播變量將數據讀取到驅動程序中。
Wide transformation: Here each operation requires the data to be shuffled, henceforth for each wide transformation a new stage will be created — for example, reduceByKey, etc..
廣泛轉換:這里的每個操作都需要對數據進行混洗,從此以后,對于每次廣泛轉換,都會創建一個新階段-例如reduceByKey等。
We can view the lineage graph by using toDebugString
我們可以使用toDebugString查看沿襲圖
6.2 Physical Plan: In this phase, once we trigger an action on the RDD, The DAG Scheduler looks at RDD lineage and comes up with the best execution plan with stages and tasks together with TaskSchedulerImpl and execute the job into a set of tasks parallelly.
6.2物理計劃: 在此階段,一旦我們在RDD上觸發了動作, DAG Scheduler就會查看RDD沿襲,并提出最佳的執行計劃以及階段和任務,以及TaskSchedulerImpl,并并行執行一組任務。
Once we perform an action operation, the SparkContext triggers a job and registers the RDD until the first stage (i.e, before any wide transformations) as part of the DAGScheduler.
一旦我們執行動作操作,SparkContext將觸發作業并將RDD注冊到DAGScheduler的第一階段(即,在進行任何寬轉換之前)。
Now before moving onto the next stage (Wide transformations), it will check if there are any partition data that is to be shuffled and if it has any missing parent operation results on which it depends, if any such stage is missing then it re-executes that part of the operation by making use of the DAG( Directed Acyclic Graph) which makes it Fault tolerant.
現在,在進入下一個階段(寬轉換)之前,它將檢查是否有任何將要改組的分區數據,以及是否有依賴于它的任何父操作結果丟失,如果缺少任何這樣的階段,它將重新進行-通過使用DAG(有向無環圖)來執行該部分操作,從而使其具有容錯能力。
In the case of missing tasks, it assigns tasks to executors.
在缺少任務的情況下,它將任務分配給執行者。
Each task is assigned to CoarseGrainedExecutorBackend of the executor.
每個任務都分配給執行者的CoarseGrainedExecutorBackend。
It gets the block info from the Namenode.
它從Namenode獲取塊信息。
now, it performs the computation and returns the result.
現在,它執行計算并返回結果。
Next, the DAGScheduler looks for the newly runnable stages and triggers the next stage (reduceByKey) operation.
接下來,DAGScheduler查找新可運行的階段并觸發下一階段(reduceByKey)操作。
The ShuffleBlockFetcherIterator gets the blocks to be shuffled.
ShuffleBlockFetcherIterator獲取要重排的塊。
Now the reduce operation is divided into 2 tasks and executed.
現在,reduce操作分為兩個任務并執行。
On completion of each task, the executor returns the result back to the driver.
完成每個任務后,執行程序將結果返回給驅動程序。
Once the Job is finished the result is displayed.
作業完成后,將顯示結果。
Spark-WebUI (Spark-WebUI)
Spark-UI helps in understanding the code execution flow and the time taken to complete a particular job. The visualization helps in finding out any underlying problems that take place during the execution and optimizing the spark application further.
Spark-UI有助于理解代碼執行流程以及完成特定作業所花費的時間。 可視化有助于發現執行過程中發生的任何潛在問題,并進一步優化spark應用程序。
We will see the Spark-UI visualization as part of the previous step 6.
我們將在前面的步驟6中看到Spark-UI可視化。
Once the job is completed you can see the job details such as the number of stages, the number of tasks that were scheduled during the job execution of a Job.
作業完成后,您可以看到作業詳細信息,例如階段數,在作業執行過程中計劃的任務數。
On clicking the completed jobs we can view the DAG visualization i.e, the different wide and narrow transformations as part of it.
單擊完成的作業后,我們可以查看DAG可視化,即其中不同的寬窄轉換。
You can see the execution time taken by each stage.
您可以看到每個階段花費的執行時間。
On clicking on a Particular stage as part of the job, it will show the complete details as to where the data blocks are residing, data size, the executor used, memory utilized and the time taken to complete a particular task. It also shows the number of shuffles that take place.
單擊特定階段作為工作的一部分時,它將顯示有關數據塊的位置,數據大小,使用的執行程序,已利用的內存以及完成特定任務所花費的時間的完整詳細信息。 它還顯示發生的洗牌數量。
Further, we can click on the Executors tab to view the Executor and driver used.
此外,我們可以單擊Executors選項卡以查看所使用的Executor和驅動程序。
Now that we have seen how Spark works internally, you can determine the flow of execution by making use of Spark UI, logs and tweaking the Spark EventListeners to determine optimal solution on the submission of a Spark job.
現在我們已經了解了Spark在內部的工作方式,您可以通過使用Spark UI,日志和調整Spark EventListeners來確定執行流程,從而確定提交Spark作業時的最佳解決方案。
Note: The commands that were executed related to this post are added as part of my GIT account.
注意:與該帖子相關的已執行命令被添加為我的GIT帳戶的一部分。
Similarly, you can also read more here:
同樣,您也可以在此處內容:
Sqoop Architecture in Depth with code.
深入的Sqoop體系結構與代碼。
HDFS Architecture in Depth with code.
具有代碼 深度的HDFS體系結構 。
Hive Architecture in Depth with code.
具有代碼 深度的Hive架構 。
If you would like too, you can connect with me on LinkedIn — Jayvardhan Reddy.
如果您愿意,也可以通過LinkedIn- Jayvardhan Reddy與我聯系 。
If you enjoyed reading it, you can click the clap and let others know about it. If you would like me to add anything else, please feel free to leave a response ?
如果您喜歡閱讀它,可以單擊拍手并告知其他人。 如果您希望我添加其他任何內容,請隨時回復。
翻譯自: https://www.freecodecamp.org/news/deep-dive-into-spark-internals-and-architecture-f6e32045393b/
spark 架構