MapReduce綜合學習含Wordcount案例

文章目錄

    • MapReduce簡介
      • MapTask
      • ReduceTask
      • Mapper階段解讀
      • Reducer階段解讀
    • MapReduce適用的問題
    • MapReduce的特點
    • MapReduce基本思想
      • 大數據處理思想:分而治之
      • 構建抽象模型:Map 函數和 Reduce 函數
      • 上升到架構:并行自動化并隱藏底層細節
    • MapReduce計算架構提供的主要功能
    • MapReduce框架中的名詞解釋
    • MapReduce與YARN
    • MapReduce的原理
    • MapReduce進程
    • 常用數據序列化類型
    • MapReduce實際處理流程
    • FileInputFormat切片機制
    • Mapreduce的shuffle機制
    • MapReduce案例(wordcount)

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

MapReduce簡介

MapReduce是一種可用于數據處理的編程框架。MapReduce采用"分而治之"的思想,把對大規模數據集的操作,分發給一個主節點管理下的各個分節點共同完成,然后通過整合各個節點的中間結果,得到最終結果。簡單地說,MapReduce就是"任務的分解與結果的匯總"。

在分布式計算中,MapReduce框架負責處理了并行編程中分布式存儲、工作調度、負載均衡、容錯均衡、容錯處理以及網絡通信等復雜問題,把處理過程高度抽象為兩個函數:map和reduce,map負責把任務分解成多個任務,reduce負責把分解后多任務處理的結果匯總起來。
Map/Reduce是一個用于大規模數據處理的分布式計算編程模型。
MapReduce程序的工作分兩個階段進行:

Map階段(映射)
這個函數單獨地應用在每個單元格上的操作就屬于映射(Map)。
由一個或者多個MapTask組成。每個MapTask處理輸入數據集合中的一片數據(InputSplit),并將產生的若干個數據片段(一個數據文件)寫到本地磁盤上。

Reduce階段
由一個或者多個ReduceTask組成。ReduceTask則從每個MapTask上遠程拷貝相應的數據片段,經分組聚集和歸約后,將結果寫到HDFS上作為最終結果。

使用需要定義map函數和reduce函數
map函數用來處理原始數據(初始鍵值對)以生成一批中間的key/value對
reduce函數將 所有這些中間的有著相同key的values合并起來。
輸入到每一個階段均是鍵 - 值對。

MapTask

執行過程概述:
首先,通過用戶提供的InputFormat將對應的InputSplit解析成一系列key/value,并依次交給用戶編寫的map()函數處理,接著按照指定的Partition對數據分片,以確定每個key/value將交給哪個ReduceTask處理,之后將數據交給用戶定義的Combiner進行一次本地合并(沒有則直接跳過),最后即將處理結果保存到本地磁盤上。
具體步驟:
(1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該階段只要是將解析出的key/value交給用戶編寫的map()函數處理,并產生一系列新的key/value。
(3)Collect階段:在用戶編寫的map()函數中,當數據處理完成后,一般會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的ley/value分片(通過調用Partition),并寫入一個環形內存緩沖區中。
(4)Spill階段:即“溢寫”,當環形緩沖區滿后,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,并在必要時對數據進行合并操作。
(5)Combine階段:當所有數據處理完成后,MapTask對所有臨時文件進行一次合并,以確保最終只會生成一個數據文件。

ReduceTask

執行過程概述:
ReduceTask的輸入數據來自各個MapTask,因此首先通過HTTP請求從各個已經運行完成的MapTask所在TaskTracker機器上拷貝相應的數據分片,待所有數據拷貝完成后,再以key為關鍵字對所有數據進行排序(sort),通過排序,key相同的記錄聚集到一起形成若干分組,然后將分組數據交給用戶編寫的reduce()函數處理,并將數據結果直接寫到HDFS上作為最終輸出結果。
具體步驟:
(1)Shuffle階段:也稱為Copy階段。ReduceTask從各個MapTask所在的TaskTracker上遠程拷貝一片數據,并針對某一片數據,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內存中。
(2)Merge階段:在遠程拷貝數據的同時,ReduceTask啟動了兩個后臺線程對內存和磁盤上的文件進行合并,以防止內存使用過多或磁盤上的文件過多,并且可以為后面整體的歸并排序減負,提升排序效率。
(3)Sort階段:按照MapReduce的語義,用戶編寫的reduce()函數輸入數據是按key進行聚集的一組數據。為了將key相同的數據聚集在一起,Hadoop采用了基于排序的策略。由于各個MapTask已經實現了自己的處理結果進行了局部排序,因此,ReduceTask只需要對所有數據進行一次歸并排序即可。
(4)Reduce階段:在該階段中,ReduceTask將每組數據依次交給用戶編寫的reduce()函數處理。
(5)Write階段:reduce()函數將計算結果寫到HDFS上。

在這里插入圖片描述

Mapper階段解讀

Mapper的輸入文件位于HDFS上,先對輸入數據切分,每一個split分塊對應一個Mapper任務,通過RecordReader對象從輸入分塊中讀取并生成鍵值對,然后執行Map函數,輸出的中間鍵值對被partion()函數區分并寫入緩沖區,同時調用sort()進行排序。
在這里插入圖片描述

Reducer階段解讀

Reducer主要有三個階段:Shuffle、Sort、Reduce

1 . Shuffle階段:

Reducer的輸入就是Mapper階段已經排好序的輸出。在這個階段,框架為每個Reducer任務獲得所有Mapper輸出中與之相關的分塊,把Map端的輸出結果傳送到Reduce端,大量操作是數據復制(因此也稱數據復制階段)。

2 . Sort階段:

框架按照key對Reducer的輸入進行分組(Mapper階段時每一個Map任務對于它本身的輸出結果會有一個排序分組,而不同Map任務的輸出中可能會有相同的key,因此要再一次分組)。Shuffle和Sort是同時進行的,Map的輸出也是一邊被取回一邊被合并。排序是基于內存和磁盤的混合模式進行,經過多次Merge才能完成排序。(PS:如果兩次排序分組規則需要不同,可以指定一個Comparator比較器來控制分組規則)。

3 . Reduce階段:

通過Shuffle和Sort操作后得到的<key, (list of values)>被送到Reducer的reduce()函數中執行,針對每一個<key, (list of values)>會調用一次reduce()函數。
在這里插入圖片描述

MapReduce適用的問題

用MapReduce來處理的數據集(或任務)必須具備這樣的特點:待處理的數據集可以分解成許多小的數據集,而且每一個小數據集都可以完全并行地進行處理。

MapReduce的特點

1)MapReduce 易于編程 。它簡單的實現一些接口,就可以完成一個分布式程序

