統計一個文件里各單詞的個數,假設這個文件很大。
原理如下圖:
編寫代碼:
WCMapper.java
package zengmg.hadoop.mr.wordcount;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/** public class WCMapper extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>{* 參數是4個泛型* KEYIN, VALUEIN,是mapper輸入數據的類型* KEYOUT, VALUEOUT,是mapper輸出數據的類型* map和reduce的數據輸入輸出都是以 key-value對的形式封裝的* 默認情況下,框架傳遞給mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,這一行的內容作為value* mapper的輸出類型要結合業務,這里輸出 key是文本(String),value是個數(Long)* ps:* 輸入和輸出數據都要在網絡上傳遞,這些數據都要序列化,java自帶的序列化有太多的冗余內容,* 于是hadoop自定義了一套序列化。* 于是次數的java-Lang類型要用hadoop的LongWritable* java-String改為hadoop的Text*/
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{/** mapper拿到一行數據,就調一次這個方法*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws IOException, InterruptedException {//具體業務邏輯就寫在這個方法里//需要處理的業務邏輯數據,框架已經傳遞進來了,就是方法中的key和value//key是要處理的文本中一行的起始偏移量,這一行的內容作為value//業務邏輯://1、按空格分隔一行文本里的單詞//2、輸出 k 單詞,v 1String line=value.toString();String[] words=line.split(" ");for(String word:words){context.write(new Text(word), new LongWritable(1));}}}
WCReducer.java
package zengmg.hadoop.mr.wordcount;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {// 框架在map處理完成之后,將map輸送過來的kv對緩存起來,進行分組,然后傳遞一個組<key,values{}>,//每個key一組,每組調用一次reduce方法進行統計// 舉例最后的kv對:<hello,{1,1,1,1,1,......}>,循環統計values就可以了@Overrideprotected void reduce(Text key, Iterable<LongWritable> values,Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {long count=0;for(LongWritable value:values){count+=value.get();}context.write(key, new LongWritable(count));}
}
WCRunner.java
package zengmg.hadoop.mr.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** 用來描述一個特定的作業* 在hadoop集群里有很多作業job,需要指定這個業務邏輯作業使用哪個類作為邏輯處理的map,哪個作為reduce* 還可以指定該作業要處理的數據所在的路徑* 還可以指定該作業輸出的結果放到哪個路徑* .....* 該程序會打包成jar,放到hadoop的集群里跑*/
public class WCRunner {public static void main(String[] args) throws Exception {Configuration conf=new Configuration();Job wcJob= Job.getInstance(conf);//設置這個job所用的mapper類和reducer類所在的jar包//必須指定,在hadoop里會有很多的jar包作業wcJob.setJarByClass(WCRunner.class);//設置job使用的mapper類和reducer類wcJob.setMapperClass(WCMapper.class);wcJob.setReducerClass(WCReducer.class);/** 設置reducer輸出數據KV類型* 先設置reducer的原因是:沒有設置reducer輸出數據kv的類型的api,* setOutputKeyClass和setOutputValueClass會把reducer和mapper都設置了。* 如果reducer和mapper的輸出類型一樣,就用這條就夠了,無需setMapOutput...*/wcJob.setOutputKeyClass(Text.class);wcJob.setOutputValueClass(LongWritable.class);//設置mapper輸出數據kv類型//wcJob.setMapOutputKeyClass(Text.class);//wcJob.setMapOutputValueClass(LongWritable.class);//指定要處理的輸入數據存放路徑FileInputFormat.setInputPaths(wcJob, new Path("hdfs://hello110:9000/wc/srcdata/"));//指定處理結果的輸出數據存放路徑FileOutputFormat.setOutputPath(wcJob, new Path("hdfs://hello110:9000/wc/output/"));//將job提交給集群運行,true:打印運行日志wcJob.waitForCompletion(true);}
}
導出jar包,放到hadoop集群下運行。
1、啟動hadoop,start-all.sh
2、創建輸出文件
hadoop fs -mkdir /wc/srcdata/
3、編輯要統計的文件
vi word.log
4、上傳word.log到 hdfs的 /wc/srcdata ?文件夾下。
hadoop fs -put word.log /wc/srcdata
5、運行jar包
hadoop jar wcount.jar zengmg.hadoop.mr.wordcount.WCRunner
提示 successful,表明成功
6、進入輸出文件查看
?hadoop fs -ls /wc/output
?hadoop fs -cat /wc/output/part-r-00000
結果:
boy ? ? 1
girl ? ?1
hello ? 4
mimi ? ?1
world ? 1
按abc字母順序表排序的