章節內容
上一節我們完成了:
- MapReduce的介紹
- Hadoop序列化介紹
- Mapper編寫規范
- Reducer編寫規范
- Driver編寫規范
- WordCount功能開發
- WordCount本地測試
背景介紹
這里是三臺公網云服務器,每臺 2C4G,搭建一個Hadoop的學習環境,供我學習。
之前已經在 VM 虛擬機上搭建過一次,但是沒留下筆記,這次趁著前幾天薅羊毛的3臺機器,趕緊嘗試在公網上搭建體驗一下。
注意,如果你和我一樣,打算用公網部署,那一定要做好防火墻策略,避免不必要的麻煩!!!
請大家都以學習為目的,也請不要對我的服務進行嗅探或者攻擊!!!
但是有一臺公網服務器我還運行著別的服務,比如前幾天發的:autodl-keeper 自己寫的小工具,防止AutoDL機器過期的。還跑著別的Web服務,所以只能擠出一臺 2C2G 的機器。那我的配置如下了:
- 2C4G 編號 h121
- 2C4G 編號 h122
- 2C2G 編號 h123
業務需求
平常我們在業務上,有很多時候表都是分開的,通過一些 id
或者 code
來進行關聯。
在大數據的情況下,也有很多這種情況,我們需要進行聯表操作。
表1
項目編碼projectCode 項目名projectName
表2
項目編碼projectCode 項目類型projectType 項目分類projectFrom
在 SQL
中,可以通過 LEFT JOIN
來實現字段補齊。大數據下,也需要進行這樣的操作,我們需要借助 MapReduce
。
表1測試
"8aea9ba2-435c-48bd-9751-1cbd4c344d4e" "社區項目1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8" "社區項目2"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb" "智慧社區"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393" "公交站點"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c" "街道布建"
"2e556d83-bb56-45b1-8d6e-00510902c464" "街道公交站點"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9" "未命名項目1"
"5a5982d7-7257-422f-822a-a0c2f31c28d1" "未命名項目2"
表2測試
"8aea9ba2-435c-48bd-9751-1cbd4c344d4e" "重要類型" "種類1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8" "重要類型" "種類1"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb" "重要類型" "種類1"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393" "普通類型" "種類1"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c" "普通類型" "種類2"
"2e556d83-bb56-45b1-8d6e-00510902c464" "普通類型" "種類2"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9" "一般類型" "種類2"
"5a5982d7-7257-422f-822a-a0c2f31c28d1" "一般類型" "種類2"
SQL連表
假設我們使用SQL的方式聯表:
SELECT*
FROMt_project
LEFT JOINt_project_info
ONt_project.projectCode=t_project_info.projectCode
Reduce JOIN
有時候,表可能過大,無法支持我們使用 SQL 進行連表查詢。
這里我們編寫一個程序來完成操作。
ProjectBean
這里是最終的Bean類,里邊是兩個表把字段補齊的結果,一會兒我們將使用這個類進行表的連接。
package icu.wzk.demo03;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class ProjectBean implements Writable {private String projectCode;private String projectName;private String projectType;private String projectFrom;private String flag;@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(projectCode);dataOutput.writeUTF(projectName);dataOutput.writeUTF(projectType);dataOutput.writeUTF(projectFrom);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.projectCode = dataInput.readUTF();this.projectName = dataInput.readUTF();this.projectType = dataInput.readUTF();this.projectFrom = dataInput.readUTF();this.flag = dataInput.readUTF();}public ProjectBean(String projectCode, String projectName, String projectType, String projectFrom, String flag) {this.projectCode = projectCode;this.projectName = projectName;this.projectType = projectType;this.projectFrom = projectFrom;this.flag = flag;}public ProjectBean() {}@Overridepublic String toString() {return "ProjectBean{" +"projectCode='" + projectCode + '\'' +", projectName='" + projectName + '\'' +", projectType='" + projectType + '\'' +", projectFrom='" + projectFrom + '\'' +", flag=" + flag + '\'' +'}';}public String getProjectCode() {return projectCode;}public void setProjectCode(String projectCode) {this.projectCode = projectCode;}public String getProjectName() {return projectName;}public void setProjectName(String projectName) {this.projectName = projectName;}public String getProjectType() {return projectType;}public void setProjectType(String projectType) {this.projectType = projectType;}public String getProjectFrom() {return projectFrom;}public void setProjectFrom(String projectFrom) {this.projectFrom = projectFrom;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}
}
Reduce Driver
package icu.wzk.demo03;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReducerJoinDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// String inputPath = args[0];// String outputPath = args[1];// === 測試環境 ===String inputPath = "project_test";String outputPath = "project_test_output";// === ===Configuration configuration = new Configuration();Job job = Job.getInstance(configuration, "ReducerJoinDriver");job.setJarByClass(ReducerJoinDriver.class);job.setMapperClass(ReducerJoinMapper.class);job.setReducerClass(ReducerJoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(ProjectBean.class);job.setOutputKeyClass(ProjectBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}
ReduceMapper
package icu.wzk.demo03;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class ReducerJoinMapper extends Mapper<LongWritable, Text, Text, ProjectBean> {String name;ProjectBean projectBean = new ProjectBean();Text k = new Text();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {// 獲取路徑信息name = context.getInputSplit().toString();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {String line = value.toString();if (name.contains("layout_project")) {// layout_projectString[] fields = line.split("\t");projectBean.setProjectCode(fields[0]);projectBean.setProjectName(fields[1]);projectBean.setProjectType("");projectBean.setProjectFrom("");projectBean.setFlag("layout_project");// projectCode 關聯k.set(fields[0]);} else {// project_infoString[] fields = line.split("\t");projectBean.setProjectCode(fields[0]);projectBean.setProjectName("");projectBean.setProjectType(fields[1]);projectBean.setProjectFrom(fields[2]);projectBean.setFlag("project_info");// projectCode 關聯k.set(fields[0]);}context.write(k, projectBean);}
}
ReduceReducer
package icu.wzk.demo03;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class ReducerJoinReducer extends Reducer<Text, ProjectBean, ProjectBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<ProjectBean> values, Reducer<Text, ProjectBean, ProjectBean, NullWritable>.Context context) throws IOException, InterruptedException {List<ProjectBean> dataList = new ArrayList<>();ProjectBean deviceProjectBean = new ProjectBean();for (ProjectBean pb : values) {if ("layout_project".equals(pb.getFlag())) {// layout_projectProjectBean projectProjectBean = new ProjectBean(pb.getProjectCode(),pb.getProjectName(),pb.getProjectType(),pb.getProjectFrom(),pb.getFlag());dataList.add(projectProjectBean);} else {// project_infodeviceProjectBean = new ProjectBean(pb.getProjectCode(),pb.getProjectName(),pb.getProjectType(),pb.getProjectFrom(),pb.getFlag());}}for (ProjectBean pb : dataList) {pb.setProjectType(deviceProjectBean.getProjectType());pb.setProjectFrom(deviceProjectBean.getProjectFrom());context.write(pb, NullWritable.get());}}
}
運行結果
ProjectBean{projectCode='"02d9c090-e467-42b6-9c14-52cacd72a4a8"', projectName='"社區項目2"', projectType='"重要類型"', projectFrom='"種類1"', flag=layout_project'}
ProjectBean{projectCode='"244dcaca-0778-4eec-b3a2-403f8fac1dfb"', projectName='"智慧社區"', projectType='"重要類型"', projectFrom='"種類1"', flag=layout_project'}
ProjectBean{projectCode='"2e556d83-bb56-45b1-8d6e-00510902c464"', projectName='"街道公交站點"', projectType='"普通類型"', projectFrom='"種類2"', flag=layout_project'}
ProjectBean{projectCode='"3ba00542-eac9-4399-9c2b-3b06e671f4c9"', projectName='"未命名項目1"', projectType='"一般類型"', projectFrom='"種類2"', flag=layout_project'}
ProjectBean{projectCode='"5a5982d7-7257-422f-822a-a0c2f31c28d1"', projectName='"未命名項目2"', projectType='"一般類型"', projectFrom='"種類2"', flag=layout_project'}
ProjectBean{projectCode='"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"', projectName='"社區項目1"', projectType='"重要類型"', projectFrom='"種類1"', flag=layout_project'}
ProjectBean{projectCode='"94befb97-d1af-43f2-b5d5-6df9ce5b9393"', projectName='"公交站點"', projectType='"普通類型"', projectFrom='"種類1"', flag=layout_project'}
ProjectBean{projectCode='"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"', projectName='"街道布建"', projectType='"普通類型"', projectFrom='"種類2"', flag=layout_project'}
方案缺點
JOIN
操作是在 reduce
階段完成的,reduce端處理壓力過大
,map
節點的運算負載很低
,資源利用
率不高
。