2)良好的 擴展性 。當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴展它的計算能力。

3)高容錯性 。比如其中一臺機器掛了,它可以把上面的計算任務轉移到另外一個節點上面上運行,不至于這個任務運行失敗,而且這個過程不需要人工參與,而完全是由Hadoop 內部完成的。

4)適合 PB 級以上海量數據的離線處理 。比如像毫秒級別的返回一個結果,MapReduce 很難做到。MapReduce 雖然具有很多的優勢,但是它也有不擅長的地方。這里的不擅長不代表它不能做,而是在有些場景下實現的效果差,并不適合 MapReduce 來處理,主要表現在以下幾個方面。

1.實時計算。
2.流式計算。流式計算的輸入數據時動態的,而 MapReduce 的輸入數據集是靜態的,不能動態變化。這是因為 MapReduce 自身的設計特點決定了數據源必須是靜態的。

3.DAG(有向圖)計算。多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出。在這種情況下,MapReduce 并不是不能做,而是使用后,每個MapReduce 作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導致性能非常的低下。

MapReduce基本思想

大數據處理思想:分而治之

并行計算的第一個重要問題是如何劃分計算任務或者計算數據以便對劃分的子任務或數據塊同時進行計算。但是,一些計算問題的前后數據項之間存在很強的依賴關系,無法進行劃分,只能串行計算。

對于不可拆分的計算任務或相互間有依賴關系的數據無法進行并行計算。一個大數據若可以分為具有同樣計算過程的數據塊,并且這些數據塊之間不存在數據依賴關系,則提高處理速度的最好辦法就是并行計算。

構建抽象模型:Map 函數和 Reduce 函數

Map 函數和 Reduce 函數都是以 <key,value>作為輸入的,按一定的映射規則轉換成另一個或一批 <key,value> 進行輸出。
在這里插入圖片描述

1) Map:<k1,v1>List(<K2,V2>)

輸入:鍵值對<k1,v1>表示的數據。
處理:數據記錄將以“鍵值對”形式傳入 Map 函數;Map 函數將處理這些鍵值對,并以另一種鍵值對形式輸出中間結果 List(<K2,V2>)。
輸出:鍵值對List(<K2,V2>)示的一組中間數據。

2) Reduce:<K2,List(V2)>→List(<K3,V3>)

輸入:由 Map 輸出的一組鍵值對 List(<K2,V2>)將被進行合并處理,同樣主鍵下的不同數值會合并到一個列表List(V2)中,故 Reduce 的輸入為<K2,List(V2)>。

處理:對傳入的中間結果列表數據進行某種整理或進一步的處理,并產生最終的輸出結果List(<K3,V3>)。
輸出:最終輸出結果List(<K3,V3>)。

基于 MapReduce 的并行計算模型如圖 3 所示。各個 Map 函數對所劃分的數據并行處理,從不同的輸入數據產生不同的中間結果。

各個 Reduce 函數也各自并行計算,負責處理不同的中間結果。進行 Reduce 函數處理之前,必須等到所有的 Map 函數完成。

