本文由@從流域到海域翻譯,發表于騰訊云+社區
map()和reduce()是在集群式設備上用來做大規模數據處理的方法,用戶定義一個特定的映射,函數將使用該映射對一系列鍵值對進行處理,直接產生出一系列鍵值對。
Map Reduce和流處理
Hadoop的Map / Reduce模型在并行處理大量數據方面非常出色。它提供了一個通用的分區機制(基于數據的關鍵)來分配不同機器上的聚合式工作負載。基本上, map / reduce的算法設計都是關于如何在處理過程中的不同階段為記錄值選擇正確的key。
然而,“時間維度”與數據的其他維度屬性相比具有非常不同的特征,特別是在涉及實時數據處理時。它對面向批處理的Map/Reduce模型提出了一系列不同的挑戰。
- 實時處理需要非常低的響應延遲,這意味著沒有太多的數據能夠在“時間”維度上進行處理。
- 從多個數據源收集到的數據可能沒有全部到達匯總點。
- 在Map/Reduce的標準模型中,reduce階段在map階段完成之前無法啟動。而且在下載到reducer之前,所有處理過程的中間數據都保存在磁盤中。所有這些都顯著增加了處理的延遲。
盡管Hadoop Map/Reduce是針對批處理的工作負載而設計的,但某些應用程序(如欺詐檢測,廣告顯示,網絡監控需要實時響應以處理大量數據),現在已開始考慮各種調整Hadoop的方法以使其適合更實時的處理環境。在本篇文章中,我嘗試了一些基于Map/Reduce模型的執行低延遲并行處理的技術。
常用流處理模型
在這個模型中,數據是在各種各樣的OLTP系統中生成的,這些系統更新了事務數據存儲,并異步發送其他數據用于分析處理。分析處理過程將輸出寫入到決策模型,該決策模型會將信息反饋給OLTP系統來進行實時決策。
注意與OLTP系統分離的分析處理的“異步性質”,在該方式下OLTP系統不會放慢速度等待分析處理完成。無論如何,我們仍然需要盡快進行分析處理,否則決策模型將不能反映當前世界的真實場景,它將不會很有用處。什么程度的延遲可容忍的是應用程序指定的。
在Map/Reduce中進行微批處理
一種方法是根據時間窗(例如每小時)將數據分成小批量,并將每批中收集的數據提交給Map/Reduce作業。這需要分段機制,以便OLTP應用程序可以繼續獨立于分析處理。而作業調度程序用于規范生產者和消費者,基于此它們每個生產者或消費者都可以獨立進行。(生產者和消費者是在操作系統理論中對產生數據和處理數據的程序的稱呼,譯者注)
連續性Map/Reduce
這里讓我們想象一下有關Map/Reduce執行模型的一些可能的修改,以使其適應實時流處理。我并不擔心Hadoop在線原型(HOP)所采用的方法的向后兼容性 。
長時間運行
第一種修改方法是使mapper和reducer長時間運行。因此,我們不能等待map階段結束之后才開始reduce階段,因為map階段永遠不會結束。這意味著mapper在完成處理后會將數據推送到reducer,并讓reducer對數據進行排序。這種方法的缺點是它沒有機會去運行地圖側的combine()函數以降低帶寬使用率。它還將更多的工作量轉移到正需要進行分類的reducer。
注意在延遲和優化之間需要有一個折衷。優化需要更多的數據在源頭(即Mapper)就進行累積,如此即可以執行本地合并(即:結合在一起)。不幸的是,低延遲需要盡快發送數據,因此沒有太多時間使大量累積操作可以完成。
HOP提出了一種自適應流控制機制,在該方式下數據會被盡快推送到Reducer,直到Reducer被重載并退回(使用某種流量控制協議)。然后mapper將緩沖處理后的消息并在發送給reducer之前執行combine()函數。這種方法將會自動地來回移動Reducer和Mapper之間的聚合工作負載。
時間窗口:切片和范圍
這是一個“時間片(time slice)”概念和一個“時間范圍(time range)”的概念。“切片(Slice)”定義了執行reduce處理之前所累計結果的時間窗口。這也是mapper在發送到reducer之前應積累的最小數據量。
“范圍(Range)”定義了結果所匯總的時間窗口。它可以是一個具有明確起點定義的界標窗口或者是跳躍窗口的(考慮移動的界標場景)。它也可以是一個滑動窗口,其中從當前時間開始聚合的固定大小的窗口。
在從每個mapper接收到特定時間片后,reducer可以啟動聚合處理并將結果與之前的聚合結果進行合并。切片(大小)可以根據mapper發送的數據量來進行動態調整。
增量處理
請注意,reducer需要在收到所有mapper中相同時間片的所有記錄后計算聚合片值。之后,它會調用用戶定義的merge()函數將切片值與范圍值合并。如果范圍需要刷新(例如達到跳轉窗口邊界),將調用init()函數來獲取刷新的范圍值。如果范圍值需要更新(當某個切片值超出滑動范圍時),則會調用unmerge()函數。
以下是我們如何在每小時更新(即:一小時大小切片)的情況下,在24小時滑動窗口內跟蹤平均命中率(即:每小時總命中數)的示例。
# Call at each hit record
map(k1, hitRecord) {site = hitRecord.site# lookup the slice of the particular keyslice = lookupSlice(site)if (slice.time - now > 60.minutes) {# Notify reducer whole slice of site is sentadvance(site, slice)slice = lookupSlice(site)}emitIntermediate(site, slice, 1)
}combine(site, slice, countList) {hitCount = 0for count in countList {hitCount += count}# Send the message to the downstream nodeemitIntermediate(site, slice, hitCount)
}
復制代碼
# Called when reducer receive full slice from all mappers
reduce(site, slice, countList) {hitCount = 0for count in countList {hitCount += count}sv = SliceValue.newsv.hitCount = hitCountreturn sv
}# Called at each jumping window boundary
init(slice) {rangeValue = RangeValue.newrangeValue.hitCount = 0return rangeValue
}# Called after each reduce()
merge(rangeValue, slice, sliceValue) {rangeValue.hitCount += sliceValue.hitCount
}# Called when a slice fall out the sliding window
unmerge(rangeValue, slice, sliceValue) {rangeValue.hitCount -= sliceValue.hitCount
}
復制代碼
問答
比較好的MapReduce例子有哪些?
相關閱讀
MapReduce極簡教程
大數據運算模型 MapReduce 原理
如何為Hadoop選擇最佳彈性MapReduce框架
此文已由作者授權騰訊云+社區發布,原文鏈接:https://cloud.tencent.com/developer/article/1122471?fromSource=waitui