創建包 com.nefu.mapreduce.wordcount ,開始編寫 Mapper , Reducer ,
Driver

用戶編寫的程序分成三個部分: Mapper 、 Reducer 和 Driver 。
( 1 ) Mapper 階段
? 用戶自定義的 Mapper 要繼承自己的父類
? Mapper 的輸入數據是 KV 對的形式 (KV 的類型可自定義 )
? Mapper 中的業務邏輯寫在 map () 方法中
? Mapper 的輸出數據是 KV 對的形式 (KV 的類型可自定義 )
? map () 方法 (MapTask 進程 ) 對每一個 <K.V> 調用一次
package com.nefu.mapreducer.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {private Text outK=new Text();private IntWritable outV=new IntWritable(1);@Overrideprotected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {String line=value.toString();String[] words=line.split(" ");for(String word:words){//封裝outK.set(word);//寫出context.write(outK,outV);}}
}
( 2 ) Reducer 階段
? 用戶自定義的 Reducer 要繼承自己的父類
? Reducer 的輸入數據類型對應 Mapper 的輸出數據類型,也是 KV
? Reducer 的業務邏輯寫在 reduce() 方法中
? ReduceTask 進程對每一組相同 k 的 <k,v> 組調用一 次 reduce () 方法,迭代
器類型
package com.nefu.mapreducer.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordcountReducer extends Reducer<Text,IntWritable,Text, IntWritable> {private IntWritable outV=new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum=0;for(IntWritable value:values){sum=sum+value.get();}outV.set(sum);context.write(key,outV);}
}
( 3 ) Driver 階段
相當于 YARN 集群的客戶端,用于提交我們整個程序到 YARN 集群,提交的是
封裝了 MapReduce 程序相關運行參數的 job 對象
package com.nefu.mapreducer.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordcountDriver {public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {//獲取jobConfiguration conf=new Configuration();Job job=Job.getInstance(conf);//設置jar包job.setJarByClass(WordcountDriver.class);job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job,new Path("D:\\cluster\\mapreduce.txt"));FileOutputFormat.setOutputPath(job,new Path("D:\\cluster\\partion"));boolean result=job.waitForCompletion(true);System.exit(result?0:1);}
}
?
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>