1、Map-Reduce的邏輯過程
假設我們需要處理一批有關天氣的數據,其格式如下:
- 按照ASCII碼存儲,每行一條記錄
- 每一行字符從0開始計數,第15個到第18個字符為年
- 第25個到第29個字符為溫度,其中第25位是符號+/-
0067011990999991950051507+0000+ 0043011990999991950051512+0022+ 0043011990999991950051518-0011+ 0043012650999991949032412+0111+ 0043012650999991949032418+0078+ 0067011990999991937051507+0001+ 0043011990999991937051512-0002+ 0043011990999991945051518+0001+ 0043012650999991945032412+0002+ 0043012650999991945032418+0078+ |
現在需要統計出每年的最高溫度。
Map-Reduce主要包括兩個步驟:Map和Reduce
每一步都有key-value對作為輸入和輸出:
- map階段的key-value對的格式是由輸入的格式所決定的,如果是默認的TextInputFormat,則每行作為一個記錄進程處理,其中key為此行的開頭相對于文件的起始位置,value就是此行的字符文本
- map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應
對于上面的例子,在map過程,輸入的key-value對如下:
(0, 0067011990999991950051507+0000+) (33, 0043011990999991950051512+0022+) (66, 0043011990999991950051518-0011+) (99, 0043012650999991949032412+0111+) (132, 0043012650999991949032418+0078+) (165, 0067011990999991937051507+0001+) (198, 0043011990999991937051512-0002+) (231, 0043011990999991945051518+0001+) (264, 0043012650999991945032412+0002+) (297, 0043012650999991945032418+0078+) |
在map過程中,通過對每一行字符串的解析,得到年-溫度的key-value對作為輸出:
(1950, 0) (1950, 22) (1950, -11) (1949, 111) (1949, 78) (1937, 1) (1937, -2) (1945, 1) (1945, 2) (1945, 78) |
在reduce過程,將map過程中的輸出,按照相同的key將value放到同一個列表中作為reduce的輸入
(1950, [0, 22, –11]) (1949, [111, 78]) (1937, [1, -2]) (1945, [1, 2, 78]) |
在reduce過程中,在列表中選擇出最大的溫度,將年-最大溫度的key-value作為輸出:
(1950, 22) (1949, 111) (1937, 1) (1945, 78) |
其邏輯過程可用如下圖表示:
2、編寫Map-Reduce程序
編寫Map-Reduce程序,一般需要實現兩個函數:mapper中的map函數和reducer中的reduce函數。
一般遵循以下格式:
- map: (K1, V1)? ->? list(K2, V2)
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable { ? void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) ? throws IOException; } |
- reduce: (K2, list(V))? ->? list(K3, V3)?
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable { ? void reduce(K2 key, Iterator<V2> values, ????????????? OutputCollector<K3, V3> output, Reporter reporter) ??? throws IOException; } |
?
對于上面的例子,則實現的mapper如下:
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { ??? @Override ??? public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { ??????? String line = value.toString(); ??????? String year = line.substring(15, 19); ??????? int airTemperature; ??????? if (line.charAt(25) == '+') { ??????????? airTemperature = Integer.parseInt(line.substring(26, 30)); ??????? } else { ??????????? airTemperature = Integer.parseInt(line.substring(25, 30)); ??????? } ??????? output.collect(new Text(year), new IntWritable(airTemperature)); ??? } } |
實現的reducer如下:
public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { ??? public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { ??????? int maxValue = Integer.MIN_VALUE; ??????? while (values.hasNext()) { ??????????? maxValue = Math.max(maxValue, values.next().get()); ??????? } ??????? output.collect(key, new IntWritable(maxValue)); ??? } } |
?
欲運行上面實現的Mapper和Reduce,則需要生成一個Map-Reduce得任務(Job),其基本包括以下三部分:
- 輸入的數據,也即需要處理的數據
- Map-Reduce程序,也即上面實現的Mapper和Reducer
- 此任務的配置項JobConf
欲配置JobConf,需要大致了解Hadoop運行job的基本原理:
- Hadoop將Job分成task進行處理,共兩種task:map task和reduce task
- Hadoop有兩類的節點控制job的運行:JobTracker和TaskTracker
- JobTracker協調整個job的運行,將task分配到不同的TaskTracker上
- TaskTracker負責運行task,并將結果返回給JobTracker
- Hadoop將輸入數據分成固定大小的塊,我們稱之input split
- Hadoop為每一個input split創建一個task,在此task中依次處理此split中的一個個記錄(record)
- Hadoop會盡量讓輸入數據塊所在的DataNode和task所執行的DataNode(每個DataNode上都有一個TaskTracker)為同一個,可以提高運行效率,所以input split的大小也一般是HDFS的block的大小。
- Reduce task的輸入一般為Map Task的輸出,Reduce Task的輸出為整個job的輸出,保存在HDFS上。
- 在reduce中,相同key的所有的記錄一定會到同一個TaskTracker上面運行,然而不同的key可以在不同的TaskTracker上面運行,我們稱之為partition
- partition的規則為:(K2, V2) –> Integer, 也即根據K2,生成一個partition的id,具有相同id的K2則進入同一個partition,被同一個TaskTracker上被同一個Reducer進行處理。
public interface Partitioner<K2, V2> extends JobConfigurable { ? int getPartition(K2 key, V2 value, int numPartitions); } |
下圖大概描述了Map-Reduce的Job運行的基本原理:
?
下面我們討論JobConf,其有很多的項可以進行配置:
- setInputFormat:設置map的輸入格式,默認為TextInputFormat,key為LongWritable, value為Text
- setNumMapTasks:設置map任務的個數,此設置通常不起作用,map任務的個數取決于輸入的數據所能分成的input split的個數
- setMapperClass:設置Mapper,默認為IdentityMapper
- setMapRunnerClass:設置MapRunner, map task是由MapRunner運行的,默認為MapRunnable,其功能為讀取input split的一個個record,依次調用Mapper的map函數
- setMapOutputKeyClass和setMapOutputValueClass:設置Mapper的輸出的key-value對的格式
- setOutputKeyClass和setOutputValueClass:設置Reducer的輸出的key-value對的格式
- setPartitionerClass和setNumReduceTasks:設置Partitioner,默認為HashPartitioner,其根據key的hash值來決定進入哪個partition,每個partition被一個reduce task處理,所以partition的個數等于reduce task的個數
- setReducerClass:設置Reducer,默認為IdentityReducer
- setOutputFormat:設置任務的輸出格式,默認為TextOutputFormat
- FileInputFormat.addInputPath:設置輸入文件的路徑,可以使一個文件,一個路徑,一個通配符。可以被調用多次添加多個路徑
- FileOutputFormat.setOutputPath:設置輸出文件的路徑,在job運行前此路徑不應該存在
當然不用所有的都設置,由上面的例子,可以編寫Map-Reduce程序如下:
public class MaxTemperature { ??? public static void main(String[] args) throws IOException { ??????? if (args.length != 2) { ??????????? System.err.println("Usage: MaxTemperature <input path> <output path>"); ??????????? System.exit(-1); ??????? } ??????? JobConf conf = new JobConf(MaxTemperature.class); ??????? conf.setJobName("Max temperature"); ??????? FileInputFormat.addInputPath(conf, new Path(args[0])); ??????? FileOutputFormat.setOutputPath(conf, new Path(args[1])); ??????? conf.setMapperClass(MaxTemperatureMapper.class); ??????? conf.setReducerClass(MaxTemperatureReducer.class); ??????? conf.setOutputKeyClass(Text.class); ??????? conf.setOutputValueClass(IntWritable.class); ??????? JobClient.runJob(conf); ??? } } |
3、Map-Reduce數據流(data flow)
Map-Reduce的處理過程主要涉及以下四個部分:
- 客戶端Client:用于提交Map-reduce任務job
- JobTracker:協調整個job的運行,其為一個Java進程,其main class為JobTracker
- TaskTracker:運行此job的task,處理input split,其為一個Java進程,其main class為TaskTracker
- HDFS:hadoop分布式文件系統,用于在各個進程間共享Job相關的文件
3.1、任務提交
JobClient.runJob()創建一個新的JobClient實例,調用其submitJob()函數。
- 向JobTracker請求一個新的job ID
- 檢測此job的output配置
- 計算此job的input splits
- 將Job運行所需的資源拷貝到JobTracker的文件系統中的文件夾中,包括job jar文件,job.xml配置文件,input splits
- 通知JobTracker此Job已經可以運行了
提交任務后,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務運行完畢。
?
3.2、任務初始化
?
當JobTracker收到submitJob調用的時候,將此任務放到一個隊列中,job調度器將從隊列中獲取任務并初始化任務。
初始化首先創建一個對象來封裝job運行的tasks, status以及progress。
在創建task之前,job調度器首先從共享文件系統中獲得JobClient計算出的input splits。
其為每個input split創建一個map task。
每個task被分配一個ID。
?
3.3、任務分配
?
TaskTracker周期性的向JobTracker發送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已經準備運行一個新的task,JobTracker將分配給其一個task。
在JobTracker為TaskTracker選擇一個task之前,JobTracker必須首先按照優先級選擇一個Job,在最高優先級的Job中選擇一個task。
TaskTracker有固定數量的位置來運行map task或者reduce task。
默認的調度器對待map task優先于reduce task
當選擇reduce task的時候,JobTracker并不在多個task之間進行選擇,而是直接取下一個,因為reduce task沒有數據本地化的概念。
?
3.4、任務執行
?
TaskTracker被分配了一個task,下面便要運行此task。
首先,TaskTracker將此job的jar從共享文件系統中拷貝到TaskTracker的文件系統中。
TaskTracker從distributed cache中將job運行所需要的文件拷貝到本地磁盤。
其次,其為每個task創建一個本地的工作目錄,將jar解壓縮到文件目錄中。
其三,其創建一個TaskRunner來運行task。
TaskRunner創建一個新的JVM來運行task。
被創建的child JVM和TaskTracker通信來報告運行進度。
?
3.4.1、Map的過程
MapRunnable從input split中讀取一個個的record,然后依次調用Mapper的map函數,將結果輸出。
map的輸出并不是直接寫入硬盤,而是將其寫入緩存memory buffer。
當buffer中數據的到達一定的大小,一個背景線程將數據開始寫入硬盤。
在寫入硬盤之前,內存中的數據通過partitioner分成多個partition。
在同一個partition中,背景線程會將數據按照key在內存中排序。
每次從內存向硬盤flush數據,都生成一個新的spill文件。
當此task結束之前,所有的spill文件被合并為一個整的被partition的而且排好序的文件。
reducer可以通過http協議請求map的輸出文件,tracker.http.threads可以設置http服務線程數。
3.4.2、Reduce的過程
當map task結束后,其通知TaskTracker,TaskTracker通知JobTracker。
對于一個job,JobTracker知道TaskTracer和map輸出的對應關系。
reducer中一個線程周期性的向JobTracker請求map輸出的位置,直到其取得了所有的map輸出。
reduce task需要其對應的partition的所有的map輸出。
reduce task中的copy過程即當每個map task結束的時候就開始拷貝輸出,因為不同的map task完成時間不同。
reduce task中有多個copy線程,可以并行拷貝map輸出。
當很多map輸出拷貝到reduce task后,一個背景線程將其合并為一個大的排好序的文件。
當所有的map輸出都拷貝到reduce task后,進入sort過程,將所有的map輸出合并為大的排好序的文件。
最后進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每個key,最后的結果寫入HDFS。
?
?
3.5、任務結束
?
當JobTracker獲得最后一個task的運行成功的報告后,將job得狀態改為成功。
當JobClient從JobTracker輪詢的時候,發現此job已經成功結束,則向用戶打印消息,從runJob函數中返回。