但是,如果您想要平均水平呢? 然后,由于計算平均值的平均值不等于原始數字集的平均值,因此相同的方法將行不通。 盡管有了一點見識,我們仍然可以使用本地聚合。 對于這些示例,我們將使用Hadoop最終指南書中使用的NCDC天氣數據集的示例。 我們將計算1901年每個月的平均溫度。可以在MapReduce的數據密集型處理的第3.1.3章中找到組合器和映射器內組合選項的平均值算法。
一種尺寸并不適合所有人
上一次,我們介紹了兩種用于在MapReduce作業中減少數據的方法:Hadoop組合器和映射器內組合方法。 Hadoop框架將組合器視為一種優化,并且無法保證調用組合器的次數(如果有的話)。 結果,映射器必須以減速器期望的形式發出數據,因此,如果不涉及組合器,則最終結果不會更改。 要針對計算平均值進行調整,我們需要返回到映射器并更改其輸出。
映射器更改
在單詞計數示例中,未優化的映射器僅發出單詞和1的計數。合并器和映射器內組合映射器通過將每個單詞保留為哈希映射中的鍵(總計數為n)來優化此輸出。值。 每次看到一個單詞,計數都將增加1。在這種設置下,如果未調用組合器,則縮減器將接收到該單詞作為鍵,并將一長串的1?s加在一起,從而得到相同的輸出(當然,使用映射器內組合映射器可以避免此問題,因為可以保證合并結果是映射器代碼的一部分)。 為了計算平均值,我們將使基本映射器發出一個字符串鍵(將天氣觀測的年和月連接在一起)和一個自定義可寫對象,稱為TemperatureAveragingPair。 TemperatureAveragingPair對象將包含兩個數字(IntWritables),獲取的溫度和一個計數。 我們將從Hadoop:權威指南中獲取MaximumTemperatureMapper,并以此為靈感來創建AverageTemperatureMapper:
public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {//sample line of weather data//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999private Text outText = new Text();private TemperatureAveragingPair pair = new TemperatureAveragingPair();private static final int MISSING = 9999;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {outText.set(yearMonth);pair.set(temp, 1);context.write(outText, pair);}}
}
通過使映射器輸出鍵和TemperatureAveragingPair對象,無論調用組合器如何,我們的MapReduce程序都可以保證具有正確的結果。
合路器
我們需要減少發送的數據量,因此我們將對溫度求和,對計數求和并分別存儲。 這樣,我們將減少發送的數據,但保留計算正確平均值所需的格式。 如果/在調用組合器時,它將采用所有傳入的TemperatureAveragingPair對象,并為同一鍵發出單個TemperatureAveragingPair對象,其中包含溫度和計數值的總和。 這是合并器的代碼:
public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> {private TemperatureAveragingPair pair = new TemperatureAveragingPair();@Overrideprotected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {int temp = 0;int count = 0;for (TemperatureAveragingPair value : values) {temp += value.getTemp().get();count += value.getCount().get();}pair.set(temp,count);context.write(key,pair);}
}
但是我們非常有興趣確保我們減少了發送到reducer的數據量,因此我們將看看下一步如何實現。
在Mapper合并平均值中
類似于單詞計數示例,為了計算平均值,映射器內組合映射器將使用哈希圖,將連接的年+月作為鍵,將TemperatureAveragingPair作為值。 每次獲得相同的年+月組合時,我們都會將對對象從地圖中取出,添加溫度并將計數增加一個。 調用cleanup方法后,我們將發出所有對及其各自的鍵:
public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {//sample line of weather data//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999private static final int MISSING = 9999;private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {TemperatureAveragingPair pair = pairMap.get(yearMonth);if(pair == null){pair = new TemperatureAveragingPair();pairMap.put(yearMonth,pair);}int temps = pair.getTemp().get() + temp;int count = pair.getCount().get() + 1;pair.set(temps,count);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {Set<String> keys = pairMap.keySet();Text keyText = new Text();for (String key : keys) {keyText.set(key);context.write(keyText,pairMap.get(key));}}
}
通過遵循在映射調用之間跟蹤數據的相同模式,我們可以通過實現映射器內合并策略來實現可靠的數據減少。 同樣的注意事項適用于在對映射器的所有調用中保持狀態,但是考慮使用這種方法可以提高處理效率,這是值得考慮的。
減速器
在這一點上,編寫我們的reducer很容易,為每個鍵列出一對配對,將所有溫度和計數求和,然后將溫度的總和除以計數的總和。
public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> {private IntWritable average = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {int temp = 0;int count = 0;for (TemperatureAveragingPair pair : values) {temp += pair.getTemp().get();count += pair.getCount().get();}average.set(temp / count);context.write(key, average);}
}
結果
使用合并器和映射器內合并映射器選項可以預測結果,從而顯著減少數據輸出。
未優化的映射器選項:
12/10/10 23:05:28 INFO mapred.JobClient: Reduce input groups=12
12/10/10 23:05:28 INFO mapred.JobClient: Combine output records=0
12/10/10 23:05:28 INFO mapred.JobClient: Map input records=6565
12/10/10 23:05:28 INFO mapred.JobClient: Reduce shuffle bytes=111594
12/10/10 23:05:28 INFO mapred.JobClient: Reduce output records=12
12/10/10 23:05:28 INFO mapred.JobClient: Spilled Records=13128
12/10/10 23:05:28 INFO mapred.JobClient: Map output bytes=98460
12/10/10 23:05:28 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200
12/10/10 23:05:28 INFO mapred.JobClient: Combine input records=0
12/10/10 23:05:28 INFO mapred.JobClient: Map output records=6564
12/10/10 23:05:28 INFO mapred.JobClient: SPLIT_RAW_BYTES=108
12/10/10 23:05:28 INFO mapred.JobClient: Reduce input records=6564
組合器選項:
12/10/10 23:07:19 INFO mapred.JobClient: Reduce input groups=12
12/10/10 23:07:19 INFO mapred.JobClient: Combine output records=12
12/10/10 23:07:19 INFO mapred.JobClient: Map input records=6565
12/10/10 23:07:19 INFO mapred.JobClient: Reduce shuffle bytes=210
12/10/10 23:07:19 INFO mapred.JobClient: Reduce output records=12
12/10/10 23:07:19 INFO mapred.JobClient: Spilled Records=24
12/10/10 23:07:19 INFO mapred.JobClient: Map output bytes=98460
12/10/10 23:07:19 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200
12/10/10 23:07:19 INFO mapred.JobClient: Combine input records=6564
12/10/10 23:07:19 INFO mapred.JobClient: Map output records=6564
12/10/10 23:07:19 INFO mapred.JobClient: SPLIT_RAW_BYTES=108
12/10/10 23:07:19 INFO mapred.JobClient: Reduce input records=12
映射器內合并選項:
12/10/10 23:09:09 INFO mapred.JobClient: Reduce input groups=12
12/10/10 23:09:09 INFO mapred.JobClient: Combine output records=0
12/10/10 23:09:09 INFO mapred.JobClient: Map input records=6565
12/10/10 23:09:09 INFO mapred.JobClient: Reduce shuffle bytes=210
12/10/10 23:09:09 INFO mapred.JobClient: Reduce output records=12
12/10/10 23:09:09 INFO mapred.JobClient: Spilled Records=24
12/10/10 23:09:09 INFO mapred.JobClient: Map output bytes=180
12/10/10 23:09:09 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200
12/10/10 23:09:09 INFO mapred.JobClient: Combine input records=0
12/10/10 23:09:09 INFO mapred.JobClient: Map output records=12
12/10/10 23:09:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=108
12/10/10 23:09:09 INFO mapred.JobClient: Reduce input records=12
計算結果:
(注意:示例文件中的溫度以攝氏度* 10為單位)
未優化 | 合路器 | 映射器內合并器映射器 |
190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 | 190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 | 190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 |
結論
對于簡單的情況(可以將reducer重用為組合器)和更復雜的情況(對于如何構造數據同時仍能從本地聚集數據以提高處理效率)有所了解,我們已經介紹了本地聚集。
進一步閱讀
- Jimmy Lin和Chris Dyer 使用MapReduce進行的數據密集型處理
- Hadoop: Tom White 的權威指南
- 來自博客的源代碼
- Hadoop API
- MRUnit用于單元測試Apache Hadoop映射減少工作
- Gutenberg項目提供了大量純文本格式的書籍,非常適合在本地測試Hadoop作業。
參考: 使用MapReduce進行數據密集型文本處理-本地聚合第二部分,來自我們的JCG合作伙伴 Bill Bejeck,來自“ 隨機編碼思考”博客。
翻譯自: https://www.javacodegeeks.com/2012/10/mapreduce-working-through-data-2.html