1.圖解MapReduceMapReduce整體流程圖
并行讀取文本中的內容,然后進行MapReduce操作
Map過程:并行讀取三行,對讀取的單詞進行map操作,每個詞都以<key,value>形式生成
reduce操作是對map的結果進行排序,合并,最后得出詞頻。
2.簡單過程:
MergeSort的過程(ps:2012-10-18)Map:
<Hello,1><World,1><Bye,1><World,1><Hello,1><Hadoop,1><Bye,1><Hadoop,1><Bye,1><Hadoop,1><Hello,1><Hadoop,1>
MergeSort:
- <Hello,1><World,1><Bye,1><World,1><Hello,1><Hadoop,1> | <Bye,1><Hadoop,1><Bye,1><Hadoop,1><Hello,1><Hadoop,1>
- <Hello,1><World,1><Bye,1> || <World,1><Hello,1><Hadoop,1> | <Bye,1><Hadoop,1><Bye,1> || <Hadoop,1><Hello,1><Hadoop,1>
- <Hello,1><World,1> ||| <Bye,1> || <World,1><Hello,1> ||| <Hadoop,1> | <Bye,1><Hadoop,1> ||| <Bye,1> || <Hadoop,1><Hello,1> ||| <Hadoop,1>
- MergeArray 結果:<Hello,1><World,1> ||| <Bye,1> || <Hello,1><World,1> ||| <Hadoop,1> | <Bye,1><Hadoop,1> ||| <Bye,1> || <Hadoop,1><Hello,1> ||| <Hadoop,1> 在|||這一層級
- MergeArray 結果:<Bye,1><Hello,1><World,1> || <Hadoop,1><Hello,1><World,1> | <Bye,1><Bye,1><Hadoop,1> || <Hadoop,1><Hadoop,1><Hello,1> 在||這一層級
- MergeArray 結 果:<Bye,1><Hadoop,1><Hello,1><World,1><Hello,1><World,1> | <Bye,1><Bye,1><Hadoop,1><Hadoop,1><Hello,1><Hadoop,1> 在|這一層級
- MergeArray結 果:<Bye,1><Bye,1><Bye,1><Hadoop,1><Hadoop,1><Hadoop,1><Hadoop,1><Hello,1><Hello,1><Hello,1><World,1><World,1> 排序完成
3.代碼實例:
package cn.opensv.hadoop.ch1;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Hello world!
*
*/
public class WordCount1 {
? ? ? ? public static class Map extends? ? ? ? Mapper<LongWritable, Text, Text, LongWritable> {
? ? ? ?
? ? ? ? ? ? ? ? private final static LongWritable one = new LongWritable(1);
? ? ? ? ? ? ? ? private Text word = new Text();
? ? ? ?
? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? public void map(LongWritable key, Text value, Context context)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? throws IOException, InterruptedException {
? ? ? ? ? ? ? ? ? ? ? ? String line = value.toString();
? ? ? ? ? ? ? ? ? ? ? ? StringTokenizer tokenizer = new StringTokenizer(line);
? ? ? ? ? ? ? ? ? ? ? ? while (tokenizer.hasMoreTokens()) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? word.set(tokenizer.nextToken());
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? context.write(word, one);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? }
? ? ? ? public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? public void reduce(Text key, Iterable<LongWritable> values, Context context)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? throws IOException, InterruptedException {
? ? ? ? ? ? ? ? ? ? ? ? long sum = 0;
? ? ? ? ? ? ? ? ? ? ? ? for (LongWritable val : values)??{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? sum += val.get();
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? context.write(key, new LongWritable(sum));
? ? ? ? ? ? ? ? }
? ? ? ? }
? ? ? ? public static void main(String[] args) throws Exception {
? ? ? ?
? ? ? ? ? ? ? ? Configuration cfg = new Configuration();
? ? ? ? ? ? ? ???Job job = new Job(cfg);? ?? ?? ?
? ? ? ? ? ? ? ???job.setJarByClass(WordCount1.class);? ?
? ? ? ? ? ? ? ? job.setJobName("wordcount1"); // 設置一個用戶定義的job名稱
? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? job.setOutputKeyClass(Text.class); // 為job的輸出數據設置Key類
? ? ? ? ? ? ? ? job.setOutputValueClass(LongWritable.class); // 為job輸出設置value類
? ? ? ? ? ? ? ? job.setMapperClass(Map.class); // 為job設置Mapper類
? ? ? ? ? ? ? ? job.setCombinerClass(Reduce.class); // 為job設置Combiner類
? ? ? ? ? ? ? ? job.setReducerClass(Reduce.class); // 為job設置Reduce類
? ? ? ? ? ? ? ?
? ? ? ?
? ? ? ? ? ? ? ?? ?? ?? ?? ?
? ? ? ? ? ? ? ? FileInputFormat.setInputPaths(job, new Path(args[0]));
? ? ? ? ? ? ? ? FileOutputFormat.setOutputPath(job, new Path(args[1]));
? ? ? ? ? ? ? ? job.waitForCompletion(true);
? ? ? ? }
}