mapreduce在編程的時候,基本上一個固化的模式,沒有太多可靈活改變的地方,除了以下幾處:
?
1、輸入數據接口:InputFormat ---> FileInputFormat(文件類型數據讀取的通用抽象類) DBInputFormat (數據庫數據讀取的通用抽象類)
默認使用的實現類是: TextInputFormat job.setInputFormatClass(TextInputFormat.class)
TextInputFormat的功能邏輯是:一次讀一行文本,然后將該行的起始偏移量作為key,行內容作為value返回
2、邏輯處理接口: Mapper
完全需要用戶自己去實現其中 map() setup() clean()
3、map輸出的結果在shuffle階段會被partition以及sort,此處有兩個接口可自定義:
Partitioner
有默認實現 HashPartitioner,邏輯是 根據key和numReduces來返回一個分區號; key.hashCode()&Integer.MAXVALUE % numReduces
通常情況下,用默認的這個HashPartitioner就可以,如果業務上有特別的需求,可以自定義
Comparable
當我們用自定義的對象作為key來輸出時,就必須要實現WritableComparable接口,override其中的compareTo()方法
?
4、reduce端的數據分組比較接口 : Groupingcomparator
reduceTask拿到輸入數據(一個partition的所有數據)后,首先需要對數據進行分組,其分組的默認原則是key相同,然后對每一組kv數據調用一次reduce()方法,并且將這一組kv中的第一個kv的key作為參數傳給reduce的key,將這一組數據的value的迭代器傳給reduce()的values參數
利用上述這個機制,我們可以實現一個高效的分組取最大值的邏輯:
自定義一個bean對象用來封裝我們的數據,然后改寫其compareTo方法產生倒序排序的效果
然后自定義一個Groupingcomparator,將bean對象的分組邏輯改成按照我們的業務分組id來分組(比如訂單號)
這樣,我們要取的最大值就是reduce()方法中傳進來key
?
5、邏輯處理接口:Reducer
完全需要用戶自己去實現其中 reduce() setup() clean()
?
6、輸出數據接口: OutputFormat ---> 有一系列子類 FileOutputformat DBoutputFormat .....
默認實現類是TextOutputFormat,功能邏輯是: 將每一個KV對向目標文本文件中輸出為一行
?