因此,在進入 Reduce 函數前需要有一個同步屏障;這個階段也負責對 Map 函數的中間結果數據進行收集整理處理,以便 Reduce 函數能更有效地計算最終結果,最終匯總所有 Reduce 函數的輸出結果即可獲得最終結果。

						基于MapReduce的并行計算模型

在這里插入圖片描述

Map 函數的輸入數據來自于 HDFS的文件塊,這些文件塊的格式是任意類型的,可以是文檔,可以是數字,也可以是二進制。文件塊是一系列元素組成的集合,這些元素也可以是任意類型的。

Map 函數首先將輸入的數據塊轉換成 <key,Value> 形式的鍵值對,鍵和值的類型也是任意的。

Map 函數的作用就是把每一個輸入的鍵值對映射成一個或一批新的鍵值對。輸出鍵值對里的鍵與輸入鍵值對里的鍵可以是不同的。

需要注意的是,Map 函數的輸出格式與 Reduce 函數的輸入格式并不相同,前者是 List(<K2,V2>) 格式,后者是<K2,List(V2)> 的格式。所以,Map 函數的輸出并不能直接作為 Reduce 函數的輸入。

MapReduce 框架會把 Map 函數的輸出按照鍵進行歸類,把具有相同鍵的鍵值對進行合并,合并成 <K2,List(V2)>
的格式,其中,List(V2) 是一批屬于同一個 K2 的 value。

Reduce 函數的任務是將輸入的一系列具有相同鍵的值以某種方式組合起來,然后輸出處理后的鍵值對,輸出結果一般會合并成一個文件。

為了提高 Reduce 的處理效率,用戶也可以指定 Reduce 任務的個數,也就是說,可以有多個 Reduce 并發來完成規約操作。

MapReduce 框架會根據設定的規則把每個鍵值對輸入到相應的 Reduce 任務進行處理。這種情況下,MapReduce將會輸出多個文件。
一般情況下,并不需要把這些輸出文件進行合并,因為這些文件也許會作為下一個 MapRedue 任務的輸入。

上升到架構:并行自動化并隱藏底層細節

MapReduce 提供了一個統一的計算框架,來完成計算任務的劃分和調度,數據的分布存儲和劃分,處理數據與計算任務的同步,結果數據的收集整理,系統通信、負載平衡、計算性能優化、系統結點出錯檢測和失效恢復處理等。

MapReduce 通過抽象模型和計算框架把需要做什么與具體怎么做分開了,為程序員提供了一個抽象和高層的編程接口和框架,程序員僅需要關心其應用層的具體計算問題,僅需編寫少量的處理應用本身計算問題的程序代碼。

與具體完成并行計算任務相關的諸多系統層細節被隱藏起來,交給計算框架去處理:從分布代碼的執行,到大到數千個,小到單個的結點集群的自動調度使用。

MapReduce計算架構提供的主要功能

1)任務調度

提交的一個計算作業(Job)將被劃分為很多個計算任務(Tasks)。

任務調度功能主要負責為這些劃分后的計算任務分配和調度計算結點(Map 結點或 Reduce 結點),同時負責監控這些結點的執行狀態,以及 Map 結點執行的同步控制,也負責進行一些計算性能優化處理。例如,對最慢的計算任務采用多備份執行,選最快完成者作為結果。

2)數據/程序互定位

為了減少數據通信量,一個基本原則是本地化數據處理,即一個計算結點盡可能處理其本地磁盤上分布存儲的數據,這實現了代碼向數據的遷移。

當無法進行這種本地化數據處理時,再尋找其他可用結點并將數據從網絡上傳送給該結點(數據向代碼遷移),但將盡可能從數據所在的本地機架上尋找可用結點以減少通信延遲。

3)出錯處理

在以低端商用服務器構成的大規模 MapReduce 計算集群中,結點硬件(主機、茲盤、內存等)出錯和軟件有缺陷是常態。因此,MapReduce 架構需要能檢測并隔離出錯結點,并調度分配新的結點接管出錯結點的計算任務。

4)分布式數據存儲與文件管理

海量數據處理需要一個良好的分布數據存儲和文件管理系統作為支撐,該系統能夠把海量數據分布存儲在各個結點的本地磁盤上,但保持整個數據在邏輯上成為一個完整的數據文件。

為了提供數據存儲容錯機制,該系統還要提供數據塊的多備份存儲管理能力。

5)Combiner 和 Partitioner

為了減少數據通信開銷,中間結果數據進入 Reduce 結點前需要進行合并(Combine)處理,即把具有同樣主鍵的數據合并到一起避免重復傳送。

一個 Reduce 結點所處理的數據可能會來自多個 Map 結點,因此,Map 結點輸出的中間結果需使用一定的策略進行適當的劃分(Partition)處理,保證相關數據發送到同一個 Reduce 結點上。

