05 MapReduce應用案例02

6、統計每個月份中,最高的三個溫度。

輸入格式:年月日 空格 時分秒 TAB 溫度

inputfile:

1949-10-01 14:21:02?? ?34c
1949-10-02 14:01:02?? ?36c
1950-01-01 11:21:02?? ?32c
1950-10-01 12:21:02?? ?37c
1951-12-01 12:21:02?? ?23c
1950-10-02 12:21:02?? ?41c
1950-10-03 12:21:02?? ?27c
1951-07-01 12:21:02?? ?45c
1951-07-02 12:21:02?? ?46c
1951-07-03 12:21:03?? ?47c

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WRunner {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJobName("weather");job.setJarByClass(WRunner.class);job.setMapperClass(WMapper.class);job.setReducerClass(WReducer.class);job.setMapOutputKeyClass(MyKey.class);job.setMapOutputValueClass(DoubleWritable.class);job.setPartitionerClass(MyPartitioner.class);job.setSortComparatorClass(MySort.class);job.setGroupingComparatorClass(MyGroup.class);job.setInputFormatClass(KeyValueTextInputFormat.class);job.setNumReduceTasks(3);Path in = new Path("/home/jinzhao/mrtest/input");FileInputFormat.setInputPaths(job, in);Path out = new Path("/home/jinzhao/mrtest/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job, out);job.waitForCompletion(true);}static class WMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");NullWritable nw = NullWritable.get();@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {try {Date date = sdf.parse(key.toString());Calendar c = Calendar.getInstance();c.setTime(date);int year = c.get(Calendar.YEAR);int month = c.get(Calendar.MONTH);int day = c.get(Calendar.DAY_OF_MONTH);String h = value.toString().trim();double hot = Double.parseDouble(h.substring(0, h.length()-1));context.write(new MyKey(year, month, day, hot), new DoubleWritable(hot));} catch (ParseException e) {e.printStackTrace();}}}static class WReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{@Overrideprotected void reduce(MyKey key, Iterable<DoubleWritable> values, Context context)throws IOException, InterruptedException {int i=0;for(DoubleWritable v : values){++i;String msg = key.getYear() + "\t" + (key.getMonth() + 1) + "\t" + (key.getDay()+1) + "\t" + v.get();context.write(new Text(msg), NullWritable.get());if (i == 3)break;}}}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 序列化所傳輸的對象*/
public class MyKey implements WritableComparable<MyKey> {private int year;private int month;private int day;private double hot;public MyKey(){super();}public MyKey(int year, int month, int day, double hot){this.year = year;this.month = month;this.day = day;this.hot = hot;}public int getYear() {return year;}public void setYear(int year) {this.year = year;}public int getMonth() {return month;}public void setMonth(int month) {this.month = month;}public int getDay() {return day;}public void setDay(int day) {this.day = day;}public double getHot() {return hot;}public void setHot(double hot) {this.hot = hot;}@Overridepublic void readFields(DataInput arg0) throws IOException {this.year = arg0.readInt();this.month = arg0.readInt();this.hot = arg0.readDouble();this.day = arg0.readInt();}@Overridepublic void write(DataOutput arg0) throws IOException {arg0.writeInt(year);arg0.writeInt(month);arg0.writeDouble(hot);	arg0.writeInt(day);}/*** 判斷是否是同一個對象,當對象作為key時。*/@Overridepublic int compareTo(MyKey arg0) {int r1 = Integer.compare(this.year, arg0.getYear());if (r1 == 0){int r2 = Integer.compare(this.month, arg0.getMonth());if (r2 == 0){return Double.compare(this.hot, arg0.getHot());}else{return r2;}}elsereturn r1;}}

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 分組,將具有相同年份和月份的MyKey作為一組,即傳遞給一個reduce函數進行處理。*/
public class MyGroup extends WritableComparator{public MyGroup(){super(MyKey.class, true);}public int compare (WritableComparable a, WritableComparable b){MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;int r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1 == 0){return Integer.compare(k1.getMonth(), k2.getMonth());}elsereturn r1;}
}

