MapReduce二次排序

2019獨角獸企業重金招聘Python工程師標準>>> hot3.png

?默認情況下,Map輸出的結果會對Key進行默認的排序,但是有時候需要對Key排序的同時還需要對Value進行排序,這時候就要用到二次排序了。下面我們來說說二次排序

1、二次排序原理

? 我們把二次排序分為以下幾個階段

??Map起始階段

? ? 在Map階段,使用job.setInputFormatClass()定義的InputFormat,將輸入的數據集分割成小數據塊split,同時InputFormat提供一個RecordReader的實現。在這里我們使用的是TextInputFormat,它提供的RecordReader會將文本的行號作為Key,這一行的文本作為Value。這就是自定 Mapper的輸入是<LongWritable,Text> 的原因。然后調用自定義Mapper的map方法,將一個個<LongWritable,Text>鍵值對輸入給Mapper的map方法

??Map最后階段

? ? 在Map階段的最后,會先調用job.setPartitionerClass()對這個Mapper的輸出結果進行分區,每個分區映射到一個Reducer。每個分區內又調用job.setSortComparatorClass()設置的Key比較函數類排序。可以看到,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass()設置 Key比較函數類,則使用Key實現的compareTo()方法

? Reduce階段

? ? 在Reduce階段,reduce()方法接受所有映射到這個Reduce的map輸出后,也會調用job.setSortComparatorClass()方法設置的Key比較函數類,對所有數據進行排序。然后開始構造一個Key對應的Value迭代器。這時就要用到分組,使用 job.setGroupingComparatorClass()方法設置分組函數類。只要這個比較器比較的兩個Key相同,它們就屬于同一組,它們的 Value放在一個Value迭代器,而這個迭代器的Key使用屬于同一個組的所有Key的第一個Key。最后就是進入Reducer的 reduce()方法,reduce()方法的輸入是所有的Key和它的Value迭代器,同樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致

?

? 接下來我們通過示例,可以很直觀的了解二次排序的原理

? 輸入文件 sort.txt 內容為

? ? 40 20

? ? 40 10

? ? 40 30

? ? 40 5

? ? 30 30

? ? 30 20

? ? 30 10

? ? 30 40

? ? 50 20

? ? 50 50

? ? 50 10

? ? 50 60

? 輸出文件的內容(從小到大排序)如下

? ? 30 10

? ? 30 20

? ? 30 30

? ? 30 40

? ? --------

? ? 40 5

? ? 40 10

? ? 40 20

? ? 40 30

? ? --------

? ? 50 10

? ? 50 20

? ? 50 50

? ? 50 60

? 從輸出的結果可以看出Key實現了從小到大的排序,同時相同Key的Value也實現了從小到大的排序,這就是二次排序的結果

2、二次排序的具體流程

? 在本例中要比較兩次。先按照第一字段排序,然后再對第一字段相同的按照第二字段排序。根據這一點,我們可以構造一個復合類IntPair ,它有兩個字段,先利用分區對第一字段排序,再利用分區內的比較對第二字段排序。二次排序的流程分為以下幾步。

? 1、自定義 key

? ? 所有自定義的key應該實現接口WritableComparable,因為它是可序列化的并且可比較的。WritableComparable 的內部方法如下所示

?

// 反序列化,從流中的二進制轉換成IntPair
public void readFields(DataInput in) throws IOException// 序列化,將IntPair轉化成使用流傳送的二進制
public void write(DataOutput out)//  key的比較
public int compareTo(IntPair o)//  默認的分區類 HashPartitioner,使用此方法
public int hashCode()//  默認實現
public boolean equals(Object right)

?

? 2、自定義分區

? ? 自定義分區函數類FirstPartitioner,是key的第一次比較,完成對所有key的排序。

public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>

? ? 在job中使用setPartitionerClasss()方法設置Partitioner

job.setPartitionerClasss(FirstPartitioner.Class);

? 3、Key的比較類

? ? 這是Key的第二次比較,對所有的Key進行排序,即同時完成IntPair中的first和second排序。該類是一個比較器,可以通過兩種方式實現。

? ? 1) 繼承WritableComparator。

public static class KeyComparator extends WritableComparator

? ? ? 必須有一個構造函數,并且重載以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

? ? 2) 實現接口 RawComparator。

