目錄
- 描述
- 運行模式
- 1. Windows模式
- 代碼示例
- 2. Local模式
- 3. Standalone模式
- RDD
- 描述
- 特性
- RDD創建
- 代碼示例(并行化創建)
- 代碼示例(讀取外部數據)
- 代碼示例(讀取目錄下的所有文件)
- 算子
- DAG
- SparkSQL
- SparkStreaming
描述
Apache Spark 是用于大規模數據處理的統一分析引擎。它提供 Java、Scala、Python 和 R 中的高級 API,以及支持通用執行圖的優化引擎。它還支持一組豐富的高級工具,包括用于 SQL 和結構化數據處理的Spark SQL 、用于機器學習的MLlib、用于圖形處理的 GraphX 以及用于增量計算和流處理的結構化流。
1. Spark Core
Spark的核心,是Spark運行的基礎。Spark Core以RDD為數據抽象,提供Python、Java、Scala、R語言的API,可以編程進行海量離線數據批處理計算。
2. Spark SQL
Spark SQL是Spark用來操作結構化數據的組件。通過Spark SQL對數據進行處理。
3. Spark Streaming
Spark Streaming是Spark平臺上針對實時數據進行流式計算的組件,提供了豐富的處理數據流的API。
4. Spark MLlib
MLlib是Spark提供的一個機器學習算法庫。MLlib不僅提供了模型評估、數據導入等額外的功能,還提供了一些更底層的機器學習原語。
5. Spark GraphX
GraphX是Spark面向圖計算提供的框架與算法庫。
運行模式
1. Windows模式
多用于本地測試,不需要虛擬機或服務器。
代碼示例
WordCount.scala
package com.wunaiieq//1.導入SparkConf,SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//2.構建SparkConf對象,并設置本地運行和程序的名稱val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//3.通過SparkConf對象構建SparkContext對象val sc = new SparkContext(conf)//4.讀取文件,并生成RDD對象val fileRdd: RDD[String] = sc.textFile("data/words.txt")//5.將單詞進行切割,得到一個存儲全部單詞的集合對象val wordsRdd: RDD[String] = fileRdd.flatMap(_.split(" "))//6.將單詞轉換為Tuple2對象("hello"->("hello",1))val wordAndOneRdd: RDD[(String, Int)] = wordsRdd.map((_, 1))//7.將元組的value按照key進行分組,并對該組所有的value進行聚合操作val resultRdd: RDD[(String, Int)] = wordAndOneRdd.reduceByKey(_ + _)//8.通過collect方法收集RDD數據val wordCount: Array[(String, Int)] = resultRdd.collect()//9.輸出結果wordCount.foreach(println)}
}
log4j.properties
這個沒什么說的直接復制用即可
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{MM/dd HH:mm:ss} %p %c{1}: %m%n# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver=WARN# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR# For deploying Spark ThriftServer
# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
log4j.appender.console.filter.1.AcceptOnMatch=false
2. Local模式
一臺服務器或虛擬機搞定,所謂的Local模式,就是不需要其他任何節點資源就可以在本地執行Spark代碼的環境,一般用于教學,調試,演示等。
# 進入spark根目錄
cd /opt/module/spark/bin
# 運行視頻spark-shell
./spark-shell
webUI
[atguigu@master bin]$ jps
2081 SparkSubmit
2206 Jps
[atguigu@master bin]$ netstat -anp|grep 2081
(Not all processes could be identified, non-owned process infowill not be shown, you would have to be root to see it all.)
tcp6 0 0 192.168.16.100:42050 :::* LISTEN 2081/java
tcp6 0 0 :::4040 :::* LISTEN 2081/java
tcp6 0 0 192.168.16.100:35770 :::* LISTEN 2081/java
unix 2 [ ] STREAM CONNECTED 33071 2081/java
unix 2 [ ] STREAM CONNECTED 36801 2081/java
瀏覽器訪問
http://192.168.16.100:4040/
spark-submit
以下為使用spark提交jar包示例
./spark-submit --master local[2] --class org.apache.spark.examples.SparkPi /opt/module/spark/examples/jars/spark-examples_2.12-3.1.1.jar 100
參數 | 描述 |
---|---|
--class | 要執行程序的主類,可以更換為自己寫的應用程序的主類名稱 |
--master local[2] | 部署模式,默認為本地模式;數字 2 表示分配的虛擬 CPU 核數量 |
spark-examples_2.12-3.2.1.jar | 運行的應用類所在的 jar 包,實際使用時可以設定為自己打的 jar 包 |
20 | 程序的入口參數,根據應用程序的需要,可以是任何有效的輸入值 |
幾種提交方式比較
工具 | 功能 | 特點 | 使用場景 |
---|---|---|---|
bin/spark-submit | 提交 Java/Scala/Python/R 代碼到 Spark 中運行 | 提交代碼用 | 正式場合,正式提交 Spark 程序運行 |
bin/spark-shell | 提供一個 Scala 解釋器環境,用來以 Scala 代碼執行 Spark 程序 | 解釋器環境,寫一行執行一行 | 測試、學習、寫一行執行一行、用來驗證代碼等 |
bin/pyspark | 提供一個 Python 解釋器環境,用來以 Python 代碼執行 Spark 程序 | 解釋器環境,寫一行執行一行 | 測試、學習、寫一行執行一行、用來驗證代碼等 |
3. Standalone模式
Standalone是Spark自帶的一個資源調度框架,它支持完全分布式,也支持HA
- Master角色:管理整個集群的資源,主要負責資源的調度和分配,并進行集群的監控等職責;并托管運行各個任務的Driver。如Yarn的ResourceManager。
- Worker角色:每個從節點分配資源信息給Worker管理,管理單個服務器的資源類,分配對應的資源來運行Executor(Task);資源信息包含內存Memory和CPU
Cores核數。如Yarn的NodeManager。- Driver角色,管理單個Spark任務在運行的時候的工作,如Yarn的ApplicationMaster “
- Executor角色,單個任務運行的時候的一堆工作者,干活的。它是集群中工作節點(Worker)中的一個JVM進程,負責在 Spark 作業中運行具體任務(Task),任務彼此之間相互獨立。Spark 應用啟動時,Executor節點被同時啟動,并且始終伴隨著整個 Spark應用的生命周期而存在。如果有Executor節點發生了故障或崩潰,Spark應用也可以繼續執行,會將出錯節點上的任務調度到其他Executor節點上繼續運行。
Executor有兩個核心功能:
1.負責運行組成Spark應用的任務,并將結果返回給驅動器進程。
2.它們通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的
RDD 提供內存式存儲。RDD 是直接緩存在Executor進程內的,因此任務可以在運行時充分利用緩存數據加速運算。
總結
資源管理維度
集群資源管理者:Master
單機資源管理者:Worker任務計算維度
單任務管理者:Driver
單任務執行者:Executor
注:Executor運行于Worker進程內,由Worker提供資源供給它們運行
擴展:歷史服務器HistoryServer(可選),Spark Application運行完成以后,保存事件日志數據至HDFS,啟動HistoryServer可以查看應用運行相關信息。
4. Yarn模式
Hadoop生態圈里面的一個資源調度框架,Spark也是可以基于Yarn來計算的。
5. 云服務模式(運行在云平臺上)
Kubernetes(K8S)容器模式
Spark中的各個角色運行在Kubernetes的容器內部,并組成Spark集群環境。容器化部署是目前業界很流行的一項技術,基于Docker鏡像運行能夠讓用戶更加方便地對應用進行管理和運維。容器管理工具中最為流行的就是(K8S),而Spark也在新版本中支持了k8s部署模式。
6. Mesos
Mesos是Apache下的開源分布式資源管理框架,它被稱為是分布式系統的內核,在Twitter得到廣泛使用,管理著Twitter超過30,0000臺服務器上的應用部署,但是在國內,依然使用著傳統的Hadoop大數據框架,所以國內使用Mesos框架的并不多。
模式 | Spark安裝機器數 | 需啟動的進程 | 所屬者 | 應用場景 |
---|---|---|---|---|
Local | 1 | 無 | Spark | 測試 |
Standalone | 3 | Master及Worker | Spark | 單獨部署 |
Yarn | 1 | Yarn及HDFS | Hadoop | 混合部署 |
RDD
描述
Spark RDD(Resilient Distributed Dataset,彈性分布式數據集)代表一個不可變、可分區、元素可并行計算的集合,是Spark進行數據處理的基本單元。
- 不可變性:RDD一旦創建,其數據就不可改變。對RDD的所有操作(如map、filter、reduce等)都會生成一個新的RDD,而不會修改原始RDD。這種不可變性使得RDD在分布式計算環境下非常穩定,避免了并發沖突。
- 可分區性:RDD可以分成多個分區(Partition),每個分區就是一個數據集片段。一個RDD的不同分區可以保存到集群中的不同節點上,從而可以在集群中的不同節點上進行并行計算。分區是Spark作業并行計算的基本單位,每個分區都會被一個計算任務處理,分區的數量決定了并行計算的粒度。
- 彈性:RDD具有彈性容錯的特點。當運算中出現異常情況導致分區數據丟失或運算失敗時,可以根據RDD的血統(Lineage)關系對數據進行重建。此外,RDD的數據可以保存在內存中,內存放不下時也可以保存在磁盤中,實現了存儲的彈性。
特性
1. 分區(Partitions) 含義:RDD的數據被劃分為多個分區,每個分區是一個數據塊,分布在集群的不同節點上。 作用:每個分區會被一個計算任務處理,分區的數量決定了并行計算的粒度。用戶可以在創建RDD時指定分區數,如果沒有指定,Spark會根據集群的資源自動設置。
示例:從HDFS文件創建RDD時,默認分區數為文件的Block數。
2. 計算函數(Compute Function) 含義:RDD的計算方法會作用到每個分區上。 作用:當對RDD進行操作(如map、filter等)時,Spark會對每個分區應用這個函數。
示例:在map操作中,計算函數會對每個元素執行指定的轉換邏輯。
3. 依賴關系(Dependencies) 含義:RDD之間存在依賴關系。 作用:在部分分區數據丟失時,Spark可以利用依賴關系重新計算丟失的數據,而不是重新計算整個RDD,提高了容錯能力。
分類:依賴關系分為窄依賴(Narrow Dependency)和寬依賴(Wide
Dependency)。窄依賴指一個父RDD的分區最多被一個子RDD的分區使用;寬依賴指一個父RDD的分區被多個子RDD的分區使用。
4. 分區器(Partitioner,可選,只有kv型RDD才有) 含義:對于鍵值對(Key-Value)類型的RDD,可以指定一個分區器來決定數據的分區方式。
作用:分區器決定了數據在集群中的分布,影響并行計算的性能。
類型:Spark支持多種分區器,如HashPartitioner(基于哈希值分區)和RangePartitioner(基于范圍分區)。
5. 優先位置(Preferred Locations,可選) 含義:RDD分區規劃應當盡量靠近數據所在的服務器 作用:Spark在進行任務調度時,會優先將數據分配到其存儲位置進行計算,減少數據傳輸開銷,提高計算效率。
示例:對于HDFS文件,優先位置通常是文件塊所在的節點。
RDD創建
1. 通過并行化集合創建,將本地集合對象轉分布式RDD
val sc = new SparkContext(conf)
val rdd1:RDD[Int]=sc.parallelize(List(1, 2, 3, 4, 5, 6), 3)
rdd1.glom().collect()
makeRdd()創建,本質上也是使用sc.parallelize(…)
def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {parallelize(seq, numSlices)
}
2. 讀取外部數據源 (比如:讀取文件 )
//通過SparkConf對象構建SparkContext對象
val sc = new SparkContext(conf)
//讀取文件
val fileRdd:RDD[String] = sc.textFile("data/words.txt")
程序執行入口:SparkContext對象
Spark RDD 編程的程序入口對象是SparkContext對象(Scala、Python、Java都是如此)
只有構建出SparkContext, 基于它才能執行后續的API調用和計算
本質上, SparkContext對編程來說, 主要功能就是創建第一個RDD出來。
代碼示例(并行化創建)
package com.wunaiieq//1.導入SparkConf類、SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CreateByParallelize {def main(args: Array[String]): Unit = {//2.構建SparkConf對象。并設置本地運行和程序的名稱,*表示使用全部cpu內核,可以指定數量val sparkconf = new SparkConf().setMaster("local[*]").setAppName("CreateRdd1")//3.構建SparkContext對象val sparkContext = new SparkContext(sparkconf)//4.通過并行化創建RDD對象:將本地集合->分布式的RDD對象,如果不指定分區,則根據cpu內核數進行自動分配val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8),3)//5.輸出默認的分區數println("默認分區數:"+rdd.getNumPartitions)//已經指定為3//6.collect方法:將rdd對象中每個分區的數據,都發送到Driver,形成一個Array對象val array1: Array[Int] = rdd.collect()println("rdd.collect()="+array1.mkString(","))//7.顯示出rdd對象中元素被分布到不同分區的數據信息val array2: Array[Array[Int]] = rdd.glom().collect()println("rdd.glom().collect()的內容是:")for(eleArr<- array2){println(eleArr.mkString(","))}}
}
代碼示例(讀取外部數據)
package com.wunaiieq//1.導入SparkConf,SparkContext類
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object CreateByTextFile {def main(args: Array[String]): Unit = {//2.構建SparkConf對象,并設置本地運行和程序名val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFile")//3.通過sparkconf創建SparkContext對象val sparkContext = new SparkContext(sparkConf)//4.通過textFile讀取文件//4.1.讀取hdfs分布式文件系統上的文件
// val hdfsRdd: RDD[String] = sparkContext.textFile("hdfs://192.168.16.100:9820/input/data.txt")
// val hdfsResult: Array[String] = hdfsRdd.collect()
// println("hdfsRdd分區數"+hdfsRdd.getNumPartitions)
// println("hdfsRdd內容"+hdfsResult.mkString(","))//4.2讀取本地文件val localRdd1: RDD[String] = sparkContext.textFile("data/words.txt")println("localRdd1分區數"+localRdd1.getNumPartitions)println("localRdd1內容"+localRdd1.collect().mkString(","))//5.設置最小分區數val localRdd2: RDD[String] = sparkContext.textFile("data/words.txt",3)println("localRdd2分區數"+localRdd2.getNumPartitions)println("localRdd2內容"+localRdd2.collect().mkString(","))//6.最小分區數設置是一個參考值,Spark會有自己的判斷,值太大Spark不會理會val localRdd3: RDD[String] = sparkContext.textFile("data/words.txt", 100)println("localRdd3的分區數"+localRdd3.getNumPartitions)}
}
代碼示例(讀取目錄下的所有文件)
package com.wunaiieq//1.導入類
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object CreateByWholeTextFiles {def main(args: Array[String]): Unit = {//2.構建SparkConf對象,并設置本地運行和程序名稱val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WholeTextFiles")//3.使用sparkconf對象構建SparkContet對象val sparkContext = new SparkContext(sparkConf)//5.讀取指定目錄下的小文件val rdd: RDD[(String, String)] = sparkContext.wholeTextFiles("data")val tuples: Array[(String, String)] = rdd.collect()tuples.foreach(ele=>println(ele._1,ele._2))//6.獲取小文件中的內容val array: Array[String] = rdd.map(_._2).collect()println("---------------------------")println(array.mkString("|"))//4.關閉sparkContext對象sparkContext.stop()}
}
算子
詳見如下專題RDD算子集合
DAG
詳見如下專題DAG專題
SparkSQL
詳見如下專題SparkSQL專題
SparkStreaming
詳見如下專題SparkStreaming專題