- 問題描述
? ? ? ? 需要連接的表如下:其中左邊是child,右邊是parent,我們要做的是找出grandchild和grandparent的對應關系,為此需要進行表的連接。
Tom Lucy
Tom Jim
Lucy David
Lucy Lili
Jim Lilei
Jim SuSan
Lily Green
Lily Bians
Green Well
Green MillShell
Havid James
James LiT
Richard Cheng
Cheng LiHua
- 思路分析
誠然,在寫MR程序的時候要結合MR數據處理的一些特性。例如如果我們用默認的TextInputFormat來處理傳入的文件數據,傳入的格式是key為行號,value為這一行的值(如上例中的第一行,key為0,value為[Tom,Lucy]),在shuffle過程中,我們的值如果有相同的key,會merge到一起(這一點很重要!)。我們利用shuffle階段的特性,merge到一組的數據夠成一組關系,然后我們在這組關系中想辦法區分晚輩和長輩,最后對merge里的value一一作處理,分離出grandchild和grandparent的關系。
例如,Tom Lucy傳入處理后我們將其反轉,成為Lucy Tom輸出。當然,輸出的時候,為了達到join的效果,我們要輸出兩份,因為join要兩個表,一個表為L1:child parent,一個表為L2:child parent,為了達到關聯的目的和利用shuffle階段的特性,我們需要將L1反轉,把parent放在前面,這樣L1表中的parent和L2表中的child如果字段是相同的那么在shuffle階段就能merge到一起。還有,為了區分merge到一起后如何區分child和parent,我們把L1表中反轉后的child(未來的 grandchild)字段后面加一個1,L2表中parent(未來的grandparent)字段后加2。
1 package com.test.join; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.Iterator; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 public class STJoin { 17 18 public static class STJoinMapper extends Mapper<Object, Text, Text, Text>{ 19 20 @Override 21 protected void map(Object key, Text value, Context context) 22 throws IOException, InterruptedException { 23 // TODO Auto-generated method stub 24 String[] rela = value.toString().trim().split(" ",2); 25 if(rela.length!=2) 26 return; 27 String child = rela[0]; 28 String parent = rela[1]; 29 context.write(new Text(parent), new Text((child+"1"))); 30 context.write(new Text(child), new Text((parent+"2"))); 31 32 } 33 34 } 35 public static class STJoinReducer extends Reducer<Text, Text, Text, Text>{ 36 37 @Override 38 protected void reduce(Text arg0, Iterable<Text> arg1,Context context) 39 throws IOException, InterruptedException { 40 // TODO Auto-generated method stub 41 ArrayList<String> grandParent = new ArrayList<>(); 42 ArrayList<String> grandChild = new ArrayList<>(); 43 Iterator<Text> iterator = arg1.iterator(); 44 while(iterator.hasNext()){ 45 String text = iterator.next().toString(); 46 if(text.endsWith("1")) 47 grandChild.add(text.substring(0, text.length()-1)); 48 if(text.endsWith("2")) 49 grandParent.add(text.substring(0, text.length()-1)); 50 } 51 52 for(String grandparent:grandParent){ 53 for(String grandchild:grandChild){ 54 context.write(new Text(grandchild), new Text(grandparent)); 55 } 56 } 57 } 58 } 59 60 61 public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException { 62 Configuration conf = new Configuration(); 63 Job job = new Job(conf,"STJoin"); 64 job.setMapperClass(STJoinMapper.class); 65 job.setReducerClass(STJoinReducer.class); 66 job.setOutputKeyClass(Text.class); 67 job.setOutputValueClass(Text.class); 68 FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/hadoop/STJoin/joinFile")); 69 FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/STJoin/joinResult")); 70 71 System.exit(job.waitForCompletion(true)?0:1); 72 } 73 }
- 結果顯示
?
Richard LiHua
Lily Well
Lily MillShell
Havid LiT
Tom Lilei
Tom SuSan
Tom Lili
Tom David
?以上代碼在hadoop1.0.3平臺實現