我們將繼續進行有關實現MapReduce算法的系列文章,該系列可在使用MapReduce進行數據密集型文本處理中找到。 本系列的其他文章:
- 使用MapReduce進行數據密集型文本處理
- 使用MapReduce進行數據密集型文本處理-本地聚合第二部分
- 使用Hadoop計算共現矩陣
- MapReduce算法–順序反轉
這篇文章介紹了在使用MapReduce進行數據密集型文本處理的第3章中找到的二級排序模式。 雖然Hadoop在將映射器發出的數據自動排序后再發送給reducer,但是如果您還想按值排序怎么辦? 您當然會使用二級排序。 通過稍加操作鍵對象的格式,二級排序使我們能夠在排序階段將值考慮在內。 這里有兩種可能的方法。
第一種方法涉及讓減速器緩沖給定鍵的所有值,并對這些值進行歸約器排序。 由于減速器將接收給定鍵的所有值,因此此方法可能會導致減速器內存不足。
第二種方法涉及通過向自然鍵添加部分或整個值來創建組合鍵,以實現您的排序目標。 這兩種方法之間的權衡是對reducer中的值進行顯式排序,這很可能會更快(存在內存不足的風險),但實現“值到鍵”轉換方法會減輕MapReduce框架的排序工作,這是Hadoop / MapReduce設計要做的核心。 出于本文的目的,我們將考慮“關鍵價值”方法。 我們將需要編寫一個自定義分區程序,以確保所有具有相同鍵(自然鍵不包含帶有值的復合鍵)的數據都發送到相同的reducer和自定義比較器,以便一旦數據被自然鍵分組到達減速機。
值到密鑰轉換
直接創建復合鍵。 我們需要做的是分析在排序過程中要考慮值的哪一部分,并將適當的部分添加到自然鍵中。 然后,我們需要在鍵類或比較器類中使用compareTo方法,以確保對組合鍵進行了說明。 我們將重新訪問天氣數據集,并將溫度作為自然鍵的一部分(自然鍵是年和月連接在一起)的一部分。 結果將是給定月份和年份中最冷的一天的列表。 該示例的靈感來自Hadoop,《權威指南》一書中的二級排序示例。 盡管可能有更好的方法可以實現此目標,但它足以說明次級排序的工作原理。
映射器代碼
我們的映射器代碼已經將年和月連接在一起,但是我們還將把溫度作為鍵的一部分。 由于我們將值包含在鍵本身中,因此映射器將發出NullWritable,在其他情況下,我們將發出溫度。
public class SecondarySortingTemperatureMapper extends Mapper<LongWritable, Text, TemperaturePair, NullWritable> {private TemperaturePair temperaturePair = new TemperaturePair();private NullWritable nullValue = NullWritable.get();private static final int MISSING = 9999;
@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {temperaturePair.setYearMonth(yearMonth);temperaturePair.setTemperature(temp);context.write(temperaturePair, nullValue);}}
}
現在,我們已將溫度添加到密鑰中,我們為啟用輔助排序做好了準備。 剩下要做的就是在必要時編寫考慮溫度的代碼。 在這里,我們有兩個選擇,編寫一個Comparator或在TemperaturePair類上調整compareTo方法(TemperaturePair實現WritableComparable)。 在大多數情況下,我建議您編寫一個單獨的Comparator,但是TemperaturePair類是專門為演示二次排序而編寫的,因此我們將修改TemperaturePair類的compareTo方法。
@Overridepublic int compareTo(TemperaturePair temperaturePair) {int compareValue = this.yearMonth.compareTo(temperaturePair.getYearMonth());if (compareValue == 0) {compareValue = temperature.compareTo(temperaturePair.getTemperature());}return compareValue;}
如果我們想按降序排序,我們可以簡單地將溫度比較的結果乘以-1。
現在,我們已經完成了排序所必需的部分,我們需要編寫一個自定義分區程序。
合伙人代碼
為了確保在確定將哪個縮減程序發送數據時僅考慮自然鍵,我們需要編寫一個自定義分區程序。 該代碼簡單明了,在計算將數據發送到的減速器時,僅考慮TemperaturePair類的yearMonth值。
public class TemperaturePartitioner extends Partitioner<TemperaturePair, NullWritable>{@Overridepublic int getPartition(TemperaturePair temperaturePair, NullWritable nullWritable, int numPartitions) {return temperaturePair.getYearMonth().hashCode() % numPartitions;}
}
盡管自定義分區程序保證了年和月的所有數據都到達同一精簡程序,但我們仍然需要考慮精簡程序將按鍵對記錄進行分組的事實。
分組比較器
數據到達精簡器后,所有數據均按鍵分組。 由于我們有一個復合鍵,因此我們需要確保記錄僅按自然鍵分組。 這是通過編寫自定義GroupPartitioner完成的。 為了將記錄分組在一起,我們只考慮了TemperaturePair類的yearMonth字段的Comparator對象。
public class YearMonthGroupingComparator extends WritableComparator {public YearMonthGroupingComparator() {super(TemperaturePair.class, true);}@Overridepublic int compare(WritableComparable tp1, WritableComparable tp2) {TemperaturePair temperaturePair = (TemperaturePair) tp1;TemperaturePair temperaturePair2 = (TemperaturePair) tp2;return temperaturePair.getYearMonth().compareTo(temperaturePair2.getYearMonth());}
}
結果
這是運行我們的二級排序作業的結果:
new-host-2:sbin bbejeck$ hdfs dfs -cat secondary-sort/part-r-00000
190101 -206
190102 -333
190103 -272
190104 -61
190105 -33
190106 44
190107 72
190108 44
190109 17
190110 -33
190111 -217
190112 -300
結論
雖然按值對數據進行排序可能不是普遍的需求,但是在需要時,這是個不錯的工具。 此外,通過使用自定義分區程序和組分區程序,我們能夠更深入地了解Hadoop的內部工作原理。 感謝您的時間。
資源資源
- Jimmy Lin和Chris Dyer 使用MapReduce進行的數據密集型處理
- Hadoop: Tom White 的權威指南
- 來自博客的源代碼和測試
- Hadoop API
- MRUnit用于單元測試Apache Hadoop映射減少工作
參考: MapReduce算法–來自我們的JCG合作伙伴 Bill Bejeck的“ 二級排序”,來自“ 隨機編碼思考”博客。
翻譯自: https://www.javacodegeeks.com/2013/01/mapreduce-algorithms-secondary-sorting.html