? ? ? 現在假設有兩個數據文件
file1.txt | file2.txt |
---|---|
2018-3-1 a 2018-3-2 b 2018-3-3 c 2018-3-4 d 2018-3-5 a 2018-3-6 b 2018-3-7 c 2018-3-3 c | 2018-3-1 b 2018-3-2 a 2018-3-3 b 2018-3-4 d 2018-3-5 a 2018-3-6 c 2018-3-7 d 2018-3-3 c |
? ? ? 上述文件 file1.txt 本身包含重復數據,并且與 file2.txt 同樣出現重復數據,現要求使用 Hadoop 大數據相關技術對這兩個文件進行去重操作,并最終將結果匯總到一個文件中。
一、MapReduce 的數據去重
二、案例實現
1、Map 階段實現
DedupMapper.java
package com.itcast.dedup;//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class DedupMapper extends Mapper<LongWritable, Text,Text, NullWritable> {//重寫Ctrl+o@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// <0,2018-3-1 a> <11,2018-3-2 b>
// NullWritable.get() 方法設置空值context.write(value, NullWritable.get());}
}
? ? ? 該代碼的作用是為了讀取數據集文件將 TextInputFormat 默認組件解析的類似 <0,2018-3-1 a> 鍵值對修改??<2018-3-1 a,null>
2、Reduce 階段實現
DedupReducer.java
package com.itcast.dedup;//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class DedupReducer extends Reducer<Text, NullWritable,Text,NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {//<2018-3-1 a,null> <11,2018-3-2 b,null> <11,2018-3-3 c,null>context.write(key,NullWritable.get());}
}
? ? ? 該代碼的作用僅僅是接受 Map 階段傳遞來的數據,根據 Shuffle 工作原理,鍵值 key 相同的數據就不會被合并,因此輸出數據就不會出現重復數據了。
3、Dtuver 程序主類實現
DedupDriver.java
package com.itcast.dedup;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class DedupDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//通過 Job 來封裝本次 MR 的相關信息Configuration conf = new Configuration();//System.setProperty("HADOOP_USER_NAME","root");//配置 MR 運行模式,使用 local 表示本地模式,可以省略
// conf.set("mapreduce.framework.name","local");Job job = Job.getInstance(conf);//指定 MR Job jar 包運行主類job.setJarByClass(DedupDriver.class);//指定本次 MR 所有的 Mapper Reducer 類job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);//設置業務邏輯 Mapper 類的輸出 key 和 value 的數據類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//設置業務邏輯 Reducer 類的輸出 key 和 value 的數據類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//使用本地模式指定處理的數據所在的位置//{input2\*} 表示讀取該路徑下所有的文件FileInputFormat.setInputPaths(job,"D:\\homework2\\Hadoop\\mr\\{input2\\*}");//使用本地模式指定處理完成之后的結果所保存的位置FileOutputFormat.setOutputPath(job, new Path("D:\\homework2\\Hadoop\\mr\\output"));//提交程序并且監控打印程序執行情況boolean res = job.waitForCompletion(true);//執行成功輸出 0 ,不成功輸出 1System.exit(res ? 0 : 1);}
}
運行結果:
?三、拓展
? ? ? ? 只要日期相同,就判定為相同,最后結果輸出日期即可
只需要修改DedupMapper.java文件
package com.itcast.dedup;//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class DedupMapper extends Mapper<LongWritable, Text,Text, NullWritable> {//重寫Ctrl+o@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//輸出日期// 把 hadoop 類型轉換為 java 類型(接收傳入進來的一行文本,把數據類型轉換為 String 類型)String line = value.toString();// 把字符串拆分為單詞String[] words = line.split(" ");// 輸出前面的內容String wo = words[0];context.write(new Text(wo), NullWritable.get());}
}
運行結果: