spark 并行處理
by Hari Santanam
通過Hari Santanam
如何使用Spark集群并行處理大數據 (How to use Spark clusters for parallel processing Big Data)
將Apache Spark的彈性分布式數據集(RDD)與Databricks一起使用 (Use Apache Spark’s Resilient Distributed Dataset (RDD) with Databricks)
Due to physical limitations, the individual computer processor has largely reached the upper ceiling for speed with current designs. So, hardware makers added more processors to the motherboard (parallel CPU cores, running at the same speed).
由于物理限制,在當前設計中,單個計算機處理器已在很大程度上達到了速度的上限。 因此,硬件制造商在主板上增加了更多處理器(并行CPU內核,以相同的速度運行)。
But… most software applications written over the last few decades were not written for parallel processing.
但是……過去幾十年來編寫的大多數軟件應用程序都不是為并行處理編寫的。
Additionally, data collection has gotten exponentially bigger, due to cheap devices that can collect specific data (such as temperature, sound, speed…).
此外,由于廉價的設備可以收集特定的數據(例如溫度,聲音,速度等),因此數據收集的數量成倍增長。
To process this data in a more efficient way, newer programming methods were needed.
為了以更有效的方式處理此數據,需要更新的編程方法。
A cluster of computing processes is similar to a group of workers. A team can work better and more efficiently than a single worker. They pool resources. This means they share information, break down the tasks and collect updates and outputs to come up with a single set of results.
計算過程的集群類似于一組工人。 一個團隊可以比一個工人更好,更高效地工作。 他們集中資源。 這意味著他們共享信息,分解任務并收集更新和輸出以得出一組結果。
Just as farmers went from working on one field to working with combines and tractors to efficiently produce food from larger and more farms, and agricultural cooperatives made processing easier, the cluster works together to tackle larger and more complex data collection and processing.
就像農民從在一個田地上工作到與聯合收割機和拖拉機一起工作以有效地從更大和更多的農場生產食物,以及農業合作社簡化了加工過程一樣,該集群協同工作以處理更大,更復雜的數據收集和處理。
Cluster computing and parallel processing were the answers, and today we have the Apache Spark framework. Databricks is a unified analytics platform used to launch Spark cluster computing in a simple and easy way.
集群計算和并行處理便是答案,如今,我們有了Apache Spark框架。 Databricks是一個統一的分析平臺,用于以簡單的方式啟動Spark集群計算。
什么是星火? (What is Spark?)
Apache Spark is a lightning-fast unified analytics engine for big data and machine learning. It was originally developed at UC Berkeley.
Apache Spark是一個閃電般的統一分析引擎,適用于大數據和機器學習。 它最初是在加州大學伯克利分校開發的。
Spark is fast. It takes advantage of in-memory computing and other optimizations. It currently holds the record for large-scale on-disk sorting.
火花很快。 它利用了內存計算和其他優化功能。 當前,它保留了大規模磁盤上排序的記錄。
Spark uses Resilient Distributed Datasets (RDD) to perform parallel processing across a cluster or computer processors.
Spark使用彈性分布式數據集(RDD)在群集或計算機處理器上執行并行處理。
It has easy-to-use APIs for operating on large datasets, in various programming languages. It also has APIs for transforming data, and familiar data frame APIs for manipulating semi-structured data.
它具有易于使用的API,可使用各種編程語言對大型數據集進行操作。 它還具有用于轉換數據的API,以及用于處理半結構化數據的熟悉的數據框架API。
Basically, Spark uses a cluster manager to coordinate work across a cluster of computers. A cluster is a group of computers that are connected and coordinate with each other to process data and compute.
基本上,Spark使用群集管理器來協調跨計算機群集的工作。 群集是一組相互連接并相互協調以處理數據和計算的計算機。
Spark applications consist of a driver process and executor processes.
Spark應用程序由驅動程序進程和執行程序進程組成。
Briefly put, the driver process runs the main function, and analyzes and distributes work across the executors. The executors actually do the tasks assigned — executing code and reporting to the driver node.
簡而言之,驅動程序運行主要功能,并在執行程序中分析和分配工作。 執行者實際上執行分配的任務-執行代碼并向驅動程序節點報告。
In real-world applications in business and emerging AI programming, parallel processing is becoming a necessity for efficiency, speed and complexity.
在業務和新興AI編程的實際應用中,并行處理已成為提高效率,速度和復雜性的必要條件。
太好了-那么Databricks是什么? (Great — so what is Databricks?)
Databricks is a unified analytics platform, from the creators of Apache Spark. It makes it easy to launch cloud-optimized Spark clusters in minutes.
Databricks是來自Apache Spark的創建者的統一分析平臺。 它使在幾分鐘內啟動云優化的Spark集群變得容易。
Think of it as an all-in-one package to write your code. You can use Spark (without worrying about the underlying details) and produce results.
將其視為編寫代碼的多合一軟件包。 您可以使用Spark(無需擔心基礎細節)并產生結果。
It also includes Jupyter notebooks that can be shared, as well as providing GitHub integration, connections to many widely used tools and automation monitoring, scheduling and debugging. See here for more information.
它還包括可以共享的Jupyter筆記本,并提供GitHub集成,與許多廣泛使用的工具的連接以及自動化監視,調度和調試。 有關更多信息,請參見此處 。
You can sign up for free with the community edition. This will allow you to play around with Spark clusters. Other benefits, depending on plan, include:
您可以使用社區版免費注冊。 這將使您可以使用Spark集群。 根據計劃,其他好處包括:
- Get clusters up and running in seconds on both AWS and Azure CPU and GPU instances for maximum flexibility. 在幾秒鐘內在AWS以及Azure CPU和GPU實例上啟動并運行群集,以實現最大的靈活性。
- Get started quickly with out-of-the-box integration of TensorFlow, Keras, and their dependencies on Databricks clusters. 立即使用TensorFlow,Keras及其對Databricks集群的依賴關系的現成集成快速入門。
Let’s get started. If you have already used Databricks before, skip down to the next part. Otherwise, you can sign up here and select ‘community edition’ to try it out for free.
讓我們開始吧。 如果您以前已經使用過Databricks,請跳至下一部分。 否則,您可以在這里注冊并選擇“社區版”以免費試用。
Follow the directions there. They are clear, concise and easy:
按照那里的指示。 它們清晰,簡潔,容易:
- Create a cluster 創建集群
- Attach a notebook to the cluster and run commands in the notebook on the cluster 將筆記本連接到群集,并在群集上的筆記本中運行命令
- Manipulate the data and create a graph 處理數據并創建圖形
- Operations on Python DataFrame API; create a DataFrame from a Databricks dataset 對Python DataFrame API的操作; 從Databricks數據集創建DataFrame
- Manipulate the data and display results 處理數據并顯示結果
Now that you have created a data program on cluster, let’s move on to another dataset, with more operations so you can have more data.
現在,您已經在集群上創建了一個數據程序,讓我們繼續進行另一個具有更多操作的數據集,以便可以擁有更多數據。
The dataset is the 2017 World Happiness Report by country, based on different factors such as GDP, generosity, trust, family, and others. The fields and their descriptions are listed further down in the article.
該數據集是基于不同因素(例如GDP,慷慨,信任,家庭等)的國家/地區發布的《 2017年世界幸福報告》。 這些字段及其描述在文章的下方列出。
I previously downloaded the dataset, then moved it into Databricks’ DBFS (DataBricks Files System) by simply dragging and dropping into the window in Databricks.
我以前下載了數據集,然后只需將其拖放到Databricks的窗口中,即可將其移動到Databricks的DBFS(DataBricks文件系統)中。
Or, you can click on Data from left Navigation pane, Click on Add Data, then either drag and drop or browse and add.
或者,您可以從左側導航窗格中單擊數據,單擊添加數據,然后拖放或瀏覽并添加。
# File location and type#this file was dragged and dropped into Databricks from stored #location; https://www.kaggle.com/unsdsn/world-happiness#2017.csv
file_location = "/FileStore/tables/2017.csv"file_type = "csv"
# CSV options# The applied options are for CSV files. For other file types, these # will be ignored: Schema is inferred; first row is header - I # deleted header row in editor and intentionally left it 'false' to #contrast with later rdd parsing, #delimiter # separated, #file_location; if you don't delete header row, instead of reading #C0, C1, it would read "country", "dystopia" etc.infer_schema = "true"first_row_is_header = "false"delimiter = ","df = spark.read.format(file_type) \ .option("inferSchema", infer_schema) \ .option("header", first_row_is_header) \ .option("sep", delimiter) \ .load(file_location)
display(df)
Now, let’s load the file into Spark’s Resilient Distributed Dataset(RDD) mentioned earlier. RDD performs parallel processing across a cluster or computer processors and makes data operations faster and more efficient.
現在,讓我們將文件加載到前面提到的Spark的彈性分布式數據集(RDD)中。 RDD在群集或計算機處理器上執行并行處理,使數據操作更快,更高效。
#load the file into Spark's Resilient Distributed Dataset(RDD)data_file = "/FileStore/tables/2017.csv"raw_rdd = sc.textFile(data_file).cache()#show the top 5 lines of the fileraw_rdd.take(5)
Note the “Spark Jobs” below, just above the output. Click on View to see details, as shown in the inset window on the right.
注意輸出上方的下面的“ Spark Jobs”。 單擊查看以查看詳細信息,如右側插入窗口中所示。
Databricks and Sparks have excellent visualizations of the processes.
Databrick和Sparks具有出色的過程可視化效果。
In Spark, a job is associated with a chain of RDD dependencies organized in a direct acyclic graph (DAG). In a DAG, branches are directed from one node to another, with no loop backs. Tasks are submitted to the scheduler, which executes them using pipelining to optimize the work and transform into minimal stages.
在Spark中,作業與直接非循環圖(DAG)中組織的RDD依賴關系鏈相關聯。 在DAG中,分支從一個節點定向到另一個節點,沒有環回。 任務被提交給調度程序,調度程序使用流水線執行任務以優化工作并轉換為最少的階段。
Don’t worry if the above items seem complicated. There are visual snapshots of processes occurring during the specific stage for which you pressed Spark Job view button. You may or may not need this information — it is there if you do.
如果上述項目看起來很復雜,請不要擔心。 在您按下“ Spark Job”視圖按鈕的特定階段,會看到過程的可視快照。 您可能需要此信息,也可能不需要,如果需要,它就在那里。
RDD entries are separated by commas, which we need to split before parsing and building a dataframe. We will then take specific columns from the dataset to use.
RDD條目用逗號分隔,在解析和構建數據幀之前,我們需要對其進行拆分。 然后,我們將從數據集中獲取特定的列以使用。
#split RDD before parsing and building dataframecsv_rdd = raw_rdd.map(lambda row: row.split(","))#print 2 rowsprint(csv_rdd.take(2))#print typesprint(type(csv_rdd))print('potential # of columns: ', len(csv_rdd.take(1)[0]))
#use specific columns from dataset
from pyspark.sql import Row
parsed_rdd = csv_rdd.map(lambda r: Row( country = r[0], #country, position 1, type=string happiness_rank = r[1], happiness_score = r[2], gdp_per_capita = r[5], family = r[6], health = r[7], freedom = r[8], generosity = r[9], trust = r[10], dystopia = r[11], label = r[-1] ))parsed_rdd.take(5)
Here are the columns and definitions for the Happiness dataset:
以下是幸福數據集的列和定義:
Happiness dataset columns and definitions
幸福數據集列和定義
Country — Name of the country.
國家(地區)—國家名稱。
Region — Region the country belongs to.
地區-國家所屬的地區。
Happiness Rank — Rank of the country based on the Happiness Score.
幸福等級-基于幸福分數的國家/地區排名。
Happiness Score — A metric measured in 2015 by asking the sampled people the question: “How would you rate your happiness on a scale of 0 to 10 where 10 is the happiest.”
幸福分數-2015年的一項衡量標準,通過詢問抽樣人員以下問題:“您如何以0到10的等級來評價幸福,其中10是最幸福的。”
Economy (GDP per Capita) — The extent to which GDP (Gross Domestic Product) contributes to the calculation of the Happiness Score
經濟(人均GDP)-GDP(國內生產總值)對幸福分數計算的貢獻程度
Family — The extent to which Family contributes to the calculation of the Happiness Score
家庭-家庭對幸福分數計算的貢獻程度
Health — (Life Expectancy)The extent to which Life expectancy contributed to the calculation of the Happiness Score
健康-(預期壽命)預期壽命在計算幸福分數中的貢獻程度
Freedom — The extent to which Freedom contributed to the calculation of the Happiness Score.
自由-自由對幸福分數計算的貢獻程度。
Trust — (Government Corruption)The extent to which Perception of Corruption contributes to Happiness Score.
信任-(政府腐敗)腐敗感對幸福感得分的貢獻程度。
Generosity — The extent to which Generosity contributed to the calculation of the Happiness Score.
慷慨度—慷慨度對幸福分數計算的貢獻程度。
Dystopia Residual — The extent to which Dystopia Residual contributed to the calculation of the Happiness Score (Dystopia=imagined place or state in which everything is unpleasant or bad, typically a totalitarian or environmentally degraded one. Residual — what’s left or remaining after everything is else is accounted for or taken away).
反烏托邦殘渣-反烏托邦殘渣在計算幸福感分數方面的貢獻程度(反烏托邦=想象中的地方或狀態,其中一切都不愉快或不好,通常是極權主義或環境惡化的狀況。殘差-剩下的就是剩下的一切)占或拿走)。
# Create a view or table
temp_table_name = "2017_csv"
df.createOrReplaceTempView(temp_table_name)
#build dataframe from RDD created earlierdf = sqlContext.createDataFrame(parsed_rdd)display(df.head(10)#view the dataframe's schemadf.printSchema()
#build temporary table to run SQL commands#table only alive for the session#table scoped to the cluster; highly optimizeddf.registerTempTable("happiness")#display happiness_score counts using dataframe syntaxdisplay(df.groupBy('happiness_score') .count() .orderBy('count', ascending=False) )
df.registerTempTable("happiness")
#display happiness_score counts using dataframe syntaxdisplay(df.groupBy('happiness_score') .count() .orderBy('count', ascending=False) )
Now, let’s use SQL to run a query to do same thing. The purpose is to show you different ways to process data and to compare the methods.
現在,讓我們使用SQL運行查詢以執行相同的操作。 目的是向您展示處理數據和比較方法的不同方法。
#use SQL to run query to do same thing as previously done with dataframe (count by happiness_score)happ_query = sqlContext.sql(""" SELECT happiness_score, count(*) as freq FROM happiness GROUP BY happiness_score ORDER BY 2 DESC """)display(happ_query)
Another SQL query to practice our data processing:
另一個用于實踐數據處理SQL查詢:
#another sql queryhapp_stats = sqlContext.sql(""" SELECT country, happiness_rank, dystopia FROM happiness WHERE happiness_rank > 20 """)display(happ_stats)
There! You have done it — created a Spark-powered cluster and completed a dataset query process using that cluster. You can use this with your own datasets to process and output your Big Data projects.
那里! 您已經完成了—創建了一個由Spark驅動的集群,并使用該集群完成了數據集查詢過程。 您可以將其與自己的數據集一起使用以處理和輸出大數據項目。
You can also play around with the charts-click on the chart /graph icon at the bottom of any output, specify the values and type of graph and see what happens. It is fun.
您也可以使用圖表進行操作,在任何輸出的底部單擊圖表/圖形圖標,指定圖形的值和類型,然后看看會發生什么。 很好玩。
The code is posted in a notebook here at Databricks public forum and will be available for about 6 months as per Databricks.
該代碼已發布在Databricks公共論壇的筆記本中,根據Databricks的使用將持續約6個月。
For more information on using Sparks with Deep Learning, read this excellent article by Favio Vázquez
有關將Sparks與深度學習配合使用的更多信息,請閱讀FavioVázquez 撰寫的精彩文章
Thanks for reading! I hope you have interesting programs with Databricks and enjoy it as much as I have. Please clap if you found it interesting or useful.
謝謝閱讀! 我希望您可以使用Databricks進行一些有趣的程序,并盡可能享受它。 如果您覺得它有趣或有用,請鼓掌。
For a complete list of my articles, see here.
有關我的文章的完整列表,請參見此處 。
翻譯自: https://www.freecodecamp.org/news/how-to-use-spark-clusters-for-parallel-processing-big-data-86a22e7f8b50/
spark 并行處理