Hadoop,mapreduce?介紹
59888745@qq.com
大數據工程師是在Linux系統下搭建Hadoop生態系統(cloudera是最大的輸出者類似于Linux的紅帽),
把用戶的交易或行為信息通過HDFS(分布式文件系統)等存儲用戶數據文件,然后通過Hbase(類似于NoSQL)等存儲數據,再通過Mapreduce(并行計算框架)等計算數據,然后通過hiv或pig(數據分析平臺)等分析數據,最后按照用戶需要重現出數據.
Hadoop是一個由Apache基金會所開發的開源分布式系統基礎架構
Hadoop,最基礎的也就是HDFS和Mapreduce了,
HDFS是一個分布式存儲文件系統
Mapreduce是一個分布式計算的框架,兩者結合起來,就可以很容易做一些分布式處理任務了
大綱:
一、MapReduce 基本原理
二、MapReduce 入門示例 - WordCount 單詞統計
三、MapReduce 執行過程分析
實例1 - 自定義對象序列化
實例2 - 自定義分區
實例3 - 計算出每組訂單中金額最大的記錄
實例4 - 合并多個小文件
實例5 - 分組輸出到多個文件
四、MapReduce 核心流程梳理
實例6 - join 操作
實例7 - 計算出用戶間的共同好友
五、下載方式
一、MapReduce基本原理
MapReduce是一種編程模型,用于大規模數據集的分布式運算。
1、MapReduce通俗解釋
圖書館要清點圖書數量,有10個書架,管理員為了加快統計速度,找來了10個同學,每個同學負責統計一個書架的圖書數量。
張同學統計 書架1
王同學統計 書架2
劉同學統計 書架3
……
過了一會兒,10個同學陸續到管理員這匯報自己的統計數字,管理員把各個數字加起來,就得到了圖書總數。
這個過程就可以理解為MapReduce的工作過程。
2、MapReduce中有兩個核心操作
(1)map
管理員分配哪個同學統計哪個書架,每個同學都進行相同的“統計”操作,這個過程就是map。
(2)reduce
每個同學的結果進行匯總,這個過程是reduce。
3、MapReduce工作過程拆解
下面通過一個景點案例(單詞統計)看MapReduce是如何工作的。
有一個文本文件,被分成了4份,分別放到了4臺服務器中存儲
Text1:the weather is good
Text2:today is good
Text3:good weather is good
Text4:today has good weather
現在要統計出每個單詞的出現次數。
處理過程
(1)拆分單詞
map節點1
輸入:“the weather is good”
輸出:(the,1),(weather,1),(is,1),(good,1)
map節點2
輸入:“today is good”
輸出:(today,1),(is,1),(good,1)
map節點3
輸入:“good weather is good”
輸出:(good,1),(weather,1),(is,1),(good,1)
map節點4
輸入:“today has good weather”
輸出:(today,1),(has,1),(good,1),(weather,1)
(2)排序
map節點1
map節點2
map節點3
map節點4
(3)合并
map節點1
map節點2
map節點3
map節點4
(4)匯總統計
每個map節點都完成以后,就要進入reduce階段了。
例如使用了3個reduce節點,需要對上面4個map節點的結果進行重新組合,比如按照26個字母分成3段,分配給3個reduce節點。
Reduce節點進行統計,計算出最終結果。
這就是最基本的MapReduce處理流程。
4、MapReduce編程思路
了解了MapReduce的工作過程,我們思考一下用代碼實現時需要做哪些工作?
在4個服務器中啟動4個map任務
每個map任務讀取目標文件,每讀一行就拆分一下單詞,并記下來次單詞出現了一次
目標文件的每一行都處理完成后,需要把單詞進行排序
在3個服務器上啟動reduce任務
每個reduce獲取一部分map的處理結果
reduce任務進行匯總統計,輸出最終的結果數據
但不用擔心,MapReduce是一個非常優秀的編程模型,已經把絕大多數的工作做完了,我們只需要關心2個部分:
map處理邏輯——對傳進來的一行數據如何處理?輸出什么信息?
reduce處理邏輯——對傳進來的map處理結果如何處理?輸出什么信息?
編寫好這兩個核心業務邏輯之后,只需要幾行簡單的代碼把map和reduce裝配成一個job,然后提交給Hadoop集群就可以了。
至于其它的復雜細節,例如如何啟動map任務和reduce任務、如何讀取文件、如對map結果排序、如何把map結果數據分配給reduce、reduce如何把最終結果保存到文件等等,MapReduce框架都幫我們做好了,而且還支持很多自定義擴展配置,例如如何讀文件、如何組織map或者reduce的輸出結果等等,后面的示例中會有介紹。
二、MapReduce入門示例:WordCount單詞統計
WordCount是非常好的入門示例,相當于helloword,下面就開發一個WordCount的MapReduce程序,體驗實際開發方式。
example:
#刪除已有文件夾
hadoop fs -rmr /chenshaojun/input/example_1
hadoop fs -rmr /chenshaojun/output/example_1
#創建輸入文件夾
hadoop fs -mkdir /chenshaojun/input/example_1
#放入輸入文件
hadoop fs -put text* /chenshaojun/input/example_1
#查看文件是否放好
hadoop fs -ls /chenshaojun/input/example_1
#本地測試一下map和reduce
head -20 text1.txt | python count_mapper.py | sort | python count_reducer.py
#集群上跑任務
hadoop jar /usr/lib/hadoop-current/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-file count_mapper.py \ ? #提交文件到集群
-mapper count_mapper.py \
-file count_reducer.py \
-reducer count_reducer.py \
-input /chenshaojun/input/example_1 \
-output /chenshaojun/output/example_1 ? # 必須不存在,若存在output會抱錯,不會覆蓋
count_mapper.py
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word.lower(), 1)
count_reducer.py
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)