? ? ? 上面兩種實現方式,在Job中,可以通過setSortComparatorClass()方法來設置Key的比較類。

job.setSortComparatorClass(KeyComparator.Class);

? ? ? 注意:如果沒有使用自定義的SortComparator類,則默認使用Key中compareTo()方法對Key排序。

? 4、定義分組類函數

? ? 在Reduce階段,構造一個與 Key 相對應的 Value 迭代器的時候,只要first相同就屬于同一個組,放在一個Value迭代器。定義這個比較器,可以有兩種方式。

? ? 1) 繼承 WritableComparator。

public static class GroupingComparator extends WritableComparator

? ? ? 必須有一個構造函數,并且重載以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

? ? 2) 實現接口 RawComparator。

? ? ? 上面兩種實現方式,在 Job 中,可以通過 setGroupingComparatorClass()方法來設置分組類。

job.setGroupingComparatorClass(GroupingComparator.Class);

? ? ? 另外注意的是,如果reduce的輸入與輸出不是同一種類型,則 Combiner和Reducer 不能共用 Reducer 類,因為 Combiner 的輸出是 reduce 的輸入。除非重新定義一個Combiner。

3、代碼實現

? Hadoop的example包中自帶了一個MapReduce的二次排序算法,下面對 example包中的二次排序進行改進

?

package com.buaa;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/** 
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName IntPair
* @Description 將示例數據中的key/value封裝成一個整體作為Key,同時實現 WritableComparable接口并重寫其方法
* @Author 劉吉超
* @Date 2016-06-07 22:31:53
*/
public class IntPair implements WritableComparable<IntPair>{private int first;private int second;public IntPair(){}public IntPair(int left, int right){set(left, right);}public void set(int left, int right){first = left;second = right;}@Overridepublic void readFields(DataInput in) throws IOException{first = in.readInt();second = in.readInt();}@Overridepublic void write(DataOutput out) throws IOException{out.writeInt(first);out.writeInt(second);}@Overridepublic int compareTo(IntPair o){if (first != o.first){return first < o.first ? -1 : 1;}else if (second != o.second){return second < o.second ? -1 : 1;}else{return 0;}}@Overridepublic int hashCode(){return first * 157 + second;}@Overridepublic boolean equals(Object right){if (right == null)return false;if (this == right)return true;if (right instanceof IntPair){IntPair r = (IntPair) right;return r.first == first && r.second == second;}else{return false;}}public int getFirst(){return first;}public int getSecond(){return second;}
}

?

package com.buaa;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** 
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName SecondarySort
* @Description TODO
* @Author 劉吉超
* @Date 2016-06-07 22:40:37
*/
@SuppressWarnings("deprecation")
public class SecondarySort {public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> {public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);int left = 0;int right = 0;if (tokenizer.hasMoreTokens()) {left = Integer.parseInt(tokenizer.nextToken());if (tokenizer.hasMoreTokens())right = Integer.parseInt(tokenizer.nextToken());context.write(new IntPair(left, right), new IntWritable(right));}}}/** 自定義分區函數類FirstPartitioner,根據 IntPair中的first實現分區*/public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{@Overridepublic int getPartition(IntPair key, IntWritable value,int numPartitions){return Math.abs(key.getFirst() * 127) % numPartitions;}}/** 自定義GroupingComparator類,實現分區內的數據分組*/@SuppressWarnings("rawtypes")public static class GroupingComparator extends WritableComparator{protected GroupingComparator(){super(IntPair.class, true);}@Overridepublic int compare(WritableComparable w1, WritableComparable w2){IntPair ip1 = (IntPair) w1;IntPair ip2 = (IntPair) w2;int l = ip1.getFirst();int r = ip2.getFirst();return l == r ? 0 : (l < r ? -1 : 1);}}public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable val : values) {context.write(new Text(Integer.toString(key.getFirst())), val);}}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 讀取配置文件Configuration conf = new Configuration();// 判斷路徑是否存在,如果存在,則刪除    Path mypath = new Path(args[1]);  FileSystem hdfs = mypath.getFileSystem(conf);  if (hdfs.isDirectory(mypath)) {  hdfs.delete(mypath, true);  } Job job = new Job(conf, "secondarysort");// 設置主類job.setJarByClass(SecondarySort.class);// 輸入路徑FileInputFormat.setInputPaths(job, new Path(args[0]));// 輸出路徑FileOutputFormat.setOutputPath(job, new Path(args[1]));// Mapperjob.setMapperClass(Map.class);// Reducerjob.setReducerClass(Reduce.class);// 分區函數job.setPartitionerClass(FirstPartitioner.class);// 本示例并沒有自定義SortComparator,而是使用IntPair中compareTo方法進行排序 job.setSortComparatorClass();// 分組函數job.setGroupingComparatorClass(GroupingComparator.class);// map輸出key類型job.setMapOutputKeyClass(IntPair.class);// map輸出value類型job.setMapOutputValueClass(IntWritable.class);// reduce輸出key類型job.setOutputKeyClass(Text.class);// reduce輸出value類型job.setOutputValueClass(IntWritable.class);// 輸入格式job.setInputFormatClass(TextInputFormat.class);// 輸出格式job.setOutputFormatClass(TextOutputFormat.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

