Map-Reduce入門

1、Map-Reduce的邏輯過程

假設我們需要處理一批有關天氣的數據,其格式如下:

  • 按照ASCII碼存儲,每行一條記錄
  • 每一行字符從0開始計數,第15個到第18個字符為年
  • 第25個到第29個字符為溫度,其中第25位是符號+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+

現在需要統計出每年的最高溫度。

Map-Reduce主要包括兩個步驟:Map和Reduce

每一步都有key-value對作為輸入和輸出:

  • map階段的key-value對的格式是由輸入的格式所決定的,如果是默認的TextInputFormat,則每行作為一個記錄進程處理,其中key為此行的開頭相對于文件的起始位置,value就是此行的字符文本
  • map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應

對于上面的例子,在map過程,輸入的key-value對如下:

(0, 0067011990999991950051507+0000+)

(33, 0043011990999991950051512+0022+)

(66, 0043011990999991950051518-0011+)

(99, 0043012650999991949032412+0111+)

(132, 0043012650999991949032418+0078+)

(165, 0067011990999991937051507+0001+)

(198, 0043011990999991937051512-0002+)

(231, 0043011990999991945051518+0001+)

(264, 0043012650999991945032412+0002+)

(297, 0043012650999991945032418+0078+)

在map過程中,通過對每一行字符串的解析,得到年-溫度的key-value對作為輸出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

在reduce過程,將map過程中的輸出,按照相同的key將value放到同一個列表中作為reduce的輸入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

在reduce過程中,在列表中選擇出最大的溫度,將年-最大溫度的key-value作為輸出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

其邏輯過程可用如下圖表示:

image

2、編寫Map-Reduce程序

編寫Map-Reduce程序,一般需要實現兩個函數:mapper中的map函數和reducer中的reduce函數。

一般遵循以下格式:

  • map: (K1, V1)? ->? list(K2, V2)

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {

? void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)

? throws IOException;

}

  • reduce: (K2, list(V))? ->? list(K3, V3)?

public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {

? void reduce(K2 key, Iterator<V2> values,

????????????? OutputCollector<K3, V3> output, Reporter reporter)

??? throws IOException;

}

?

對于上面的例子,則實現的mapper如下:

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

??? @Override

??? public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

??????? String line = value.toString();

??????? String year = line.substring(15, 19);

??????? int airTemperature;

??????? if (line.charAt(25) == '+') {

??????????? airTemperature = Integer.parseInt(line.substring(26, 30));

??????? } else {

??????????? airTemperature = Integer.parseInt(line.substring(25, 30));

??????? }

??????? output.collect(new Text(year), new IntWritable(airTemperature));

??? }

}

實現的reducer如下:

public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

??? public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

??????? int maxValue = Integer.MIN_VALUE;

??????? while (values.hasNext()) {

??????????? maxValue = Math.max(maxValue, values.next().get());

??????? }

??????? output.collect(key, new IntWritable(maxValue));

??? }

}

?

欲運行上面實現的Mapper和Reduce,則需要生成一個Map-Reduce得任務(Job),其基本包括以下三部分:

  • 輸入的數據,也即需要處理的數據
  • Map-Reduce程序,也即上面實現的Mapper和Reducer
  • 此任務的配置項JobConf

欲配置JobConf,需要大致了解Hadoop運行job的基本原理:

  • Hadoop將Job分成task進行處理,共兩種task:map task和reduce task
  • Hadoop有兩類的節點控制job的運行:JobTracker和TaskTracker
    • JobTracker協調整個job的運行,將task分配到不同的TaskTracker上
    • TaskTracker負責運行task,并將結果返回給JobTracker
  • Hadoop將輸入數據分成固定大小的塊,我們稱之input split
  • Hadoop為每一個input split創建一個task,在此task中依次處理此split中的一個個記錄(record)
  • Hadoop會盡量讓輸入數據塊所在的DataNode和task所執行的DataNode(每個DataNode上都有一個TaskTracker)為同一個,可以提高運行效率,所以input split的大小也一般是HDFS的block的大小。
  • Reduce task的輸入一般為Map Task的輸出,Reduce Task的輸出為整個job的輸出,保存在HDFS上。
  • 在reduce中,相同key的所有的記錄一定會到同一個TaskTracker上面運行,然而不同的key可以在不同的TaskTracker上面運行,我們稱之為partition
    • partition的規則為:(K2, V2) –> Integer, 也即根據K2,生成一個partition的id,具有相同id的K2則進入同一個partition,被同一個TaskTracker上被同一個Reducer進行處理。

