起初在B站看3分鐘的速成視頻,感覺很多細節沒聽懂。
具體例子解析(文件內容去重)
對于兩個輸入文件,即文件A 和文件B,請編寫MapReduce 程序,對兩個文件進行合并,并剔除
其中重復的內容,得到一個新的輸出文件C。
📂 一、輸入數據文件
文件 A:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
文件 B:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
🧠 二、MapReduce 執行流程和中間結果
MapReduce 分為三個主要階段:
-
Map 階段
-
Shuffle(分組 & 排序)階段
-
Reduce 階段
🔹2.1 Map 階段(映射階段)
我們先來看下 Mapper 的代碼邏輯:
public static class Map extends Mapper<Object, Text, Text, Text> {private static Text text = new Text();public void map(Object key, Text value, Context context) {text = value;context.write(text, new Text(""));}
}
🔍 Mapper 做了什么?
-
每行文本被視為一個輸入記錄(
value
),key
是字節偏移量(無關緊要)。 -
該
Mapper
不對數據做任何處理,直接原樣輸出value
作為key
,并給定空字符串作為value
。 -
這樣,相同行的數據(A、B 中相同的行)會生成相同的 key,從而可以在 Shuffle 階段合并。
🔢 Map 輸出結果(中間鍵值對)
我們對 A、B 兩個文件的所有行執行一次 map()
操作,得到如下中間結果(<key, value>
形式):
來源 | key(Text) | value(Text) |
---|---|---|
A | 20150101 x | "" |
A | 20150102 y | "" |
A | 20150103 x | "" |
A | 20150104 y | "" |
A | 20150105 z | "" |
A | 20150106 x | "" |
B | 20150101 y | "" |
B | 20150102 y | "" |
B | 20150103 x | "" |
B | 20150104 z | "" |
B | 20150105 y | "" |
🔹2.2 Shuffle 階段(分組 & 排序)
MapReduce 框架自動完成以下操作:
-
將所有 Mapper 輸出結果根據 key 進行哈希分區、排序、去重分組。
-
每一個唯一的 key 會被送入一次 Reducer。
🎯 分組結果(Reducer 接收到的 key 和 values):
key(唯一行) | values("" 的列表) |
---|---|
20150101 x | ["",] |
20150101 y | ["",] |
20150102 y | ["", ""] |
20150103 x | ["", ""] |
20150104 y | ["",] |
20150104 z | ["",] |
20150105 y | ["",] |
20150105 z | ["",] |
20150106 x | ["",] |
?? 注意:
-
20150102 y
和20150103 x
都在兩個文件中出現了,所以它們的values
有兩個空字符串。 -
但 Reducer 并不關心這些
values
,它只輸出唯一的key
。
🔹2.3 Reduce 階段(歸約階段)
看一下 Reducer 的代碼:
public static class Reduce extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context) {context.write(key, new Text(""));}
}
🔍 Reducer 做了什么?
-
對于每一個唯一的
key
,Reducer 被調用一次。 -
它忽略 values,直接輸出
key
和空的Text("")
。 -
實際效果是:只輸出不重復的唯一行內容。
? 最終輸出文件 C 的內容:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
? 總結
步驟 | 說明 |
---|---|
Map | 對輸入的每一行輸出 <Text(該行), Text("")> |
Shuffle | 根據行內容去重、分組、排序 |
Reduce | 忽略 values,只輸出唯一的 key(行內容) |
輸出文件 | 文件 A 和 B 合并去重后的內容 |
問題一:Reduce端是如何輸出文件的?
? Reduce中context.write(key, value)
的行為
在 Hadoop MapReduce 中:
context.write(new Text("s"), new Text("a"));
的輸出行為是:
-
每一行輸出格式為:
key \t value
即,key 和 value 之間用一個制表符(Tab 字符
\t
)分隔。
🔍 所以你舉的例子
context.write(new Text("s"), new Text("a"));
最終輸出文件中的一行會是:
s a
不是 sa
,而是 s
和 a
之間有一個 Tab 分隔符。
🔧 那么在你的代碼中:
context.write(key, new Text(""));
由于 value
是空字符串,所以每一行就是:
key
即沒有顯示的 value,只輸出 key 的內容,所以:
20150101 x
這行實際上是 key 的原樣內容,不是 key + "" 的拼接結果,而是 key 后面雖然有個空字符串作為 value,但由于 value 是空的,輸出就是 key 自身。
?? 補充:可以自定義 key 和 value 的分隔符
在 Hadoop 中默認的 key/value 分隔符是 Tab(\t
),但你可以通過設置:
-D mapreduce.output.textoutputformat.separator=","
來更改分隔符,比如改成逗號,則輸出會變成:
s,a
? 是不是有幾個 Reducer 就會產生幾個輸出文件?
是的,完全正確。
在 Hadoop MapReduce 中:
-
如果你設置了 N 個 Reducer 任務(比如
job.setNumReduceTasks(N)
), -
那么就會產生 N 個輸出文件。
這些輸出文件的名稱通常是:
part-r-00000
part-r-00001
...
part-r-00(N-1)
每個文件由一個 Reducer 任務寫出。
? 那這些輸出文件的內容一致嗎?
不一致!每個文件的內容不同!
? 原因:
-
MapReduce 框架會按照 key 的 hash 值把數據**分區(Partition)**給不同的 Reducer。
-
每個 Reducer 只處理自己分到的 key 分區。
-
所以:
-
每個輸出文件包含的是不同部分的 key-value 對。
-
輸出文件之間是不重合的,也就是說每個 key 只會出現在一個 Reducer 的輸出文件中。
-
🧠 舉個例子(比如有 2 個 Reducer):
假設你有以下中間 key:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
Hadoop 會通過 partitioner(比如默認的 HashPartitioner)決定:
-
Reducer 0 處理:
20150101 x 20150103 x 20150105 z 20150106 x
-
Reducer 1 處理:
20150101 y 20150102 y 20150104 y 20150104 z 20150105 y
然后產生:
-
part-r-00000
← Reducer 0 寫入 -
part-r-00001
← Reducer 1 寫入
兩個文件中的內容互不重復,但合起來是完整的結果。
? 1. 默認是不是只有一個 Reducer?
是的,默認情況下 Hadoop MapReduce 只使用 1 個 Reducer。
也就是說,如果你沒有顯式調用:
job.setNumReduceTasks(N);
則默認 N = 1
,最終只會生成一個輸出文件:part-r-00000
。
? 2. 可以設置多個 Reducer 嗎?
當然可以,而且非常常見。
你可以在驅動代碼中顯式設置 Reducer 個數,例如設置為 3:
job.setNumReduceTasks(3);
這樣 Hadoop 會啟用 3 個 Reducer 并行處理數據,輸出三個文件:
part-r-00000
part-r-00001
part-r-00002
問題二:Shuffle過程的輸出結果與Combiner函數本質是?
? 一、Shuffle 輸出是啥?
默認情況下:
Map 階段的輸出會經過 Shuffle(排序 + 分區 + 組裝) 后變成:
key1 → [v1, v2, v3, ...]
key2 → [v1, v2, ...]
...
這些最終被送入 Reducer。
? 問題:為什么會有重復的 value?
因為 同一個 key 可能在同一個 Mapper 中出現多次,比如:
hello → 1
hello → 1
hello → 1
這些數據在傳輸前就可以局部聚合,先加一加再傳過去,不用浪費網絡帶寬。
? 二、Combiner 是什么?
Combiner 就是一個 “局部 Reduce”,在 Mapper 端執行,用來提前聚合。
它的作用是:
-
在 Mapper 本地 就先對 key 進行累加(或合并),
-
減少大量重復的
<key, 1>
傳給 Reducer, -
降低網絡傳輸壓力,提升性能。
? 三、怎么寫一個 Combiner?
👉 很簡單,其實你可以直接 復用 Reducer 邏輯,只要滿足:聚合操作是可交換和結合的(比如加法)。
? 1. 定義 Combiner 類(和 Reducer 一樣):
public static class IntSumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result); // 輸出 key sum}
}
? 2. 在 Driver 中設置:
job.setCombinerClass(IntSumCombiner.class);
?? 注意:你也可以直接寫成:
job.setCombinerClass(IntSumReducer.class);
因為本質一樣(統計加法是符合條件的操作)。
? 四、添加了 Combiner 后的數據流是什么樣?
假設有兩個 Map 輸出如下:
Mapper1 輸出:
hello → 1
hello → 1
world → 1
經過 Combiner:
hello → 2
world → 1
Mapper2 輸出:
hello → 1
world → 1
經過 Combiner:
hello → 1
world → 1
最終 Shuffle 輸出給 Reducer:
hello → [2, 1]
world → [1, 1]
Reducer 再聚合:
hello → 3
world → 2
? 五、什么時候不要用 Combiner?
雖然 Combiner 很有用,但它不是 always-safe 的,只有在滿足可交換、可結合的前提下才可用。
操作類型 | 適合使用 Combiner? | 示例 |
---|---|---|
加法、計數、最大最小值 | ? 可以用 | WordCount、MaxTemperature |
求平均、TopN、排序 | ? 不建議 | 平均值不能分區計算后再平均 |
? 所以完整流程是:
Map → Combiner → Shuffle(聚合 + 分區)→ Reduce