Job執行可以分為本地執行或者集群執行。hadoop集群安裝部署在遠程centos系統中。使用經典的WordCount代碼為例。
1. 本地執行模式(本地為MacOS環境),無需啟動遠程的hadoop集群,本地job會提交給本地執行器LocalJobRunner去執行。
1)輸入輸出數據存放在本地路徑下:
首先,MapReduce代碼如下:
- Mapper
package com.nasuf.hadoop.mr;import java.io.IOException;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] words = StringUtils.split(line, " ");for (String word: words) {context.write(new Text(word), new LongWritable(1));}}}
- Reducer
package com.nasuf.hadoop.mr;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count = 0;for (LongWritable value: values) {count += value.get();}context.write(key, new LongWritable(count));}}
- Runner
package com.nasuf.hadoop.mr;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WCRunner {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 設置整個job所用的類在哪個jar包job.setJarByClass(WCRunner.class);// 本job實用的mapper和reducer的類job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);// 指定reducer的輸出數據kv類型(若不指定下面mapper的輸出類型,此處可以同時表明mapper和reducer的輸出類型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 指定mapper的輸出數據kv類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 指定原始數據存放位置FileInputFormat.setInputPaths(job, new Path("/Users/nasuf/Desktop/wc/srcdata"));// 處理結果的輸出數據存放路徑FileOutputFormat.setOutputPath(job, new Path("/Users/nasuf/Desktop/wc/output"));// 將job提交給集群運行job.waitForCompletion(true);}}
在本地模式中,可以將測試數據存放在"/Users/nasuf/Desktop/wc/srcdata"路徑下,注意輸出路徑不能是已經存在的路徑,不然會拋出異常。
2) 輸入輸出數據存放在hdfs中,需要啟動遠程的hdfs(無需啟動yarn)
修改Runner代碼如下:
// 指定原始數據存放位置FileInputFormat.setInputPaths(job, new Path("hdfs://hdcluster01:9000/wc/srcdata"));// 處理結果的輸出數據存放路徑FileOutputFormat.setOutputPath(job, new Path("hdfs://hdcluster01:9000/wc/output1"));
如果出現如下錯誤:
org.apache.hadoop.security.AccessControlException: Permission denied: user=nasuf, access=WRITE, inode="/wc":parallels:supergroup:drwxr-xr-x
顯然是權限問題。hadoop的用戶目錄是parallels,權限是rwxr-xr-x,而本地操作使用的用戶是nasuf。解決方法如下:在vm啟動參數中加入如下參數:-DHADOOP_USER_NAME=parallels
即可。
2. 集群執行模式(首先需要啟動yarn,job會提交到yarn框架中去執行。訪問http://hdcluster01:8088可以查看job執行狀態。)
1)使用命令直接執行jar
hadoop jar wc.jar com.nasuf.hadoop.mr.WCRunner
查看http://hdcluster01:8088中job執行狀態
2) 通過main方法直接在本地提交job到yarn集群中執行
將$HADOOP_HOME/etc/hadoop/mapred-site.xml 和 yarn-site.xml拷貝到工程的classpath下,直接執行上述代碼,即可提交job到yarn集群中執行。
或者直接在代碼中配置如下參數,與拷貝上述兩個配置文件相同的作用:
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hdcluster01");
conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");
如果出現如下錯誤信息:
2018-08-26 10:25:37,544 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1375)) - Job job_1535213323614_0010 failed with state FAILED due to: Application application_1535213323614_0010 failed 2 times due to AM Container for appattempt_1535213323614_0010_000002 exited with exitCode: -1000 due to: File file:/tmp/hadoop-yarn/staging/nasuf/.staging/job_1535213323614_0010/job.jar does not exist
.Failing this attempt.. Failing the application.
可以將core-site.xml配置文件同時拷貝到classpath中,或者同樣配置如下參數:
conf.set("hadoop.tmp.dir", "/home/parallels/app/hadoop-2.4.1/data/");
即可解決問題。