public interface Partitioner<K2, V2> extends JobConfigurable {

? int getPartition(K2 key, V2 value, int numPartitions);

}

下圖大概描述了Map-Reduce的Job運行的基本原理:

image

?

下面我們討論JobConf,其有很多的項可以進行配置:

  • setInputFormat:設置map的輸入格式,默認為TextInputFormat,key為LongWritable, value為Text
  • setNumMapTasks:設置map任務的個數,此設置通常不起作用,map任務的個數取決于輸入的數據所能分成的input split的個數
  • setMapperClass:設置Mapper,默認為IdentityMapper
  • setMapRunnerClass:設置MapRunner, map task是由MapRunner運行的,默認為MapRunnable,其功能為讀取input split的一個個record,依次調用Mapper的map函數
  • setMapOutputKeyClass和setMapOutputValueClass:設置Mapper的輸出的key-value對的格式
  • setOutputKeyClass和setOutputValueClass:設置Reducer的輸出的key-value對的格式
  • setPartitionerClass和setNumReduceTasks:設置Partitioner,默認為HashPartitioner,其根據key的hash值來決定進入哪個partition,每個partition被一個reduce task處理,所以partition的個數等于reduce task的個數
  • setReducerClass:設置Reducer,默認為IdentityReducer
  • setOutputFormat:設置任務的輸出格式,默認為TextOutputFormat
  • FileInputFormat.addInputPath:設置輸入文件的路徑,可以使一個文件,一個路徑,一個通配符。可以被調用多次添加多個路徑
  • FileOutputFormat.setOutputPath:設置輸出文件的路徑,在job運行前此路徑不應該存在

當然不用所有的都設置,由上面的例子,可以編寫Map-Reduce程序如下:

public class MaxTemperature {

??? public static void main(String[] args) throws IOException {

??????? if (args.length != 2) {

??????????? System.err.println("Usage: MaxTemperature <input path> <output path>");

??????????? System.exit(-1);

??????? }

??????? JobConf conf = new JobConf(MaxTemperature.class);

??????? conf.setJobName("Max temperature");

??????? FileInputFormat.addInputPath(conf, new Path(args[0]));

??????? FileOutputFormat.setOutputPath(conf, new Path(args[1]));

??????? conf.setMapperClass(MaxTemperatureMapper.class);

??????? conf.setReducerClass(MaxTemperatureReducer.class);

??????? conf.setOutputKeyClass(Text.class);

??????? conf.setOutputValueClass(IntWritable.class);

??????? JobClient.runJob(conf);

??? }

}

3、Map-Reduce數據流(data flow)

Map-Reduce的處理過程主要涉及以下四個部分:

  • 客戶端Client:用于提交Map-reduce任務job
  • JobTracker:協調整個job的運行,其為一個Java進程,其main class為JobTracker
  • TaskTracker:運行此job的task,處理input split,其為一個Java進程,其main class為TaskTracker
  • HDFS:hadoop分布式文件系統,用于在各個進程間共享Job相關的文件

image

3.1、任務提交

JobClient.runJob()創建一個新的JobClient實例,調用其submitJob()函數。

  • 向JobTracker請求一個新的job ID
  • 檢測此job的output配置
  • 計算此job的input splits
  • 將Job運行所需的資源拷貝到JobTracker的文件系統中的文件夾中,包括job jar文件,job.xml配置文件,input splits
  • 通知JobTracker此Job已經可以運行了

提交任務后,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務運行完畢。

?