?

轉載于:https://my.oschina.net/xiaoluobutou/blog/807362

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/542048.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/542048.shtml
英文地址,請注明出處:http://en.pswp.cn/news/542048.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數據有序_詳解數據庫插入性能優化:合并+事務+有序數據進行INSERT操作

概述對于一些數據量較大的系統&#xff0c;數據庫面臨的問題除了查詢效率低下&#xff0c;還有就是數據入庫時間長。特別像報表系統&#xff0c;每天花費在數據導入上的時間可能會長達幾個小時或十幾個小時之久。因此&#xff0c;優化數據庫插入性能是很有意義的。其實最有效的…

Java ProcessBuilder environment()方法與示例

ProcessBuilder類的environment()方法 (ProcessBuilder Class environment() method) environment() method is available in java.lang package. environment()方法在java.lang包中可用。 environment() method is used to return Map interfaces of this process builder env…

容器內應用日志收集方案

容器化應用日志收集挑戰 應用日志的收集、分析和監控是日常運維工作重要的部分&#xff0c;妥善地處理應用日志收集往往是應用容器化重要的一個課題。 Docker處理日志的方法是通過docker engine捕捉每一個容器進程的STDOUT和STDERR&#xff0c;通過為contrainer制定不同log dri…

python統計行號_利用Python進行數據分析(第三篇上)

上一篇文章我記錄了自己在入門 Python 學習的一些基礎內容以及實際操作代碼時所碰到的一些問題。這篇我將會記錄我在學習和運用 Python 進行數據分析的過程&#xff1a;介紹 Numpy 和 Pandas 兩個包運用 Numpy 和 Pandas 分析一維、二維數據數據分析的基本過程實戰項目【用 Pyt…

lnmp架構搭建—源碼編譯(nginx、mysql、php)

含義及理解&#xff1a; LNMP LinuxNginxMysqlPHP&#xff1a;LNMP是指一組通常一起使用來運行動態網站或者服務器的自由軟件名稱首字母縮寫。L指Linux&#xff0c;N指Nginx&#xff0c;M一般指MySQL&#xff0c;也可以指MariaDB&#xff0c;P一般指PHP&#xff0c;也可以指P…

Java PipedInputStream available()方法與示例

PipedInputStream類的available()方法 (PipedInputStream Class available() method) available() method is available in java.io package. available()方法在java.io包中可用。 available() method is used to return the number of available bytes left that can be read …

解析xml_Mybatis中mapper的xml解析詳解

上一篇文章分析了mapper注解關鍵類MapperAnnotationBuilder&#xff0c;今天來看mapper的項目了解析關鍵類XMLMapperBuilder。基礎介紹回顧下之前是在分析configuration的初始化過程&#xff0c;已經進行到了最后一步mapperElement(root.evalNode("mappers"))&#x…

lnmp—MemCache的作用

含義及理解&#xff1a; 1 . memcache是一個高性能的分布式的內存對象緩存系統&#xff0c;用于動態web應用以減輕數據庫負擔。通過在內存里維護一個統一的巨大的hash表&#xff0c;來存儲經常被讀寫的一些數組與文件&#xff0c;從而極大的提高網站的運行效率。 memcache是一…

Java ListResourceBundle getKeys()方法與示例

