java導出hbase表數據_通用MapReduce程序復制HBase表數據

編寫MR程序,讓其可以適合大部分的HBase表數據導入到HBase表數據。其中包括可以設置版本數、可以設置輸入表的列導入設置(選取其中某幾列)、可以設置輸出表的列導出設置(選取其中某幾列)。

原始表test1數據如下:

b4d7516b06e845aed13292d5e5c2b89c.png

每個row key都有兩個版本的數據,這里只顯示了row key為1的數據

在hbase shell 中創建數據表:

create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、無列導入設置、無列導出設置的數據

create 'test3',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、無列導入設置、有列導出設置的數據

create 'test4',{NAME => 'cf1',VERSIONS => 10} // 保存無版本、有列導入設置、無列導出設置的數據

create 'test5',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、無列導入設置、無列導出設置的數據

create 'test6',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、無列導入設置、有列導出設置的數據

create 'test7',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列導入設置、無列導出設置的數據

create 'test8',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列導入設置、有列導出設置的數據

main函數入口:

package GeneralHBaseToHBase;

import org.apache.hadoop.util.ToolRunner;

public class DriverTest {

public static void main(String[] args) throws Exception {

// 無版本設置、無列導入設置,無列導出設置

String[] myArgs1= new String[]{

"test1", // 輸入表

"test2", // 輸出表

"0", // 版本大小數,如果值為0,則為默認從輸入表導出最新的數據到輸出表

"-1", // 列導入設置,如果為-1 ,則沒有設置列導入

"-1" // 列導出設置,如果為-1,則沒有設置列導出

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs1);

// 無版本設置、有列導入設置,無列導出設置

String[] myArgs2= new String[]{

"test1",

"test3",

"0",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs2);

// 無版本設置,無列導入設置,有列導出設置

String[] myArgs3= new String[]{

"test1",

"test4",

"0",

"-1",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs3);

// 有版本設置,無列導入設置,無列導出設置

String[] myArgs4= new String[]{

"test1",

"test5",

"2",

"-1",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs4);

// 有版本設置、有列導入設置,無列導出設置

String[] myArgs5= new String[]{

"test1",

"test6",

"2",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs5);

// 有版本設置、無列導入設置,有列導出設置

String[] myArgs6= new String[]{

"test1",

"test7",

"2",

"-1",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs6);

// 有版本設置、有列導入設置,有列導出設置

String[] myArgs7= new String[]{

"test1",

"test8",

"2",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs7);

}

}

driver:

package GeneralHBaseToHBase;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import util.JarUtil;

public class HBaseDriver extends Configured implements Tool{

public static String FROMTABLE=""; //導入表

public static String TOTABLE=""; //導出表

public static String SETVERSION=""; //是否設置版本

// args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable}

@Override

public int run(String[] args) throws Exception {

if(args.length!=5){

System.err.println("Usage:\n demo.job.HBaseDriver "

+ ""

+"< versions >"

+ " like or "

+ " like or ");

return -1;

}

Configuration conf = getConf();

FROMTABLE = args[0];

TOTABLE = args[1];

SETVERSION = args[2];

conf.set("SETVERSION", SETVERSION);

if(!args[3].equals("-1")){

conf.set("COLUMNFROMTABLE", args[3]);

}

if(!args[4].equals("-1")){

conf.set("COLUMNTOTABLE", args[4]);

}

String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE;

Job job = Job.getInstance(conf, jobName);

job.setJarByClass(HBaseDriver.class);

Scan scan = new Scan();

// 判斷是否需要設置版本

if(SETVERSION != "0" || SETVERSION != "1"){

scan.setMaxVersions(Integer.parseInt(SETVERSION));

}

// 設置HBase表輸入:表名、scan、Mapper類、mapper輸出鍵類型、mapper輸出值類型

TableMapReduceUtil.initTableMapperJob(

FROMTABLE,

scan,

HBaseToHBaseMapper.class,

ImmutableBytesWritable.class,

Put.class,

job);

// 設置HBase表輸出:表名,reducer類

TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job);

// 沒有 reducers, 直接寫入到 輸出文件

job.setNumReduceTasks(0);

return job.waitForCompletion(true) ? 0 : 1;

}