3.2、任務初始化

?

當JobTracker收到submitJob調用的時候,將此任務放到一個隊列中,job調度器將從隊列中獲取任務并初始化任務。

初始化首先創建一個對象來封裝job運行的tasks, status以及progress。

在創建task之前,job調度器首先從共享文件系統中獲得JobClient計算出的input splits。

其為每個input split創建一個map task。

每個task被分配一個ID。

?

3.3、任務分配

?

TaskTracker周期性的向JobTracker發送heartbeat。

在heartbeat中,TaskTracker告知JobTracker其已經準備運行一個新的task,JobTracker將分配給其一個task。

在JobTracker為TaskTracker選擇一個task之前,JobTracker必須首先按照優先級選擇一個Job,在最高優先級的Job中選擇一個task。

TaskTracker有固定數量的位置來運行map task或者reduce task。

默認的調度器對待map task優先于reduce task

當選擇reduce task的時候,JobTracker并不在多個task之間進行選擇,而是直接取下一個,因為reduce task沒有數據本地化的概念。

?

3.4、任務執行

?

TaskTracker被分配了一個task,下面便要運行此task。

首先,TaskTracker將此job的jar從共享文件系統中拷貝到TaskTracker的文件系統中。

TaskTracker從distributed cache中將job運行所需要的文件拷貝到本地磁盤。

其次,其為每個task創建一個本地的工作目錄,將jar解壓縮到文件目錄中。

其三,其創建一個TaskRunner來運行task。

TaskRunner創建一個新的JVM來運行task。

被創建的child JVM和TaskTracker通信來報告運行進度。

?

3.4.1、Map的過程

MapRunnable從input split中讀取一個個的record,然后依次調用Mapper的map函數,將結果輸出。

map的輸出并不是直接寫入硬盤,而是將其寫入緩存memory buffer。

當buffer中數據的到達一定的大小,一個背景線程將數據開始寫入硬盤。

在寫入硬盤之前,內存中的數據通過partitioner分成多個partition。

在同一個partition中,背景線程會將數據按照key在內存中排序。

每次從內存向硬盤flush數據,都生成一個新的spill文件。

當此task結束之前,所有的spill文件被合并為一個整的被partition的而且排好序的文件。

reducer可以通過http協議請求map的輸出文件,tracker.http.threads可以設置http服務線程數。

3.4.2、Reduce的過程

當map task結束后,其通知TaskTracker,TaskTracker通知JobTracker。

對于一個job,JobTracker知道TaskTracer和map輸出的對應關系。

reducer中一個線程周期性的向JobTracker請求map輸出的位置,直到其取得了所有的map輸出。

reduce task需要其對應的partition的所有的map輸出。

reduce task中的copy過程即當每個map task結束的時候就開始拷貝輸出,因為不同的map task完成時間不同。

reduce task中有多個copy線程,可以并行拷貝map輸出。

當很多map輸出拷貝到reduce task后,一個背景線程將其合并為一個大的排好序的文件。

當所有的map輸出都拷貝到reduce task后,進入sort過程,將所有的map輸出合并為大的排好序的文件。

最后進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每個key,最后的結果寫入HDFS。

?

image

?

3.5、任務結束

?

當JobTracker獲得最后一個task的運行成功的報告后,將job得狀態改為成功。

當JobClient從JobTracker輪詢的時候,發現此job已經成功結束,則向用戶打印消息,從runJob函數中返回。

轉載于:https://www.cnblogs.com/JohnLiang/archive/2011/11/09/2243448.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/276945.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/276945.shtml
英文地址,請注明出處:http://en.pswp.cn/news/276945.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java之泛型T T與T的用法

<T> T表示返回值是一個泛型&#xff0c;傳遞啥&#xff0c;就返回啥類型的數據&#xff0c;而單獨的T就是表示限制你傳遞的參數類型&#xff0c;這個案例中&#xff0c;通過一個泛型的返回方式&#xff0c;獲取每一個集合中的第一個數據&#xff0c; 通過返回值<T>…

