一、Maptask并行度與決定機制
1.一個job任務的map階段的并行度默認是由該任務的大小決定的;
2.一個split切分分配一個maprask來并行處理;
3.默認情況下,split切分的大小等于blocksize大小;
4.切片不是mapper類中對單詞的切片,而是對每一個處理文件的單獨切片。
eg.? 默認情況下,一個maptask處理的文件大小為128M,比如一個400M的數據文件,就需要4個maptask并行來處理,而500M的數據文件也是需要4個maptask。
?
二、Maptask運行機制
1.讀數據文件:執行類Driver通過InputFormat類讀取文件中的數據;
2.mapper階段:通過文件的大小決定了maptask的數量,然后mapper進行邏輯運行(讀數據、切分、封裝);
3.OutputCollector階段:mapper方法通過OutputCollector接口將KV對寫入到環形緩沖區中(這個過程不需要我們處理我們);
4.溢寫階段:環形緩沖區默認的大小為100M,當環形緩沖區中數據量到達閾值的80%的時候發生溢寫,溢寫的過程中會保證數據KV對使用默認的分區和排序(HashPartitioner分區、字典排序,而環形緩沖區大小和閾值的大小都是可以通過配置來修改的);
5.歸并排序:將溢寫的數據進行合并排序。
?
三、MR的小文件優化案例
當許多個小文件上傳到HDFS集群上時,每個小文件都將會占用一個blocksize的大小(128M),而且在對它們進行MR計算時,一個文件就會開啟一個maptask,這樣會浪費很多的資源,下面有兩種解決方案:
1.在文件上傳到HDFS集群前,先將文件進行合并成一個大的文件,再上傳到HDFS集群進行存儲和計算;
2.若文件已經上傳到HDFS集群,需要直接進行計算時,
可以再Driver類中設置輸入流之前設置InputFormatClass屬性為CombinerTextInputFormat(它的默認為TextInputFormat),
原理是:CombineTextInputFormat類可以將多個小文件交給一個split切片,然后交給一個maptask來處理,即再Driver類中設置輸入流FileInputFormat前加入代碼:
job.setInputFormatClass(CombinerTextInputFormat.class);
CombinerTextInputFormat.setMaxInputSplitSize(job,4194304); //設置切片最大值為4M
CombinerTextInputFormat.setMinInputSplitSize(job,3145725); //設置切片最大值為3M
表示大小在3M~4M的文件會被方法一個切片中,那么如果有無數的小文件,一個maptask中大概會有28~42個小文件一起處理。
?
四、自定義分區Partitioner
在MR程序中,默認分區為HashPartitioner,以下為源碼:
public class HashPartitioner<K, V> extends Partitioner<K, V> {public HashPartitioner() {}public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & 2147483647) % numReduceTasks;}
}
HashPartitioner繼承了父類Partitioner,其中getPartition方法返回int值0(注釋:分區數量決定了reducetask的數量,不分區reducetask值為1,所以一直返回int值0,也就只會產生一個結果文件!!!)
而如果我們想要進行自定義分區,就要重新定義一個分區類繼承Partitioner類:
public class FlowPartitioner extends Partitioner<Text,FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int i) {//獲取用來分區的電話號碼前三位String phoneNum = key.toString().substring(0, 3);//設置分區邏輯int partitionNum = 4;if ("135".equals(phoneNum)){return 0;}else if ("137".equals(phoneNum)){return 1;}else if ("138".equals(phoneNum)){return 2;}else if ("139".equals(phoneNum)){return 3;}return partitionNum;}
}
我在流量統計案例中也寫了該分區類,然后再Driver類中的InputFormat類之前加入設置的自定義分區代碼:
?
job.setPartitionClass(PhoneNumPartitioner.class);
job.setNumReduceTasks(5); (注意:輸出文件數量要大于partitioner分區的數量)
?
總結:MR程序運算過程中,決定maptask個數的有塊大小(blocksize)、數據文件大小、文件輸入方式(小文件優化);而決定reducetask個數的是分區(無分區時reducetask個數為1,生成一個結果文件)。
?