private static Configuration configuration;

public static Configuration getConfiguration(){

if(configuration==null){

/**

* TODO 了解如何直接從Windows提交代碼到Hadoop集群

* 并修改其中的配置為實際配置

*/

configuration = new Configuration();

configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平臺提交任務

configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode

configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架

configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager

configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定資源分配器

configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver

configuration.set("hbase.master", "master:16000");

configuration.set("hbase.rootdir", "hdfs://master:8020/hbase");

configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");

configuration.set("hbase.zookeeper.property.clientPort", "2181");

//TODO 需export->jar file ; 設置正確的jar包所在位置

configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 設置jar包路徑

}

return configuration;

}

}

mapper:

package GeneralHBaseToHBase;

import java.io.IOException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.HashSet;

import java.util.Map.Entry;

import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class HBaseToHBaseMapper extends TableMapper {

Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class);

private static int versionNum = 0;

private static String[] columnFromTable = null;

private static String[] columnToTable = null;

private static String column1 = null;

private static String column2 = null;

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

Configuration conf = context.getConfiguration();

versionNum = Integer.parseInt(conf.get("SETVERSION", "0"));

column1 = conf.get("COLUMNFROMTABLE",null);

if(!(column1 == null)){

columnFromTable = column1.split(",");

}

column2 = conf.get("COLUMNTOTABLE",null);

if(!(column2 == null)){

columnToTable = column2.split(",");

}

}

@Override

protected void map(ImmutableBytesWritable key, Result value,

Context context)

throws IOException, InterruptedException {

context.write(key, resultToPut(key,value));

}

/***

* 把key,value轉換為Put

* @param key

* @param value

* @return

* @throws IOException

*/

private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException {

HashMap fTableMap = new HashMap<>();

HashMap tTableMap = new HashMap<>();

Put put = new Put(key.get());

if(! (columnFromTable == null || columnFromTable.length == 0)){

fTableMap = getFamilyAndColumn(columnFromTable);

}

if(! (columnToTable == null || columnToTable.length == 0)){

tTableMap = getFamilyAndColumn(columnToTable);

}

if(versionNum==0){

if(fTableMap.size() == 0){

if(tTableMap.size() == 0){

for (Cell kv : value.rawCells()) {

put.add(kv); // 沒有設置版本,沒有設置列導入,沒有設置列導出

}

return put;

} else{

return getPut(put, value, tTableMap); // 無版本、無列導入、有列導出

}

} else {

if(tTableMap.size() == 0){

return getPut(put, value, fTableMap);// 無版本、有列導入、無列導出

} else {

return getPut(put, value, tTableMap);// 無版本、有列導入、有列導出

}

}

} else{

if(fTableMap.size() == 0){

if(tTableMap.size() == 0){

return getPut1(put, value); // 有版本,無列導入,無列導出

}else{

return getPut2(put, value, tTableMap); //有版本,無列導入,有列導出

}

}else{

if(tTableMap.size() == 0){

return getPut2(put,value,fTableMap);// 有版本,有列導入,無列導出

}else{

return getPut2(put,value,tTableMap); // 有版本,有列導入,有列導出

}

}

}

}

/***

* 無版本設置的情況下,對于有列導入或者列導出

* @param put

* @param value

* @param tableMap

* @return

* @throws IOException

*/

private Put getPut(Put put,Result value,HashMap tableMap) throws IOException{

for(Cell kv : value.rawCells()){

byte[] family = kv.getFamily();

if(tableMap.containsKey(new String(family))){

String columnStr = tableMap.get(new String(family));

ArrayList columnBy = toByte(columnStr);

if(columnBy.contains(new String(kv.getQualifier()))){

put.add(kv); //沒有設置版本,沒有設置列導入,有設置列導出

}

}

}

return put;

}

