05 MapReduce應用案例01

1、單詞計數

在一定程度上反映了MapReduce設計的初衷--對日志文件進行分析。

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{//該方法循環調用,從文件的split中讀取每行調用一次,把該行所在的下標為key,該行的內容為valueprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {String[] words = StringUtils.split(value.toString(), ' ');for(String w :words){context.write(new Text(w), new IntWritable(1));}}
}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{//每組調用一次,這一組數據特點:key相同,value可能有多個。protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum =0;for(IntWritable i: values){sum=sum+i.get();}context.write(key, new IntWritable(sum));}
}
public class RunJob {public static void main(String[] args) {Configuration config =new Configuration();
//	config.set("fs.defaultFS", "hdfs://node1:8020");
//	config.set("yarn.resourcemanager.hostname", "node1");
//      config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("wc");job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/wc");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completed!");}} catch (Exception e) {e.printStackTrace();}}
}


2、數據去重

最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。

自然會想到將同一個數據的所有記錄都交給一臺Reduce機器,無論這個數據出現多少次,只要在最終結果中輸出一次就可以了。

將單次計數程序稍加改動即可。

public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable>{protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable>{protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
public class RunJob {public static void main(String[] args) {Configuration config =new Configuration();
//	config.set("fs.defaultFS", "hdfs://node1:8020");
//	config.set("yarn.resourcemanager.hostname", "node1");config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("dedup");job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/dedup");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completed!");}} catch (Exception e) {e.printStackTrace();}}
}



3、排序

對輸入文件中的內容進行排序。

輸入文件中的每行內容均為一個數字,即一個數據。

要求在輸出中每行有兩個間隔的數字,第二個數字代表原始數據,第一個數字代表原始數據的位次。

樣例輸入:

file1:

2

32

654

32

15

765

65223

file2:

5956

22

650

92

file3:

26

54

6

樣例輸出:

1 2

2 6

3 15

4 22

5 26

6 32

7 32

8 54

9 92

10 650

11 654

12 756

13 5956

14 65223


設計思路:

可以利用MapReduce過程中默認的排序,而不需要自己再實現排序。

重點:

1、待排序數據作為Map任務的key;

2、需要重寫partition類,保證整體有序,具體做法是用輸入數據的最大值除以系統partition數量的商作為分割數據的邊界,即分割數據的邊界為此商的1倍、2倍至numPartitions-1倍,這樣就能保證執行完partition后是整體有序的。

3、Reduce獲得<key, value-list>,根據value-list中元素的個數將輸入的key作為value的輸出次數。

package hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Sort
{public static class SortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{private NullWritable nw = NullWritable.get();@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, IntWritable, NullWritable>.Context context)throws IOException, InterruptedException{context.write(new IntWritable(Integer.parseInt(value.toString().trim())), nw);}}public static class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable>{private IntWritable counter = new IntWritable(1);@Overrideprotected void reduce(IntWritable key, Iterable<NullWritable> values,Reducer<IntWritable, NullWritable, IntWritable, IntWritable>.Context context)throws IOException, InterruptedException{for(NullWritable nw : values){context.write(counter, key);counter = new IntWritable(counter.get() + 1);}}}public static class SortPartitioner extends Partitioner<IntWritable, NullWritable>{//numPartitions equals with the number of reduce tasks@Overridepublic int getPartition(IntWritable key, NullWritable value, int numPartitions){int maxNumber = 65223;int bound = maxNumber/numPartitions;int keyNumber = key.get();for (int i = 0; i < numPartitions; ++i){if (keyNumber <= (i+1)*bound)return i;}return 0;}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Sort.class);job.setJobName("sort");job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(NullWritable.class);job.setNumReduceTasks(5);job.setPartitionerClass(SortPartitioner.class);String inputFile = "/home/jinzhao/dataset/input";String outputFile = "/home/jinzhao/dataset/output";FileInputFormat.setInputPaths(job, new Path(inputFile));Path output = new Path(outputFile);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(job, output);job.waitForCompletion(true);}
}


4、單表關聯

樣例輸入:

file:

child parent

Tom Lucy

Tom Jack

Jone Lucy

Jone Jack

Lucy Mary

Lucy Ben

Jack Alice

Jack Jesse

Terry Alice

Terry Jesse

Philip Terry

Philip Alma

Mark Terry

Mark Alma

樣例輸出:

file:

grandchild grandparent

Tom Alice

Tom Jesse

Jone Alice

Jone Jesse

Tom Mary

Tom Ben

Jone Mary

Jone Ben

Philip Alice

philip Jesse

Mark Alice

Mark Jesse


package hadoop;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class stlink
{private static boolean flag = true;public static class stlinkMapper extends Mapper<Object, Text, Text, Text>{@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException{String[] names = value.toString().trim().split("\t");if (names[0].compareTo("child") != 0){				context.write(new Text(names[0]), new Text("parent:"+names[1]));context.write(new Text(names[1]), new Text("child:"+names[0]));}}}public static class stlinkReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException{if (flag){context.write(new Text("grandchild"), new Text("grandparent"));flag = false;}List<String> children = new ArrayList<String>();List<String> parents = new ArrayList<String>();for(Text t : values){String[] kv = t.toString().split(":");if (kv[0].compareTo("child") == 0)children.add(kv[1]);elseparents.add(kv[1]);}for(String c : children)for(String p : parents)context.write(new Text(c), new Text(p));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job stlinkJob = Job.getInstance(conf);stlinkJob.setJarByClass(stlink.class);stlinkJob.setJobName("single table link");stlinkJob.setMapperClass(stlinkMapper.class);stlinkJob.setReducerClass(stlinkReducer.class);stlinkJob.setOutputKeyClass(Text.class);stlinkJob.setOutputValueClass(Text.class);stlinkJob.setMapOutputKeyClass(Text.class);stlinkJob.setMapOutputValueClass(Text.class);Path input = new Path("/home/jinzhao/dataset/input");Path output = new Path("/home/jinzhao/dataset/output");FileInputFormat.setInputPaths(stlinkJob, input);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(stlinkJob, output);stlinkJob.waitForCompletion(true);}
}


5、多表關聯

樣例輸入:

factory:

factoryname addressed

Beijing Red Star 1

Shenzhen Thunder 3

Guangzhou Honda 2

Beijing Rising 1

Guangzhou Development Bank 2

Tencent 3

Bank of Beijing 1

address:

1 Beijing

2 Guangzhou

3 Shenzhen

4 Xian

package hadoop;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class mtlink
{private static boolean flag = true;public static class mtlinkMapper extends Mapper<Object, Text, Text, Text>{@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException{String str = value.toString();if (str.contains("factoryname") || str.contains("addressname"))return;String[] infos = str.trim().split(" ");if (infos[0].charAt(0) >= '0' && infos[0].charAt(0) <= '9')context.write(new Text(infos[0]), new Text("right:" + strCombine(infos, "right")));elsecontext.write(new Text(infos[infos.length - 1]), new Text("left:" + strCombine(infos, "left")));}private String strCombine(String[] strs, String direction){StringBuilder sb = new StringBuilder();if (direction.compareTo("right") == 0)for(int i = 1; i < strs.length; ++i)sb.append(strs[i] + " ");elsefor (int i = 0; i < strs.length - 1; ++i)sb.append(strs[i] + " ");return sb.toString().trim();}}public static class mtlinkReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException{if (flag){context.write(new Text("factoryname"), new Text("adressname"));flag = false;}List<String> companies = new ArrayList<String>();String place = "huoxing";for (Text t : values){String[] kv = t.toString().trim().split(":");if (kv[0].compareTo("right") == 0)place = kv[1];elsecompanies.add(kv[1]);}for (String s : companies)context.write(new Text(s), new Text(place));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job mtlinkJob = Job.getInstance(conf);mtlinkJob.setJarByClass(mtlink.class);mtlinkJob.setJobName("multiple tables link");mtlinkJob.setMapperClass(mtlinkMapper.class);mtlinkJob.setReducerClass(mtlinkReducer.class);mtlinkJob.setOutputKeyClass(Text.class);mtlinkJob.setOutputValueClass(Text.class);Path input = new Path("/home/jinzhao/dataset/input");Path output = new Path("/home/jinzhao/dataset/output");FileInputFormat.setInputPaths(mtlinkJob, input);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(mtlinkJob, output);mtlinkJob.waitForCompletion(true);}
}


本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/387145.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/387145.shtml
英文地址,請注明出處:http://en.pswp.cn/news/387145.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ios高級開發之多線程(一)

1.概念&#xff1a; 多線程&#xff08;multithreading&#xff09;到底是什么呢&#xff0c;它是指在軟件或者硬件上實現多個線程并發執行的技術。具有多線程能力的計算機因有硬件的支持&#xff0c;而能夠在同一時間執行多個線程&#xff0c;進而提升整體處理性能。在一個程序…

v-if的簡單應用

<span v-if"item.status0"> 項目狀態&#xff1a;未提交 </span> <span v-if"item.status1"> 項目狀態&#xff1a;審批中 </span> <span v-if"item.status2"> 項目狀態&#xff1a;審批退回 </span> <s…

05 MapReduce應用案例02

6、統計每個月份中&#xff0c;最高的三個溫度。 輸入格式&#xff1a;年月日 空格 時分秒 TAB 溫度 inputfile: 1949-10-01 14:21:02 34c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1950-10-…

05 MapReduce應用案例03

8、PageRank Page-rank源于Google&#xff0c;用于衡量特定網頁相對于搜索引擎索引中的其他網頁而言的重要程度。 Page-rank實現了將鏈接價值概念作為排名因素。 算法原理 – 入鏈 投票 ? Page-rank 讓鏈接來“ 投票 “ ,到一個頁面的超鏈接相當于對該頁投一票。 – 入…

利用微信的weui框架上傳、預覽和刪除圖片

jQuery WeUI 是專為微信公眾賬號開發而設計的一個框架&#xff0c;jQuery WeUI的官網&#xff1a;http://jqweui.com/ 需求&#xff1a;需要在微信公眾號網頁添加上傳圖片功能 技術選型&#xff1a;實現上傳圖片功能可選百度的WebUploader、餓了么的Element和微信的jQuery WeUI…

【轉】Java Socket編程基礎及深入講解

原文&#xff1a;https://www.cnblogs.com/yiwangzhibujian/p/7107785.html#q2.3.3 Socket是Java網絡編程的基礎&#xff0c;了解還是有好處的&#xff0c; 這篇文章主要講解Socket的基礎編程。Socket用在哪呢&#xff0c;主要用在進程間&#xff0c;網絡間通信。本篇比較長&am…

使用 vue-i18n 切換中英文

使用 vue-i18n 切換中英文vue-i18n 倉庫地址&#xff1a;https://github.com/kazupon/vue-i18n兼容性&#xff1a;支持 Vue.js 2.x 以上版本安裝方法&#xff1a;&#xff08;此處只演示 npm&#xff09;npm install vue-i18n使用方法&#xff1a;1、在 main.js 中引入 vue-i18…

ZooKeeper數據模型

Zookeeper的數據模型 層次化的目錄結構&#xff0c;命名符合常規文件系統規范&#xff08;Linux&#xff09; 每個節點在zookeeper中叫做znode,并且其有一個唯一的路徑標識 節點Znode可以包含數據和子節點(即子目錄)&#xff0c;但是EPHEMERAL類型的節點不能有子節點 Znod…

堆疊條形圖

堆疊條形圖 import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib as mpl import matplotlib.dates as mdates#解決能顯示中文 mpl.rcParams[font.sans-serif][SimHei] #指定默認字體 SimHei為黑體 mpl.rcParams[axes.unicode_minus]Fal…

spring boot 服務器常用

ps aux|grep tgcwll /opt/nginx/html sudo cp -r /tmp/tgcw/dist/* /opt/nginx/html/design sudo cp -r /tmp/tgcw/dist/* /opt/nginx/html springboot 啟動nohup java -jar tgcw-service-usermanagement-0.0.1-SNAPSHOT.jar --spring.profiles.activedemo > /dev/null 2&g…

PHP數組 轉 對象/對象 轉 數組

/*** 數組 轉 對象** param array $arr 數組* return object*/ function array_to_object($arr) {if (gettype($arr) ! array) {return;}foreach ($arr as $k > $v) {if (gettype($v) array || getType($v) object) {$arr[$k] (object)array_to_object($v);}}return (obj…

ZooKeeper編程01--RMI服務的多服務器管理

服務器端與客戶端都要用到&#xff1a; public interface ZkInfo {String ZK_CONNECTION_STRING "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";int ZK_SESSION_TIMEOUT 5000;String ZK_REGISTRY_PATH "/registry";String ZK_PROVIDER_…

org.activiti.engine.ActivitiOptimisticLockingException updated by another transaction concurrently

org.activiti.engine.ActivitiOptimisticLockingException: Task[id5905010, name審核(市場部)] was updated by another transaction concurrentlyat org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:872)at org.activiti.engine.impl.db.DbSqlSess…

DataTable不能通過已刪除的行訪問該行的信息解決方法

使用dt.Rows[0]["name", DataRowVersion.Original]可以獲取轉載于:https://www.cnblogs.com/heyiping/p/10616640.html

ZooKeeper編程02--多線程的分佈式鎖

面向過程版&#xff1a; package distributedLockProcess;import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zoo…

01 Python變量和數據類型

Python變量和數據類型 1 數據類型 計算機&#xff0c;顧名思義就是可以做數學計算的機器&#xff0c;因此&#xff0c;計算機程序理所當然也可以處理各種數值。 但是&#xff0c;計算機能處理的遠不止數值&#xff0c;還可以處理文本、圖形、音頻、視頻、網頁等各種各樣的數…

初識Python-1

1&#xff0c;計算機基礎。 2&#xff0c;python歷史。 宏觀上&#xff1a;python2 與 python3 區別&#xff1a; python2 源碼不標準&#xff0c;混亂&#xff0c;重復代碼太多&#xff0c; python3 統一 標準&#xff0c;去除重復代碼。 3&#xff0c;python的環境。 編譯型&…

02 List、Tuple、Dict、Set

List 線性表 創建List&#xff1a; >>> classmates [Michael, Bob, Tracy] >>> L [Michael, 100, True] #可以在list中包含各種類型的數據 >>> empty_list [] #空List 按索引訪問List&#xff1a; >>> print L[0] #索引從0開始…

Jenkins的一些代碼

pipeline {agent anyenvironment { def ITEMNAME "erp"def DESTPATH "/home/ops/testpipe"def codePATH"/var/lib/jenkins/workspace/test_pipeline"}stages { stage(代碼拉取){steps {echo "checkout from ${ITEMNAME}"git url:…

利用layui前端框架實現對不同文件夾的多文件上傳

利用layui前端框架實現對不同文件夾的多文件上傳 問題場景&#xff1a; 普通的input標簽實現多文件上傳時&#xff0c;只能對同一個文件夾下的多個文件進行上傳&#xff0c;如果要同時上傳兩個或多個文件夾下的文件&#xff0c;是無法實現的。這篇文章就是利用layui中的插件&am…