《大數據技術原理與應用》實驗報告七 熟悉 Spark 初級編程實踐

目? 錄

一、實驗目的

二、實驗環境

三、實驗內容與完成情況

3.1 Spark讀取文件系統的數據。

3.2 編寫獨立應用程序實現數據去重。

3.3 編寫獨立應用程序實現求平局值問題。

四、問題和解決方法

五、心得體會


一、實驗目的

????????1. 掌握使用 Spark 訪問本地文件和 HDFS 文件的方法。

????????2. 掌握 Spark 應用程序的編寫、編譯和運行方法。


二、實驗環境

????????1. 硬件要求:筆記本電腦一臺

????????2. 軟件要求:VMWare虛擬機、Ubuntu 18.04 64、JDK1.8、Hadoop-3.1.3、Hive-3.1.2、Windows11操作系統、Eclipse、Flink-1.9.1、IntelliJ IDEA、Spark-2.4.0


三、實驗內容與完成情況

3.1 Spark讀取文件系統的數據。

????????在 Linux 系統中安裝 IntelliJ IDEA,然后使用 IntelliJ IDEA 工具開發 WordCount 程序,并打 包成JAR 文件,提交到 Flink 中運行。

????????(1)在 spark-shell 中讀取 Linux 系統本地文件“/home/hadoop/test.txt”,然后統計出文件的行數。

????????①下載spark壓縮包并使用如下語句將對應的壓縮包解壓到/usr/local的文件目錄下:

sudo tar -zxf ./spark-2.4.0-bin-without-hadoop.tgz -C /usr/local

????????②使用如下語句將解壓后的文件夾“spark-2.4.0-bin-without-hadoop”重命名為“spark”:

sudo mv ./spark-2.4.0-bin-without-hadoop/ ./spark

????????③使用如下語句修改spark的配置文件信息:

vim /etc/profile

????????④使用如下語句讓剛修改完的配置信息立即生效:

source /etc/profile

????????⑤使用如下語句復制"spark-env.sh.template"文件的內容并將其存入到"spark-env.sh"文件中:

cp spark-env.sh.template spark-env.sh

????????⑥使用如下語句啟動spark進行實驗操作:

./sbin/start-all.sh

????????⑦使用如下語句進行進程信息查看:

jps

????????⑧使用如下語句完成spark-shell的啟動工作:

./bin/spark-shell

????????⑨使用如下語句讀取 Linux 系統本地文件“/home/hadoop/test.txt”:

cd /home/hadoop/test.txt
val textFile=sc.textFile("file:///home/hadoop/test.txt")

????????⑩使用如下語句統計文件“/home/hadoop/test.txt”的行數:

textFile.count()

????????(2)在 spark-shell 中讀取 HDFS 系統文件“/user/hadoop/test.txt”(如果該文件不存在, 請先創建),然后統計出文件的行數。

????????①使用如下語句在HDFS上傳文件"1.txt",上傳完成后并進行查看是否成功完成上傳:

cd /usr/local/hadoop
hadoop fs -ls
hadoop fs -put 1.txt
hadoop fs -ls

????????②使用如下語句讀取 Linux 系統HDFS系統文件“/user/hadoop/test.txt”:

val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")

????????③使用如下語句統計HDFS系統文件“/user/hadoop/test.txt”的行數:

textFile.count()

????????(3)編寫獨立應用程序(推薦使用 Scala 語言),讀取 HDFS 系統文件“/user/hadoop/test.txt”(如果該文件不存在,請先創建),然后統計出文件的行數;通過 sbt 工具將整個應用程 序編譯打包成 JAR 包,并將生成的 JAR 包通過 spark-submit 提交到 Spark 中運行命令。

????????①使用hadoop用戶名登錄Linux系統,打開一個終端在Linux終端中執行如下命令創建一個文件夾sparkapp作為應用程序根目錄:

cd ~
mkdir ./sparkapp
mkdir -p ./sparkapp/src/main/scala

