- 目錄
- 前言:
- 1、MapReduce原理
- 2、mapreduce實踐(WordCount實例)
目錄
今天先總體說下MapReduce的相關知識,后續將會詳細說明對應的shuffle、mr與yarn的聯系、以及mr的join操作的等知識。以下內容全是個人學習后的見解,如有遺漏或不足請大家多多指教。
前言:
為什么要MAPREDUCE
(1)海量數據在單機上處理因為硬件資源限制,無法勝任
(2)而一旦將單機版程序擴展到集群來分布式運行,將極大增加程序的復雜度和開發難度
(3)引入mapreduce框架后,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分布式計算中的復雜性交由框架來處理。
設想一個海量數據場景下的wordcount需求:
單機版:內存受限,磁盤受限,運算能力受限分布式:
1、文件分布式存儲(HDFS)
2、運算邏輯需要至少分成2個階段(一個階段獨立并發,一個階段匯聚)
3、運算程序如何分發
4、程序如何分配運算任務(切片)
5、兩階段的程序如何啟動?如何協調?
6、整個程序運行過程中的監控?容錯?重試?
可見在程序由單機版擴成分布式時,會引入大量的復雜工作。為了提高開發效率,可以將分布式程序中的公共功能封裝成框架,讓開發人員可以將精力集中于業務邏輯。
而mapreduce就是這樣一個分布式程序的通用框架,其應對以上問題的整體結構如下:
1、MRAppMaster(mapreduce application master)
2、MapTask
3、ReduceTask
1、MapReduce原理
Mapreduce是一個分布式運算程序的編程框架,是用戶開發“基于hadoop的數據分析應用”的核心框架;
Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在一個hadoop集群上;
Mapreduce框架結構及核心運行機制
1.1、結構
一個完整的mapreduce程序在分布式運行時有三類實例進程 :
1、MRAppMaster:負責整個程序的過程調度及狀態協調
2、mapTask:負責map階段的整個數據處理流程
3、ReduceTask:負責reduce階段的整個數據處理流程
1.2、mapreduce框架的設計思想
這里面有兩個任務的分配過程:1、總的任務切割分配給各個mapTask,不同的mapTask再將得到的hashmap按照首字母劃分,分配給各個reduceTask。
1.3、mapreduce程序運行的整體流程(wordcount運行過程的解析)
流程解析
(job.split:負責任務的切分,形成一個任務切片規劃文件。
wc.jar:要運行的jar包,包含mapper、reducer、Driver等java類。
job.xml:job的其他配置信息:如指定map是哪個類,reduce是那個類,以及輸入數據的路徑在哪,輸出數據的路徑在哪等配置信息。)
前提:客戶端提交任務給yarn后(提交前會進行任務的規劃),yarn利用ResouceManager去找到mrAppmaster.
1、 一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動后根據本次job的描述信息,計算出需要的maptask實例數量,然后向集群申請機器啟動相應數量的maptask進程
2、 maptask進程啟動之后,根據給定的數據切片范圍進行數據處理,主體流程為:
a) 利用客戶指定的inputformat來獲取RecordReader讀取數據,形成輸入KV對(框架干的事)
b) 將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,并將map()方法輸出的KV對收集到緩存
c) 將緩存中的KV對按照K分區排序后不斷溢寫到磁盤文件
3、 MRAppMaster監控到所有maptask進程任務完成之后,會根據客戶指定的參數啟動相應數量的reducetask進程,并告知reducetask進程要處理的數據范圍(數據分區)
4、 Reducetask進程啟動之后,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,并在本地進行重新歸并排序,然后按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,并收集運算輸出的結果KV,然后調用客戶指定的outputformat將結果數據輸出到外部存儲(對應的就是context.write方法)
2、mapreduce實踐(WordCount實例)
編程規范:
(1)用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
(2)Mapper的輸入數據是KV對的形式(KV的類型可自定義)
(3)Mapper的輸出數據是KV對的形式(KV的類型可自定義)
(4)Mapper中的業務邏輯寫在map()方法中
(5)map()方法(maptask進程)對每一個<K,V>調用一次
(6)Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
(7)Reducer的業務邏輯寫在reduce()方法中
(8)Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
(9)用戶自定義的Mapper和Reducer都要繼承各自的父類
(10)整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象
WordCount程序
mapper類
package bigdata.mr.wcdemo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
//map方法的生命周期: 框架每傳一行數據就被調用一次* KEYIN: 默認情況下,是mr框架所讀到的一行文本的起始偏移量,Long,* 但是在hadoop中有自己的更精簡的序列化接口,所以不直接用Long,而用LongWritable* * VALUEIN:默認情況下,是mr框架所讀到的一行文本的內容,String,同上,用Text* * KEYOUT:是用戶自定義邏輯處理完成之后輸出數據中的key,在此處是單詞,String,同上,用Text* VALUEOUT:是用戶自定義邏輯處理完成之后輸出數據中的value,在此處是單詞次數,Integer,同上,用IntWritable*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{/*** map階段的業務邏輯就寫在自定義的map()方法中* maptask會對每一行輸入數據調用一次我們自定義的map()方法*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將maptask傳給我們的文本內容先轉換成StringString line = value.toString();//根據空格將這一行切分成單詞String[] words = line.split(" "); //將單詞輸出為<單詞,1>for(String word:words){//將單詞作為key,將次數1作為value,以便于后續的數據分發,可以根據單詞分發,以便于相同單詞會到相同的reduce taskcontext.write(new Text(word), new IntWritable(1));}}
}
reducer類
package mr_test;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
//生命周期:框架每傳遞進來一個k相同的value 組,reduce方法就被調用一次* KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT類型對應* KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出數據類型* KEYOUT是單詞* VLAUEOUT是總次數*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /*** <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>* 入參key,是一組相同單詞kv對的key*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count=0;for(IntWritable value:values){count+=value.get(); }context.write(key, new IntWritable(count));}
}
Driver類 用來描述job并提交job
package mr_test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*** 相當于一個yarn集群的客戶端* 需要在此封裝我們的mr程序的相關運行參數,指定jar包* 最后提交給yarn*/
public class WordcountDriver {public static void main(String[] args) throws IOException, Exception, InterruptedException {Configuration cf = new Configuration();
// 把這個程序打包成一個Job來運行Job job = Job.getInstance(); //指定本程序的jar包所在的本地路徑job.setJarByClass(WordcountDriver.class); //指定本業務job要使用的mapper/Reducer業務類job.setMapperClass(WorldcountMapper.class);job.setReducerClass(WordcountReducer.class); //指定mapper輸出數據的kv類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class); //指定最終輸出的數據的kv類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); //指定job的輸入原始文件所在目錄FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的輸出結果所在目錄FileOutputFormat.setOutputPath(job, new Path(args[1])); //將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行boolean res = job.waitForCompletion(true);System.exit(res?0:1); }
}