本地聚集
在很高的級別上,當Mappers發出數據時,中間結果將寫入磁盤,然后通過網絡發送到Reducers以進行最終處理。 在處理MapReduce作業中,寫入磁盤然后通過網絡傳輸數據的延遲是一項昂貴的操作。 因此,有理由認為,只要有可能,減少從映射器發送的數據量將提高MapReduce作業的速度。 本地聚合是一種用于減少數據量并提高MapReduce工作效率的技術。 本地聚合不能代替reducers,因為我們需要一種方法來使用來自不同映射器的相同鍵來收集結果。 我們將考慮實現本地聚合的3種方法:
- 使用Hadoop Combiner函數。
- 文本處理和MapReduce書中介紹了兩種“映射器”組合方法。
當然,任何優化都需要權衡取舍,我們也會對此進行討論。
為了演示本地聚合,我們將使用hadoop-0.20.2-cdh3u3在MacBookPro上安裝的偽分布式群集上,在Charles Dickens的純文本版本的A Christmas Carol (從Project Gutenberg下載)上運行無處不在的字數統計作業。從Cloudera發行。 我計劃在以后的文章中在具有更實際大小的數據的EC2集群上運行相同的實驗。
合路器
組合器函數是擴展Reducer類的對象。 實際上,對于此處的示例,我們將重復使用單詞計數作業中使用的相同的reduce。 設置MapReduce作業時會指定組合器功能,如下所示:
job.setReducerClass(TokenCountReducer.class);
這是化簡器代碼:
public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable value : values) {count+= value.get();}context.write(key,new IntWritable(count));}
}
組合器的工作就是按照名稱中的含義進行操作,聚合數據的最終結果是網絡上的數據減少,從而使效率得到提高。 如前所述,請記住,仍然需要簡化器將結果與來自不同映射器的相同鍵組合在一起。 由于組合器功能是一種優化,因此Hadoop框架無法保證組合器將被調用多少次(如果有的話)。
在Mapper組合選項1中
使用Combiners的第一種選擇(圖3.2,第41頁)非常簡單,對我們原始的字數映射器進行了一些修改:
public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {IntWritable writableCount = new IntWritable();Text text = new Text();Map<String,Integer> tokenMap = new HashMap<String, Integer>();StringTokenizer tokenizer = new StringTokenizer(value.toString());while(tokenizer.hasMoreElements()){String token = tokenizer.nextToken();Integer count = tokenMap.get(token);if(count == null) count = new Integer(0);count+=1;tokenMap.put(token,count);}Set<String> keys = tokenMap.keySet();for (String s : keys) {text.set(s);writableCount.set(tokenMap.get(s));context.write(text,writableCount);}}
}
正如我們在這里看到的,對于遇到的每個單詞,我們不會發出計數為1的單詞,而是使用映射來跟蹤已處理的每個單詞。 然后,在處理完所有標記后,我們遍歷該映射并發出該行中遇到的每個單詞的總數。
在Mapper組合選項2中
映射器合并的第二個選項(圖3.3,第41頁)與上述示例非常相似,但有兩個區別-創建哈希映射時和發出映射中包含的結果時。 在上面的示例中,創建了一個映射,并在每次調用map方法時將其內容通過電線轉儲。 在此示例中,我們將使地圖成為實例變量,并將地圖的實例化移動到我們的映射器中的setUp方法。 同樣,在完成對mapper的所有調用并調用cleanUp方法之前,不會將映射的內容發送到reducers。
public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> {private Map<String,Integer> tokenMap;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {tokenMap = new HashMap<String, Integer>();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer tokenizer = new StringTokenizer(value.toString());while(tokenizer.hasMoreElements()){String token = tokenizer.nextToken();Integer count = tokenMap.get(token);if(count == null) count = new Integer(0);count+=1;tokenMap.put(token,count);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {IntWritable writableCount = new IntWritable();Text text = new Text();Set<String> keys = tokenMap.keySet();for (String s : keys) {text.set(s);writableCount.set(tokenMap.get(s));context.write(text,writableCount);}}
}
從上面的代碼示例中可以看到,在對map方法的所有調用中,映射器都跟蹤唯一字數。 通過跟蹤唯一令牌及其計數,應該大大減少發送給reducer的記錄數量,這反過來又可以改善MapReduce作業的運行時間。 這樣可以達到與使用MapReduce框架提供的Combiner Function選項相同的效果,但是在這種情況下,可以確保將調用組合代碼。 但是這種方法也有一些警告。 在地圖調用之間保持狀態可能會出現問題,并且絕對違反“地圖”功能的功能精神。 同樣,通過保持所有映射器的狀態,取決于作業中使用的數據,內存可能是另一個要解決的問題。 最終,必須權衡所有權衡以確定最佳方法。
結果
現在,讓我們看一下不同映射器的一些結果。 由于作業是在偽分布式模式下運行的,因此實際的運行時間是無關緊要的,但是我們仍然可以推斷出使用本地聚合會如何影響在實際集群上運行的MapReduce作業的效率。
每個令牌映射器:
12/09/13 21:25:32 INFO mapred.JobClient: Reduce shuffle bytes=366010
12/09/13 21:25:32 INFO mapred.JobClient: Reduce output records=7657
12/09/13 21:25:32 INFO mapred.JobClient: Spilled Records=63118
12/09/13 21:25:32 INFO mapred.JobClient: Map output bytes=302886
在Mapper精簡選項1中:
12/09/13 21:28:15 INFO mapred.JobClient: Reduce shuffle bytes=354112
12/09/13 21:28:15 INFO mapred.JobClient: Reduce output records=7657
12/09/13 21:28:15 INFO mapred.JobClient: Spilled Records=60704
12/09/13 21:28:15 INFO mapred.JobClient: Map output bytes=293402
在Mapper精簡選項2中:
12/09/13 21:30:49 INFO mapred.JobClient: Reduce shuffle bytes=105885
12/09/13 21:30:49 INFO mapred.JobClient: Reduce output records=7657
12/09/13 21:30:49 INFO mapred.JobClient: Spilled Records=15314
12/09/13 21:30:49 INFO mapred.JobClient: Map output bytes=90565
組合器選項:
12/09/13 21:22:18 INFO mapred.JobClient: Reduce shuffle bytes=105885
12/09/13 21:22:18 INFO mapred.JobClient: Reduce output records=7657
12/09/13 21:22:18 INFO mapred.JobClient: Spilled Records=15314
12/09/13 21:22:18 INFO mapred.JobClient: Map output bytes=302886
12/09/13 21:22:18 INFO mapred.JobClient: Combine input records=31559
12/09/13 21:22:18 INFO mapred.JobClient: Combine output records=7657
不出所料,沒有合并的Mapper的結果最差,緊隨其后的是第一個映射器內的合并選項(盡管如果在運行字數統計之前將數據清理干凈,這些結果本來可以更好)。 第二個映射器內合并選項和合并器功能實際上具有相同的結果。 重要的事實是,作為前兩個選項,兩者產生的結果都減少了2/3,減少了混洗字節。 將通過網絡發送到縮減程序的字節數量減少該數量一定會對MapReduce作業的效率產生積極影響。 這里要牢記一點,那就是合并器/映射器合并不能僅在所有MapReduce作業中使用,在這種情況下,字數計數非常適合于這種增強,但這可能并不總是正確的。
結論
正如您所看到的,在尋求提高MapReduce作業的性能時,需要認真考慮使用映射器內合并或Hadoop合并器功能的好處。 至于哪種方法,則要權衡每種方法的權衡。
相關鏈接
- Jimmy Lin和Chris Dyer 使用MapReduce進行的數據密集型處理
- Hadoop: Tom White 的權威指南
- 來自博客的源代碼
- MRUnit用于單元測試Apache Hadoop映射減少工作
- Gutenberg項目提供了大量純文本格式的書籍,非常適合在本地測試Hadoop作業。
祝您編程愉快,別忘了分享!
參考: 《 隨機編碼》博客上的JCG合作伙伴 Bill Bejeck 提供的MapReduce數據密集型文本處理功能 。
翻譯自: https://www.javacodegeeks.com/2012/09/mapreduce-working-through-data.html