MapReduce分區

目錄

  • 1. MapReduce分區
    • 1.1 哈希分區
    • 1.2 自定義分區
  • 2. 成績分組
    • 2.1 Map
    • 2.2 Partition
    • 2.3 Reduce
  • 3. 代碼和結果
    • 3.1 pom.xml中依賴配置
    • 3.2 工具類util
    • 3.3 GroupScores
    • 3.4 結果
  • 參考

??本文引用的Apache Hadoop源代碼基于Apache許可證 2.0,詳情請參閱 Apache許可證2.0。

1. MapReduce分區

??在默認情況下,MapReduce認為Reduce函數處理的是數據匯總操作,因此其針對的必定是一個Map函數清洗處理后的相對規模較小的數據集,且需要對整個集群中所有Map的中間輸出結果進行統一處理,因此只會啟動一個Reduce計算節點來處理。
??這與某些特殊的應用需求并不相匹配。在某些特定的時刻,開發人員希望啟動更多的Reduce并發節點來優化最終結果統計的性能,減小數據處理的延遲,這通過簡單的設置代碼即可完成;而在更定制化的環境中,開發人員希望符合特定規則的Map中間輸出結果交由特定的Reduce節點處理,這就需要使用MapReduce分區,開發人員還可以提供自定義的分區規則。
??如果有很多個Reduce任務,每個Map任務就會針對輸出進行分區,即為每個Reduce任務建立一個分區。每個分區中有很多鍵,但每個鍵對應的鍵值對記錄都在同一分區中。如果不給定自定義的分區規則,則Hadoop使用默認的哈希函數來分區,效率較高。

1.1 哈希分區

??下面是Apache Hadoop中默認哈希分區的源代碼。在這個分區規則中,選擇Reduce節點的計算方法是(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.mapreduce.lib.partition;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Partitioner;/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

1.2 自定義分區

??如果希望實現自定義分區,需要繼承Hadoop提供的分區類org.apache.hadoop.mapreduce.Partitioner,下面是該類的聲明。繼承該分區類的自定義分區類需要實現public abstract int getPartition(KEY key, VALUE value, int numPartitions),該函數的作用是設置Map中間處理結果的分區規則,其中numPartitions是總分區的個數。此外,在自定義分區類時,通過函數返回了多少個分區,那么在MapReduce任務調度代碼中需要設置Job.setNumReduceTasks(自定義分區個數)

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.mapreduce;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;/** * Partitions the key space.* * <p><code>Partitioner</code> controls the partitioning of the keys of the * intermediate map-outputs. The key (or a subset of the key) is used to derive* the partition, typically by a hash function. The total number of partitions* is the same as the number of reduce tasks for the job. Hence this controls* which of the <code>m</code> reduce tasks the intermediate key (and hence the * record) is sent for reduction.</p>** <p>Note: A <code>Partitioner</code> is created only when there are multiple* reducers.</p>** <p>Note: If you require your Partitioner class to obtain the Job's* configuration object, implement the {@link Configurable} interface.</p>* * @see Reducer*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {/** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job.*   * <p>Typically a hash function on a all or a subset of the key.</p>** @param key the key to be partioned.* @param value the entry value.* @param numPartitions the total number of partitions.* @return the partition number for the <code>key</code>.*/public abstract int getPartition(KEY key, VALUE value, int numPartitions);}

2. 成績分組

??成績文本如下,第一列為人名,第二列為成績。目標是將成績分為5段,分別為 [ 0 , 20 ) [0, 20) [0,20) [ 20 , 40 ) [20, 40) [20,40) [ 40 , 60 ) [40, 60) [40,60) [ 60 , 80 ) [60, 80) [60,80) [ 80 , 100 ] [80, 100] [80,100]

1 23
2 78
3 45
4 12
5 67
6 34
7 89
8 56
9 9
10 78
11 21
12 54
13 83
14 10
15 65
16 39
17 92
18 47
19 28
20 72

2.1 Map

??假設這個MapReduce作業使用了1個Map,Map的作用是從文本獲取<人名,成績>鍵值對,同時保證成績在有效范圍內。
在這里插入圖片描述

2.2 Partition

??根據成績進行分區,其中1的范圍是 [ 20 , 40 ) [20, 40) [20,40),3的范圍是 [ 60 , 80 ) [60, 80) [60,80),2的范圍是 [ 40 , 60 ) [40, 60) [40,60),4的范圍是 [ 80 , 100 ] [80, 100] [80,100],0的范圍是 [ 0 , 20 ) [0, 20) [0,20)
在這里插入圖片描述

2.3 Reduce

??reduce不進行任何操作,直接將分區結果排序后寫入5個文件中。
在這里插入圖片描述

3. 代碼和結果

3.1 pom.xml中依賴配置

  <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.6</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.3.6</version><type>pom</type></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>3.3.6</version></dependency></dependencies>

3.2 工具類util