MapReduce框架中的名詞解釋

split:
分片是指MapReduce框架將數據源根據一定的規則將源數據分成若干個小數據的過程;其中,一個小數據集,也被稱為一個分片。

Map:
Map有兩層含義:

其一、是指MapReduce框架中的Map過程,即將一個分片根據用戶定義的Map邏輯處理后,經由MapReduce框架處理,形成輸出結果,供后續Reduce過程使用;
其二,是指用戶定義Java程序實現Mapper類的map接口的用戶自定義邏輯,此時通常被稱為mapper。
Reduce:
Reduce也有兩層含義:

其一,是指MapReduce框架中的Reduce過程,即將Map的結果作為輸入,根據用戶定義的Reduce邏輯,將結果處理并匯總,輸出最后的結果;
其二,是指用戶定義Java程序實現Reducer類的reduce接口的用戶自定義邏輯,此時通常被稱為reducer。
Combine:
Combine是一個可由用戶自定的過程,類似于Map和Reduce,MapReduce框架會在Map和Reduce過程中間調用Combine邏輯(會在下面章節中仔細講解),通常Combine和reduce的用戶代碼是一樣的(也可被稱為本地的reduce過程),但是請注意并不是所有用MapReduce框架實現的算法都適合增加Combine過程(比如求平均值)。

Partition:
在MapReduce框架中一個split對應一個map,一個partiton對應一個reduce(無partition指定時,由用戶配置項指定,默認為1個)。 reduce的個數決定了輸出文件的個數。比如,在需求中,數據是從對每個省匯總而成,要求計算結果按照省來存放,則需要根據源數據中的表明省的字段分區,用戶自定義partition類,進行分區。

MapReduce與YARN

YARN概述

Yarn是一個資源調度平臺,負責為運算程序提供服務器運算資源,相當于一個分布式的操作系統平臺,而mapreduce等運算程序則相當于運行于操作系統之上的應用程序。

YARN中的重要概念
  1) yarn并不清楚用戶提交的程序的運行機制
  2) yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)
  3) yarn中的主管角色叫ResourceManager
  4) yarn中具體提供運算資源的角色叫NodeManager
  5) 這樣一來,yarn其實就與運行的用戶程序完全解耦,就意味著yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez等等 。
  6) 所以,spark、storm等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規范的資源請求機制即可
  7) Yarn就成為一個通用的資源調度平臺,從此,企業中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享

MapReduce的原理

在這里插入圖片描述

map以(key, value)的形式輸入數據并根據編寫的map()處理數據,輸出為(key,
value)的形式,map的輸出經過中間階段(叫做shuffle)的處理,再以(key,value)的形式傳入reduce()內進行處理,最后以(key, value)的形式輸出最終結果。

一個MapReduce作業(Job)是客戶端要執行的一個工作單元:它包括輸入數據,MapReduce程序與配置信息.Hadoop將作業分成若干個任務來執行,它包括兩類任務:map任務與reduce任務.這些任務分布在集群的不同節點上,由YARN負責調度.如果一個任務失敗,它將在另一個不同的節點上重新調度運行.

Hadoop將MapReduce的輸入數據劃分成等長的小數據塊,稱為輸入分片(input split)或簡稱"分片".Hadoop為每個分片創建一個map任務,并由該任務來運行用戶自己定義的map函數從而處理分片中的每條記錄.

map任務與reduce任務之間存在一個shuffle,這是MapReduce中最為消耗時間的過程,因為它對數據進行了多次處理,其中包括排序,分區,溢寫,combiner等過程.combiner就是一個map端的reduce,可以讓數據更加緊湊,所以一般都指定為reduce()所在的類(注意,有些任務中不適用combiner).這一切的處理都是為了減少map任務與reduce任務之間的網絡傳輸,畢竟集群中最為稀缺的資源就是網絡帶寬,應該想盡辦法節省。

分布式的運算程序往往需要分成至少2個階段:

第一個階段的MapTask并發實例,完全并行運行,互不相干。
第二個階段的ReduceTask并發實例互不相干,但是他們的數據依賴于上一個階段的所有MapTask并發實例的輸出。
MapReduce編程模型只能包含 一個Map階段 和 一個Reduce階段,如果用戶的業務邏輯非常復雜,那就只能 多個MapReduce程序,串行運行。

在這里插入圖片描述

MapReduce進程

一個完整的MapReduce程序在分布式運行時有三類實例進程:
(1)MrAppMaster:負責整個程序的 過程調度 及 狀態協調。
(2)MapTask:負責 Map階段的 整個數據處理流程。
(3)ReduceTask:負責 Reduce階段的 整個數據處理流程。

常用數據序列化類型

在這里插入圖片描述

MapReduce實際處理流程

mapreduce 其實是分治算法的一種現,所謂分治算法就是“就是分而治之 ,將大的問題分解為相同類型的子問題(最好具有相同的規模),對子問題進行求解,然后合并成大問題的解。

