系統參數配置
Configuration類由源來設置,每個源包含以XML形式出現的一系列屬性/值對。如:
configuration-default.xml
configuration-site.xml
Configuration conf = new Configuration();
conf.addResource("configuraition-default.xml");
conf.addResource("|configuration-site.xml");
后添加進來的屬性取值覆蓋前面所添加資源中的屬性取值,除非前面的屬性值被標記為final。
Hadoop默認使用兩個源進行配置,順序加載core-default.xml和core-site.xml。
前者定義系統默認屬性,后者定義在特定的地方重寫。
性能調優
在正確完成功能的基礎上,使執行的時間盡量短,占用的空間盡量小。
輸入采用大文件
1000個2.3M的文件運行33分鐘;合并為1個2.2G的文件后運行3分鐘。
也可借用Hadoop中的CombineFileInputFormat,它將多個文件打包到一個輸入單元中,從而每次執行Map操作就會處理更多的數據。
壓縮文件
對Map的輸出進行壓縮,好處:減少存儲文件的空間;加快在網絡上的傳輸速度;減少數據在內存和磁盤間交換的時間。
mapred.compress.map.output設置為true來對Map的輸出數據進行壓縮;
mapred.map.output.compression.codec設置壓縮格式
修改作業屬性
在conf目錄下修改屬性
mapred.tasktracker.map.tasks.maximum
mapred.tasktracker.reduce.tasks.maximum
設置Map/Reduce任務槽數,默認均為2。
MapReduce工作流
如果處理過程變得復雜了,可以通過更加復雜、完善的Map和Reduce函數,甚至更多的MapReduce工作來體現。
復雜的Map和Reduce函數
基本的MapReduce作業僅僅集成并覆蓋了基類Mapper和Reducer中的核心函數Map或Reduce。
下面介紹基類中的其他函數,使大家能夠編寫功能更加復雜、控制更加完備的Map和Reduce函數。
setup函數
源碼如下
/**
* Called once at the start of the task
*/
protected void setup( Context context) throws IOException, InterruptedException {//NOTHING
}
此函數在task啟動開始時調用一次。
每個task以Map類或Reduce類為處理方法主體,輸入分片為處理方法的輸入,自己的分片處理完之后task也就銷毀了。
setup函數在task啟動之后數據處理之前只調用一次,而覆蓋的Map函數或Reduce函數會針對輸入分片中的每個key調用一次。
可以將Map或Reduce函數中的重復處理放置到setup函數中;
可以將Map或Reduce函數處理過程中可能使用到的全局變量進行初始化;
可以從作業信息中獲取全局變量;
可以監控task的啟動。
cleanup函數
/**
* Called noce at the end of the task
*/
protected void cleanup(Context context) throws IOException, InterruptedException {//NOTHING
}
和setup相似,不同之處在于cleanup函數是在task銷毀之前執行的。
run函數
/**
* Expert users can override this method for more complete control over the execution of the Mapper.
*@param context
*@throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {setup (context);while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}cleanup(context);
}
此函數是map函數或Reduce函數的啟動方法。
如果想更完備地控制Map或者Reduce,可以覆蓋此函數。
MapReduce中全局共享數據方法
1、讀寫HDFS文件
利用Hadoop的Java API來實現。
需要注意:多個Map或Reduce的寫操作會產生沖突,覆蓋原有數據。
優點:能夠實現讀寫,比較直觀;
缺點:要貢獻一些很小的全局數據也需要使用IO,這將占用系統資源,增加作業完成的資源消耗。
2、配置Job屬性
在任務啟動之初利用Configuration類中的set(String name, String value)將一些簡單的全局數據封裝到作業的配置屬性中;
然后在task中利用Configuration類中的get(String name)獲取配置到屬性中的全局數據。
優點:簡單,資源消耗小;
缺點:對量比較大的共享數據顯得比較無力。
3、使用DistributedCache
為應用提供緩存文件的只讀工具,可以緩存文本文件、壓縮文件、jar文件。
在使用時,用戶可以在作業配置時使用本地或HDFS文件的UCRL來將其設置成共享緩存文件。
在作業啟動之后和task啟動之前,MapReduce框架會將可能需要的緩存文件復制到執行任務結點的本地。
優點:每個Job共享文件只會在啟動之后復制一次,適用于大量的共享數據;
缺點:只讀。
//配置
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("/myapp/lookup"), conf);
//在Map函數中使用:
public static class Map extends Mapper<...>{private Path[] localArchives;private Paht[] localFiles;public void setup (Context context) throws IOException, InterruptedException{Configuration conf = context.getConfiguration();localArchives = DistributedCache.getLocalCacheArchives(conf);localFiles = DistributedCache.getLocalCacheFiles(conf);}public void map(K key, V value, Context context) throws IOException {//使用從緩存文件中獲取的數據context.collect(k, v);}
}
鏈接MapReduce Job
如果問題不是一個MapReduce作業就能解決,就需要在工作流中安排多個MapReduce作業,讓它們配合起來自動完成一些復雜的任務,而不需要用戶手動啟動每一個作業。
1、線性MapReduce Job流
最簡單的辦法是設置多個有一定順序的Job,每個Job以前一個Job的輸入作為輸入,經過處理,將數據再輸入到下一個Job中。
這種辦法的實現非常簡單,將每個Job的啟動代碼設置成只有上一個Job結束之后才執行,然后將Job的輸入設置成上一個Job的輸出路徑。
2、復雜MapReduce Job流
第一種方法在某些復雜任務下仍然不能滿足需求。
如Job3需要將Job1和Job2的輸出結果組合起來進行處理。這種情況下Job3的啟動依賴于Job1和Job2的完成,但Job1和Job2之間沒有關系。
針對這種復雜情況,MapReduce框架提供了讓用戶將Job組織成復雜Job流的API--ControlledJob類和JobControl類。這兩個類屬于org.apache.hadoop.mapreduce.lib.jobcontrol包。
具體做法:
先按照正常情況配置各個Job;
配置完成后再將各個Job封裝到對應的ControlledJob對象中;
然后使用ControlledJob的addDependingJob()設置依賴關系;
接著再實例化一個JobControl對象,并使用addJob()方法將所有的Job注入JobControl對象中;
最后使用JobControl對象的run方法啟動Job流。
3、Job設置預處理和后處理過程
org.apache.hadoop.mapred.lib包下的ChainMapper和ChainReducer兩個靜態類來實現。
The ChainMapper class allows to use multiple Mapper classes within a single Map task.
The Mapper classes are invoked in a chained (or piped) fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.
The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.
For each record output by the Reducer, the Mapper classes are invoked in a chained (or piped) fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output. ?
The key functionality of this feature is that the Mappers in the chain do not need to be aware that they are executed in a chain. This enables having reusable specialized Mappers that can be combined to perform composite operations within a single task.
Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain. It is assumed all Mappers and the Reduce in the chain use maching output and input key and value classes as no conversion is done by the chaining code.
Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like[MAP+ / REDUCE MAP*]
. And immediate benefit of this pattern is a dramatic reduction in disk IO.
IMPORTANT: There is no need to specify the output key/value classes for the ChainMapper, this is done by the addMapper for the last mapper in the chain.
ChainMapper usage pattern:
...conf.setJobName("chain");conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);JobConf mapAConf = new JobConf(false);...ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,Text.class, Text.class, true, mapAConf);
JobConf mapBConf = new JobConf(false);...ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class,LongWritable.class, Text.class, false, mapBConf);
JobConf reduceConf = new JobConf(false);...ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class,Text.class, Text.class, true, reduceConf);
ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class,LongWritable.class, Text.class, false, null);
ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,LongWritable.class, LongWritable.class, true, null);
FileInputFormat.setInputPaths(conf, inDir);FileOutputFormat.setOutputPath(conf, outDir);...
JobClient jc = new JobClient(conf);RunningJob job = jc.submitJob(conf);...