import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class util {public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {URI add = new URI(uri);return FileSystem.get(add, conf);}public static void removeALL(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println("Delete Output Folder? " + isDeleted);}}public static void  showResult(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);String regex = "part-r-";Pattern pattern = Pattern.compile(regex);if (fs.exists(new Path(path))) {FileStatus[] files = fs.listStatus(new Path(path));for (FileStatus file : files) {Matcher matcher = pattern.matcher(file.getPath().toString());if (matcher.find()) {System.out.println(file.getPath() + ":");FSDataInputStream openStream = fs.open(file.getPath());IOUtils.copyBytes(openStream, System.out, 1024);openStream.close();}}}}
}

3.3 GroupScores

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class App {public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] splitStr = value.toString().split(" ");Text keyOut = new Text(splitStr[0]);int grade = Integer.parseInt(splitStr[1]);if (grade >= 0 && grade <= 100) {IntWritable valueOut = new IntWritable(grade);context.write(keyOut, valueOut);}}}public static class MyPartitioner extends Partitioner<Text, IntWritable> {@Overridepublic int getPartition(Text key, IntWritable value, int numPartitions) {if (value.get() >= 80) {return 0;} else if (value.get() >= 60) {return 1;} else if (value.get() >= 40) {return 2;} else if (value.get() >= 20) {return 3;} else {return 4;}}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {for (IntWritable value : values) {context.write(key, value);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] myArgs = {"file:///home/developer/CodeArtsProjects/mapreduce-partitioner/values.txt","hdfs://localhost:9000/user/developer/GroupScores/output"};util.removeALL("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);Job job = Job.getInstance(conf, "GroupScores");job.setMapperClass(MyMapper.class);job.setPartitionerClass(MyPartitioner.class);job.setReducerClass(MyReducer.class);job.setNumReduceTasks(5);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < myArgs.length - 1; i++) {FileInputFormat.setInputPaths(job, new Path(myArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));int res = job.waitForCompletion(true) ? 0 : 1;if (res == 0) {System.out.println("GroupScores結果為:");util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);}System.exit(res);}
}

3.4 結果

在這里插入圖片描述

參考

吳章勇 楊強著 大數據Hadoop3.X分布式處理實戰

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/67929.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/67929.shtml
英文地址,請注明出處:http://en.pswp.cn/web/67929.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

kamailio-ACC_JSON模塊詳解【后端語言go】

要確認 ACC_JSON 模塊是否已經成功將計費信息推送到消息隊列&#xff08;MQueue&#xff09;&#xff0c;以及如何從隊列中取值&#xff0c;可以按照以下步驟進行操作&#xff1a; 1. 確認 ACC_JSON 已推送到隊列 1.1 配置 ACC_JSON 確保 ACC_JSON 模塊已正確配置并啟用。以下…

網件r7000刷回原廠固件合集測評

《網件R7000路由器刷回原廠固件詳解》 網件R7000是一款備受贊譽的高性能無線路由器&#xff0c;其強大的性能和可定制性吸引了許多高級用戶。然而&#xff0c;有時候用戶可能會嘗試第三方固件以提升功能或優化網絡性能&#xff0c;但這也可能導致一些問題&#xff0c;如系統不…

【C++STL標準模板庫】二、STL三大組件

文章目錄 1、容器2、算法3、迭代器 二、STL三大組件 1、容器 容器&#xff0c;置物之所也。 研究數據的特定排列方式&#xff0c;以利于搜索或排序或其他特殊目的&#xff0c;這一門學科我們稱為數據結構。大學信息類相關專業里面&#xff0c;與編程最有直接關系的學科&…

基于 Java 開發的 MongoDB 企業級應用全解析

基于Java的MongoDB企業級應用開發實戰 目錄 背景與歷史MongoDB的核心功能與特性企業級業務場景分析MongoDB的優缺點剖析開發環境搭建 5.1 JDK安裝與配置5.2 MongoDB安裝與集群配置5.3 開發工具選型 Java與MongoDB集成實戰 6.1 項目依賴與驅動選擇6.2 連接池與客戶端配置6.3…

需求分析應該從哪些方面來著手做?

需求分析一般可從以下幾個方面著手&#xff1a; 業務需求方面 - 與相關方溝通&#xff1a;與業務部門、客戶等進行深入交流&#xff0c;通過訪談、問卷調查、會議討論等方式&#xff0c;明確他們對項目的期望、目標和整體業務需求&#xff0c;了解項目要解決的業務問題及達成的…

算法題(57):找出字符串中第一個匹配項的下標

審題: 需要我們根據原串與模式串相比較并找到完全匹配時子串的第一個元素索引&#xff0c;若沒有則返回-1 思路&#xff1a; 方法一&#xff1a;BF暴力算法 思路很簡單&#xff0c;我們用p1表示原串的索引&#xff0c;p2表示模式串索引。遍歷原串&#xff0c;每次遍歷都匹配一次…

求組合數(遞推法、乘法逆元、盧卡斯定理、分解質因數)

文章目錄 遞推法 10^4代碼 乘法逆元 10^6代碼 盧卡斯定理 1 0 18 m o d 1 0 6 10^{18}mod 10^6 1018mod106代碼 分解質因數 常規的解法就不多加贅述了&#xff0c;如&#xff08;分子/分母&#xff0c;邊乘邊除&#xff09;&#xff0c;本文講述以下方法&#xff1a; 遞推法 了…

WPF進階 | WPF 動畫特效揭秘:實現炫酷的界面交互效果

WPF進階 | WPF 動畫特效揭秘&#xff1a;實現炫酷的界面交互效果 前言一、WPF 動畫基礎概念1.1 什么是 WPF 動畫1.2 動畫的基本類型1.3 動畫的核心元素 二、線性動畫詳解2.1 DoubleAnimation 的使用2.2 ColorAnimation 實現顏色漸變 三、關鍵幀動畫深入3.1 DoubleAnimationUsin…

【Numpy核心編程攻略:Python數據處理、分析詳解與科學計算】2.27 NumPy+Pandas:高性能數據處理的黃金組合

2.27 NumPyPandas&#xff1a;高性能數據處理的黃金組合 目錄 #mermaid-svg-x3ndEE4hrhO6WR6H {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-x3ndEE4hrhO6WR6H .error-icon{fill:#552222;}#mermaid-svg-x3ndEE4hr…

swagger使用指引

1.swagger介紹 在前后端分離開發中通常由后端程序員設計接口&#xff0c;完成后需要編寫接口文檔&#xff0c;最后將文檔交給前端工程師&#xff0c;前端工程師參考文檔進行開發。 可以通過一些工具快速生成接口文檔 &#xff0c;本項目通過Swagger生成接口在線文檔 。 什么…

DeepSeek API文檔解讀(對話模塊)

對話&#xff08;Chat&#xff09; 對話補全 報文message對象數組 System message name 一個在線聊天系統&#xff0c;其中涉及多個用戶和一個系統管理員。在這個系統中&#xff0c;每個用戶都可以發送消息&#xff0c;并且系統管理員可以監控和回復這些消息。為了區分不同…

【Numpy核心編程攻略:Python數據處理、分析詳解與科學計算】2.19 線性代數核武器:BLAS/LAPACK深度集成

2.19 線性代數核武器&#xff1a;BLAS/LAPACK深度集成 目錄 #mermaid-svg-yVixkwXWUEZuu02L {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-yVixkwXWUEZuu02L .error-icon{fill:#552222;}#mermaid-svg-yVixkwXWUEZ…

Linux——文件與磁盤

1. 磁盤結構 磁盤在我們的計算機中有著重要的地位&#xff0c;當文件沒有被打開時其數據就存儲在磁盤上&#xff0c;要了解磁盤的工作原理先要了解磁盤的結構。 1.1 磁盤的物理結構 以傳統的存儲設備機械硬盤為例&#xff0c;它通過磁性盤片和磁頭來讀寫數據。磁盤內部有多個旋…

【Envi遙感圖像處理】010:歸一化植被指數NDVI計算方法

文章目錄 一、NDVI簡介二、NDVI計算方法1. NDVI工具2. 波段運算三、注意事項1. 計算結果為一片黑2. 計算結果超出范圍一、NDVI簡介 歸一化植被指數,是反映農作物長勢和營養信息的重要參數之一,應用于遙感影像。NDVI是通過植被在近紅外波段(NIR)和紅光波段(R)的反射率差異…

UE虛幻引擎No Google Play Store Key:No OBB found報錯如何處理

UE虛幻引擎No Google Play Store Key&#xff1a;No OBB found報錯如何處理&#xff1f; 問題描述&#xff1a; UE成功打包APK并安裝過后&#xff0c;啟動應用時提示&#xff1a; No Google Play Store KeyNo OBB found and no store key to try to download. Please setone …

C++并發編程指南04

文章目錄 共享數據的問題3.1.1 條件競爭雙鏈表的例子條件競爭示例惡性條件競爭的特點 3.1.2 避免惡性條件競爭1. 使用互斥量保護共享數據結構2. 無鎖編程3. 軟件事務內存&#xff08;STM&#xff09; 總結互斥量與共享數據保護3.2.1 互斥量使用互斥量保護共享數據示例代碼&…

【Redis】主從模式,哨兵,集群

主從復制 單點問題&#xff1a; 在分布式系統中&#xff0c;如果某個服務器程序&#xff0c;只有一個節點&#xff08;也就是一個物理服務器&#xff09;來部署這個服務器程序的話&#xff0c;那么可能會出現以下問題&#xff1a; 1.可用性問題&#xff1a;如果這個機器掛了…

Vue.js 如何選擇合適的組件庫

Vue.js 如何選擇合適的組件庫 大家在開發 Vue.js 項目的時候&#xff0c;都會面臨一個問題&#xff1a;我該選擇哪個組件庫&#xff1f; 市面上有很多優秀的 Vue 組件庫&#xff0c;比如 Element Plus、Vuetify、Quasar 等&#xff0c;它們各有特點。選擇合適的組件庫&#xf…

寒假(一)

請使用消息隊列實現2個終端之間互相聊天 終端一 #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h&g…

java項目驗證碼登錄

1.依賴 導入hutool工具包用于創建驗證碼 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.5.2</version></dependency> 2.測試 生成一個驗證碼圖片&#xff08;生成的圖片瀏覽器可…