需求背景
現有需求,需要采用spark查詢hbase數據庫的數據同步到中間分析庫,記錄spark集成hbase的簡單例子代碼
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSessionobject ReadHBaseData {def main(args: Array[String]): Unit = {// 創建SparkSessionval spark = SparkSession.builder().appName("ReadHBaseData").master("local").getOrCreate()// 創建HBase配置val conf = HBaseConfiguration.create()// 設置HBase連接參數conf.set("hbase.zookeeper.quorum", "localhost")conf.set("hbase.zookeeper.property.clientPort", "2181")// 創建HBase連接val connection = ConnectionFactory.createConnection(conf)// 創建HBase表val tableName = "my_table"val table = connection.getTable(TableName.valueOf(tableName))// 創建HBase掃描對象val scan = new Scan()// 設置要讀取的列族和列scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column1"))scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column2"))// 執行HBase掃描val scanner = table.getScanner(scan)// 遍歷掃描結果并將結果轉換為RDDval rdd = spark.sparkContext.parallelize(scanner.iterator().asScala.map(result => {val rowKey = Bytes.toString(result.getRow)val value1 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column1")))val value2 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column2")))(rowKey, value1, value2)}).toList)// 將RDD轉換為DataFrameval df = spark.createDataFrame(rdd).toDF("rowKey", "value1", "value2")// 顯示DataFrame內容df.show()// 關閉HBase連接scanner.close()table.close()connection.close()// 關閉SparkSessionspark.stop()}
}