(一)需求說明
準備十條符合包含用戶信息的文本文件,每行格式為 姓名,年齡,性別,需要清洗掉年齡為空或者非數字的行。
例如:
張三,25,男
李四,,女
王五,30,男
趙六,a,女
孫七,35,男
周八,40,女
吳九,abc,男
鄭十,45,女
王十,50,男
李二,55,女
(二)思路分析
- 讀入文件
- 對每一行數據進行分析
- 字段拆分,拆分出年齡這個字段
- 判斷
- 如果它不是數字或者缺失,則忽略這條數據
- 否則保存
(三) 代碼展示
import org.apache.spark.{SparkConf, SparkContext}
object DataCleaning {
??def main(args: Array[String]): Unit = {
????// 創建 SparkConf 對象
????val conf = new SparkConf().setAppName("DataCleaning").setMaster("local[*]")
????// 創建 SparkContext 對象
????val sc = new SparkContext(conf)
?
????// 讀取文本文件,創建 RDD
????val inputFile = "input/file.txt"
????val lines = sc.textFile(inputFile)
?
????// 數據清洗操作
????val cleanedLines = lines.filter(line => {?// 使用filter算子
??????val fields = line.split(",")
??????if (fields.length == 3) {
????????val age = fields(1).trim
????????age.matches("\\d+")
??????} else {
????????false
??????}
????})
??????// 輸出清洗后的數據
???????cleanedLines.collect().foreach(println)
?
????// 停止 SparkContext
????sc.stop()
??}
}
拓展:如何把清洗之后的數據保存到一個文件中。
可以使用coalesce(1)這個方法可以讓結果全部保存在一個文件中。
代碼如下:
val singlePartitionRDD = cleanedLines.coalesce(1)
????// 保存清洗后的數據到文件
????val outputPath = "path/to/your/output/file.txt"
????singlePartitionRDD.saveAsTextFile(outputPath)
????// 停止 SparkContext
????sc.stop()