本文介紹hadoop中的MapReduce技術的應用,使用java API。操作系統:Ubuntu24.04。
MapReduce概述
MapReduce概念
MapReduce是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在一個Hadoop集群上。
MapReduce核心思想
分布式的運算程序往往需要分成至少2個階段。
第一個階段的MapTask并發實例,完全并行運行,互不相干。
第二個階段的ReduceTask并發實例互不相干,但是他們的數據依賴于上一個階段的所有MapTask并發實例的輸出。
MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個MapReduce程序,串行運行。
MapReduce 進程
MrAppMaster:負責整個程序的過程調度及狀態調度
MapTask:負責 Map 階段的整個數據處理流程
ReduceTask:負責 Reduce 階段的整個數據處理流程
創建軟件包
新建一個MapReduce軟件包
編寫Mapper類
Mapper類將單詞文本進行切割,切割成一個個的單詞,寫入到上下文中
(1)按行讀取,通過split函數進行切割,將切割出來的一個個單詞放到數組words中
(2)遍歷數組words,將存在的單詞數據存儲到word中,然后將word寫入到context上下文(使Redcue程序能訪問到數據)
核心代碼:
package MapReduce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {// 輸出Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1 獲取一行String line = value.toString();// 2 切割String[] words = line.split(" ");// 3 輸出for (String word : words) {k.set(word);context.write(k, v);}}
}
編寫Reducer類
Reducer類
(1)將每個單詞統計次數結果進行求和合并
(2)把統計結果依次寫入到context上下文中
核心代碼:
package MapReduce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 聲明變量 用于存儲聚合完的結果long count = 0;// 遍歷相同的 key 獲取對應的所有 valuefor (IntWritable value : values) {count += value.get();}// 將聚合完的結果寫到 MapReduce 框架context.write(key, new LongWritable(count));}
}
編寫Driver類
Driver類中,需要進行以下操作:
獲取job 設置jar包路徑
關聯Mapper、Reducer
設置map輸出的k,v類型
最終輸出的k,v類型
設置輸入路徑和輸出路徑
提交job
核心代碼:
package MapReduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 0. 自定義配置對象Configuration conf = new Configuration();// 1. 創建 Job 對象,參數可取消Job job = Job.getInstance(conf);// 2. 給 Job 對象添加 Mapper 類的 Classjob.setMapperClass(WordCountMapper.class);// 3. 給 Job 對象添加 Reduce 類的 Classjob.setReducerClass(WordCountReducer.class);// 4. 給 Job 對象添加 Driver 類的 Classjob.setJarByClass(WordCountDriver.class);// 5. 設置 Mapper 輸出的數據的 key 類型job.setMapOutputKeyClass(Text.class);// 6. 設置 Mapper 輸出的數據的 value 類型job.setMapOutputValueClass(IntWritable.class);// 7. 設置 Reduce 輸出的數據的 key 類型job.setOutputKeyClass(Text.class);// 8. 設置 Reduce 輸出的數據的 value 類型job.setOutputValueClass(LongWritable.class);// 定義uri字符串// String uri = "hdfs://master:9000";// 9. 設置 MapReduce 任務的輸入路徑FileInputFormat.setInputPaths(job, new Path(args[0]));// 10.設置 MapReduce 任務的輸出路徑FileOutputFormat.setOutputPath(job, new Path(args[1]));// 11.提交任務boolean result = job.waitForCompletion(true);// 12.退出返回System.exit(result ? 0 : 1);}
}
打包
在IDEA中,選擇最右邊的“Maven”選項卡,展開旁邊的” Lifecycle → package”,雙擊,在最左邊的Project面板中,找到” src → target”,就能發現生成了一個jar文件,我這里是“Spark-1.0-SNAPSHOT.jar”。
找到這個文件,在文件資源管理器打開,上傳這個文件。可以修改成一個簡單的名字,如“mr.jar”,然后放到一個你容易找到的地方,例如桌面上。利用XShell把這個文件上傳到hadoop集群中
在hadoop集群中執行MapReduce程序
先準備好需要統計詞頻的文件,用瀏覽器打開hadoop的Web UI,輸入地址:
http://hadoop101:9870/
然后選擇“Utilities”菜單下的“Browser the file system”,我創建了一個文件夾“wordcount”
進入“wordcount”文件夾,我繼續創建了一個文件夾“input”。繼續進入“input”文件夾,我上傳了兩個文件“”file01.txt 和“file02.txt”,內容分別為:
Hello MapReduce Bye MapReduce
Hello Hadoop Goodbye Hadoop
進入主機master,打開命令行窗口,輸入下列命令來執行上傳的MapReduce程序:
cd /home/youka
hadoop jar mr.jar mr.WordCountDriver /wordcount/input /wordcount/output
系統執行后,報錯了
Could not find or load main class org.apache.hadoop.mapreduce.v2.app.MRAppMaster:
這里給出的報錯信息非常明確,就是mapreduce配置文件沒有配好,先打開hadoop中mapreduce配置文件:
vim /usr/local/hadoop/etc/hadoop/mapred-site.xml
在configuration中增加一下配置:
<property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
<property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
<property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
重新執行,成功
在“wordcount”中增加了一個”output”文件夾
打開后多了兩個文件
“part-r-00000”文件顯示了統計結果