package hadoop.wheather;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 自定義的排序,先分組,再排序*/
public class MySort extends WritableComparator{public MySort(){super(MyKey.class, true);}public int compare (WritableComparable a, WritableComparable b){MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;int r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1 == 0){int r2 = Integer.compare(k1.getMonth(), k2.getMonth());if (r2 == 0){return -Double.compare(k1.getHot(), k2.getHot());}elsereturn r2;}elsereturn r1;}
}

package hadoop.wheather;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;/*** 分區,每個分區由一個reduce進程來處理*/
public class MyPartitioner extends Partitioner<MyKey, DoubleWritable>{@Overridepublic int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {return(key.getYear() - 1949)%numReduceTasks;}}

7、社交網路的朋友推薦算法

格式:用戶 TAB 朋友1 空格 朋友2 空格 ...

inputfile:

小明??? 老王 如花 林志玲
老王??? 小明 鳳姐
如花??? 小明 李剛 鳳姐
林志玲??? 小明 李剛 鳳姐 郭美美
李剛??? 如花 鳳姐 林志玲
郭美美??? 鳳姐 林志玲
鳳姐??? 如花 老王 林志玲 郭美美

第一次輸出:

格式:用戶1 空格 用戶2 TAB 次數

第二次輸出:

格式:用戶 TAB 推薦1 空格 推薦2 空格...

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Friends {static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>{@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String user = key.toString();String[] friends = value.toString().split(" ");for (int i = 0; i < friends.length; ++i){context.write(new Fof(user, friends[i]), new IntWritable(0));for (int j = i + 1; j < friends.length; ++j)context.write(new Fof(friends[i], friends[j]), new IntWritable(1));}}}static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>{@Overrideprotected void reduce(Fof key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;boolean flag = true;for (IntWritable i : values){if (i.get() == 0){flag = false;break;}else{sum = sum + i.get();}}if (flag)context.write(key, new IntWritable(sum));}}public static void main(String[] args){try {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Friends.class);job.setJobName("friend-I");job.setMapperClass(FofMapper.class);job.setReducerClass(FofReducer.class);job.setMapOutputKeyClass(Fof.class);job.setMapOutputValueClass(IntWritable.class);job.setInputFormatClass(KeyValueTextInputFormat.class);Path in = new Path("/home/jinzhao/mrtest/input");FileInputFormat.setInputPaths(job, in);Path out = new Path("/home/jinzhao/mrtest/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job,  out);if ( job.waitForCompletion(true)){Job job2 = Job.getInstance(conf);job2.setJarByClass(Friends.class);job2.setJobName("friend-II");job2.setMapperClass(SortMapper.class);job2.setReducerClass(SortReducer.class);job2.setMapOutputKeyClass(User.class);job2.setMapOutputValueClass(User.class);job2.setInputFormatClass(KeyValueTextInputFormat.class);job2.setSortComparatorClass(FSort.class);job2.setGroupingComparatorClass(FGroup.class);Path in2 = new Path("/home/jinzhao/mrtest/output");FileInputFormat.setInputPaths(job2, in2);Path out2 = new Path("/home/jinzhao/mrtest/output2");if (fs.exists(out2))fs.delete(out2, true);FileOutputFormat.setOutputPath(job2,  out2);job2.waitForCompletion(true);}} catch (Exception e){e.printStackTrace();}}static class SortMapper extends Mapper<Text, Text, User, User>{@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String[] friends = key.toString().split(" ");int count = Integer.parseInt(value.toString());context.write(new User(friends[0], count), new User(friends[1], count));context.write(new User(friends[1], count), new User(friends[0], count));}}static class SortReducer extends Reducer<User, User, Text, Text>{@Overrideprotected void reduce(User key, Iterable<User> values, Context context)throws IOException, InterruptedException {StringBuilder sb = new StringBuilder();for (User i : values)sb.append(i.getUsername() + "," + i.getCount() + " ");context.write(new Text(key.getUsername()), new Text(sb.toString().trim()));}}
}

import org.apache.hadoop.io.Text;public class Fof extends Text{public Fof(){super();}public Fof(String a, String b){super(getFof(a, b));}public static String getFof(String a, String b){int r = a.compareTo(b);if (r < 0)return a + " " + b;else return b + " " + a;} 
}

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class User implements WritableComparable<User>{private String username;private int count;public User(){}public User(String username, int count){this.username = username;this.count = count;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(username);out.writeInt(count);}@Overridepublic void readFields(DataInput in) throws IOException {this.username = in.readUTF();this.count = in.readInt();}@Overridepublic int compareTo(User arg0) {int c1 = this.username.compareTo(arg0.username);if (c1 == 0){return this.count - arg0.getCount();} elsereturn c1;}}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class FGroup extends WritableComparator{public FGroup(){super(User.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {User u1 = (User)a;User u2 = (User)b;return  u1.getUsername().compareTo(u2.getUsername());}}

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class FSort extends WritableComparator{public FSort(){super(User.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {User u1 = (User)a;User u2 = (User)b;int c1 = u1.getUsername().compareTo(u2.getUsername());if (c1==0){return u2.getCount() - u1.getCount();} elsereturn c1;}
}





本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/387142.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/387142.shtml
英文地址,請注明出處:http://en.pswp.cn/news/387142.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

05 MapReduce應用案例03

8、PageRank Page-rank源于Google&#xff0c;用于衡量特定網頁相對于搜索引擎索引中的其他網頁而言的重要程度。 Page-rank實現了將鏈接價值概念作為排名因素。 算法原理 – 入鏈 投票 ? Page-rank 讓鏈接來“ 投票 “ ,到一個頁面的超鏈接相當于對該頁投一票。 – 入…

利用微信的weui框架上傳、預覽和刪除圖片

jQuery WeUI 是專為微信公眾賬號開發而設計的一個框架&#xff0c;jQuery WeUI的官網&#xff1a;http://jqweui.com/ 需求&#xff1a;需要在微信公眾號網頁添加上傳圖片功能 技術選型&#xff1a;實現上傳圖片功能可選百度的WebUploader、餓了么的Element和微信的jQuery WeUI…

【轉】Java Socket編程基礎及深入講解

原文&#xff1a;https://www.cnblogs.com/yiwangzhibujian/p/7107785.html#q2.3.3 Socket是Java網絡編程的基礎&#xff0c;了解還是有好處的&#xff0c; 這篇文章主要講解Socket的基礎編程。Socket用在哪呢&#xff0c;主要用在進程間&#xff0c;網絡間通信。本篇比較長&am…

使用 vue-i18n 切換中英文

使用 vue-i18n 切換中英文vue-i18n 倉庫地址&#xff1a;https://github.com/kazupon/vue-i18n兼容性&#xff1a;支持 Vue.js 2.x 以上版本安裝方法&#xff1a;&#xff08;此處只演示 npm&#xff09;npm install vue-i18n使用方法&#xff1a;1、在 main.js 中引入 vue-i18…

ZooKeeper數據模型

Zookeeper的數據模型 層次化的目錄結構&#xff0c;命名符合常規文件系統規范&#xff08;Linux&#xff09; 每個節點在zookeeper中叫做znode,并且其有一個唯一的路徑標識 節點Znode可以包含數據和子節點(即子目錄)&#xff0c;但是EPHEMERAL類型的節點不能有子節點 Znod…

堆疊條形圖

堆疊條形圖 import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib as mpl import matplotlib.dates as mdates#解決能顯示中文 mpl.rcParams[font.sans-serif][SimHei] #指定默認字體 SimHei為黑體 mpl.rcParams[axes.unicode_minus]Fal…

spring boot 服務器常用

ps aux|grep tgcwll /opt/nginx/html sudo cp -r /tmp/tgcw/dist/* /opt/nginx/html/design sudo cp -r /tmp/tgcw/dist/* /opt/nginx/html springboot 啟動nohup java -jar tgcw-service-usermanagement-0.0.1-SNAPSHOT.jar --spring.profiles.activedemo > /dev/null 2&g…

PHP數組 轉 對象/對象 轉 數組

/*** 數組 轉 對象** param array $arr 數組* return object*/ function array_to_object($arr) {if (gettype($arr) ! array) {return;}foreach ($arr as $k > $v) {if (gettype($v) array || getType($v) object) {$arr[$k] (object)array_to_object($v);}}return (obj…

ZooKeeper編程01--RMI服務的多服務器管理

服務器端與客戶端都要用到&#xff1a; public interface ZkInfo {String ZK_CONNECTION_STRING "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";int ZK_SESSION_TIMEOUT 5000;String ZK_REGISTRY_PATH "/registry";String ZK_PROVIDER_…

org.activiti.engine.ActivitiOptimisticLockingException updated by another transaction concurrently

org.activiti.engine.ActivitiOptimisticLockingException: Task[id5905010, name審核(市場部)] was updated by another transaction concurrentlyat org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:872)at org.activiti.engine.impl.db.DbSqlSess…

DataTable不能通過已刪除的行訪問該行的信息解決方法

使用dt.Rows[0]["name", DataRowVersion.Original]可以獲取轉載于:https://www.cnblogs.com/heyiping/p/10616640.html

ZooKeeper編程02--多線程的分佈式鎖

面向過程版&#xff1a; package distributedLockProcess;import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zoo…

01 Python變量和數據類型

Python變量和數據類型 1 數據類型 計算機&#xff0c;顧名思義就是可以做數學計算的機器&#xff0c;因此&#xff0c;計算機程序理所當然也可以處理各種數值。 但是&#xff0c;計算機能處理的遠不止數值&#xff0c;還可以處理文本、圖形、音頻、視頻、網頁等各種各樣的數…

初識Python-1

1&#xff0c;計算機基礎。 2&#xff0c;python歷史。 宏觀上&#xff1a;python2 與 python3 區別&#xff1a; python2 源碼不標準&#xff0c;混亂&#xff0c;重復代碼太多&#xff0c; python3 統一 標準&#xff0c;去除重復代碼。 3&#xff0c;python的環境。 編譯型&…

02 List、Tuple、Dict、Set

List 線性表 創建List&#xff1a; >>> classmates [Michael, Bob, Tracy] >>> L [Michael, 100, True] #可以在list中包含各種類型的數據 >>> empty_list [] #空List 按索引訪問List&#xff1a; >>> print L[0] #索引從0開始…

Jenkins的一些代碼

pipeline {agent anyenvironment { def ITEMNAME "erp"def DESTPATH "/home/ops/testpipe"def codePATH"/var/lib/jenkins/workspace/test_pipeline"}stages { stage(代碼拉取){steps {echo "checkout from ${ITEMNAME}"git url:…

利用layui前端框架實現對不同文件夾的多文件上傳

利用layui前端框架實現對不同文件夾的多文件上傳 問題場景&#xff1a; 普通的input標簽實現多文件上傳時&#xff0c;只能對同一個文件夾下的多個文件進行上傳&#xff0c;如果要同時上傳兩個或多個文件夾下的文件&#xff0c;是無法實現的。這篇文章就是利用layui中的插件&am…

ps、grep和kill聯合使用殺掉進程

ps、grep和kill聯合使用殺掉進程例如要殺掉hello這個進程&#xff0c;使用下面這個命令就能直接實現。ps -ef |grep hello |awk {print $2}|xargs kill -9這里是輸出ps -ef |grep hello 結果的第二列的內容然后通過xargs傳遞給kill -9,其實第二列內容就是hello的進程號&#xf…

03 控制語句

if語句 if age > 18 print your age is, age else print teenager Python代碼的縮進規則&#xff1a;具有相同縮進的代碼被視為代碼塊。 if age > 18 print adult elif age > 6 print teenager elif age > 3 print kid else print baby for循環 L [Adam, L…

yum 來安裝 nodejs

要通過 yum 來安裝 nodejs 和 npm 需要先給 yum 添加 epel 源&#xff0c;添加方法在 centos 添加epel和remi源 中##添加 epel 源 64位: rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm32位: rpm -ivh http://download.fedoraproj…