目錄
- 1. MapReduce分區
- 1.1 哈希分區
- 1.2 自定義分區
- 2. 成績分組
- 2.1 Map
- 2.2 Partition
- 2.3 Reduce
- 3. 代碼和結果
- 3.1 pom.xml中依賴配置
- 3.2 工具類util
- 3.3 GroupScores
- 3.4 結果
- 參考
??本文引用的Apache Hadoop源代碼基于Apache許可證 2.0,詳情請參閱 Apache許可證2.0。
1. MapReduce分區
??在默認情況下,MapReduce認為Reduce函數處理的是數據匯總操作,因此其針對的必定是一個Map函數清洗處理后的相對規模較小的數據集,且需要對整個集群中所有Map的中間輸出結果進行統一處理,因此只會啟動一個Reduce計算節點來處理。
??這與某些特殊的應用需求并不相匹配。在某些特定的時刻,開發人員希望啟動更多的Reduce并發節點來優化最終結果統計的性能,減小數據處理的延遲,這通過簡單的設置代碼即可完成;而在更定制化的環境中,開發人員希望符合特定規則的Map中間輸出結果交由特定的Reduce節點處理,這就需要使用MapReduce分區,開發人員還可以提供自定義的分區規則。
??如果有很多個Reduce任務,每個Map任務就會針對輸出進行分區,即為每個Reduce任務建立一個分區。每個分區中有很多鍵,但每個鍵對應的鍵值對記錄都在同一分區中。如果不給定自定義的分區規則,則Hadoop使用默認的哈希函數來分區,效率較高。
1.1 哈希分區
??下面是Apache Hadoop中默認哈希分區的源代碼。在這個分區規則中,選擇Reduce節點的計算方法是(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
。
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.mapreduce.lib.partition;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Partitioner;/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}
1.2 自定義分區
??如果希望實現自定義分區,需要繼承Hadoop提供的分區類org.apache.hadoop.mapreduce.Partitioner
,下面是該類的聲明。繼承該分區類的自定義分區類需要實現public abstract int getPartition(KEY key, VALUE value, int numPartitions)
,該函數的作用是設置Map中間處理結果的分區規則,其中numPartitions是總分區的個數。此外,在自定義分區類時,通過函數返回了多少個分區,那么在MapReduce任務調度代碼中需要設置Job.setNumReduceTasks(自定義分區個數)
。
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.mapreduce;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;/** * Partitions the key space.* * <p><code>Partitioner</code> controls the partitioning of the keys of the * intermediate map-outputs. The key (or a subset of the key) is used to derive* the partition, typically by a hash function. The total number of partitions* is the same as the number of reduce tasks for the job. Hence this controls* which of the <code>m</code> reduce tasks the intermediate key (and hence the * record) is sent for reduction.</p>** <p>Note: A <code>Partitioner</code> is created only when there are multiple* reducers.</p>** <p>Note: If you require your Partitioner class to obtain the Job's* configuration object, implement the {@link Configurable} interface.</p>* * @see Reducer*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {/** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job.* * <p>Typically a hash function on a all or a subset of the key.</p>** @param key the key to be partioned.* @param value the entry value.* @param numPartitions the total number of partitions.* @return the partition number for the <code>key</code>.*/public abstract int getPartition(KEY key, VALUE value, int numPartitions);}
2. 成績分組
??成績文本如下,第一列為人名,第二列為成績。目標是將成績分為5段,分別為 [ 0 , 20 ) [0, 20) [0,20), [ 20 , 40 ) [20, 40) [20,40), [ 40 , 60 ) [40, 60) [40,60), [ 60 , 80 ) [60, 80) [60,80), [ 80 , 100 ] [80, 100] [80,100]。
1 23
2 78
3 45
4 12
5 67
6 34
7 89
8 56
9 9
10 78
11 21
12 54
13 83
14 10
15 65
16 39
17 92
18 47
19 28
20 72
2.1 Map
??假設這個MapReduce作業使用了1個Map,Map的作用是從文本獲取<人名,成績>鍵值對,同時保證成績在有效范圍內。
2.2 Partition
??根據成績進行分區,其中1的范圍是 [ 20 , 40 ) [20, 40) [20,40),3的范圍是 [ 60 , 80 ) [60, 80) [60,80),2的范圍是 [ 40 , 60 ) [40, 60) [40,60),4的范圍是 [ 80 , 100 ] [80, 100] [80,100],0的范圍是 [ 0 , 20 ) [0, 20) [0,20)。
2.3 Reduce
??reduce不進行任何操作,直接將分區結果排序后寫入5個文件中。
3. 代碼和結果
3.1 pom.xml中依賴配置
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.6</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.3.6</version><type>pom</type></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>3.3.6</version></dependency></dependencies>
3.2 工具類util
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class util {public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {URI add = new URI(uri);return FileSystem.get(add, conf);}public static void removeALL(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println("Delete Output Folder? " + isDeleted);}}public static void showResult(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);String regex = "part-r-";Pattern pattern = Pattern.compile(regex);if (fs.exists(new Path(path))) {FileStatus[] files = fs.listStatus(new Path(path));for (FileStatus file : files) {Matcher matcher = pattern.matcher(file.getPath().toString());if (matcher.find()) {System.out.println(file.getPath() + ":");FSDataInputStream openStream = fs.open(file.getPath());IOUtils.copyBytes(openStream, System.out, 1024);openStream.close();}}}}
}
3.3 GroupScores
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class App {public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] splitStr = value.toString().split(" ");Text keyOut = new Text(splitStr[0]);int grade = Integer.parseInt(splitStr[1]);if (grade >= 0 && grade <= 100) {IntWritable valueOut = new IntWritable(grade);context.write(keyOut, valueOut);}}}public static class MyPartitioner extends Partitioner<Text, IntWritable> {@Overridepublic int getPartition(Text key, IntWritable value, int numPartitions) {if (value.get() >= 80) {return 0;} else if (value.get() >= 60) {return 1;} else if (value.get() >= 40) {return 2;} else if (value.get() >= 20) {return 3;} else {return 4;}}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {for (IntWritable value : values) {context.write(key, value);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] myArgs = {"file:///home/developer/CodeArtsProjects/mapreduce-partitioner/values.txt","hdfs://localhost:9000/user/developer/GroupScores/output"};util.removeALL("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);Job job = Job.getInstance(conf, "GroupScores");job.setMapperClass(MyMapper.class);job.setPartitionerClass(MyPartitioner.class);job.setReducerClass(MyReducer.class);job.setNumReduceTasks(5);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < myArgs.length - 1; i++) {FileInputFormat.setInputPaths(job, new Path(myArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));int res = job.waitForCompletion(true) ? 0 : 1;if (res == 0) {System.out.println("GroupScores結果為:");util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);}System.exit(res);}
}
3.4 結果
參考
吳章勇 楊強著 大數據Hadoop3.X分布式處理實戰