ListResourceBundle類的getContents()方法 (ListResourceBundle Class getContents() method) getContents() method is available in java.util package. getContents()方法在java.util包中可用。 getContents() method is used to return an enumeration of all the keys tha…

orale用戶密碼過期處理

使用具有管理權限的用戶登錄1、查看用戶的proifle是哪個&#xff0c;一般是default&#xff1a;SELECT username,PROFILE FROM dba_users;2、查看指定概要文件&#xff08;如default&#xff09;的密碼有效期設置&#xff1a;sql>SELECT * FROM dba_profiles s WHERE s.prof…

python字典怎么設置_在python中設置字典中的屬性

在python中設置字典中的屬性是否可以在python中從字典創建一個對象&#xff0c;使每個鍵都是該對象的屬性&#xff1f;像這樣的東西&#xff1a;d { name: Oscar, lastName: Reyes, age:32 }e Employee(d)print e.name # Oscarprint e.age 10 # 42我認為這幾乎與這個問題相反…

Java ObjectInputStream readByte()方法與示例

ObjectInputStream類readByte()方法 (ObjectInputStream Class readByte() method) readByte() method is available in java.io package. readByte()方法在java.io包中可用。 readByte() method is used to read a byte (i.e. 8 bit) of data from this ObjectInputStream. re…

openresty—實現緩存前移

含義及理解&#xff1a; OpenResty(又稱&#xff1a;ngx_openresty) 是一個基于 NGINX 的可伸縮的 Web 平臺&#xff0c;由中國人章亦春發起&#xff0c;提供了很多高質量的第三方模塊。 其目標是讓Web服務直接跑在Nginx服務內部&#xff0c;充分利用Nginx的非阻塞I/O模型&am…

Nginx+Keepalived+Tomcat之動靜分離的web集群

NginxKeepalivedTomcat之動靜分離的web集群 博客分類&#xff1a; webserverNginxKeepalivedTomcat之動靜分離的web集群為小公司提供大概一天持續在100萬/日之間訪問的高性能、高可用、高并發訪問及動靜分離的web集群方案NginxKeepalived 高可用、反向代理NginxPHP …

安裝完成后的配置_cent os7 默認安裝后的一般配置

在安裝cent os7后&#xff0c;進入系統會出現一些命令無法執行。這是因為最小化沒有安裝包含的軟件包。這時候先要配置一下基本的IP參數&#xff0c;(包括動態&#xff0c;靜態&#xff0c;或者是雙網卡綁定)。我們在虛擬機中模擬操作一下&#xff0c;配置文件在/etc/sysconfig…

Java Integer類lowerOneBit()方法與示例

整數類lowerOneBit()方法 (Integer class lowestOneBit() method) lowestOneBit() method is available in java.lang package. minimumOneBit()方法在java.lang包中可用。 lowestOneBit() method is used to find at most only single 1’s bit from the rightmost side one b…

lnmp構架——對tomcat詳解

tomcat的安裝部署 安裝jdk和tomcat tar zxf jdk-7u79-linux-x64.tar.gz -C /usr/local/ tar zxf apache-tomcat-7.0.37.tar.gz -C /usr/local/做好軟連接便于訪問 cd /usr/local ln -s jdk1.7.0_79/ java ln -s apache-tomcat-7.0.37/ tomcat配置環境變量 vim /etc/profile…

Linux 查找文件

find 查找目錄 -name "文件名"find / -name "php.ini"locate 文件名locate php.ini 一&#xff1a;locate命令 locate命令用于查找文件&#xff0c;它比find命令的搜索速度快&#xff0c;它需要一個數據庫&#xff0c;這個數據庫由每天的例行工作&#xff…

Java GregorianCalendar hashCode()方法與示例

GregorianCalendar類的hashCode()方法 (GregorianCalendar Class hashCode() method) hashCode() method is available in java.util package. hashCode()方法在java.util包中可用。 hashCode() method is used to returns the hash code for this GregorianCalendar. hashCode…

python元組為什么不可變_為什么python字符串和元組是不可變的?

我不知道為什么字符串和元組是不可變的&#xff1b;使它們不可變的優點和缺點是什么&#xff1f;除了Python解釋器的內部實現&#xff0c;這種設計在編寫程序上是否有很好的意義&#xff1f;(例如&#xff0c;如果元組和字符串是可變的&#xff0c;會更容易嗎&#xff1f;)如果…