MapReduce的API理解
Mapper
- 如果是單詞計數:hello:1, hello:1, world:1
public void map(Object key, // 首字符偏移量Text value, // 文件的一行內容Context context) // Mapper端的上下文,與 OutputCollector 和 Reporter 的功能類似throws IOException, InterruptedException {
Reduce
- 注意拿到的是value的集合
- 如果是單詞計數:hello:【1,1】,world:【1】
public void reduce(Text key, // Map端 輸出的 key 值Iterable<IntWritable> values, // Map端 輸出的 Value 集合(相同key的集合)Context context) // Reduce 端的上下文,與 OutputCollector 和 Reporter 的功能類似throws IOException, InterruptedException{
整體架構
- NameNode: 負責存儲文件的元信息(如文件的大小、塊信息、存儲位置等)。通常會部署一個 Secondary NameNode 來周期性合并 NameNode 的 edit log 以減少恢復時間,但它并不是熱備,而是輔助管理。避免單點故障
- JobTracker:負責 Hadoop MapReduce 任務的調度和資源管理,接收作業請求并將任務分配給不同的 TaskTracker(工作節點)
- DataNode:實際存儲數據的節點,存有多個數據塊(Block),128MB
- TaskTracker:實際進行mapper和reduce工作的節點,可以看到圖中TaskTracker接近DataNode,這是“移動計算比移動數據更便宜” 的設計理念,目的是減少數據傳輸,提高計算效率
輸入階段
- 文件存儲到 HDFS(Hadoop Distributed File System):
- 當文件上傳到 HDFS 時,文件會先到達
NameNode
,NameNode
負責存儲文件的元信息(如文件的大小、塊信息、存儲位置等)。 - 隨后,文件會被分割成固定大小的數據塊(Block),默認每塊大小是 128 MB(可以通過配置調整)。
- 這些數據塊會被分布式地存儲到集群中的不同
DataNode
節點上,通常每個塊有多個副本(默認是 3 個),以保證數據的可靠性,同步到指定數量的副本后,才向NameNode確認寫操作成功,保證一致性。TiDB的底層TiKV的存儲也類似這個結構,根據鍵值對分為多個region,且不同的節點保存奇數個副本,并有raft協議保證一致性。
Mapper階段
- TaskTracker 和 Mapper 的運行:
- 當執行 MapReduce 作業時,
JobTracker
會負責任務的調度。 - 根據
NameNode
提供的塊位置信息,JobTracker
會在包含該塊數據的DataNode
上啟動Mapper
,這是數據本地化優化的核心。 - 每個
Mapper
會處理一個或多個數據塊。 Mapper
會將每一行的文件處理成鍵值對形式,也可以進行數據預處理(過濾、清洗):
- 當執行 MapReduce 作業時,
// input.txt:
apple 10
banana 20
apple 5// after mapper:
(apple, 5), (apple, 10), (banana, 20)
Mapper
的輸出結果存儲在本地磁盤的緩存文件中或者磁盤中,并分為多個分區,每個分區對應一個Reducer
。
Shuffle 階段
什么是 Shuffle?
Shuffle 是將 Mapper
輸出的中間數據(鍵值對)分發給 Reducer
的過程。
其主要任務包括:
- 對 Mapper 輸出的數據進行分區。
- 將分區數據從 Mapper 節點移動到 Reducer 節點。
- 對分區數據進行排序和合并。
Shuffle 的執行角色
-
Mapper 節點執行分區:
- 在 Mapper 階段結束后,輸出結果會被分區(默認使用
HashPartitioner
)。 - 每個分區對應一個 Reducer。Mapper 的數據會按照分區規則存儲到本地磁盤的多個文件中。
- 在 Mapper 階段結束后,輸出結果會被分區(默認使用
-
JobTracker(或 Yarn)負責協調:
JobTracker
會通知各 Reducer 節點**從相應的 Mapper 節點拉取(pull)屬于自己的分區數據。
-
Reducer 節點的數據遷移:
- 每個 Reducer 從多個 Mapper 節點拉取自己的數據。
- 拉取的數據會臨時存儲在 Reducer 節點上,并在內存中進行排序和合并,以便 Reducer 處理。
Shuffle 階段會涉及到數據從 Mapper 節點到 Reducer 節點的遷移,這也是整個 MapReduce 流程中最耗時的一部分。
舉個例子,更好理解如何分區以及數據傳輸:
Shuffle例子
輸入文件內容:
File1: Hello Hadoop Hello
File2: Hadoop MapReduce Hadoop
分塊:
- Block1(File1)
- Block2(File2)
Mapper 輸出:
假設有 2 個 Mapper
和 2 個 Reducer
,并用 key.hashCode() % 2
作為分區規則。
Mapper | Key | Partition (Reducer) | Output |
---|---|---|---|
Mapper1 | Hello | 0 | {Hello: 1} |
Mapper1 | Hadoop | 1 | {Hadoop: 1} |
Mapper1 | Hello | 0 | {Hello: 1} |
Mapper2 | Hadoop | 1 | {Hadoop: 1} |
Mapper2 | MapReduce | 0 | {MapReduce: 1} |
Mapper2 | Hadoop | 1 | {Hadoop: 1} |
Shuffle 階段:
在 Shuffle 階段,每個 Reducer 會從多個 Mapper 拉取數據:
Reducer | Data Source | Received Data |
---|---|---|
Reducer0 | Mapper1 + Mapper2 | {Hello: [1, 1], MapReduce: [1]} |
Reducer1 | Mapper1 + Mapper2 | {Hadoop: [1, 1, 1]} |
Reducer 聚合:
最終,Reducer 聚合數據,輸出結果:
Reducer0: {Hello: 2, MapReduce: 1}
Reducer1: {Hadoop: 3}
Reduce階段
前面提到,reduce時會向map的節點獲取數據,它是如何直到那個mapper節點的呢?
具體是這樣的:map任務成功完成后,它們會使用心跳機制通知它們JobTracker。因此,對于指定作業JobTracker 知道 map輸出和主機位置之間的映射關系。reducer 中的一個線程定期詢問 JobTracker 以便獲取 map輸出主機的位置,直到獲得所有輸出位置。
Reduce 任務執行完畢后,輸出結果會直接存儲到 HDFS,但是Reduce 節點不會主動通知 NameNode 數據位置,而是 HDFS 負責數據存儲的元數據管理,Reduce 任務會通過 HDFS 客戶端 API 將數據寫入 HDFS
寫數據到 HDFS 的過程(詳)
-
客戶端請求寫入文件:
客戶端(例如 Reducer 或用戶程序)向 HDFS 的 NameNode 發起寫入請求。
客戶端需要告訴 NameNode 文件的元信息(如文件名、大小等)
NameNode 分配數據塊NameNode 根據文件大小、HDFS 的配置(如塊大小和副本數量),分配該文件需要的 數據塊(Block)
對于每個塊,NameNode 會選擇多個 DataNode(通常是 3 個)作為存儲目標,并將這些位置信息返回給客戶端。
-
分配數據塊:
通常會優先選擇離客戶端最近的節點,或者是同一個機架的節點,來減少網絡延遲。副本的存儲節點也會盡量分布在不同的機架上,提高數據可靠性
客戶端直接寫入 DataNode:客戶端根據 NameNode 返回的塊位置信息,開始向第一組目標 DataNode 寫入數據。寫入過程是 流式傳輸,數據被切分為塊后,直接發送給第一個 DataNode
DataNode 進行副本復制:第一臺 DataNode 在接收到數據后,會立即將該數據塊傳輸到下一臺 DataNode,依此類推,直到完成所有副本的寫入(鏈式復制)
-
DataNode 匯報塊信息:
每個 DataNode 在數據寫入完成后,會向 NameNode 匯報存儲的塊信息(如塊 ID、塊大小、存儲位置)
-
寫入完成:
當所有數據塊都寫入成功,并且所有副本都存儲完成后,HDFS 客戶端通知 NameNode 文件寫入完成,保證數據的一致性NameNode 將文件的元數據標記為 “完成狀態”
MapReduce框架的Java實現
這里手寫一個簡易的java實現的框架,方便大家理解
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;// 定義 Mapper 接口
interface Mapper {List<Pair<String, Integer>> map(String input);
}// 定義 Reducer 接口
interface Reducer {Pair<String, Integer> reduce(String key, List<Integer> values);
}// 定義 Pair 類,用于存儲鍵值對
class Pair<K, V> {public final K key;public final V value;public Pair(K key, V value) {this.key = key;this.value = value;}@Overridepublic String toString() {return key + ": " + value;}
}// 實現支持多個 Mapper 和 Reducer 的 MapReduce 框架
class ParallelMapReduceFramework {private List<Mapper> mappers;private List<Reducer> reducers;private int reducerCount;public ParallelMapReduceFramework(List<Mapper> mappers, List<Reducer> reducers) {this.mappers = mappers;this.reducers = reducers;this.reducerCount = reducers.size();}public Map<String, Integer> execute(List<String> inputs) throws InterruptedException, ExecutionException {ExecutorService executor = Executors.newFixedThreadPool(mappers.size());// 1. Map 階段:將輸入數據分給多個 Mapper 并行處理List<Future<List<Pair<String, Integer>>>> mapResults = new ArrayList<>();int chunkSize = inputs.size() / mappers.size();for (int i = 0; i < mappers.size(); i++) {int start = i * chunkSize;int end = (i == mappers.size() - 1) ? inputs.size() : (i + 1) * chunkSize;List<String> chunk = inputs.subList(start, end);Mapper mapper = mappers.get(i);mapResults.add(executor.submit(() -> {List<Pair<String, Integer>> results = new ArrayList<>();for (String input : chunk) {results.addAll(mapper.map(input));}return results;}));}// 收集所有 Mapper 生成的鍵值對List<Pair<String, Integer>> allMappedData = new ArrayList<>();for (Future<List<Pair<String, Integer>>> future : mapResults) {allMappedData.addAll(future.get());}// 2. Shuffle 階段:將鍵值對分片,分配給不同的 ReducerMap<Integer, List<Pair<String, Integer>>> reducerInput = new HashMap<>();for (int i = 0; i < reducerCount; i++) {reducerInput.put(i, new ArrayList<>());}for (Pair<String, Integer> pair : allMappedData) {int reducerIndex = Math.abs(pair.key.hashCode() % reducerCount);reducerInput.get(reducerIndex).add(pair);}// 3. Reduce 階段:每個 Reducer 處理一個分片數據List<Future<Map<String, Integer>>> reduceResults = new ArrayList<>();for (int i = 0; i < reducers.size(); i++) {int index = i;Reducer reducer = reducers.get(i);List<Pair<String, Integer>> inputForReducer = reducerInput.get(index);reduceResults.add(executor.submit(() -> {// 按鍵分組Map<String, List<Integer>> groupedData = new HashMap<>();for (Pair<String, Integer> pair : inputForReducer) {groupedData.computeIfAbsent(pair.key, k -> new ArrayList<>()).add(pair.value);}// Reduce 操作Map<String, Integer> result = new HashMap<>();for (Map.Entry<String, List<Integer>> entry : groupedData.entrySet()) {result.put(entry.getKey(), reducer.reduce(entry.getKey(), entry.getValue()).value);}return result;}));}// 收集所有 Reducer 的結果Map<String, Integer> finalResult = new HashMap<>();for (Future<Map<String, Integer>> future : reduceResults) {finalResult.putAll(future.get());}executor.shutdown();return finalResult;}
}// 實現單詞統計的 Mapper 和 Reducer
class WordCountMapper implements Mapper {@Overridepublic List<Pair<String, Integer>> map(String input) {String[] words = input.split("\\s+");List<Pair<String, Integer>> result = new ArrayList<>();for (String word : words) {result.add(new Pair<>(word.toLowerCase(), 1));}return result;}
}class WordCountReducer implements Reducer {@Overridepublic Pair<String, Integer> reduce(String key, List<Integer> values) {int sum = values.stream().mapToInt(Integer::intValue).sum();return new Pair<>(key, sum);}
}// 測試并行 MapReduce 框架
public class ParallelWordCountExample {public static void main(String[] args) throws InterruptedException, ExecutionException {// 輸入數據List<String> inputs = Arrays.asList("Hello world hello","MapReduce is powerful","Hello MapReduce world","Java is great","Hello from the other side");// 創建多個 Mapper 和 Reducer 實例List<Mapper> mappers = Arrays.asList(new WordCountMapper(), new WordCountMapper());List<Reducer> reducers = Arrays.asList(new WordCountReducer(), new WordCountReducer());// 執行 MapReduceParallelMapReduceFramework framework = new ParallelMapReduceFramework(mappers, reducers);Map<String, Integer> wordCounts = framework.execute(inputs);// 輸出結果wordCounts.forEach((word, count) -> System.out.println(word + ": " + count));}
}
Yarn(MapReduce2)
MapReducer1的問題:
前面提到的MapReducer的模型的問題:
- JobTracker的負載(可擴展性):MapReduce 1 中,,obtracker 同時負責作業調度(將任務與 tasktracker 匹配)和任務進度監控(跟蹤任務、重啟失敗或遲緩的任務;記錄任務流水,如維護計數器的計數)。當任務實例過多時,會導致系統無法再擴展
- JobTracker的可用性:由于JobTracker管理所有應用的狀態,為了實現其可用性,就要創建副本并同步內存中的數據,強一致性意味著性能的損耗,弱一致性意味著故障恢復時的數據的差異。
- 節點的利用率:MapReduce 1中,每個tasktracker都配置有若干固定長度和類型的slot,這些slot是靜態分配的,在配置的時候就被劃分為map slot和reduce slot。一個map slot僅能用于運行一個map任務,一個reduce slot僅能用于運行一個reduce任務,所以分配不當就會導致系統性能低下
針對以上幾個問題,Yarn將jobTracker的工作拆分,分為資源管理器(負責作業調度),以及application Master(負責任務進度監控,一個MapperReducer應用對應一個application Master),通過合理的資源分配提高節點的利用率,每個應用交由一個master管理,可以無限擴展資源避免單點的負載過大,還可以通過zookeeper等機制分別實現資源管理器和application master的高可用(如失敗后再次申請資源)。
還有一個優點就是實現了多租戶,對于資源的抽象和分配機制,可以在Yarn上構建不同的應用,如Spark等
MapReducer2的工作流程
申明幾個概念:
- Yarn的資源管理器,申請的資源即為一個container,可以指定其計算機的資源數量(內存和CPU),可以理解為之前版本的DataNode拆分成了多個容器
- Map,reduce的執行是容器中的進程,而前面提到的Application Master實際上也是容器中的進程,只是功能較為特殊
- 一個MapReduce應用對應一個Application Master
- 一個節點對應一個Node Manager,負責管理該節點上的所有容器和心跳
- 1-5的步驟較為簡單,就是創建一個container用于生成application master。具體是資源管理器收到調用它的submitApplication()消息后,便將請求傳遞給 YARN調度器(scheduler)。調度器分配一個容器,然后資源管理器在節點管理器的管理下在容器中啟動application master的進程
- 接下來,它接受來自共享文件系統的、在客戶端計算的輸入分片(步驟7)。然后對每一個分片創建一個 map任務對象以及由mapreduce.job.reduces 屬性(通過作業的 setNumReduceTasks()方法設置),根據配置確定多個reduce任務對象。
- application master就會為該作業中的所有map任務和reduce任務向資源管理器請求執行具體任務的容器
- 一旦資源管理器的調度器為任務分配了一個特定節點上的容器,application master就通過與節點管理器通信來啟動容器(步驟9a和9b)。該任務由主類為YarnChild的一個 Java 應用程序執行。在它運行任務之前,首先將任務需要的資源本地化,包括作業的配置、JAR 文件和所有來自分布式緩存的文件(步驟 10)。最后,運行map任務或reduce任務(步驟11)。
Spark
適用場景
Spark 最突出的表現在于它能將作業與作業之間產生的大規模的工作數據集存儲在內存中。MapReduce的數據集始終需要從磁盤上加載。從Spark 處理模型中獲益最大的兩種應用類型分別為迭代算法(即對一個數據集重復應用某個函數,直至滿足退出條件)和交互式分析(用戶向數據集發出一系列專用的探索性查詢,比如查出數據后,根據數據在進行多次篩選分析)。
相關概念
RDD
RDD(Resilient Distributed Dataset)是 Spark 的核心抽象,用于表示分布式、不變、容錯的數據集。
RDD的分區(Partition) 是 Spark 對數據的基本并行處理單元。RDD會被分割成多個分區,并在多個節點上并行處理,以實現高效的分布式計算。分區的大小就是HDFS 文件默認塊大小(HDFS block size),通常為 128MB。可以理解為從HDFS中讀取block到內存中,并且對這個block進行計算的抽象
舉個例子,我們在 Spark 中使用 textFile(“hdfs://path/to/my_data.txt”) 讀取時,RDD 會被劃分為2 個分區,分別對應:
Partition 1 在 Node A 處理 Block 1 數據
Partition 2 在 Node B 處理 Block 2 數據
RDD的操作
RDD的生成,一個是讀取文件,在不同的block中生成分區,還有一個就是對現有的RDD進行轉換
JavaRDD<String> rdd = sc.textFile("hdfs://path/to/my_data.txt");
JavaRDD<String> filteredRDD = rdd.filter(line -> line.contains("error"));
JavaPairRDD<String, Integer> counts = filteredRDD.mapToPair(line -> new Tuple2<>(line, 1)).reduceByKey((a, b) -> a + b);
// 觸發RDD開始轉換,foreach函數也可以觸發
counts.saveAsTextFile("hdfs://path/to/output");
以上的代碼中的filter,mapToPair,reduceByKey就是一系列動作,類似于響應式編程中的訂閱發布,當實際訂閱(也就是這里的saveAsTextFile執行時),才會觸發發布(RDD的開始轉換),即惰性轉換
RDD的持久化
spark的特性就是能夠保存中間數據在內存,默認RDD不會保存到內存中,當我們需要某部分數據時,可以手動將其保存到內存中,方便下一次計算,以下調用cache緩存
當執行RDD轉換時,提示已經保存:
當下一次對該RDD重新進行不同的轉換時,提示用到了緩存:
DAG和Stage
多個RDD的轉換形成一個有向無環圖,當一些可以基于本地的RDD的操作進行的轉換的執行鏈,即每個分區的數據只依賴于上游 RDD (在本地)的一個分區的話(如 map(), filter()),我們當然可以在同一個節點中進行這個轉換操作,這稱為窄依賴
如果當前 RDD 的分區需要依賴多個上游 RDD 分區(如 reduceByKey(), groupBy()),那么會發生 Shuffle,相當于觸發了一次reduce操作,這成為寬依賴
而這個DAG會因為出現寬依賴而進行stage的劃分,將執行鏈拆分成不同的stage部分,每一個stage交給一個節點運行
這個很好理解,相當于上游RDD的reduceByKey需要進行一個類似于mapreducer中的shuffle操作,下游RDD的reduceByKey需要進行一個類似于mapreducer中的reduce操作,而reduce的數據非本地的,且對應的所需要的reduce的任務數量也不等同于map階段的任務數,所以重新分配