目錄
- 1. 執行過程
- 1.1 分割
- 1.2 Map
- 1.3 Combine
- 1.4 Reduce
- 2. 代碼和結果
- 2.1 pom.xml中依賴配置
- 2.2 工具類util
- 2.3 WordCount
- 2.4 結果
- 參考
1. 執行過程
??假設WordCount的兩個輸入文本text1.txt和text2.txt如下。
Hello World
Bye World
Hello Hadoop
Bye Hadoop
1.1 分割
??將每個文件拆分成split分片,由于測試文件比較小,所以每個文件為一個split,并將文件按行分割形成<key,value>對,如下圖所示。這一步由MapReduce自動完成,其中key值為偏移量,由MapReduce自動計算出來,包括回車所占的字符數。
1.2 Map
??將分割好的<key,value>對交給用戶定義的Map方法處理,生成新的<key,value>對。處理流程為先對每一行文字按空格拆分為多個單詞,每個單詞出現次數設初值為1,key為某個單詞,value為1,如下圖所示。
1.3 Combine
??得到Map方法輸出的<key,value>對后,Mapper將它們按照key值進行升序排列,并執行Combine合并過程,將key值相同的value值累加,得到Mapper的最終輸出結果,并寫入磁盤,如下圖所示。
1.4 Reduce
??Reducer先對從Mapper接受的數據進行排序,并將key值相同的value值合并到一個list列表中,再交由用戶自定義的Reduce方法進行匯總處理,得到新的<key,value>對,并作為WordCount的輸出結果,存入HDFS,如下圖所示。
2. 代碼和結果
2.1 pom.xml中依賴配置
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.6</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.3.6</version><type>pom</type></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>3.3.6</version></dependency></dependencies>
2.2 工具類util
??util.removeALL的功能是刪除hdfs上的指定輸出路徑(如果存在的話),而util.showResult的功能是打印wordcount的結果。
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class util {public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {URI add = new URI(uri);return FileSystem.get(add, conf);}public static void removeALL(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println("Delete Output Folder? " + isDeleted);}}public static void showResult(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);String regex = "part-r-";Pattern pattern = Pattern.compile(regex);if (fs.exists(new Path(path))) {FileStatus[] files = fs.listStatus(new Path(path));for (FileStatus file : files) {Matcher matcher = pattern.matcher(file.getPath().toString());if (matcher.find()) {FSDataInputStream openStream = fs.open(file.getPath());IOUtils.copyBytes(openStream, System.out, 1024);openStream.close();}}}}
}
2.3 WordCount
??正常來說,MapReduce編程都是要把代碼打包成jar文件,然后用hadoop jar jar文件名 主類名稱 輸入路徑 輸出路徑
。下面代碼中直接給出了輸入和輸出路徑,可以直接運行。
import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class App {public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {System.out.println(key + " " + value);Text keyOut;IntWritable valueOut = new IntWritable(1);StringTokenizer token = new StringTokenizer(value.toString());while (token.hasMoreTokens()) {keyOut = new Text(token.nextToken());context.write(keyOut, valueOut);}}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(key, new IntWritable(sum));} }public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] myArgs = {"file:///home/developer/CodeArtsProjects/WordCount/text1.txt", "file:///home/developer/CodeArtsProjects/WordCount/text2.txt", "hdfs://localhost:9000/user/developer/wordcount/output"};util.removeALL("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);Job job = Job.getInstance(conf, "wordcount");job.setJarByClass(App.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setCombinerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < myArgs.length - 1; i++) {FileInputFormat.addInputPath(job, new Path(myArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));int res = job.waitForCompletion(true) ? 0 : 1;if (res == 0) {System.out.println("WordCount結果:");util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);}System.exit(res);}
}
2.4 結果
參考
吳章勇 楊強著 大數據Hadoop3.X分布式處理實戰