mapreduce就是分治法的一種,將輸入進行分片,然后交給不同的task進行處理,然后合并成最終的解。
mapreduce實際的處理過程可以理解為Input->Map->Sort->Combine->Partition->Reduce->Output。

1)Input階段

數據以一定的格式傳遞給Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以設置,也可以自定義分片函數。

2)map階段

對輸入的(key,value)進行處理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass進行設置。

3)Sort階段

對于Mapper的輸出進行排序,使用Job.setOutputKeyComparatorClass進行設置,然后定義排序規則。

4)Combine階段

這個階段對于Sort之后又相同key的結果進行合并,使用Job.setCombinerClass進行設置,也可以自定義Combine Class類。

5)Partition階段

將Mapper的中間結果按照key的范圍劃分為R份(Reduce作業的個數),默認使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也可以自定義劃分的函數。

使用Job.setPartitionClass設置。

6)Reduce階段

對于Mapper階段的結果進行進一步處理,Job.setReducerClass進行設置自定義的Reduce類。

7)Output階段
 Reducer輸出數據的格式。

FileInputFormat切片機制

1)FileInputFormat切片機制切片定義在InputFormat類中的getSplit()方法
2)FileInputFormat中默認的切片機制:
    簡單地按照文件的內容長度進行切片
    切片大小,默認等于block大小
    切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片 。比如待處理數據有兩個文件:
    file1.txt 320M
    file2.txt 10M

經過FileInputFormat的切片機制運算后,形成的切片信息如下:

file1.txt.split1-- 0~128
    file1.txt.split2-- 128~256
    file1.txt.split3-- 256~320
    file2.txt.split1-- 0~10M
    
3)FileInputFormat中切片的大小的參數配置
    通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定
minsize:默認值:1
配置參數: mapreduce.input.fileinputformat.split.minsize
maxsize:默認值:Long.MAXValue
配置參數:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默認情況下,切片大小=blocksize
maxsize(切片最大值):
  參數如果調得比blocksize小,則會讓切片變小,而且就等于配置的這個參數的值
minsize (切片最小值):
  參數調的比blockSize大,則可以讓切片變得比blocksize還大

選擇并發數的影響因素:
    運算節點的硬件配置
    運算任務的類型:CPU密集型還是IO密集型
    運算任務的數據量

Mapreduce的shuffle機制

MapReduce計算模型主要由三個階段構成:Map、Shuffle、Reduce。
(1)Map是映射,負責數據的過濾分類,將原始數據轉化為鍵值對;
(2)Reduce是合并,將具有相同key值的value進行處理后再輸出新的鍵值對作為最終結果;
(3)為了讓Reduce可以并行處理Map的結果,必須對Map的輸出進行一定的排序與分割,然后再交給對應的Reduce,這個過程就是Shuffle。Shuffle過程包含Map Shuffle和Reduce Shuffle。
1)概述

mapreduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle。
shuffle: 洗牌、發牌——(核心機制:數據分區,排序,緩存)。
具體來說:就是將maptask輸出的處理結果數據,分發給reducetask,并在分發的過程中,對數據按key進行了分區和排序。
分區partition(確定哪個數據進入哪個reduce)
   Sort根據key排序
   Combiner進行局部value的合并

2)詳細流程  
    1、 maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
    2、 從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
    3、 多個溢出文件會被合并成大的溢出文件
    4、 在溢出過程中,及合并的過程中,都要調用partitoner進行分組和針對key進行排序
    5、 reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
    6、 reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合并(歸并排序)
    7、 合并成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
    
Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。
緩沖區的大小可以通過參數調整, 參數:io.sort.mb 默認100M

MapReduce案例(wordcount)

wordcount是最簡單也是最能體現MapReduce思想的程序之一,可以稱為MapReduce版"Hello World",單詞計數主要完成功能是:統計一系列文本文件中每個單詞出現的次數,即簡單如下圖所示:
在這里插入圖片描述
在這里插入圖片描述

WordcountMapper.java

package  wordcount;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*KEYIN, VALUEIN, KEYOUT, VALUEOUT
* 四個泛型解釋:
* KEYIN:K1的類型
* VALUEIN:V1的類型
*
* KEYOUT:K2的類型
* VALUEOUT:V2的類型
* */
public class WordCountMapper  extends Mapper<LongWritable,Text,Text,LongWritable> {//map方法就是將K1和V1轉化為K2和V2/*參數:key: K1行偏移量value:V1 每一行的文本數據context:表示上下文對象*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Text text = new Text();LongWritable longWritable = new LongWritable();//將一行的文本數據進行拆分String[] split = value.toString().split(",");//遍歷數組,進行組裝K2和v2for (String word:split){//將K2和V2寫入上文text.set(word);longWritable.set(1);context.write(text,longWritable);}// 將K2和v2寫入上下文中}
}

