RDD 專項練習
現有分數信息文件 scores.txt
班級ID 姓名 年齡 性別 科目 成績
12 張三 25 男 chinese 50
12 張三 25 男 math 60
12 張三 25 男 english 70
12 李四 20 男 chinese 50
12 李四 20 男 math 50
12 李四 20 男 english 50
12 王芳 19 女 chinese 70
12 王芳 19 女 math 70
12 王芳 19 女 english 70
13 張大三 25 男 chinese 60
13 張大三 25 男 math 60
13 張大三 25 男 english 70
13 李大四 20 男 chinese 50
13 李大四 20 男 math 60
13 李大四 20 男 english 50
13 王小芳 19 女 chinese 70
13 王小芳 19 女 math 80
13 王小芳 19 女 english 70
需求如下: |
---|
1、一共有多少人參加考試? |
2、一共有多少個大于、小于、等于20歲的人參加考試? |
3、分別有多個男生和女生參加考試? |
4、各個班有多少人參加考試? |
5、語文和數學科目的平均成績是多少? |
6、單個人平均成績是多少? |
7、各班平均成績是多少? |
8、各班男女生平均總成績是多少? |
9、全校語文成績最高分是多少? |
10、各班各個科目最高和最低成績是多少? |
11、總成績大于 150 分的 12 班的女生有幾個? |
一、預處理
定義一個內部case類用于存儲分數數據
創建SparkContext對象
讀取數據文件,跳過第一行(標題行),并映射為Score對象(附加年齡類型)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} object SparkRDD { // 定義一個內部case類用于存儲分數數據 private case class Score(classId: Int, name: String, age: Int, gender: String, subject: String, score: Int, _type: String) def main(args: Array[String]): Unit = { // 創建Spark配置對象 val conf = new SparkConf() .setAppName("spark_rdd") // 設置應用名稱 .setMaster("local[4]") // 設置運行模式為本地模式,并分配4個核心 // 獲取或創建SparkContext對象 val sc = SparkContext.getOrCreate(conf) // 指定數據文件路徑 val path = "file:///D:\\myOwnProject\\spark_first\\data\\scores.txt" // 讀取數據文件,跳過第一行(標題行),并映射為Score對象 val scores: RDD[Score] = sc.textFile(path, 4) // 讀取文件,分區數為4 .mapPartitionsWithIndex { // 對每個分區應用索引和迭代器 case (index, iterator) => if (index == 0) iterator.drop(1) else iterator // 如果是第一個分區,則跳過第一行 } .mapPartitions( // 對每個分區應用映射操作 _.map(line => { // 對分區中的每一行進行處理 val a = line.split("\\s+") // 按空白字符分割每行 val age = a(2).toInt // 將年齡字段轉換為整數 val _type = age match { // 根據年齡設置類型 case age if age > 20 => "GT20" // 年齡大于20 case age if age == 20 => "EQ20" // 年齡等于20 case age if age < 20 => "LT20" // 年齡小于20 } // 構造Score對象 Score(a(0).toInt, a(1), a(2).toInt, a(3), a(4), a(5).toInt, _type) }) ).cache() // 將RDD緩存到內存中,優化:RDD結果被不斷使用// 打印所有處理后的Score對象 scores.foreach(println) //程序結束時停止SparkContext sc.stop() }
}
二、處理需求
1、一共有多少人參加考試?
val count1 = scores.mapPartitions(_.map(_.name)) // scores如上預處理,下同.distinct().count()println(s"${count1} 人參加考試") //6人參加考試
2、一共有多少個大于、小于、等于20歲的人參加考試?
val map = Map(("GT20", "20歲以上"), ("EQ20", "20歲"), ("LT20", "20歲以下"))
scores.mapPartitions(_.map(s => ((s.name, s._type), 1))).groupByKey().mapPartitions(_.map(s => (s._1._2, 1))).reduceByKey(_ + _).foreach(s => println(s"${map.get(s._1).get}的人數為${s._2}"))
/*
20歲以上的人數為2
20歲以下的人數為2
20歲的人數為2
*/
3、分別有多個男生和女生參加考試?
scores.mapPartitions(_.map(s => ((s.name, s.gender), 1))).groupByKey().mapPartitions(_.map(s => (s._1._2, 1))).reduceByKey(_ + _).foreach(s => println(s"${s._1}生參加考試的人數為${s._2}"))
/*
男生參加考試的人數為4
女生參加考試的人數為2
*/
4、各個班有多少人參加考試?
注意同班,同名去重
方法一:groupByKey 去重
scores.mapPartitions(_.map(s => ((s.name, s.classId), 1))).groupByKey().mapPartitions(_.map(s => (s._1._2, 1))).reduceByKey(_ + _).foreach(s => println(s"${s._1}班參加考試的人數為${s._2}"))
/*
12班參加考試的人數為3
13班參加考試的人數為3
*/
方法二:distinct去重(不推薦)
scores.mapPartitions(_.map(t=>(t.classId, t.name))).distinct().map(s => (s._1, 1)).reduceByKey(_+_).foreach(s => println(s"${s._1}班參加考試的人數為${s._2}"))
5、語文數學科目的平均成績是多少?
scores.mapPartitions(_.collect({ // 存在非語數外科目,如何過濾case s if s.subject.matches("chinese|math") => (s.subject, s.score)})).groupByKey().map(t => (t._1, t._2.sum * 1.0f / t._2.size)).foreach(s => println(s"${s._1}平均分:${s._2}"))
6、單個人平均成績是多少?
scores.mapPartitions(_.map(t=>(t.name,t.score))).groupByKey().map(t=>(t._1,t._2.sum*1.0f/t._2.size)).foreach(println)
7、各班平均成績是多少?
scores.mapPartitions(_.map(t=>(t.classId,t.score))).groupByKey().map(t=>(t._1,t._2.sum*1.0f/t._2.size)).foreach(println)
8、各班男女生平均總成績是多少?
scores.mapPartitions(_.map(t=>((t.classId,t.gender),t.score))).groupByKey().map(t=>(t._1,t._2.sum*1.0f/t._2.size)).foreach(println)
9、全校語文成績最高分是多少?
val chineseMax = scores.mapPartitions(_.filter(_.subject.equals("chinese")).map(_.score)).max()
println(s"最高語文成績為${chineseMax}")
10、各班各個科目最高和最低成績是多少?
scores.mapPartitions(_.map(s => ((s.subject, s.classId), (s.score, s.score)))
)
.reduceByKey((s1, s2) => (if (s1._1 > s2._1) s1._1 else s2._1, (if (s1._2 < s2._2) s1._2 else s2._2))
)
.foreach(s => println(s"${s._1._2}班${s._1._1}科目的最大成績為${s._2._1},最小成績為${s._2._2}"))
結果
13班chinese科目的最大成績為70,最小成績為50
12班english科目的最大成績為70,最小成績為50
12班chinese科目的最大成績為70,最小成績為50
13班english科目的最大成績為70,最小成績為50
12班math科目的最大成績為70,最小成績為50
13班math科目的最大成績為80,最小成績為60
11、總成績大于 150 分的 12 班的女生有幾個?
val count2 = scores.mapPartitions(_.filter(s => s.gender == "女" && s.classId == 12).map(s => (s.name, s.score))).reduceByKey(_ + _).filter(s => s._2 > 150).count()
println(s"總成績大于 150 分的 12 班的女生有${count2}個")