/***

* (有版本,無列導入,有列導出)或者(有版本,有列導入,無列導出)

* @param put

* @param value

* @param tTableMap

* @return

*/

private Put getPut2(Put put,Result value,HashMap tableMap){

NavigableMap>> map=value.getMap();

for(byte[] family:map.keySet()){

if(tableMap.containsKey(new String(family))){

String columnStr = tableMap.get(new String(family));

log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr);

ArrayList columnBy = toByte(columnStr);

NavigableMap> familyMap = map.get(family);//列簇作為key獲取其中的列相關數據

for(byte[] column:familyMap.keySet()){ //根據列名循壞

log.info("!!!!!!!!!!!"+new String(column));

if(columnBy.contains(new String(column))){

NavigableMap valuesMap = familyMap.get(column);

for(Entry s:valuesMap.entrySet()){//獲取列對應的不同版本數據,默認最新的一個

System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue()));

put.addColumn(family, column, s.getKey(),s.getValue());

}

}

}

}

}

return put;

}

/***

* 有版本、無列導入、無列導出

* @param put

* @param value

* @return

*/

private Put getPut1(Put put,Result value){

NavigableMap>> map=value.getMap();

for(byte[] family:map.keySet()){

NavigableMap> familyMap = map.get(family);//列簇作為key獲取其中的列相關數據

for(byte[] column:familyMap.keySet()){ //根據列名循壞

NavigableMap valuesMap = familyMap.get(column);

for(Entry s:valuesMap.entrySet()){ //獲取列對應的不同版本數據,默認最新的一個

put.addColumn(family, column, s.getKey(),s.getValue());

}

}

}

return put;

}

// str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}

/***

* 得到列簇名與列名的k,v形式的map

* @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}

* @return map => {"cf1" => "c1,c2,c10,c11,c14"}

*/

private static HashMap getFamilyAndColumn(String[] str){

HashMap map = new HashMap<>();

HashSet set = new HashSet<>();

for(String s : str){

set.add(s.split(":")[0]);

}

Object[] ob = set.toArray();

for(int i=0; i

String family = String.valueOf(ob[i]);

String columns = "";

for(int j=0;j < str.length;j++){

if(family.equals(str[j].split(":")[0])){

columns += str[j].split(":")[1]+",";

}

}

map.put(family, columns.substring(0, columns.length()-1));

}

return map;

}

private static ArrayList toByte(String s){

ArrayList b = new ArrayList<>();

String[] sarr = s.split(",");

for(int i=0;i

b.add(sarr[i]);

}

return b;

}

}

程序運行完之后,在hbase shell中查看每個表,看是否數據導入正確:

test2:(無版本、無列導入設置、無列導出設置)

663c31cae4e6e0014af4b367ca742ec4.png

test3 (無版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導出設置)

8fed56c4365c50b1bf7384c7c3e90809.png

