Hadoop學習:深入解析MapReduce的大數據魔力(三)
- 3.5 MapReduce 內核源碼解析
- 3.5.1 MapTask 工作機制
- 3.5.2 ReduceTask 工作機制
- 3.5.3 ReduceTask 并行度決定機制
- 3.6 數據清洗(ETL)
- 1)需求
- 2)需求分析
- 3)實現代碼
- 3.7 MapReduce 開發總結
- 1)輸入數據接口:InputFormat
- 2)邏輯處理接口:Mapper
- 3)Partitioner 分區
- 4)Comparable 排序
- 5)Combiner 合并
- 6)邏輯處理接口:Reducer
- 7)輸出數據接口:OutputFormat
3.5 MapReduce 內核源碼解析
3.5.1 MapTask 工作機制
(1)Read階段:MapTask通過InputFormat獲得的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,并產生一系列新的key/value。
(3)Collect 收集階段:在用戶編寫 map()函數中,當數據處理完成后,一般會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的 key/value 分區(調用Partitioner),并寫入一個環形內存緩沖區中。
(4)Spill 階段:即“溢寫”,當環形緩沖區滿后,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,并在必要時對數據進行合并、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號Partition 進行排序,然后按照key進行排序。這樣,經過排序后,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。
步驟2:按照分區編號由小到大依次將每個分區中的數據寫入任務工作目錄下的臨時文件output/spillN.out(N 表示當前溢寫次數)中。如果用戶設置了 Combiner,則寫入文件之前,對每個分區中的數據進行一次聚集操作。
步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮后數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。
(5)Merge 階段:當所有數據處理完成后,MapTask 對所有臨時文件進行一次合并,以確保最終只會生成一個數據文件。 當所有數據處理完后,MapTask 會將所有臨時文件合并成一個大文件,并保存到文件output/file.out 中,同時生成相應的索引文件output/file.out.index。 在進行文件合并過程中,MapTask以分區為單位進行合并。對于某個分區,它將采用多輪遞歸合并的方式。每輪合并mapreduce.task.io.sort.factor(默認 10)個文件,并將產生的文件重新加入待合并列表中,對文件排序后,重復以上過程,直到最終得到一個大文件。
讓每個 MapTask 最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。
3.5.2 ReduceTask 工作機制
(1)Copy 階段:ReduceTask 從各個 MapTask 上遠程拷貝一片數據,并針對某一片數據,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內存中。
(2)Sort 階段:在遠程拷貝數據的同時,ReduceTask啟動了兩個后臺線程對內存和磁盤上的文件進行合并,以防止內存使用過多或磁盤上文件過多。按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行聚集的一組數據。為了將key相同的數據聚在一
起,Hadoop采用了基于排序的策略。由于各個MapTask已經實現對自己的處理結果進行了局部排序,因此,ReduceTask只需對所有數據進行一次歸并排序即可。
(3)Reduce 階段:reduce()函數將計算結果寫到HDFS上。
3.5.3 ReduceTask 并行度決定機制
回顧:MapTask并行度由切片個數決定,切片個數由輸入文件和切片規則決定。
思考:ReduceTask并行度由誰決定?
1)設置ReduceTask并行度(個數)
ReduceTask 的并行度同樣影響整個 Job 的執行并發度和執行效率,但與MapTask的并發數由切片數決定不同,ReduceTask數量的決定是可以直接手動設置:
// 默認值是1,手動設置為4
job.setNumReduceTasks(4);
2)實驗:測試ReduceTask多少合適
(1)實驗環境:1個Master節點,16個Slave節點:CPU:8GHZ,內存: 2G
(2)實驗結論:
3)注意事項
(1)ReduceTask=0,表示沒有Reduce階段,輸出文件個數和Map個數一致。
(2)ReduceTask默認值就是1,所以輸出文件個數為一個。
(3)如果數據分布不均勻,就有可能在Reduce階段產生數據傾斜
(4)ReduceTask數量并不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有1個ReduceTask。
(5)具體多少個ReduceTask,需要根據集群性能而定。
(6)如果分區數不是1,但是ReduceTask為1,是否執行分區過程。答案是:不執行分區過程。因為在MapTask的源碼中,執行分區的前提是先判斷ReduceNum個數是否大于1。不大于1肯定不執行。
3.6 數據清洗(ETL)
“ETL,是英文 Extract-Transform-Load 的縮寫,用來描述將數據從來源端經過抽取
(Extract)、轉換(Transform)、加載(Load)至目的端的過程。ETL一詞較常用在數據倉庫,但其對象并不限于數據倉庫
在運行核心業務MapReduce 程序之前,往往要先對數據進行清洗,清理掉不符合用戶要求的數據。==清理的過程往往只需要運行Mapper程序,不需要運行Reduce程序。 ==
1)需求
去除日志中字段個數小于等于11的日志。
(1)輸入數據
(2)期望輸出數據
每行字段長度都大于11。
2)需求分析
需要在Map階段對輸入的數據根據規則進行過濾清洗。
3)實現代碼
(1)編寫WebLogMapper類
package com.atguigu.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; public class WebLogMapper extends Mapper<LongWritable, Text, Text,
NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { // 1 獲取1行數據 String line = value.toString(); // 2 解析日志 boolean result = parseLog(line,context); // 3 日志不合法退出 if (!result) { return; } // 4 日志合法就直接寫出 context.write(value, NullWritable.get()); } // 2 封裝解析日志的方法 private boolean parseLog(String line, Context context) { // 1 截取 String[] fields = line.split(" "); // 2 日志長度大于11的為合法 if (fields.length > 11) { return true; }else { return false; } }
}
(2)編寫WebLogDriver類
package com.atguigu.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WebLogDriver {
public static void main(String[] args) throws Exception {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置
args = new String[] { "D:/input/inputlog", "D:/output1" };
// 1 獲取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加載jar包
job.setJarByClass(LogDriver.class);
// 3 關聯map
job.setMapperClass(WebLogMapper.class);
// 4 設置最終輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 設置reducetask個數為0
job.setNumReduceTasks(0);
// 5 設置輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
3.7 MapReduce 開發總結
1)輸入數據接口:InputFormat
(1)默認使用的實現類是:TextInputFormat
(2)TextInputFormat 的功能邏輯是:一次讀一行文本,然后將該行的起始偏移量作為key,行內容作為value返回。
(3)CombineTextInputFormat 可以把多個小文件合并成一個切片處理,提高處理效率。
2)邏輯處理接口:Mapper
用戶根據業務需求實現其中三個方法:map() setup() cleanup ()
3)Partitioner 分區
(1)有默認實現 HashPartitioner,邏輯是根據key的哈希值和numReduces來返回一個分區號;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果業務上有特別的需求,可以自定義分區。
4)Comparable 排序
(1)當我們用自定義的對象作為key來輸出時,就必須要實現WritableComparable 接口,重寫其中的compareTo()方法。
(2)部分排序:對最終輸出的每一個文件進行內部排序。
(3)全排序:對所有數據進行排序,通常只有一個Reduce。
(4)二次排序:排序的條件有兩個。
5)Combiner 合并
Combiner 合并可以提高程序執行效率,減少IO傳輸。但是使用時必須不能影響原有的
業務處理結果。
6)邏輯處理接口:Reducer
用戶根據業務需求實現其中三個方法:reduce() setup() cleanup ()
7)輸出數據接口:OutputFormat
(1)默認實現類是TextOutputFormat,功能邏輯是:將每一個KV對,向目標文本文件
輸出一行。
(2)用戶還可以自定義OutputFormat。