UrlReWriter 使用經驗小結

UrlRewriter 是微軟封裝好了的一個URL重寫組件。使用它可以讓我節約很多自已開發的時間。 好了&#xff0c;開始講述我的應用經驗&#xff0c;這只是很菜鳥的經驗&#xff0c;高手就不用看了。 第一步&#xff0c;請從此下載此組件。解壓&#xff0c;把UrlRewriter.dll copy到你…

clickhouse大數據分析技術與實戰_從銷售到經營——大客戶銷售策略與實戰技術...

對于首席客戶代表而言&#xff0c;要走出困局&#xff0c;所需要大客戶銷售策略性的訓練&#xff0c;而不是像基層客戶經理的銷售技巧訓練一樣&#xff1b;新業務的學習固然重要&#xff0c;但更重要的是轉化成實戰績效。從組織變革角度&#xff0c;每次成功的業務轉型背后都意…

Hadoop_NameNode_代碼分析_目錄樹(2)

&#xff08;1&#xff09;NameNode的內存中保存了龐大的目錄樹結構&#xff0c;這個結構用來保存文件目錄結構和文件Block之間的映射&#xff0c;這種結構關系會固化在磁盤上&#xff0c;但是對樹的改動頻繁發生&#xff0c;什么時候將樹寫入磁盤呢&#xff1f;把每次操作應用…

詳解 Visual C# 數據庫編程

詳解 Visual C# 數據庫編程 ******2007-11-05 14:34關于數據庫編程&#xff0c;微軟提供了一個統一的數據對象訪問模型&#xff0c;在Visual Studio6.0中稱為ADO&#xff0c;在.NET中則統一為ADO.NET,掌握ADO.NET就等于掌握了數據庫編程的核心。 針對數據庫編程始終是程序設計語…

swift - 根試圖控制器的手勢返回沖突 - push 新的tabbar控制器手勢沖突