test4(無版本、無列導入設置、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

95dd81da9857f9d75e6f182eca8ed7f0.png

test5(有版本、無列導入設置、無列導出設置)

b37a5ec4bcc01f3bd824c1d692a83cc9.png

test6(有版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導出設置)

7acc004cefa773a006865c0612567622.png

test7(有版本、無列導入設置、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

4c95796f2116410dbf6804a957d4012a.png

test8(有版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

6649a92bc17c809fac6a0ef6dd2fc035.png

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/443070.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/443070.shtml
英文地址,請注明出處:http://en.pswp.cn/news/443070.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

java雙語試卷_Java程序設計基礎(雙語)試題題目及答案,課程2021最新期末考試題庫,章節測驗答案...

若二項式(x&#xff0b;13x)n的展開式中含3x的項是第三項&#xff0c;則n的值是______&#xff0e;(x2&#xff0b;1ax)6(a&#xff1e;0)的展開式中常數項是15&#xff0c;那么展開式中所有項系數和是______&#xff0e;(x2&#xff0b;1ax)6(a&#xff1e;0)的展開式中常數項…

java服務器和linux_在Linux下開一個Java服務器(使用CatServer Pro)

引言Linux開服具有快速&#xff0c;高效&#xff0c;性能等特點&#xff0c;而Windows雖然簡單&#xff0c;但是不具備Linux良好的性能。本教程就說明一下簡單的Linux開服方式(需要教程的人&#xff0c;如果你學會后&#xff0c;請無償幫助更多的人。)服務器準備首先。先準備一…

java中js九個隱含對象_第九章 JSP標簽——《跟我學Shiro》

Shiro提供了JSTL標簽用于在JSP/GSP頁面進行權限控制&#xff0c;如根據登錄用戶顯示相應的頁面按鈕。導入標簽庫標簽庫定義在shiro-web.jar包下的META-INF/shiro.tld中定義。guest標簽歡迎游客訪問&#xff0c;登錄用戶沒有身份驗證時顯示相應信息&#xff0c;即游客訪問信息。…

java中jsp標準動作_JavaBean和jsp標準動作

一.JavaBean 1.理解&#xff1a;可以重用的java類 2.分類 1)封裝數據的bean(相當于實體類) 2)封裝業務的bean(一般就是實現增刪改查) 3.注意&#xff1a; 1)封裝數據的bean一般要滿足如下兩個條件 1.implements Serializable&#xff1a;實現序列化接口 2.擁有一個無參的public…

我的世界java版游戲崩潰_我的世界全攻略之-游戲崩潰的解決方法

我的世界崩潰怎么辦&#xff1f;下面吾愛網小編給大家帶來我的世界無法正常啟動的解決方法,需要的朋友可以參考下。我的世界作為許多玩家都十分喜愛的模擬經營沙盤類游戲,經常有玩家反映在玩我的世界的時候,游戲總是會出現崩潰或者無法啟動的情況,玩家在遇到的時候不知道怎么辦…

手寫實現java中的trim_JS中字符串trim()使用示例

示例一&#xff1a;測試JS擴展方法// 合并多個空白為一個空白String.prototype.ResetBlank function() { //對字符串擴展var regEx /\s/g;return this.replace(regEx, );};window.onload function(){var str "你 在他想還好嗎?";alert(str);str str.ResetBlan…

java excil表格開發_JAVA導出Excel電子表格的方法

JAVA導出Excel電子表格的方法package com.qingruxu.excel;import java.io.File;import java.io.IOException;import java.net.URL;import jxl.Sheet;import jxl.Workbook;import jxl.format.Border;import jxl.read.biff.BiffException;import jxl.write.Blank;import jxl.writ…

vue3 新項目 - 搭建路由router

創建router/index 文件 main.ts 安裝 router 然后 在 app下面 去 設置 路由出口

java json中的注釋_如何使用Java中的@Expose注釋從JSON中排除字段?

Gson Expose批注可用于標記要公開或不公開(串行化或反序列化)的字段。 expose注釋可以取兩個參數和每個參數是可以采取任一值的布爾真或假。為了使GSON對Expose批注做出反應&#xff0c;我們必須使用GsonBuilder類創建一個Gson實例&#xff0c;并需要調用excludeFieldsWithoutE…

java 屏蔽郵箱_使用javamail發送郵件的時候如何阻止附件內容輸出到控制臺

我在使用JavaMail發送帶附件的郵件時候&#xff0c;每次到了Transport.sendMessage()這一步&#xff0c;控制臺就會輸出附件內容&#xff0c;請問如何設置可以取消輸出呢&#xff1f;public void sendFileAttachedMail(String fromMail, String toMail, String fromMailPwd, St…

java如何獲得相反的顏色_javascript – 如何根據當前顏色生成相反的顏色?

更新&#xff1a;GitHub上的生產就緒代碼.我就是這樣做的&#xff1a;>將HEX轉換為RGB>反轉R,G和B組件>將每個組件轉換回HEX>用零和輸出填充每個組件.function invertColor(hex) {if (hex.indexOf(#) 0) {hex hex.slice(1);}// convert 3-digit hex to 6-digits.…

php暫停循環,在特定數量的遞歸循環后,PHP停止執行

我遇到的問題很奇怪。 在執行遞歸循環時會發生這種情況。 使用for循環或任何其他迭代執行相同任務時&#xff0c;不會發生這種情況。在?21 000次以下遞歸調用函數時&#xff0c;一切正常。 超過此數字時會出現問題。我的工作代碼&#xff1a;foo();function foo($i 1) {if ($…

thinkphp3 php jwt,thinkphp框架使用JWTtoken的方法詳解

本文實例講述了thinkphp框架使用JWTtoken的方法。分享給大家供大家參考&#xff0c;具體如下&#xff1a;簡介一&#xff1a;JWT介紹&#xff1a;全稱JSON Web Token&#xff0c;基于JSON的開放標準((RFC 7519) &#xff0c;以token的方式代替傳統的Cookie-Session模式&#xf…

php 64位編碼解碼,php base64 編碼和解碼

/*** 自定義規則方式編碼和解碼**/public function test_changinttoStr () {$intvalue1 1232344234;$intStr ;$str "Y 9 L F k g y 5 R o v i x I 1 a O f 8 U h d s 3 r 4 D M p l Q Z J X P q e b E 0 W S j B n 7 w V z m N 2 G c 6 T H C A K u t";$seq expl…

matlab能力處理,書+程序《MATLAB圖像處理:能力提高與應用案例》趙小川

【實例簡介】MATLAB圖像處理能力提高與應用案例 [趙小川 編著] 2014年版,書程序(僅供學習)【實例截圖】【核心代碼】c3c59b5e-16c1-4bda-938e-8fcfb2ff870d└── 《MATLAB圖像處理&#xff1a;能力提高與應用案例》書程序├── 1.1 圖像多分辨率金字塔.zip├── 1.2圖像的矩…

wamp php啟動不成功,wamp的mysql 啟動失敗解決

wamp啟動失敗&#xff0c;查看原因是mysql 啟動失敗首先查看mysql的啟動日志命令&#xff1a;mysqld --console知道error報錯的地方&#xff1a;然后百度了此報錯&#xff0c;解決方法在my.ini中添加innodb_force_recovery 1發現這個會影響insert需要設置為 innodb_force_reco…

php復選框樣式,如何自定義checkbox樣式?附代碼

本篇文章給大家帶來的內容是關于如何自定義checkbox樣式&#xff1f;附代碼&#xff0c;有一定的參考價值&#xff0c;有需要的朋友可以參考一下&#xff0c;希望對你有所幫助。修改原生checkbox樣式。效果原理1.利用CSS3屬性 appearance。該屬性(強制)更改(改變)默認(原生)樣式…

JAVA用數據留給出師表排序,如果諸葛亮會編程,用Java寫出師表...

繼上一篇 "如果諸葛亮用C#寫出師表..."后&#xff0c;站長想自己的第一語言是Java&#xff0c;雖然平時工作上用的不多&#xff0c;也用Java實現一遍吧&#xff0c;改改就是了&#xff0c;無非就是:C#的Console.WriteLine改為Java的System.out.println&#xff1b;C#…

python 橫坐標旋轉,python 橫坐標旋轉

數據旋轉公式x1cos(angle)*x-sin(angle)*y;y1cos(angle)*ysin(angle)*x;從數學上來說&#xff0c;此公式可以用來計算某個點繞另外一點旋轉一定角度后的坐標&#xff0c;例如&#xff1a;A(x&#xff0c;y)繞B(a&#xff0c;b)旋轉β度后的位置為C(c&#xff0c;d)&#xff0c;…

php 數組區刪除重復的,php – 從數組中刪除重復的項目

我使用下面的代碼行來遍歷數據庫中的一個表&#xff1a;$items_thread $connection -> fetch_all($sql);如果我打印出來的陣列&#xff1a;print_r($items_thread);我會得到這個&#xff1a;Array([0] > Array([RecipientID] > 3[RecipientScreenname] > Tom L[Re…