MapReduce實現單詞計數:WordCount
單詞計數的文本信息(hello.txt):
hello can i help you
i have a dream
maybe you can help me
? 實現過程:
? Map過程:并行讀取文本,對讀取的單詞進行Map操作,每個詞將會形成。
? 第一行將會形成:
? 第二行生成
? 第三行生成
? Map會根據key值,自動按照ASCII值進行排序,所以,相同key值得K/V值,將會被放到一起
? Reduce過程:對map的結果進行合并,計算,
? Map將key相同得值放到了一起,所以當Reduce拿到從Map傳過來得數據,可以理解為如下
? 而Reduce進行最后得合并,就會得到如下得結果
? 最后輸出
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-g8x0hrTQ-1578386385884)(C:\Users\Lenovo\AppData\Roaming\Typora\typora-user-images\image-20200107162331181.png)]
Java Api 編寫
public class WordCountApp {
//靜態類,用于繼承Mapper類,用于繼承Mapper方法
public static class MyMapper extends Mapper{//mapper的四種參數類型(源碼使用了泛型,具體可以查看源碼)
//IntWritable與Text是Hadoop中的兩個參數,可以簡單的理解為是
//在hadoop中的int與text格式
private final static IntWritable one =new IntWritable(1);
private Text word =new Text();
//map操作
public void map(Object key ,Text value ,Context context)throws IOException,InterruptedException{
//StringTokenizer是Java.util中的類,用于將字符串切割成
//一個字符串數組(用" "把各個字符分隔開)。
StringTokenizer itr =new StringTokenizer(value.toString());
//遍歷文本中一行的所有字符串
while (itr.hasMoreTokens()){
word.set(itr.nextToken());
//寫入到map的輸出文件中,這個context用于Reduce的接收
context.write(word,one);
}
}
}
//靜態類,用于繼承Reduce類,編寫Reduce方法
public static class MyReduce extends Reducer{
//統計計數,即記錄單詞的出現次數,最后輸出的輔助類
private final static IntWritable result =new IntWritable();
//reduce操作
public void reduce(Text key , Iterablevalues, Context context)throws IOException,InterruptedException{
//記錄單詞的出現個數,因為在Reduce中沒有int類型,所以要
//用IntWritable進行包裝
int sum=0;
//上面講到,Reduce拿到Map的值的形式,這個Reduce會執行在每
//一個塊中,用于計數,所以每一個單詞都是從0開始計數
for (IntWritable val : values){
sum +=val.get();
}
//int sum包裝進 IntWritable
result.set(sum);
//寫入到最終會輸出的地方
context.write(key,result);
}
}
//主方法
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
//本此編碼是在Windowx編程,所以要加Hdfs的路徑
String INPUT_PATH="hdfs://10.30.60.3:8020/a";
String OUTPUT_PATH="hdfs://10.30.60.3:8020/b";
Configuration configuration =new Configuration();
final FileSystem fileSystem=FileSystem.get(new URI(INPUT_PATH),configuration);
//確保INPUT的路徑存在,而OutPut得路徑會在最后生成結果時生成、
//所以確保OUTPUT得文件不存在
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
//設置job
Job job =Job.getInstance(configuration,"WordCountApp");
//設置主類
job.setJarByClass(WordCountApp.class);
//設置mapper
job.setMapperClass(MyMapper.class);
//設置Mapper的輸出格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設置Reduce
job.setReducerClass(MyReduce.class);
//設置Reduce格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設置輸入格式
job.setInputFormatClass(TextInputFormat.class);
Path inputPath=new Path(INPUT_PATH);
FileInputFormat.addInputPath(job,inputPath);
//設置輸出格式
job.setOutputValueClass(TextOutputFormat.class);
Path outputPath=new Path(OUTPUT_PATH);
FileOutputFormat.setOutputPath(job,outputPath);
//提交
System.exit(job.waitForCompletion(true)? 0:1);
}
}