????????②將對應的應用程序代碼存放在應用程序根目錄下的“src/main/scala”目錄下,使用vim編輯器在“~/sparkapp/src/main/scala”下建立一個名為 SimpleApp.scala的Scala代碼文件:

import org.apache.spark.{SparkConf, SparkContext}object SimpleApp {def main(args: Array[String]) {// 正確的路徑:去掉路徑字符串中的多余空格val logFile = "hdfs://localhost:9000/user/hadoop/test.txt"// 設置SparkConfval conf = new SparkConf().setAppName("Simple Application")// 初始化SparkContextval sc = new SparkContext(conf)// 讀取文件并進行計算val logData = sc.textFile(logFile, 2)// 計算并輸出文件的行數val num = logData.count()printf("The num of this file is %d\n", num)// 關閉SparkContextsc.stop()}
}

????????③SimpleApp.scala程序依賴于Spark API,因此需要通過sbt進行編譯打包以后才能運行。 首先需要使用vim編輯器在“~/sparkapp”目錄下新建文件simple.sbt:

cd ~
vim ./sparkapp/simple.sbt

????????④simple.sbt文件用于聲明該獨立應用程序的信息以及與 Spark的依賴關系,需要在simple.sbt文件中輸入以下內容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

????????⑤為了保證sbt能夠正常運行,先執行如下命令檢查整個應用程序的文件結構:

cd sparkapp/
find .

????????⑥通過如下代碼將整個應用程序打包成 JAR:

/usr/local/sbt/sbt package

????????⑦生成的JAR包的位置為“~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar”,使用sbt打包得到的應用程序JAR包可以通過 spark-submit 提交到 Spark 中運行:

/usr/local/spark/bin/spark-submit --class "SimpleApp"
~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar

3.2 編寫獨立應用程序實現數據去重。

????????對于兩個輸入文件 A 和 B,編寫 Spark 獨立應用程序(推薦使用 Scala 語言),對兩個 文件進行合并,并剔除其中重復的內容,得到一個新文件 C。下面是輸入文件和輸出文件的 一個樣例供參考。

????????輸入文件 A 的樣例如下:

20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z

????????輸入文件 B 的樣例如下:

20170101 y
20170102 y
20170103 x
20170104 z
20170105 y

????????根據輸入的文件 A 和 B 合并得到的輸出文件 C 的樣例如下:

20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z

????????(1)在目錄/usr/local/spark/mycode/remdup/src/main/scala下新建一個remdup.scala,然后輸入如下代碼:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitionerobject RemDup {def main(args: Array[String]) {// 初始化SparkConf和SparkContextval conf = new SparkConf().setAppName("RemDup")val sc = new SparkContext(conf)// 輸入文件路徑val dataFile = "file:///home/charles/data"// 讀取文件并創建RDD,設置為2個分區val data = sc.textFile(dataFile, 2)// 處理數據:過濾空行,去除多余空格并進行分區和排序val res = data.filter(_.trim.length > 0)  // 過濾空行.map(line => (line.trim, "")) // 每行生成鍵值對(鍵為去除空格的行內容,值為空字符串).partitionBy(new HashPartitioner(1))  // 使用HashPartitioner進行分區.groupByKey()  // 根據鍵進行分組.sortByKey()  // 按照鍵進行排序.keys  // 只保留鍵(去除重復行)// 將結果保存到文件res.saveAsTextFile("result")// 停止SparkContextsc.stop()}
}

????????(2)在目錄/usr/local/spark/mycode/remdup目錄下新建simple.sbt,然后輸入如下代碼:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

????????(3)在目錄/usr/local/spark/mycode/remdup下執行如下命令打包程序:

sudo /usr/local/sbt/sbt package

????????(4)最后在目錄/usr/local/spark/mycode/remdup下執行如下命令提交程序,在目錄/usr/local/spark/mycode/remdup/result下即可得到結果文件:

/usr/local/spark/bin/spark-submit --class "RemDup"
/usr/local/spark/mycode/remdup/target/scala-2.11/simple-project_2.11-1.0.jar

3.3 編寫獨立應用程序實現求平局值問題。

????????每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個是學生 名字,第二個是學生的成績;編寫 Spark 獨立應用程序求出所有學生的平均成績,并輸出到 一個新文件中。下面是輸入文件和輸出文件的一個樣例供參考。

????????Algorithm 成績:

小明 92
小紅 87
小新 82
小麗 90

????????Database 成績:

小明 95
小紅 81
小新 89
小麗 85

????????Python 成績:

小明 82
小紅 83
小新 94
小麗 91

????????平均成績如下:

(小紅,83.67)
(小新,88.33)
(小明,89.67)
(小麗,88.67)

????????(1)在目錄/usr/local/spark/mycode/avgscore/src/main/scala下新建一個avgscore.scala,然后輸入如下代碼:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitionerobject AvgScore {def main(args: Array[String]) {// 初始化SparkConf和SparkContextval conf = new SparkConf().setAppName("AvgScore")val sc = new SparkContext(conf)// 輸入文件路徑val dataFile = "file:///home/hadoop/data"// 讀取文件并創建RDD,設置為3個分區val data = sc.textFile(dataFile, 3)// 處理數據:過濾空行,拆分數據并轉為鍵值對val res = data.filter(_.trim.length > 0)  // 過濾空行.map(line => {val parts = line.split(" ")(parts(0).trim, parts(1).trim.toInt) // 鍵為學生名字,值為成績}).partitionBy(new HashPartitioner(1))  // 使用HashPartitioner進行分區.groupByKey()  // 根據鍵(學生名字)進行分組.map(x => {var n = 0var sum = 0.0// 計算每個學生的總成績和數量for (i <- x._2) {sum += in += 1}// 計算平均成績val avg = sum / n// 格式化平均成績到兩位小數val format = f"$avg%1.2f"(x._1, format)  // 返回學生名字和格式化后的平均成績})// 將結果保存到文件res.saveAsTextFile("result")// 停止SparkContextsc.stop()}
}

????????(2)在目錄/usr/local/spark/mycode/avgscore目錄下新建simple.sbt,然后輸入如下代碼:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

????????(3)在目錄/usr/local/spark/mycode/avgscore下執行如下命令打包程序:

sudo /usr/local/sbt/sbt package

????????(4)最后在目錄/usr/local/spark/mycode/avgscore下執行下面命令提交程序,在目錄/usr/local/spark/mycode/avgscore/result下即可得到結果文件:

/usr/local/spark/bin/spark-submit --class "AvgScore"
/usr/local/spark/mycode/avgscore/target/scala-2.11/simple-project_2.11-1.0.jar


四、問題和解決方法

????????1. 實驗問題:Spark的配置可能變得很復雜,尤其是當涉及到不同的集群管理器、存儲系統和應用需求時。

解決方法:仔細閱讀Spark官方文檔,并理解各種配置選項含義,根據應用需求和集群環境,選擇正確配置選項并進行適當調整。

????????2. 實驗問題:在Spark實驗中,數據清洗、轉換和聚合較為繁瑣。

解決方法:使用Spark提供的豐富數據處理功能,例如DataFrame API和Transformations、Actions操作,通過使用這些功能,可以輕松地處理數據并對其進行各種操作。

????????3. 實驗問題:用spark讀取本地文件并統計文件內容行數時報出IllegalArgumentException(非法參數異常)。

解決方法:由于file:///少打了一個/,spark讀取本地文件的url路徑有問題,正確的應該是valtextFile=sc.textFile("file:///usr/local/hadoop/1.txt")。

????????4. 實驗問題:用spark讀取hdfs文件系統中的文件并統計文件內容行數時報出InvalidInput Exception(無效輸入異常),并提示輸入路徑不存在。

解決方法:讀取hdfs文件的url路徑有問題,hdfs文件系統的根目錄不是~,而應該是/user/hadoop,其中hadoop是用戶名。

????????5. 實驗問題:編寫spark獨立應用程序并實現數據去重時,程序運行報出異常:java.net.URI SyntaxException: Illegal character in scheme name at index 0。

解決方法:代碼中獲取hdfs文件的地址URL前面有空格,就會報錯java.net.URISyntaxException:Illegal character in scheme name at index 0,刪掉空格就好了。

????????6. 實驗問題:隨著Spark版本更新,一些功能和API會發生變化,導致與舊版本不兼容。

解決方法:了解Spark版本之間差異并適應新API和功能通過仔細閱讀官方文檔并參考示例代碼,可以更好地適應新版本Spark并利用其提供的新功能。

????????7. 實驗問題:在Spark應用程序運行過程中,對其進行監控和調試是一項重要任務會面臨日志分析困難、任務跟蹤和性能分析等。

解決方法:使用Spark提供的監控工具和API,例如Spark UI、Event Tracker和Profiler等,通過這些工具和API,可以實時監控Spark應用程序狀態、跟蹤任務執行過程并分析性能瓶頸等。


五、心得體會

????????1、在這次實驗中我深刻體會到了數據去重的重要性,在處理和分析大規模數據時,數據去重能夠避免重復數據的干擾,提高數據的質量和準確性。例如在分析用戶行為或銷售數據時,如果存在大量的重復數據,將會對結果產生誤導。通過使用Spark的distinct()函數可以輕松地實現數據去重,并且在處理大規模數據時,去重操作能夠顯著提高計算效率和結果質量。此外數據去重還可以應用于數據清洗和預處理階段,在數據預處理時需要刪除無效、重復和不完整的數據,以便進行后續的數據分析和挖掘。通過使用Spark的distinct()函數可以快速地去除重復數據,并且對數據進行清洗和預處理,以便更好地探索和分析數據的規律和信息。

????????2、Spark是一個開源的大規模數據處理工具,具有高效、可擴展和易用的特點,通過使用Spark可以快速地讀取和處理大量數據,并且使用豐富的轉換和操作來分析和挖掘數據中的規律和信息。DataFrame提供了一種以表格形式組織數據的結構,可以方便地處理各種類型的數據。在處理和分析數據時,Spark提供了豐富的轉換和操作,例如可以使用map()函數對數據進行轉換,使用filter()函數對數據進行篩選,使用reduceByKey()函數對數據進行聚合操作。

????????3、使用Spark的read()函數來讀取不同格式的數據,使用DataFrame API提供的各種函數對數據進行處理和分析,使用select()函數選擇指定的列進行輸出,使用where()函數對數據進行篩選,使用groupBy()函數對數據進行分組和聚合操作,還可以使用DataFrame API提供的統計函數來計算數據的統計信息,如平均值、標準差、計數等。

????????4、合理地設置Spark的配置參數,以提高計算效率和減少資源浪費,選擇合適的轉換和操作來處理和分析數據以確保結果的準確性和可靠性。在分布式計算中,某些鍵值對的出現頻率可能遠高于其他鍵值對,導致數據分布不均衡,這會引發集群中的負載傾斜問題,影響計算性能和結果質量。為了避免數據傾斜對計算性能的影響,可以對數據進行預處理或使用應對數據傾斜的算法來解決問題。

????????5、DataFrame API提供了豐富的統計和分析函數,可以方便地對數據進行分組、聚合、過濾和排序等操作,通過使用DataFrame API可以快速地獲得數據的統計信息和分布情況,并且對數據進行更深入的挖掘和分析。可以使用groupBy()函數對數據進行分組操作,然后使用agg()函數對分組后的數據進行聚合操作。使用window()函數對數據進行窗口分析,以便更好地探索數據的趨勢和規律,窗口函數可以用于計算滑動平均值、累計和等動態變化的數據指標。

????????6、使用Spark進行數據處理具有高效處理大規模數據、簡單易用的API、靈活的數據處理方式、高效的資源利用率、穩定的計算性能等優點,這些優點使得Spark成為目前大規模數據處理領域中備受關注和常用的工具之一。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/915193.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/915193.shtml
英文地址,請注明出處:http://en.pswp.cn/news/915193.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

機器學習漫畫小抄 - 彩圖版

斯坦福機器學習漫畫小抄&#xff0c;中文版來啦&#xff01; 下載地址&#xff1a; 通過網盤分享的文件&#xff1a;機器學習知識點彩圖版.pdf 鏈接: https://pan.baidu.com/s/1-fH9OpC_u_OrTqWy6gVUCA 提取碼: 246r

1.初始化

業務模塊核心技術棧業務&#xff08;亮點&#xff09;解決方案課程安排01 認識Vue3為什么需要學Vue3?Vue3組合式API體驗Vue3更多的優勢2 使用create-vue搭建Vue3項目認識 create-vue使用create-vue創建項目3 熟悉項目目錄和關鍵文件項目目錄和關鍵文件4 組合式API - setup選項…

Milvus分布式數據庫工作職責

主導騰訊云Milvus服務化項目&#xff0c;設計多租戶隔離方案&#xff0c;支撐日均10億向量請求&#xff0c;延遲降低40%。優化IVF_PQ索引構建流程&#xff0c;通過量化編碼壓縮使內存占用減少60%&#xff0c;QPS提升35%。開發基于Kubernetes的Milvus Operator&#xff0c;實現自…

FMEA-CP-PFD三位一體數字化閉環:汽車部件質量管控的速效引擎

FMEA-CP-PFD三位一體數字化閉環&#xff1a;汽車部件質量管控的速效引擎 全星FMEA軟件系統通過??FMEA&#xff08;失效模式分析&#xff09;、CP&#xff08;控制計劃&#xff09;、PFD&#xff08;過程流程圖&#xff09;三大工具的一體化協同管理??&#xff0c;為汽車部件…

VUE2 學習筆記1

目錄 VUE特點 文檔tips 開發者工具 從一個Hello world開始 hello world Demo 容器和實例的對應關系 差值語法{{}} VUE特點 構建用戶界面&#xff1a;可以用來把數據構建成用戶界面。 漸進式&#xff1a;自底向上&#xff0c;可以先從一個非常輕量級的框架開始&#xf…

嵌入式學習系統編程(四)進程

目錄 一、進程 1.程序和進程 2.進程的八種狀態 3. 幾個狀態 4.關于進程常用命令 二、關于進程的函數 1.fork 2.面問 3.孤兒進程 后臺進程 2. exec函數族 (只保留父子關系&#xff0c;做新的事情) strtok函數 三、進程的結束 1.分類 exit和_exit的區別 wait函數…

Linux中添加重定向(Redirection)功能到minishell

前言&#xff1a;在談論添加minishell之前&#xff0c;我再重談一下重定向的具體實現等大概思想&#xff01;&#xff01;&#xff01;方便自己回顧&#xff01;&#xff01;&#xff01; 目錄 一、重定向&#xff08;Redirection&#xff09;原理詳解 1、文件描述符基礎 2、…

Django由于數據庫版本原因導致數據庫遷移失敗解決辦法

在django開發中&#xff0c;一般我們初始化一個項目之后&#xff0c;創建應用一般就會生成如下的目錄&#xff1a;django-admin startproject myproject python manage.py startapp blogmyproject/ ├── manage.py └── myproject/ | ├── __init__.py | ├── se…

C++STL系列之vector

前言 vector是變長數組&#xff0c;有點像數據結構中的順序表&#xff0c;它和list也是經常被拿出作對比的&#xff0c; vector使用動態分配數組來存儲它的元素。當新元素插入時候&#xff0c;這個數組需要被重新分配大小&#xff0c;如果擴容&#xff0c;因為要開一個新數組把…

Functional C++ for Fun Profit

Lambda Conf上有人講C函數式編程。在Functional Conf 2019上&#xff0c;就有主題為“Lambdas: The Functional Programming Companion of Modern C”的演講。演講者介紹了現代C中函數式編程相關內容&#xff0c;講解了如何使用Lambda表達式編寫符合函數式編程原則的C代碼&…

Python基礎理論與實踐:從零到爬蟲實戰

引言Python如輕舟&#xff0c;載你探尋數據寶藏&#xff01;本文從基礎理論&#xff08;變量、循環、函數、模塊&#xff09;啟航&#xff0c;結合requests和BeautifulSoup實戰爬取Quotes to Scrape&#xff0c;適合零基礎到進階者。文章聚焦Python基礎&#xff08;變量、循環、…

ThingJS開發從入門到精通:構建三維物聯網可視化應用的完整指南

文章目錄第一部分&#xff1a;ThingJS基礎入門第一章 ThingJS概述與技術架構1.1 ThingJS平臺簡介1.2 技術架構解析1.3 開發環境配置第二章 基礎概念與核心API2.1 核心對象模型2.2 場景創建與管理2.3 對象操作基礎第三章 基礎開發實戰3.1 第一個ThingJS應用3.2 事件系統詳解3.3 …

關于list

1、什么是listlist是一個帶頭結點的雙向循環鏈表模版容器&#xff0c;可以存放任意類型&#xff0c;需要顯式定義2、list的使用有了前面學習string和vector的基礎&#xff0c;學習和使用list會方便很多&#xff0c;因為大部分的內容依然是高度重合的。與順序表不同&#xff0c;…

Mysql 查看當前事務鎖

在 MySQL 中查看事務鎖&#xff08;鎖等待、鎖持有等&#xff09;&#xff0c;可以使用以下方法&#xff1a; 一、查看當前鎖等待情況&#xff08;推薦&#xff09; SELECTr.trx_id AS waiting_trx_id,r.trx_mysql_thread_id AS waiting_thread,r.trx_query AS waiting_query,b…

【Keil5-map文件】

Keil5-map文件■ map文件■ map文件

k8s 基本架構

基于Kubernetes(K8s)的核心設計&#xff0c;以下是其關鍵基本概念的詳細解析。這些概念構成了K8s容器編排系統的基石&#xff0c;用于自動化部署、擴展和管理容器化應用。### 一、K8s核心概念概覽 K8s的核心對象圍繞容器生命周期管理、資源調度和服務發現展開&#xff0c;主要包…

Bell不等式賦能機器學習:微算法科技MLGO一種基于量子糾纏的監督量子分類器訓練算法技術

近年來&#xff0c;量子計算&#xff08;Quantum Computing&#xff09; 和 機器學習&#xff08;Machine Learning&#xff09; 的融合成為人工智能和計算科學領域的重要研究方向。隨著經典計算機在某些復雜任務上接近計算極限&#xff0c;研究人員開始探索量子計算的獨特優勢…

Edge瀏覽器設置網頁自動翻譯

一.瀏覽網頁自動翻譯設置->擴展->獲取Microsoft Edge擴展->搜索“沉浸式翻譯”->獲取 。提示&#xff1a;如果采用其他的翻譯擴展沒找自動翻譯功能&#xff0c;所以這里選擇“沉浸式翻譯”二.基于Java WebElement時自動翻譯Java關鍵代碼&#xff1a;提示&#xff1…

TCP/UDP協議深度解析(四):TCP的粘包問題以及異常情況處理

&#x1f50d; 開發者資源導航 &#x1f50d;&#x1f3f7;? 博客主頁&#xff1a; 個人主頁&#x1f4da; 專欄訂閱&#xff1a; JavaEE全棧專欄 本系列往期內容~ TCP/UDP協議深度解析&#xff08;一&#xff09;&#xff1a;UDP特性與TCP確認應答以及重傳機制 TCP/UDP協議深…

R 基礎語法

R 基礎語法 R 語言是一種針對統計計算和圖形表示而設計的編程語言&#xff0c;廣泛應用于數據分析、統計學習、生物信息學等領域。本文將為您介紹 R 語言的基礎語法&#xff0c;幫助您快速入門。 1. R 語言環境搭建 在開始學習 R 語言之前&#xff0c;您需要安裝并配置 R 語言環…