(一)ETL介紹
“ETL,是英文Extract-Transform-Load的縮寫,用來描述將數據從來源端經過抽取(Extract)、轉換(Transform)、加載(Load)至目的端的過程。ETL一詞較常用在數據倉庫,但其對象并不限于數據倉庫。
在Transform的過程中,我們經常會做數據清洗這個操作。它是指對采集到的原始數據進行預處理,以去除錯誤、重復、不完整或不一致的數據,使數據符合分析要求的過程。它在整個數據分析和數據處理流程中處于非常重要的位置,因為數據質量的好壞直接影響到后續分析結果的準確性和可靠性。
清理的過程往往只需要運行Mapper程序,不需要運行Reduce程序。
(二)需求分析
我們有去除日志中字段個數小于等于11的日志。
(1)輸入數據
(2)期望輸出數據:每行字段長度都大于11。
需要在Map階段對輸入的數據根據規則進行過濾清洗,并不需要進行匯總。
(三)思路分析
map階段:按行讀入內容,對內容進行檢查,如果字段的個數少于等于11,就刪除這條日志(不保留)去除日志中字段個數小于等于11的日志內容。
對于map函數來說,它的輸入參數是:<偏移量,第一行的內容>
<偏移量,每一行的內容> → <刷選后的沒一行的內容,null>
對于reduce函數來說,它的輸入參數是:<刷選后的每一行的內容,[null,null,...]>,對于我們的需求來說,并不需要這個階段。
(三)實現代碼
在之前的項目的基礎之上,重寫去寫一個包,并創建兩個類:WebLogMapper和WebLogDriver類。
(1)編寫WebLogMapper類
package com.root.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
????// 1. 獲取一行數據,使用空格進行拆分,判斷是否有8個字段
????String[] fields = value.toString().split(" ");
????if (fields.length > 7) {
????????// 這條數據是有意義的,保留
????????System.out.println(fields[0]);
????????context.write(value, NullWritable.get());
????} else {
????????// 這條數據是無意義的,不保留
????????return;
????}
}
}
- 編寫WebLogDriver類
package com.root.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WebLogDriver {
public static void main(String[] args) throws Exception { ???????
// 1 獲取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加載jar包
job.setJarByClass(LogDriver.class);
// 3 關聯map
job.setMapperClass(WebLogMapper.class);
// 4 設置最終輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 設置reducetask個數為0
job.setNumReduceTasks(0);
// 5 設置輸入和輸出路徑
?????????FileInputFormat.setInputPaths(job, new Path("E:\\vm\\web.log"));
?????????FileOutputFormat.setOutputPath(job, new Path("E:\\vm\\ouput2"));
// 6 提交
?????????boolean b = job.waitForCompletion(true);
?????????System.exit(b ? 0 : 1);
}
}
說明:reduceTask為0,表示沒有reduce階段,那么最終輸出的文件個數與mapperTask的數量一致。