實驗4 ?大數據去重
1.實驗目的
通過Hadoop數據去重實驗,學生可以掌握準備數據、偽分布式文件系統配置方法,以及在集成開發環境Eclipse中實現Hadoop數據去重方法。
2.實驗要求
了解基于Hadoop處理平臺的大數據去重過程,理解其主要功能,并能夠在Hadoop環境下獨立完成。
(1)制訂實驗計劃。
(2)準備數據。
(3)偽分布式文件系統配置。
(4)在集成開發環境Eclipse中實現Hadoop數據去重。
3.實驗內容
(1)制訂實驗計劃。
(2)進入“/usr/local/hadoop”目錄。
(3)準備數據。
(4)修改“/usr/local/hadoop/etc/hadoop/”目錄下的Hadoop配置文件。
(5)NameNode格式化。
(6)在集成開發環境Eclipse中實現Hadoop數據去重。
4.實驗總結
通過本實驗,使學生了解Hadoop數據去重的特點和過程、理解MapReduce程序的執行過程,掌握NameNode的格式化方法、Hadoop的配置文件的修改和Eclipse開發環境下實現Hadoop數據去重的方法。
5.思考拓展
(1)為什么需要NameNode格式化?說明NameNode格式化方法。
1.清空dfs.name.dir和dfs.name.edits.dir兩個目錄下的所有文件
2.在目錄dfs.name.dir下創建文件:
[plain] view plaincopy
{dfs.name.dir}/current/fsimage ?
{dfs.name.dir}/current/fstime ?
{dfs.name.dir}/current/VERSION ?
{dfs.name.dir}/image/fsimage ?
- 1
- 2
- 3
- 4
- 5
3.在目錄dfs.name.edits.dir下創建文件:
[plain] view plaincopy
{dfs.name.edits.dir}/current/edits ?
{dfs.name.edits.dir}/current/fstime ?
????????{dfs.name.edits.dir}/current/VERSION ?
????????{dfs.name.edits.dir}/image/fsimage
(2)為什么需要數據去重?說明Hadoop數據去重的主要優勢。
與傳統的數據倉庫相比,Hadoop 的分布式架構,實現了既能夠處理關系型數據庫當中的結構化數據,也能夠處理例如視頻、音頻圖片等非結構化數據,并且還能根據數據任務的規模和復雜程度,實現輕松的擴展。
所以 Hadoop能處理哪些類型數據?概括點來說,就是傳統的結構化數據,文字圖片等,以及非結構化的數據,視頻、音頻等,都能基于Hadoop框架技術得到合理的處理
Hadoop處理大數據,主要通過分布式技術來解決各種類型的數據問題一-
并行化問題:處理數據的應用程序要改造成適合并行的方式;資源分配管理問題:如何有效的管理提交任務的資源,內存、網絡、磁盤等;
容錯問題:隨著機器數量的增加,可靠性如何保證,例如部分機器硬件出錯導致不可用,最終結果的完整性和正確性如何保證。
(3)結合MapReduce程序執行過程,說明Hadoop數據去重是離線處理還是在線處理。
1. MapReduce 定義
Hadoop中的 MapReduce是一個使用簡單的軟件框架,基于它寫出來的應用程序能夠運行在由上千個商用機器組成的大型集群上,并以一種可靠容錯式并行處理TB級別的數據集
2. MapReduce 特點
MapReduce?之所以如此受歡迎,它主要有以下幾個特點。:
- MapReduce 易于編程。它簡單的實現一些接口,就可以完成一個分布式程序,這個分布式程序可以分布到大量廉價的 PC 機器運行。也就是說你寫一個分布式程序,跟寫一個簡單的串行程序是一模一樣的。 就是因為這個特點使得 MapReduce 編程變得非常流行。
- 良好的擴展性。當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴展它的計算能力。
**- 高容錯性。**MapReduce 設計的初衷就是使程序能夠部署在廉價的 PC 機器上,這就要求它具有很高的容錯性。比如其中一臺機器掛了,它可以把上面的計算任務轉移到另外一個節點上面上運行,不至于這個任務運行失敗,而且這個過程不需要人工參與,而完全是由 hadoop 內部完成的。
- 適合 PB 級以上海量數據的離線處理。這里加紅字體離線處理,說明它適合離線處理而不適合在線處理。比如像毫秒級別的返回一個結果,MapReduce 很難做到。
MapReduce 雖然具有很多的優勢,但是它也有不擅長的地方。這里的不擅長不代表它不能做,而是在有些場景下實現的效果差,并不適合 MapReduce 來處理,主要表現在以下幾個方面。
- 實時計算。MapReduce 無法像 MySQL 一樣,在毫秒或者秒級內返回結果。
- 流式計算。流式計算的輸入數據時動態的,而 MapReduce 的輸入數據集是靜態的,不能動態變化。這是因為 MapReduce 自身的設計特點決定了數據源必須是靜態的。
- DAG(有向圖)計算。多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出。在這種情況下,MapReduce 并不是不能做,而是使用后,每個MapReduce 作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導致性能非常的低下。
3. MapReduce的架構
目前存在兩種 MapReduce 實現,分別是
? 可獨立運行的 MapReduce
它由兩類服務組成,分別是 JobTracker 和 TaskTraker,其中 JobTracker 存在單點故障問題,本文提到的單點故障實際上是第一種實現中JobTracker的單點故障。
? MapReduce On YARN
在這種實現中,每個作業獨立使用一個作業跟蹤器(ApplicationMaster),彼此之間不再相互影響,不存在單點故障問題。
(4)說明在集成開發環境Eclipse中實現Hadoop數據去重的主要過程。
一、MapReduce 模型簡介
MapReduce?將復雜的、運行于大規模集群上的并行計算過程高度地抽象到了兩個函數:Map?和?Reduce?。它采用?“?分而治之?”?策略,一個存儲在分布式文件系統中的大規模數據集,會被切分成許多獨立的分片(split?),這些分片可以被多個 Map?任務并行處理。
1.Map 和 Reduce 函數
2.MapReduce 體系結構
MapReduce?體系結構主要由四個部分組成,分別是:?Client?、?JobTracker、?TaskTracker 以及?Task
1)Client
用戶編寫的MapReduce程序通過Client提交到JobTracker端 用戶可通過Client提供的一些接口查看作業運行狀態
2)JobTracker
JobTracker負責資源監控和作業調度 JobTracker 監控所有TaskTracker與Job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點 JobTracker 會跟蹤任務的執行進度、資源使用量等信息,并將這些信息告訴任務調度器(TaskScheduler),而調度器會在資源出現空閑時,選擇合適的任務去使用這些資源
3)TaskTracker
TaskTracker 會周期性地通過“心跳”將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker 發送過來的命令并執行相應的操作(如啟動新任務、殺死任務等) TaskTracker 使用“slot”等量劃分本節點上的資源量(CPU、內存等)。一個Task 獲取到一個slot 后才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce?slot 兩種,分別供MapTask 和Reduce Task 使用
4)Task
Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動
3.MapReduce 工作流程
1) 工作流程概述
?
- 不同的Map任務之間不會進行通信
- 不同的Reduce任務之間也不會發生任何信息交換
- 用戶不能顯式地從一臺機器向另一臺機器發送消息
- 所有的數據交換都是通過MapReduce框架自身去實現的
2) MapReduce各個執行階段
?4.MapReduce 應用程序執行過程
?
?二、MapReduce 實戰
1.數據去重
"數據去重"主要是為了掌握和利用并行化思想來對數據進行有意義的篩選。統計大數據集上的數據種類個數、從網站日志中計算訪問地等這些看似龐雜的任務都會涉及數據去重。
1.1實例描述
對數據文件中的數據進行去重。數據文件中的每行都是一個數據。樣例輸入如下所示:
1)file1:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2)file2:
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
樣例輸出如下所示:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
1.2 解題思路
map階段:將每一行的文本作為鍵值對的key
?reduce階段:將每一個公用的鍵組輸出
1.3 代碼展示
package?datadeduplicate.pers.xls.datadeduplicate;
?
import?java.io.IOException;
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import?org.apache.log4j.BasicConfigurator;
?
public?class?Deduplication?{
????public?static?void?main(String[] args)?throws?Exception {
???? BasicConfigurator.configure(); //自動快速地使用缺省Log4j環境
???? //必須要傳遞的是自定的mapper和reducer的類,輸入輸出的路徑必須指定,輸出的類型<k3,v3>必須指定
???? //1首先寫job,知道需要conf和jobname在去創建即可
????????????????Configuration conf=new?Configuration();
????????????????String jobName=Deduplication.class.getSimpleName();
????????????????Job?job?= Job.getInstance(conf, jobName);
????????????????//2將自定義的MyMapper和MyReducer組裝在一起
????????????????//3讀取HDFS內容:FileInputFormat在mapreduce.lib包下
????????????????FileInputFormat.setInputPaths(job, new?Path(args[0]));
????????????????//4指定解析<k1,v1>的類(誰來解析鍵值對)
????????????????//*指定解析的類可以省略不寫,因為設置解析類默認的就是TextInputFormat.class
????????????????job.setInputFormatClass(TextInputFormat.class);
????????????????//5指定自定義mapper類
????????????????job.setMapperClass(MyMapper.class);
????????????????//6指定map輸出的key2的類型和value2的類型 ?<k2,v2>
????????????????//*下面兩步可以省略,當<k3,v3>和<k2,v2>類型一致的時候,<k2,v2>類型可以不指定
????????????????job.setMapOutputKeyClass(Text.class);
????????????????job.setMapOutputValueClass(Text.class);
????????????????//7分區(默認1個),排序,分組,規約 采用 默認
????????????????job.setCombinerClass(MyReducer.class);
????????????????//接下來采用reduce步驟
????????????????//8指定自定義的reduce類
????????????????job.setReducerClass(MyReducer.class);
????????????????//9指定輸出的<k3,v3>類型
????????????????job.setOutputKeyClass(Text.class);
????????????????job.setOutputValueClass(Text.class);
????????????????//10指定輸出<K3,V3>的類
?????????????????//*下面這一步可以省
????????????????job.setOutputFormatClass(TextOutputFormat.class);
????????????????//11指定輸出路徑
????????????????FileOutputFormat.setOutputPath(job, new?Path(args[1]));
????????????????//12寫的mapreduce程序要交給resource manager運行
????????????????job.waitForCompletion(true);
????????????????//*13最后,如果要打包運行改程序,則需要調用如下行
????????????????job.setJarByClass(Deduplication.class);
????}
????private?static?class?MyMapper?extends?Mapper<Object, Text, Text, Text>{
????????private?static?Text line=new?Text();
????????@Override
????????protected?void?map(Object k1, Text v1,Mapper<Object, Text, Text, Text>.Context context)?throws?IOException, InterruptedException {
????????????line=v1;//v1為每行數據,賦值給line
????????????context.write(line, new?Text(""));
?????????}
????}
????private?static?class?MyReducer?extends?Reducer<Text, Text, Text, Text>
????{
????????@Override
????????protected?void?reduce(Text k2, Iterable<Text> v2s,Reducer<Text, Text, Text, Text>.Context context)?throws?IOException, InterruptedException {
?????????????context.write(k2, new?Text(""));
?????????}
????}
}
1.4 運行結果展示
打包項目成可運行的jar包,上傳的hdfs文件系統:
?
?在linux系統下終端輸入hadoop命令,在建立的hadoop節點上運行jar包:
?查看eclipse中hdfs文件系統下out文件夾,發現生成了先前指定的deduplication文件夾,其中part-r-00000為運行的輸出。
?2.數據排序
package?dararank.pers.xls.datarank;
?
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
import?org.apache.log4j.BasicConfigurator;
import?java.io.IOException;
?
public?class?DataRank?{
????/**
?????* 使用Mapper將數據文件中的數據本身作為Mapper輸出的key直接輸出
?????*/
????public?static?class?forSortedMapper?extends?Mapper<Object, Text, IntWritable, IntWritable> {
????????private?IntWritable?mapperValue?= new?IntWritable(); //存放key的值
????????public?void?map(Object key, Text value, Context context)
????????????????throws?IOException, InterruptedException {
????????????String?line?= value.toString(); //獲取讀取的值,轉化為String
????????????mapperValue.set(Integer.parseInt(line)); //將String轉化為Int類型
????????????context.write(mapperValue,new?IntWritable(1)); //將每一條記錄標記為(key,value) key--數字 value--出現的次數
??????????//每出現一次就標記為(number,1)
????????}
????}
?
????/**
?????* 使用Reducer將輸入的key本身作為key直接輸出
?????*/
?public?static?class?forSortedReducer?extends?Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
????????private?IntWritable?postion?= new?IntWritable(1); //存放名次
????????@Override
????????protected?void?reduce(IntWritable key, Iterable<IntWritable> values, Context context)?throws?IOException, InterruptedException {
????????????for?(IntWritable item :values){ //同一個數字可能出多次,就要多次并列排序
????????????????context.write(postion,key); //寫入名次和具體數字
????????????????System.out.println(postion + "\t"+ key);
????????????????postion = new?IntWritable(postion.get()+1); //名次加1
????????????}
????????}
????}
?
?
????public?static?void?main(String[] args)?throws?Exception {
?
???? BasicConfigurator.configure(); //自動快速地使用缺省Log4j環境
????????
???? Configuration?conf?= new?Configuration(); //設置MapReduce的配置
????????String[] otherArgs = new?GenericOptionsParser(conf,args).getRemainingArgs();
????????if(otherArgs.length < 2){
????????????System.out.println("Usage: datarank <in> [<in>...] <out>");
????????????System.exit(2);
????????}
????????//設置作業
????????//Job job = new Job(conf);
????????Job?job?= Job.getInstance(conf);
????????job.setJarByClass(DataRank.class);
????????job.setJobName("DataRank");
????????//設置處理map,reduce的類
????????job.setMapperClass(forSortedMapper.class);
????????job.setReducerClass(forSortedReducer.class);
????????//設置輸入輸出格式的處理
????????job.setOutputKeyClass(IntWritable.class);
????????job.setOutputValueClass(IntWritable.class);
????????//設定輸入輸出路徑
????????for?(int?i?= 0; i < otherArgs.length-1;++i){
????????????FileInputFormat.addInputPath(job,new?Path(otherArgs[i]));
????????}
????????FileOutputFormat.setOutputPath(job, new?Path(otherArgs[otherArgs.length-1]));
????????System.exit(job.waitForCompletion(true)?0:1);
????}
?
}
3.平均成績
package?averagescoreapp.pers.xls.averagescoreapp;
?
import?java.io.IOException;
import?java.util.StringTokenizer;
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.DoubleWritable;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
import?org.apache.log4j.BasicConfigurator;
?
/**
?* 求平均成績
?*
?*/
public?class?AverageScoreApp?{
?
public?static?class?Map?extends?Mapper<Object, Text, Text, IntWritable>{
@Override
protected?void?map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)?throws?IOException, InterruptedException {
//成績的結構是:
// 張三 80
// 李四 82
// 王五 86
StringTokenizer?tokenizer?= new?StringTokenizer(value.toString(), "\n");
while(tokenizer.hasMoreElements()) {
StringTokenizer?lineTokenizer?= new?StringTokenizer(tokenizer.nextToken());
String?name?= lineTokenizer.nextToken(); //姓名
String?score?= lineTokenizer.nextToken();//成績
context.write(new?Text(name), new?IntWritable(Integer.parseInt(score)));
}
}
}
public?static?class?Reduce?extends?Reducer<Text, IntWritable, Text, DoubleWritable>{
@Override
protected?void?reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, DoubleWritable>.Context context)
throws?IOException, InterruptedException {
//reduce這里輸入的數據結構是:
// 張三 <80,85,90>
// 李四 <82,88,94>
// 王五 <86,80,92>
int?sum?= 0;//所有課程成績總分
double?average?= 0;//平均成績
int?courseNum?= 0; //課程數目
for(IntWritable score:values) {
sum += score.get();
courseNum++;
}
average = sum/courseNum;
context.write(new?Text(key), new?DoubleWritable(average));
}
}
public?static?void?main(String[] args)?throws?Exception{
BasicConfigurator.configure(); //自動快速地使用缺省Log4j環境
Configuration?conf?= new?Configuration();
String[] otherArgs = new?GenericOptionsParser(conf,args).getRemainingArgs();
????????if(otherArgs.length < 2){
????????????System.out.println("Usage: AverageScoreRank <in> [<in>...] <out>");
????????????System.exit(2);
????????}
Job?job?= Job.getInstance(conf);
job.setJarByClass(AverageScoreApp.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
?//設定輸入輸出路徑
????????for?(int?i?= 0; i < otherArgs.length-1;++i){
????????????FileInputFormat.addInputPath(job,new?Path(otherArgs[i]));
????????}
????????FileOutputFormat.setOutputPath(job, new?Path(otherArgs[otherArgs.length-1]));
System.exit(job.waitForCompletion(true)?0:1);
}
?
}
?4.單表關聯
package?singletabblerelation.pers.xls.singletablerelation;
?
import?java.io.IOException;
import?java.util.ArrayList;
import?java.util.List;
import?java.util.StringTokenizer;
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.LongWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
import?org.apache.log4j.BasicConfigurator;
?
public?class?SingleTableRelation?{
????public?static?int?time?= 0;
????public?static?class?Map?extends?Mapper<LongWritable, Text, Text, Text> {
????protected?void?map(LongWritable key, Text value, Context context)throws?java.io.IOException, InterruptedException {
???????? // 左右表的標識
????????????int?relation;
????????????StringTokenizer?tokenizer?= new?StringTokenizer(value.toString());
????????????String?child?= tokenizer.nextToken();
????????????String?parent?= tokenizer.nextToken();
????????????if?(child.compareTo("child") != 0) {
????????????????// 左表
????????????????relation = 1;
????????????????context.write(new?Text(parent), new?Text(relation + "+"?+ child));
????????????????// 右表
????????????????relation = 2;
????????????????context.write(new?Text(child), new?Text(relation + "+"?+ parent));
????????????}
????????};
?
????}
?
????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {
????????protected?void?reduce(Text key, Iterable<Text> values,
????????????????Reducer<Text, Text, Text, Text>.Context output)
????????????????throws?java.io.IOException, InterruptedException {
????????????int?grandchildnum?= 0;
????????????int?grandparentnum?= 0;
????????????List<String> grandchilds = new?ArrayList<>();
????????????List<String> grandparents = new?ArrayList<>();
????????????/** 輸出表頭 */
????????????if?(time == 0) {
????????????????output.write(new?Text("grandchild"), new?Text("grandparent"));
????????????????time++;
????????????}
????????????for?(Text val : values) {
????????????????String?record?= val.toString();
????????????????char?relation?= record.charAt(0);
????????????????// 取出此時key所對應的child
????????????????if?(relation == '1') {
????????????????????String?child?= record.substring(2);
????????????????????grandchilds.add(child);
????????????????????grandchildnum++;
????????????????}
????????????????// 取出此時key所對應的parent
????????????????else?{
????????????????????String?parent?= record.substring(2);
????????????????????grandparents.add(parent);
????????????????????grandparentnum++;
????????????????}
????????????}
????????????if?(grandchildnum != 0?&& grandparentnum != 0) {
????????????????for?(int?i?= 0; i < grandchildnum; i++)
????????????????????for?(int?j?= 0; j < grandparentnum; j++)
????????????????????????output.write(new?Text(grandchilds.get(i)), new?Text(
????????????????????????????????grandparents.get(j)));
????????????}
?
????????}
????}
?
????public?static?void?main(String[] args)?throws?IOException, ClassNotFoundException, InterruptedException {
BasicConfigurator.configure(); //自動快速地使用缺省Log4j環境
//必須要傳遞的是自定的mapper和reducer的類,輸入輸出的路徑必須指定,輸出的類型<k3,v3>必須指定
????????????//2將自定義的MyMapper和MyReducer組裝在一起
????????????Configuration conf=new?Configuration();
????????????String[] otherArgs = new?GenericOptionsParser(conf,args).getRemainingArgs();
????????????if(otherArgs.length < 2){
????????????????System.out.println("Usage: SingleTableRelation <in> [<in>...] <out>");
????????????????System.exit(2);
????????????}
????????????String jobName=SingleTableRelation.class.getSimpleName();
????????????//1首先寫job,知道需要conf和jobname在去創建即可
?????????????Job?job?= Job.getInstance(conf, jobName);
????????job.setJarByClass(SingleTableRelation.class);
????????job.setMapperClass(Map.class);
????????job.setReducerClass(Reduce.class);
????????job.setOutputKeyClass(Text.class);
????????job.setOutputValueClass(Text.class);
????????//設定輸入輸出路徑
????????for?(int?i?= 0; i < otherArgs.length-1;++i){
????????????FileInputFormat.addInputPath(job,new?Path(otherArgs[i]));
????????}
????????FileOutputFormat.setOutputPath(job, new?Path(otherArgs[otherArgs.length-1])); ??????
????????System.exit((job.waitForCompletion(true) ? 0?: 1));
????}
}
?三、總結
hadoop?是一個分布式的基礎架構,利用分布式實現高效的計算與儲存,最核心的設計在于 HDFS?與?MapReduce?。
HDFS?在集群上實現了分布式文件系統,?MapReduce?則在集群上實現了分布式計算和任務處理。HDFS?在?MapReduce?任務處理過程中提供了對文件操作和存儲等的支持。而MapReduce在?HDFS?的基礎上實現任務的分發、跟蹤和執行等工作,并收集結果,兩種相互作用,完成了 Hadoop?分布式集群的主要任務。
通過這四個實戰的題目我進一步掌握了?Hadoop?架構在現實生活中的應用。