目錄
1 編程前總分析
1.1 數據源
1.2?需要掌握的知識
1.2.1 Hadoop對比java的數據類型
1.2.2 MapReduce流程簡介
?1.3.3?MapReduce流程細分
2 編碼階段
2.1 導入依賴
2.2?mapper
2.3?reducer
2.4?main
1 編程前總分析
1.1 數據源
英語,李沐,85,男,20
數學,李沐,54,男,20
音樂,李沐,54,男,20
體育,李沐,34,男,20
語文,李媛,81,女,20
音樂,李媛,85,女,20
體育,李媛,89,女,20
語文,馬珂,75,女,19
英語,馬珂,85,女,19
音樂,馬珂,75,女,19
體育,馬珂,65,女,19
語文,潘琴,42,女,20
英語,潘琴,48,女,20
音樂,潘琴,48,女,20
體育,潘琴,78,女,20
英語,秦燦,75,男,19
數學,秦燦,89,男,19
音樂,秦燦,85,男,19
體育,秦燦,99,男,19
語文,王靚,85,女,21
英語,王靚,85,女,21
數學,王靚,48,女,21
音樂,王靚,86,女,21
音樂,王靚,85,女,21
體育,王靚,96,女,21
體育,王靚,87,女,21
英語,吳起,85,男,20
數學,吳起,85,男,20
英語,張翔,96,男,20
數學,張翔,85,男,20
音樂,張翔,85,男,20
體育,張翔,87,男,20
語文,鄭虎,85,男,20
數學,鄭虎,85,男,20
音樂,鄭虎,88,男,20
體育,鄭虎,68,男,20
語文,周偉,76,男,19
英語,周偉,85,男,19
數學,周偉,76,男,19
音樂,周偉,99,男,19
體育,周偉,90,男,19
數學,朱鴻,90,男,21
音樂,朱鴻,80,男,21
體育,朱鴻,81,男,21
1.2?需要掌握的知識
1.2.1 Hadoop對比java的數據類型
java | Hadoop |
boolean | BooleanWritable |
Integer/int | IntWritable |
Long/long | LongWritable |
Float/float | FloatWritable |
Double/double | DoubleWritable |
String | Text |
NullWritable |
1.2.2 MapReduce流程簡介
? ? MapReduce是一種簡化的并行計算編程模型框架,主要解決了分布式計算的相關問題。所謂的分布式計算就是將一個文件里的數據內容,一行行的發送給mapper,mapper接收到一行數據使用split分割后接收,并按key分組后傳給reducer,reducer將接收到的一組數據進行處理后輸出,當所有的組都處理完成結束一個MapReduce。
?1.3.3?MapReduce流程細分
? ? 功能:統計每門課程中相同分數的人有多少及他們的名字
? ? 思考一下,想要統計每門課程中相同分數的人數及名字,我們需要以什么字段為標準對數據進行分組(mapper最主要的功能就是分組)?想要搞明白上面的問題,試著和mysql的分組查詢操作做一下類比,具體sql語句如下:
SELECT 姓名?FROM?表?GROUP?BY 課程名稱,成績?;
? ? 參考sql語句的分組查詢,mapper功能就相當于按課程和成績兩個字段值對數據進行分組并查詢學生姓名。mapper里的最后一句context.write(key,value);里的兩個參數,key等于GROUP?BY后面的字段名-->課程成績和成績的拼接字符串,value等于GROUP?BY前面的字段名-->姓名。mapper就實現了將所有key值相同的分為一組,value放在迭代器中,一組組傳給reducer,reducer使用一個Text類型的key和迭代器value進行接收。
2 編碼階段
????????mapreduce拆分:每個mapreduce程序都可以拆分成三個小部分mapper類、reducer類、main方法類。每個類都有其固定的框架,需要改變的就只有mapper和reducer類中重寫方法的方法體本身,還有main方法里面的各項參數值。
????????如果說,當然我的讀者肯定都是聰明的亞批,我是說如果你朋友的java編程基礎不是很好,我的注釋表示它完全可以很細。????????
2.1 導入依賴
????????MapReduce不需導入的四個依賴(hadoop-client、hadoop-hdfs、hadoop-common、hadoop-mapreduce-client-core)
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.7.3</version></dependency>
</dependencies>
2.2?mapper
package course_score_same;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*
stu[0]:課程名稱
stu[1]:學生姓名
stu[2]:成績
stu[3]:性別
stu[4]:年齡
該功能實現:統計該課程中成績相同的學生姓名*/
//Mapper的泛型依次為輸入文本的第幾行,該行的文本,Mapper的輸出key,Mapper的輸出value
public class CssMapper extends Mapper<LongWritable, Text,Text,Text> {
//重寫方法:在idea的代碼區使用快捷鍵 alt+insert選擇鼠標單擊override methods選擇map方法@Override//map方法的三個參數,前兩個就是輸入文本行號,該行的文本,最后一個Context context固定寫法protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {//將文件的每一行傳遞過來,使用split分割后利用字符數組進行接收String[] stu = value.toString().split(",");//拼接字符串:課程和成績String sc = stu[0]+"\t"+stu[2];//向Reducer傳遞參數-> Key:課程+成績 Value:學生名context.write(new Text(sc),new Text(stu[1]));}
}
2.3?reducer
package course_score_same;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;//Reducer的泛型依次為Mapper輸出的key作為Reducer的輸入,Mapper輸出的value作為Reducer的輸入,Reducer的輸出key,Reducer的輸出value
public class CssReducer extends Reducer <Text,Text,Text,Text>{
//重寫方法與Mapper一樣@Override//reduce方法的三個參數:Mapper輸出的key作為Reducer的輸入,Mapper輸出的value作為Reducer的輸入,最后一個Context context固定寫法protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//創建StringBuffer用來接收該課程中成績相同的學生的姓名StringBuffer sb = new StringBuffer();//num變量用來計數int num = 0;//遍歷values參數,將所有的value拼接進sb,并統計學生數量for(Text value:values){sb.append(value.toString()).append(",");num++;}//如果num=1,則表明該課程的這個成績只有一個學生,否則就輸出if(num>1){String names = "一共有" + num + "名學生,他們的名字是:" +sb.toString();System.out.println("*************************************************");System.out.println(key.toString() + names);context.write(key,new Text(names));}}
}
2.4?main
package course_score_same;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class CssMain {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {//創建job和“統計相同課程相同分數的人數”任務入口Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(CssMain.class);//設置Mapper和Reducer的入口job.setMapperClass(CssMapper.class);job.setReducerClass(CssReducer.class);//設置Mapper的輸入輸出類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//設置Reducer的輸入輸出類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//指定輸入輸出路徑String inputPath = "hdfs://localhost:9000/mapreduce/input/學生成績.csv";String outputPath = "hdfs://localhost:9000/mapreduce/output/該課程中成績相同的學生姓名.txt";FileInputFormat.setInputPaths(job,new Path(inputPath));FileOutputFormat.setOutputPath(job,new Path(outputPath));//輸出路徑存在的話就刪除,不然就只能手動刪除,否則會報該文件已存在的異常FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf);if (fileSystem.exists(new Path(outputPath))) {fileSystem.delete(new Path(outputPath), true);}//執行jobjob.waitForCompletion(true);}
}
????????至此,一個完整的MapReduce的編寫就已經完全結束了,如果想要別的功能,只需要修改mapper和reducer類中重寫方法的方法體本身,還有main方法里面的各項參數值即可。
? ? ? ? 為了進一步鍛煉大家MapReduce確定mapper輸出的key和value,下面再找兩個例子練習一下(每個人的想法都不一樣,所以說并沒有標準的答案,合理即可):
- 統計所有學生的信息—>(key:姓名+性別+年齡;value:課程+成績)
- 計算每門成績的最高分、最低分、平均分—>(key:課程名稱;value:成績)
- 統計各性別的人數及他們的姓名—>(key:性別;value:姓名)