1. 禁用手勢 和開啟手勢extension JYRTSShopListController: UIGestureRecognizerDelegate {/// 禁止使用手勢返回func forbidhenSideBack() {self.isCanSideBack falseif (self.navigationController?.responds(to:#selector(getter: self.navigationController?.interacti…

Acer 4750 安裝黑蘋果_黑蘋果系統安裝通用教程圖文版

在開始之前&#xff0c;不管你要安裝的是臺式組裝機&#xff0c;臺式品牌機&#xff0c;一體機&#xff0c;還是筆記本&#xff0c;都要大概了解一下硬件信息。因為黑蘋果的安裝確實比安裝Windows的系統要復雜的多。不管是前期準備工作&#xff0c;安裝&#xff0c;還是安裝之后…

IIS7中使用集成模式時出現HttpException

癥狀:在iis7在使用集成模式的Pool可能出現HttpException,而程序在經典模式下能正常運行. 解決方法:http://mvolo.com/blogs/serverside/archive/2007/11/10/Integrated-mode-Request-is-not-available-in-this-context-in-Application_5F00_Start.aspx 轉載于:https://www.cnbl…

教你學會七種維護服務器安全最佳技巧

導讀&#xff1a; 你的計算機上是否存在有至關重要的數據,并且不希望它們落入惡人之手呢?當然,它們完全有這種可能 。而且,近些年來,服務器遭受的風險也比以前更大了.越來越多的病毒,心懷不軌的黑客,以及那些商業間諜都將服務器作為了自己的目標.很顯然,服務器的安全問題是不容…

mysql 快速生成百萬條測試數據

轉自&#xff1a;http://www.cnblogs.com/jiangxiaobo/p/6101072.html 1、生成思路 利用mysql內存表插入速度快的特點&#xff0c;先利用函數和存儲過程在內存表中生成數據&#xff0c;然后再從內存表插入普通表中2、創建內存表及普通表 CREATE TABLE vote_record_memory (id I…

自動化專業學python有用嗎-馬哥教育官網-專業Linux培訓班,Python培訓機構

今天小編要來說一下Python自動化的學習思路&#xff0c; 對于剛剛進入的測試行業的人來說&#xff0c;未來該怎么樣朝著自動化方向發展&#xff0c;即使接觸到了自動化測試&#xff0c;又該從何下手去學呢&#xff1f; 簡單的說&#xff0c; 做測試做的好&#xff0c;會了接口&…

java JVM

每一個Java虛擬機都由一個類加載器子系統&#xff08;class loader subsystem&#xff09;&#xff0c;負責加載程序中的類型&#xff08;類和接口&#xff09;&#xff0c;并賦予唯一的名字。每一個Java虛擬機都有一個執行引擎&#xff08;execution engine&#xff09;負責執…

馬化騰聯手10余位科學家發起科學探索獎,騰訊基金投入10億元啟動資金

11月9日消息&#xff0c;據騰訊科技報道&#xff0c;騰訊基金會于騰訊公司成立20周年之際宣布&#xff0c;騰訊公司董事會主席兼首席執行官&#xff0c;騰訊基金會發起人馬化騰&#xff0c;與北京大學教授饒毅&#xff0c;攜手楊振寧、毛淑德、何華武、鄔賀銓、李培根、陳十一、…

給Domino系統管理員的十二項建議

Domino系統管理員的日常工作就是維護Domino系統的正常運行。以下簡要說明了管理員所必做的一些工作。對于系統管理員&#xff0c;特別是新建系統的管理員來說&#xff0c;這些建議能幫助他們完成基本的維護工作。 根據許多資深的Domino管理員和咨詢人員的經驗&#xff0c;我們對…

delphi 軟件在線人數統計_8款值得學習的科研論文作圖軟件

寫在前面科研繪圖在國外已經非常流行&#xff0c;且被高度重視&#xff0c;國內科研人員也越來越重視科研方面的繪圖。不少科研工作者&#xff0c;包括在讀的博士生、研究生等可能都有這樣的體會&#xff1a;千辛萬苦得來的實驗結果&#xff0c;不知道該如何展現給別人?曾經有…

技術管理—管理書籍推薦

技術出身&#xff0c;考慮接觸下管理方面的知識。也許管理真的適合你&#xff0c;角色認知?角色實踐?角色勝任&#xff01;最后愛上它&#xff01; 我最喜歡的一本書--高效能人士的七個習慣 作者&#xff1a;史蒂芬柯維&#xff08;Stephen Richards Covey&#xff09; 該…

JS 幾種數據類型及其轉換

ECMAScript 標準定義了 7 種數據類型: Number&#xff1b;String&#xff1b;Boolean&#xff1b;Symbol&#xff1b;Null&#xff1b;Undefined&#xff1b;Object 。通常&#xff0c;數值、字符串、布爾值、undefined和null這五種類型&#xff0c;合稱為簡單類型的值&#xf…

網絡虛擬化有幾種實現方式_停車場管理系統的防砸車功能有幾種方式?如何實現?...

原標題&#xff1a;停車場管理系統的防砸車功能有幾種方式&#xff1f;如何實現&#xff1f;前言0101正文一、壓力波防砸裝置也叫遇阻防砸&#xff0c;主要是安裝遇阻返回裝置&#xff0c;當道閘桿下落過程中接觸到車輛或者行人(接觸力度是可以調節的)&#xff0c;裝置道閘桿底…

Socket 死連接詳解

當使用 Socket 進行通信時&#xff0c;由于各種不同的因素&#xff0c;都有可能導致死連接停留在服務器端&#xff0c;假如服務端需要處理的連接較多&#xff0c;就有可能造成服務器資源嚴重浪費&#xff0c;對此&#xff0c;本文將闡述其原理以及解決方法。 在寫 Socket 進行通…

[Swift]LeetCode1146. 快照數組 | Snapshot Array

★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★?微信公眾號&#xff1a;山青詠芝&#xff08;shanqingyongzhi&#xff09;?博客園地址&#xff1a;山青詠芝&#xff08;https://www.cnblogs.com/strengthen/&#xff09;?GitHub地址&a…