1.需求分析
我們仍然以美國各個氣象站每年的氣溫數據集為例,現在要求使用MapReduce讀取該數據集,然后批量寫入HBase數據庫,最后利用HBase shell根據行鍵即席查詢氣溫數據。
2.數據集準備
?數據集的文件名為temperature.log,里面包含美國各個氣象站每年的氣溫數據,數據的第一列為氣象站ID,第二列為年份,第三列為氣溫值。具體樣本數據如下所示: 03103,1980,41 03103,1981,98 03103,1982,70 03103,1983,74 03103,1984,77
3.代碼實現
Mapper:
public static class MyMapper extends Mapper<LongWritable, Text,LongWritable,Text>{private Text word = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//解析每條氣溫記錄String[] records = value.toString().split(",");int length = records.length;if(length==3){//設置HBase行鍵rowKeyString rowKey = records[0]+":"+records[1];word.set(rowKey+","+value.toString());context.write(key,word);}}}
?Reducer:
public static class MyReducer extends TableReducer<LongWritable,Text, ImmutableBytesWritable>{
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text value:values){
String[] splits = value.toString().split(",");
String rowKey=splits[0];
//獲取第一列作為RowKey
Put put =new Put(Bytes.toBytes(splits[0]));
//獲取其他列作為HBase列簇中的字段
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("id"), Bytes.toBytes(splits[0]));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("year"), Bytes.toBytes(splits[1]));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("temperature"), Bytes.toBytes(splits[2]));
ImmutableBytesWritable keys = new ImmutableBytesWritable(rowKey.getBytes());
context.write(keys,put);
}
}}
Main:
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//設置HBase配置連接
Configuration conf= HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job = new Job(conf, "BatchImportHBase");
job.setJobName("BatchImportHBase");
job.setJarByClass(BatchImportHBase.class);
job.setMapperClass(MyMapper.class);
//執行reducer類寫入HBase
TableMapReduceUtil.initTableReducerJob("temperature", MyReducer.class, job, null, null, null, null, false);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.waitForCompletion(true) ;
}
4.測試運行
1. 創建HBase表 hbase(main):002:0> create 'temperature','cf'
2.?提交MapReduce作業
3.?根據Rowkey查詢氣溫數據