dataReduce.java

package wordcount;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*
* 四個泛型解釋:
* KEYIN:K2類型
* VALUEIN:V2類型
* KEYOUT:K3類型
* VALUEOUT:V3類型
*
* */
public class dataReduce  extends Reducer<Text, LongWritable,Text,LongWritable> {//把新的K2和V2轉為K3和V3  將K3和V3寫入上下文中/*參數:key:新K2values:集合 新V2context:表示上下文對象** K2   v2* hello  <1,1>*K3    v3hello 2*** */@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count=0;//1.遍歷結合,將集合中數字相加,得到v3for (LongWritable longWritable:values){count+=longWritable.get();}//2.將K3和V3寫入上下文中context.write(key,new LongWritable(count));}
}

TaskMain.java

package wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;public class TaskMain extends Configured implements Tool {//該方法用于指定一個Job任務public int run(String[] strings) throws Exception {//創建一個Job任務對象Job job = Job.getInstance(super.getConf(), "wordcount");job.setJarByClass(TaskMain.class);//2.獲得job對象(八個步驟)//第一步:指定文件的讀取方式和讀取路徑job.setInputFormatClass(TextInputFormat.class);
//        TextInputFormat.addInputPath(job,new Path("file:///D:\\mapreduce_data"));TextInputFormat.addInputPath(job,new Path("hdfs://Master:9000/wordcount"));//第二步:指定map階段的處理方式和數據類型job.setMapperClass(WordCountMapper.class);//設置map階段K2的類型job.setMapOutputKeyClass(Text.class);//設置map階段V2的類型job.setMapOutputValueClass(LongWritable.class);//第三、四、五、六 采用默認方式//第七步:指定Reduc階段的處理方式和數據類型job.setReducerClass(dataReduce.class);//設置K3的類型 v3的類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//第八步:設置輸出類型,設置輸出的路徑job.setOutputFormatClass(TextOutputFormat.class);
//        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\test\\output2"));TextOutputFormat.setOutputPath(job,new Path("hdfs://Master:9000/wordcount_results2"));//等待任務結束boolean flag = job.waitForCompletion(true);return  flag?0:1;}public static void main(String[] args) throws Exception {Configuration configuration=new Configuration();int run=ToolRunner.run(configuration,new TaskMain(),args);   //啟動JOb任務System.exit(run);}
}

提示:

Mapper類的對象 一行一行地讀取 原始數據的中內容
每讀一行 就調用一次map方法 切割第一行的單詞 生成鍵值對
K是單詞 V是單詞的數量
然后把所有鍵值對寫入到臨時文件
最后對臨時文件排序 類似sql的groupby
隨后shuffle,隨后Reducer讀數據 每次讀一組

輸入拆分:

輸入到MapReduce工作被劃分成固定大小的塊叫做 input splits ,輸入折分是由單個映射消費輸入塊。

映射 - Mapping

這是在 map-reduce 程序執行的第一個階段。在這個階段中的每個分割的數據被傳遞給映射函數來產生輸出值。在我們的例子中,映射階段的任務是計算輸入分割出現每個單詞的數量(更多詳細信息有關輸入分割在下面給出)并編制以某一形式列表<單詞,出現頻率>

重排

這個階段消耗映射階段的輸出。它的任務是合并映射階段輸出的相關記錄。在我們的例子,同樣的詞匯以及它們各自出現頻率。

Reducing
在這一階段,從重排階段輸出值匯總。這個階段結合來自重排階段值,并返回一個輸出值。總之,這一階段匯總了完整的數據集。
在我們的例子中,這個階段匯總來自重排階段的值,計算每個單詞出現次數的總和。

wordcount項目在MapReduce計算框架下的處理流程:

首先,通過job.waitForCompletion(true)開啟了WordCount這個MapReduce作業,后續通過InputFormat的實現類FileInputFormat將輸入數據,即輸入文件,分片從而得到Map方法,即Map用戶定義的方法的輸入,即圖中所示,FileInputFormat將文件按照行分割,并組織成為的形式,成為用戶Map方法的輸入,其中Key是字符的偏移量,value即一行的內容。

數據被輸入到用戶定義的map方法中,map方法以文件中的每行數據作為輸入,將每行按照空格分詞,并將每個詞組織為K-V對,輸出;Map的輸出交予了MapReduce框架來進行處理,簡單來說MapReduce框架將這些K-V對依照key的字典順序由小到大排列,并對相同的key的value進行合并為數組list,輸出給combine過程;

將map方法的輸出結果根據Key排序完成之后,如果有combine過程被定義這時候MapReduce框架就會調用Combine過程。Combine過程是由用戶指定的,必須的過程,一般Combine過程在邏輯上就是Reduce過程,map的輸出結果需要通過網絡傳遞給reduce,其作用是減少Map的輸出的結果集的大小,從而降低網絡的開銷。

