轉載請注明:http://www.cnblogs.com/demievil/p/4059141.html
我的github博客:http://demievil.github.io/
做項目的時候遇到一個問題,在Mapper和Reducer方法中處理目標數據時,先要去檢索和匹配一個已存在的標簽庫,再對所處理的字段打標簽。因為標簽庫不是很大,沒必要用HBase。我的實現方法是把標簽庫存儲成HDFS上的文件,用分布式緩存存儲,這樣讓每個slave都能讀取到這個文件。
main方法中的配置:
//分布式緩存要存儲的文件路徑 String cachePath[] = {"hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv","hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv"}; //向分布式緩存中添加文件job.addCacheFile(new Path(cachePath[0]).toUri());job.addCacheFile(new Path(cachePath[1]).toUri());
參考上面代碼即可向分布式緩存中添加文件。
在Mapper和Reducer方法中讀取分布式緩存文件:
/** 重寫Mapper的setup方法,獲取分布式緩存中的文件*/@Overrideprotected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubsuper.setup(context);URI[] cacheFile = context.getCacheFiles();Path tagSetPath = new Path(cacheFile[0]);Path tagedUrlPath = new Path(cacheFile[1]);文件操作(如把內容讀到set或map中);}@Override public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {在map()中使用讀取出的數據;}
同樣,如果在Reducer中也要讀取分布式緩存文件,示例如下:
/** 重寫Reducer的setup方法,獲取分布式緩存中的文件*/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {super.setup(context);mos = new MultipleOutputs<Text, Text>(context);URI[] cacheFile = context.getCacheFiles();Path tagSetPath = new Path(cacheFile[0]);Path tagSetPath = new Path(cacheFile[1]);文件讀取操作;}@Overridepublic void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {while(values.iterator().hasNext()){使用讀取出的數據;}context.write(key, new Text(sb.toString()));}
?
以上。
?