MapReduce被廣泛應用于日志分析、海量數據排序、在海量數據中查找特定模式等場景中。
MapReduceJob
在Hadoop中,每個MapReduce任務都被初始化為一個Job。
每個Job又可以分為兩個階段:Map階段和Reduce階段。這兩個階段分別用Map函數和Reduce函數來表示。
Map函數接收一個<key,value>形式的輸入,然后產生另一種<key,value>的中間輸出,Hadoop負責將所有具有相同中間key值的value集合到一起傳遞給Reduce函數;Reduce函數接收一個如<key,(list of values)>形式的輸入,然后對這個value集合進行處理并輸出結果,Reduce的輸出也是<key,value>形式的。
InputFormat()和InputSplit
InputSplit是Hadoop中用來把輸入數據傳送給每個單獨的Map,InputSplit存儲的并非數據本身,而是起始位置、分片長度和一個記錄數據所在主機的數組。生成InputSplit的方法可以通過InputFormat()來設置。當數據傳送給Map時,Map會將輸入分片傳送到InputFormat()上,InputFormat調用getRecordReader()方法生成RecordReader,RecordReader在通過creatKey()、createValue()方法將InputSplit創建成可供Map處理的<key,value>對。即,InputFormat()方法是用來生成可供Map處理的<key,value>對的。
InputFormat
? ? BaileyBorweinPlouffe.BbpInputFormat
? ? ComposableInputFormat
? ? CompositeInputFormat
? ? DBInputFormat
? ? DistSum.Machine.AbstractInputFormat
? ? FileInputFormat
? ? ? ? CombineFileInputFormat
? ? ? ? KeyValueTextInputFormat
? ? ? ? NLineInputFormat
? ? ? ? SequenceFileInputFormat
? ? ? ? TeraInputFormat
? ? ? ? TextInputFormat
TextInputFormat是Hadoop默認的輸入方式。在TextInputFormat中,每個文件(或其一部分)都會單獨作為Map的輸入,而這是繼承自FileInputFormat的。之后,每行數據都會生成一條記錄,每條記錄則表示成<key,value>形式。
key值是每個數據記錄在數據分片中的字節偏移量,數據類型是LongWritable;
value值是每行的內容,數據類型是Text。
如:
file1:
0 hello world bye world
file2:
0 hello hadoop bye hadoop
兩個文件都會被單獨輸入到一個Map中,因此它們的值都是0。
OutputFormat()
默認的輸出格式是TextOutputFormat,每條記錄以一行的形式存入文本文件,鍵和值是任意形式的,程序內部調用toString()方法將鍵和值轉換為String類型再輸出。
Map函數和Reduce函數:
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] words = StringUtils.split(value.toString(), ' ');for(String w :words){context.write(new Text(w), new IntWritable(1));}}
}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {int sum =0;for(IntWritable i: values){sum=sum+i.get();}arg2.write(key, new IntWritable(sum));}
}
public class RunJob {public static void main(String[] args) {Configuration config =new Configuration();
// config.set("fs.defaultFS", "hdfs://node1:8020");
// config.set("yarn.resourcemanager.hostname", "node1");config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("wc");job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/wc");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completion");}} catch (Exception e) {e.printStackTrace();}}
}
注意兩種情況:
1、Reduce Task的數量可以由程序指定,當存在多個Reduce Task時,每個Reduce會搜集一個或多個key值。當存在多個Reduce Task時,每個Reduce Task都會生成一個輸出文件;
2、沒有Reduce任務的時候,系統會直接將Map的輸出結果作為最終結果,有多少個Map就有多少個輸出。
MapReduce任務的優化
如何完成這個任務,怎么能讓程序運行的更快。
MapReduce計算模型的優化主要集中在兩個方面:計算性能方面;IO操作方面。
1、任務調度;
計算方面:優先將任務分配給空閑機器;
IO方面:盡量將Map任務分配給InputSplit所在的機器。
2、數據預處理與InputSplit的大小
MapReduce擅長處理少量的大數據,在處理大量的小數據時性能會很遜色。
因此在提交MapReduce任務前可以先對數據進行一次預處理,將數據合并以提高MapReduce任務的執行效率。
另一方面是參考Map任務的運行時間,當一個Map任務只需要運行幾秒就可以結束時,就需要考慮是否應該給它分配更多的數據。通常而言,一個Map任務的運行時間在一分鐘左右比較合適。
在FileInputFormat中(除了CombineFileInputFormat),Hadoop會在處理每個Block后將其作為一個InputSplit,因此合理地設置block塊大小是很重要的調節方式。
3、Map和Reduce任務的數量
Map/Reduce任務槽:集群能夠同時運行的Map/Reduce任務的最大數量。
如100臺機器,每臺最多同時運行10個Map和5個Reduce,則Map任務槽為1000,Reduce任務槽為500。
設置Map任務的數量主要參考的是Map的運行時間,設置Reduce任務的數量主要參考的是Reduce槽的數量。
Reduce任務槽的0.95倍,如果一個Reduce任務失敗,可以很快找到一個空閑的機器重新執行;
Reduce任務槽的1.75倍,執行快的機器可以獲得更多的Reduce任務,因此可以使負載更加均衡,以提高任務的處理速度。
4、Combine函數
用于本地合并數據,以減少網絡IO操作的消耗。
合理的設計combine函數會有效減少網絡傳輸的數據量,提高MapReduce的效率。
job.setCombinerClass(combine.class);
在WordCount中,可以指定Reduce類為combine函數:
job.setCombinerClass(Reduce.class);
5、壓縮
可以選擇對Map的輸出和最終的輸出結果進行不同壓縮方式的壓縮。
在一些情況下,Map的中間輸出可能會很大,對其進行壓縮可以有效地減少網絡上的數據傳輸量。
6、自定義comparator
自定義Hadoop數據類型時,推薦自定義comparator來實現數據的比較,這樣可以省去數據序列化和反序列化的時間,提高程序的運行效率。