用戶通過job.setCombinerClass(IntSumReducer.class)指定Combine的實現類;Combine其實就是在Map端先執行一次用戶的reduce方法,先在中間進行一次計算,從而將結果集減少;但是需要注意的是,并不是所有的算法都適用進行多次reduce計算,請謹慎選擇;

然后,多個map的結果,匯集到reduce,由于WordCount就開啟了一個reduce,故只有一個reduce接收所有map端的輸出;在輸入到用戶定義的reduce方法之前,MapReduce框架還會進行一步排序操作,這步操作類似于在map端進行的排序,將相同key的value合并為list,不同的是排序的輸入,是來自于多個Map的輸出,是根據key排序的K-V對數據;

經過排序后的K-ValueList對,被輸入到的Reduce方法,在WordCount的reduce方法中,它對每個key對應的value的list進行求和,從而獲得每個單詞的總的出現次數。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/530962.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/530962.shtml
英文地址,請注明出處:http://en.pswp.cn/news/530962.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于Spring boot+Vue的在線考試系統

文章目錄spring boot 分層圖解安裝idea配置阿里云鏡像項目啟動前端項目結構項目前端中index.htmlApp.vuemain.jsrouter整個頁面渲染過程關于矢量圖圖標的使用引入JQuery依賴github-markdown-css樣式文件-一般用作文章正文的樣式美化spring boot 分層圖解 安裝idea 安裝參考 id…

Java基礎總結之(面試)

文章目錄Java標識符Java修飾符訪問權限修飾符訪問控制和繼承非訪問權限修飾符局部變量修飾符接口接口中方法修飾符運算符算術運算符一元運算符二元運算符算術賦值運算符賦值運算符邏輯運算符&#xff08;&&、||和!&#xff09;關系運算符自增和自減運算符&#xff08;和…

Javaweb練手項目

文章目錄學生管理系統音樂網站鋒芒博客中醫藥管理系統博客天梯CMS系統鋒芒社團官網學生管理系統 實現技術&#xff1a;ServletMVC&#xff08;模式&#xff09;Filter(過濾器&#xff09;html 主要功能&#xff1a;學生信息的增刪查改&#xff0c;文件&#xff08;圖片&#x…

Spark之scala學習(基礎篇)待更新

文章目錄引言大數據介紹大數據與云計算區別大數據和人工智能的區別大數據和傳統的分析&#xff08;excel&#xff09;的區別scala的特性面向對象特性函數式編程函數式編程的特點&#xff1a;函數式編程的優勢靜態類型擴展性并發性為什么要學scalascala安裝簡單測試了解ScalaSca…

Jupyter Notebook的安裝及問題解決方案

文章目錄下載并安裝Anaconda3更改主界面路徑但是如果沒有jupyter_notebook_config.py文件怎么辦&#xff1f;如果更改過路徑后&#xff0c;不生效怎么辦&#xff1f;使用參考pycharm導入pyspark下載并安裝Anaconda3 官網下載個人版 Anaconda3安裝參考 點擊&#xff0c;然后進…

airodump-ng wlan0mon掃描不到網絡_MySQL ProxySql 由于漏洞掃描導致的 PROXYSQL CPU 超高...

ProxySQL 本身是一款非常棒的MYSQL 中間件的開源產品, 在公司運行了一段時間后,突然一天報警,所在機器的CPU 出奇的高,之前在測試系統, 預生產, 以及生產系統均沒有出現問題. 開始未來緊急解決問題,重新啟動了proxysql服務,并查看錯誤日志.PROXYSQL 的系統版本的2.012 MYSQL 的…

網絡安全之SQL注入

文章目錄SQL注入的定義SQL注入為什么會成功&#xff1f;為什么發生SQL注入&#xff1f;SQL注入的分析SQL的注入流程判斷SQL注入點判斷注入類型SQL注入的通常方法防止SQL注入SQL注入的定義 SQL注入是通過把SQL命令插入到Web表單遞交或輸入域名或頁面請求的查詢字符串&#xff0c…

4個空格和一個tab有什么區別_火花塞為什么一換就是4個?只換一個不行嗎?

火花塞不是一個經常被提及的配件&#xff0c;但如果火花塞老化&#xff0c;車輛的整體性能將受到影響&#xff0c;更換火花塞其實也是日常保養的一部分&#xff0c;就像換機油和三濾一樣。不知道大家是否注意到&#xff0c;在做完保養之后&#xff0c;維修師傅會幫你檢查一下火…

小型云臺用的是什么電機_直流電機的工作原理是什么?未來的電動車都會用直流電機嗎?...

說起直流電機&#xff0c;其實我們每個人&#xff0c;每天都在用。是嗎&#xff1f;別驚訝&#xff0c;是的。手機&#xff0c;我們每天都在用&#xff0c;有消息或者有電話時&#xff0c;手機就開始振動。這個振動就是用直流電機來實現的。當然&#xff0c;直流電機在其他領域…

C語言、C++學習路線

文章目錄C語言 C語言大綱 C語言知識點總結圖 C語言視頻推薦基礎篇進階篇速成篇基礎入門之游戲實戰篇C語言核心鏈表文檔教程視頻教程文件C語言實例C C大綱崗位分析 C與C 編程基礎四大件數據結構與算法計算機網絡操作系統設計模式C視頻推薦C語言 C語言大綱 C語言知識點總結圖 C語…

2020年日歷_2020年《故宮日歷》發布:濃縮紫禁城600年滄桑

2020年《故宮日歷》。中國網記者 劉維佳/攝中國網8月26日訊(記者 劉維佳) 2020年《故宮日歷》發布會今日在故宮博物院建福宮舉行。2020年為庚子鼠年&#xff0c;亦正值紫禁城建成六百周年&#xff0c;因此&#xff0c;2020年《故宮日歷》是為紀念紫禁城六百周年而特別呈現。故宮…

計算機二級(C語言)備考

文章目錄考試詳情一點建議公共基礎知識&#xff08;10分&#xff09;著重點資源視頻教程文檔練習題考試詳情 一、選擇題 40分 二、程序填空題 18分 三、程序修改題 18分 四、程序設計題 24分 考試時間&#xff1a;120分鐘 一點建議 多練習題目&#xff0c;多思考&#xff0c…

jav簡單的個人博客網站代碼_每個人都可以擁有的個人博客網站

題記------去過的地方越多&#xff0c;越知道自己想回到什么地方去&#xff01;雨又下了一夜&#xff0c;曾經多少次覺得下雨天是最適合睡覺的天氣。而最近的雨&#xff0c;總感覺有些嘈雜&#xff0c;總怕吵醒遠方睡夢中的星&#xff0c;晨。以至于翻來覆去睡不著。但是&#…

實用的編程網站—良好的開端

文章目錄在線編程網站編程資源網編程源碼網編程學習網在線編程網站 nyist_acm 領扣 力扣 洛谷 計蒜客 牛客網 藍橋杯ACM在線 HUSTOJ LiberOJ EduCoder PIPIOJ Codeabbey C語言網 hihocoder 賽碼網 編程資源網 搜云盤 IT視頻學習網 798資源網 Java知識分享網…

泰禾光電機器人研發_機器之眼 | 3D相機能讓機器人看見什么?

“中國制造2025”&#xff0c;其核心環節之一就是機器人智能化。視覺技術代表了機器的眼睛和大腦&#xff0c;機器視覺將使得機器人智能化變成現實。為了使機器人能夠勝任更復雜的工作&#xff0c;機器人不但要有更好的控制系統&#xff0c;還需要更多地感知環境的變化。機器人…

云計算學習路線

文章目錄說明云計算學習路線云計算技術支撐Linux 基礎環境搭建文檔教程Linux學習視頻推薦云計算基礎知識docker容器KVM&#xff08;推薦一本書《深度實踐KVM》&#xff09;OpenStackK8S&#xff08;Kubernetes&#xff09;說明 云計算和大數據未來十年必然趨勢成為社會的生產資…

c語言將數組初始化為1_c語言之數組初始化

c語言之數組初始化在c語言中&#xff0c;我們經常會有兩種初始化的方式(一維數組)&#xff1a;方式一int arr[20];這種方式是在c語言編譯階段對數組分配了固定的內存空間&#xff0c;但沒有為c語言賦值&#xff0c;此時&#xff0c;對該數組進行打印輸出的話&#xff0c;則會輸…

算法學習路線圖

關于算法的討論 如何系統地學習數據結構與算法&#xff1f;| 自學數據結構與算法最全路線 編程指北 動態規劃-背包問題 最小生成樹(MST)—prim和kruskal算法 用回溯法(backtracking algorithm)求解N皇后問題(N-Queens puzzle) n皇后問題[分支限界法] 克魯斯卡爾算法(Krus…

python中以表示語塊_scikitlearn:將數據擬合成塊與將其全部擬合到on

這取決于您使用的矢量器。在CountVectorizer統計文檔中單詞的出現次數。它為每個文檔輸出一個(n_words, 1)向量&#xff0c;其中包含每個單詞在文檔中出現的次數。n_words是文檔中的單詞總數(也就是詞匯表的大小)。它也適合詞匯表&#xff0c;這樣您就可以反省模型(看看哪個詞是…

大數據學習路線

文章目錄學習教程&#xff08;不全&#xff09;文檔教程大數據實戰項目項目源碼廣義的定義 &#xff1a;是指物理世界到數字世界的映射和提煉。通過發現其中的數據特征&#xff0c;從而做出提升效率的決策行為。 狹義的定義&#xff1a;通過獲取存儲、分析&#xff0c;從大容量…