Spark踩坑記——數據庫(Hbase+Mysql)轉

轉自:http://www.cnblogs.com/xlturing/p/spark.html

前言

在使用Spark Streaming的過程中對于計算產生結果的進行持久化時,我們往往需要操作數據庫,去統計或者改變一些值。最近一個實時消費者處理任務,在使用spark streaming進行實時的數據流處理時,我需要將計算好的數據更新到hbase和mysql中,所以本文對spark操作hbase和mysql的內容進行總結,并且對自己踩到的一些坑進行記錄。

Spark Streaming持久化設計模式

DStreams輸出操作

  • print:打印driver結點上每個Dstream中的前10個batch元素,常用于開發和調試
  • saveAsTextFiles(prefix, [suffix]):將當前Dstream保存為文件,每個interval batch的文件名命名規則基于prefix和suffix:"prefix-TIME_IN_MS[.suffix]".
  • saveAsObjectFiles(prefix, [suffix]):將當前的Dstream內容作為Java可序列化對象的序列化文件進行保存,每個interval batch的文件命名規則基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
  • saveAsHadoopFiles(prefix, [suffix]):將Dstream以hadoop文件的形式進行保存,每個interval batch的文件命名規則基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
  • foreachRDD(func):最通用的輸出操作,可以對從數據流中產生的每一個RDD應用函數_fun_。通常_fun_會將每個RDD中的數據保存到外部系統,如:將RDD保存到文件,或者通過網絡連接保存到數據庫。值得注意的是:_fun_執行在跑應用的driver進程中,并且通常會包含RDD action以促使數據流RDD開始計算。

使用foreachRDD的設計模式

dstream.foreachRDD對于開發而言提供了很大的靈活性,但在使用時也要避免很多常見的坑。我們通常將數據保存到外部系統中的流程是:建立遠程連接->通過連接傳輸數據到遠程系統->關閉連接。針對這個流程我們很直接的想到了下面的程序代碼:

dstream.foreachRDD { rdd =>val connection = createNewConnection()  // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }

在spark踩坑記——初試中,對spark的worker和driver進行了整理,我們知道在集群模式下,上述代碼中的connection需要通過序列化對象的形式從driver發送到worker,但是connection是無法在機器之間傳遞的,即connection是無法序列化的,這樣可能會引起_serialization errors (connection object not serializable)_的錯誤。為了避免這種錯誤,我們將conenction在worker當中建立,代碼如下:

dstream.foreachRDD { rdd =>rdd.foreach { record =>val connection = createNewConnection()connection.send(record)connection.close() } }

似乎這樣問題解決了?但是細想下,我們在每個rdd的每條記錄當中都進行了connection的建立和關閉,這會導致不必要的高負荷并且降低整個系統的吞吐量。所以一個更好的方式是使用_rdd.foreachPartition_即對于每一個rdd的partition建立唯一的連接(注:每個partition是內的rdd是運行在同一worker之上的),代碼如下:

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>val connection = createNewConnection()partitionOfRecords.foreach(record => connection.send(record))connection.close()}
}

這樣我們降低了頻繁建立連接的負載,通常我們在連接數據庫時會使用連接池,把連接池的概念引入,代碼優化如下:

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>// ConnectionPool is a static, lazily initialized pool of connectionsval connection = ConnectionPool.getConnection()partitionOfRecords.foreach(record => connection.send(record))ConnectionPool.returnConnection(connection)  // return to the pool for future reuse}
}

通過持有一個靜態連接池對象,我們可以重復利用connection而進一步優化了連接建立的開銷,從而降低了負載。另外值得注意的是,同數據庫的連接池類似,我們這里所說的連接池同樣應該是lazy的按需建立連接,并且及時的收回超時的連接。
另外值得注意的是:

  • 如果在spark streaming中使用了多次foreachRDD,它們之間是按照程序順序向下執行的
  • Dstream對于輸出操作的執行策略是lazy的,所以如果我們在foreachRDD中不添加任何RDD action,那么系統僅僅會接收數據然后將數據丟棄。

Spark訪問Hbase

上面我們闡述了將spark streaming的Dstream輸出到外部系統的基本設計模式,這里我們闡述如何將Dstream輸出到Hbase集群。

Hbase通用連接類

Scala連接Hbase是通過zookeeper獲取信息,所以在配置時需要提供zookeeper的相關信息,如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.client.ConnectionFactory object HbaseUtil extends Serializable { private val conf = HBaseConfiguration.create() private val para = Conf.hbaseConfig // Conf為配置類,獲取hbase的配置 conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181")) conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts private val connection = ConnectionFactory.createConnection(conf) def getHbaseConn: Connection = connection }

根據網上資料,Hbase的連接的特殊性我們并沒有使用連接池

Hbase輸出操作

我們以put操作為例,演示將上述設計模式應用到Hbase輸出操作當中:

dstream.foreachRDD(rdd => {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords => {val connection = HbaseUtil.getHbaseConn // 獲取Hbase連接partitionRecords.foreach(data => {val tableName = TableName.valueOf("tableName")val t = connection.getTable(tableName)try { val put = new Put(Bytes.toBytes(_rowKey_)) // row key // column, qualifier, value put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes) Try(t.put(put)).getOrElse(t.close()) // do some log(顯示在worker上) } catch { case e: Exception => // log error e.printStackTrace() } finally { t.close() } }) }) // do some log(顯示在driver上) } })

關于Hbase的其他操作可以參考Spark 下操作 HBase(1.0.0 新 API)

填坑記錄

重點記錄在連接Hbase過程中配置HConstants.ZOOKEEPER_QUORUM的問題:

  • 由于Hbase的連接不能直接使用ip地址進行訪問,往往需要配置hosts,例如我在上述代碼段中127-0-0-1(任意),我們在hosts中需要配置

    127-0-0-1 127.0.0.1
  • 在單機情況下,我們只需要配置一臺zookeeper所在Hbase的hosts即可,但是當切換到Hbase集群是遇到一個詭異的bug
    問題描述:在foreachRDD中將Dstream保存到Hbase時會卡住,并且沒有任何錯誤信息爆出(沒錯!它就是卡住,沒反應)
    問題分析:由于Hbase集群有多臺機器,而我們只配置了一臺Hbase機器的hosts,這樣導致Spark集群在訪問Hbase時不斷的去尋找但卻找不到就卡在那里
    解決方式:對每個worker上的hosts配置了所有hbase的節點ip,問題解決

Spark訪問Mysql

同訪問Hbase類似,我們也需要有一個可序列化的類來建立Mysql連接,這里我們利用了Mysql的C3P0連接池

MySQL通用連接類

import java.sql.Connection
import java.util.Propertiesimport com.mchange.v2.c3p0.ComboPooledDataSource class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) private val conf = Conf.mysqlConfig try { cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8")); cpds.setDriverClass("com.mysql.jdbc.Driver"); cpds.setUser(conf.get("username").getOrElse("root")); cpds.setPassword(conf.get("password").getOrElse("")) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception => e.printStackTrace() } def getConnection: Connection = { try { return cpds.getConnection(); } catch { case ex: Exception => ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool = _ def getMysqlManager: MysqlPool = { synchronized { if (mysqlManager == null) { mysqlManager = new MysqlPool } } mysqlManager } }

我們利用c3p0建立Mysql連接池,然后訪問的時候每次從連接池中取出連接用于數據傳輸。

Mysql輸出操作

同樣利用之前的foreachRDD設計模式,將Dstream輸出到mysql的代碼如下:

dstream.foreachRDD(rdd => {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords => {//從連接池中獲取一個連接val conn = MysqlManager.getMysqlManager.getConnectionval statement = conn.createStatementtry {conn.setAutoCommit(false)partitionRecords.foreach(record => { val sql = "insert into table..." // 需要執行的sql操作 statement.addBatch(sql) }) statement.executeBatch conn.commit } catch { case e: Exception => // do some log } finally { statement.close() conn.close() } }) } })

值得注意的是:

  • 我們在提交Mysql的操作的時候,并不是每條記錄提交一次,而是采用了批量提交的形式,所以需要將conn.setAutoCommit(false),這樣可以進一步提高mysql的效率。
  • 如果我們更新Mysql中帶索引的字段時,會導致更新速度較慢,這種情況應想辦法避免,如果不可避免,那就硬上吧(T^T)

部署

提供一下Spark連接Mysql和Hbase所需要的jar包的maven配置:

<dependency><!-- Hbase --><groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.0</version> </dependency> <dependency><!-- Mysql --> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency> <dependency> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> <version>0.9.1.2</version> </dependency>

參考文獻:

  1. Spark Streaming Programming Guide
  2. HBase介紹
  3. Spark 下操作 HBase(1.0.0 新 API)
  4. Spark開發快速入門
  5. kafka->spark->streaming->mysql(scala)實時數據處理示例
  6. Spark Streaming 中使用c3p0連接池操作mysql數據庫

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/397747.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/397747.shtml
英文地址,請注明出處:http://en.pswp.cn/news/397747.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

解決Failed to connect session for conifg 故障

服務器升級openssh之后jenkins構建報錯了&#xff0c;報錯信息如下&#xff1a; Failed to connet or change directory jenkins.plugins.publish_over.BapPublisherException:Failed to connect session for config.....Message [Algorithm negotiation fail] 升級前ssh版本&a…

78oa mysql_78oa系統版本升級方法

可升級版本預覽升級方法&#xff1a;1、備份數據庫、附件目錄、二次開發程序打開開始菜單——控制面板——管理工具——服務&#xff0c;右鍵點擊停止 78oa mysql service 服務&#xff0c;完整復制【D:\78OA\server\modules\storage\data\78oa】(數據庫)文件夾至備份區域。完整…

Excel導出顯示服務器意外,C# 調用Excel 出現服務器出現意外狀況. (異常來自 HRESULT:0x80010105 (RPC_E_SERVERFAULT)...

C# 調用Excel 出現服務器出現意外狀況. (異常來自 HRESULT:0x80010105 (RPC_E_SERVERFAULT)htmlprivate Microsoft.Office.Interop.Excel.Application xApp;private Microsoft.Office.Interop.Excel.Workbook xBook;服務器//變量xApp new Microsoft.Office.Interop.Excel.Appl…

列表、元組、字典、集合的定義、操作與綜合練習

l[A,B,C] t{A,B,C}l.append(B)print(l)scores[66,77,88]d{A:66,B:77,C:88} d[B]99 d[D]111 d.pop(C) print(d)s1{A,B,C} s2{A,C,D} print(s1&s2) print(s1|s2) 轉載于:https://www.cnblogs.com/chenjunyu666/p/9147417.html

xargs

find /tmp/ -name "*.log" -mtime 4 | xargs -i -t mv {} /home/ find /tmp/ -name "*.log" -mtime 4 -print0 | xargs -0 rm -f xargs(1) xargs是給命令傳遞參數的一個過濾器&#xff0c;也是組合多個命令的一個工具。它把一個數據流分割為一些足夠小的塊…

export mysql home_mysql的Linux下安裝筆記

注&#xff1a;在5.7之后MySQL不在生成my-default.cnf配置。tar -xzvf mysql-5.7.28-linux-glibc2.12-x86_64.tar.gzmv mysql-5.7.28-linux-glibc2.12-x86_64.tar.gz/ /usr/local/mysql新建 useradd mysql新建文件夾mkdir /usr/local/mysql/data生成配置&#xff1a;./mysqld -…

[轉]DevExpress GridControl 關于使用CardView的一點小結

最近項目里需要顯示商品的一系列圖片&#xff0c;打算用CardView來顯示&#xff0c;由于第一次使用&#xff0c;遇到許多問題&#xff0c;發現網上這方面的資源很少&#xff0c;所以把自己的一點點實際經驗小結一下&#xff0c;供自己和大家以后參考。 1、選擇CardView&#xf…

thinkphp5 ajax搜索+分頁

<center> <table > <tr> 水果名稱<input type"text" name"f_name" class"f_name"> 水果分類 &…

EventBus學習

EventBus是android 下高效的發布/訂閱事件總線機制&#xff0c;可以代替傳統的Intent&#xff0c;Handler&#xff0c;BroadCast 或者Fragment&#xff0c;Activity&#xff0c;Service&#xff0c;線程之間傳遞數據&#xff0c;是一種發布訂閱設計模式&#xff08;觀察者模式&…

Uediter的引用和取值

頁面應用Uediter控件&#xff0c;代碼如下&#xff1a; <tr><td align"center" class"xwnr_j"><asp: TextBox ID "txtContent" TextMode "MultiLine" Height "274px" Width "95%" runat"serv…

java程序 構建mycircle類_Java語言程序設計(十九)對象和類的應用實例

1.我們定義一個Circle類并使用該類創建對象&#xff0c;我們創建三個圓對象&#xff0c;1.0&#xff0c;25和125&#xff0c;然后顯示這三個圓的半徑和面積&#xff0c;將第二個對象的半徑改為100&#xff0c;然后顯示它的新半徑和面積。程序清單如下&#xff1a;package testc…

Django拋錯不存在(DoesNotExist)

from django.core.exceptions import ObjectDoesNotExist try:disabledusers.objects.get(sAMAccountNameliu) except ObjectDoesNotExist:print a except modelname.DoesNotExist:轉載于:https://www.cnblogs.com/dreamer-fish/p/5835465.html

mysql ddl dql_mysql DDL、DML、DCL、DQL區分

mysql [Structure Query Language] 的組成分4個部分&#xff1a;DDL [Data Mefinition Language] 數據定義語言DML [Data Manipulation Language]  數據操縱語言DCL [Data Control Language] 數據控制語言DQL [Data Query Language ] 數據查詢語言1、…

hiho圖的聯通性(自留)

無向圖割邊割點算法 而當(u,v)為樹邊且low[v]>dfn[u]時&#xff0c;表示v節點只能通過該邊(u,v)與u連通&#xff0c;那么(u,v)即為割邊。 1 void dfs(int u) {2 //記錄dfs遍歷次序3 static int counter 0; 4 5 //記錄節點u的子樹數6 int children …

《Git權威指南》筆記2

2019獨角獸企業重金招聘Python工程師標準>>> ###Git克隆 Git使用git clone命令實現版本庫克隆&#xff0c;主要有如下3種用法&#xff1a; 1&#xff09;git clone <repository> <direcctory> 將repository指向的版本庫創建一個克隆島directory目錄。目…

SQL數據庫掛起 SQL數據庫附加報錯 SQL數據庫824錯誤修復

SQL數據庫掛起 SQL數據庫附加報錯 SQL數據庫824錯誤修復 數據類型 MSSQL 2012數據大小 4.5 GB故障檢測 附加數據庫提示824錯誤 一般是由于斷電非法關機導致頁面損壞。客戶要求 恢復數據庫數據 ERP可直接使用。修復結果 文件傳來后 檢測發現頁面沒有及時正常關閉導致SQL認為頁不…

查找算法

a. 線性查找&#xff1a;從數據中&#xff0c;第一個元素開始查找&#xff0c;將其與查找的值進行比對&#xff0c;如果相同&#xff0c;就停止查找&#xff0c;如果不相同&#xff0c;則繼續下一個元素的比對。直到查找到匹配的值&#xff0c;或者是有數據遍歷完畢&#xff0c…

mysql測試數據圖表_mysql測試數據表

1.截取至后盾人用于mysql數據測試請在navicat中執行一下命令生成測試數據表/*Navicat Premium Data TransferSource Server : 我的本地連接Source Server Type : MySQLSource Server Version : 50726Source Host : localhost:3306Source Schema : laravelTarget Server Type : …

常用歸檔壓縮命令

1. 打包tar打包表示把一堆文件變成一個tar ####打包工具-f ####指定生成包的名字-c ####創建包-v ####顯示創建過程-t ####查看包中內容-x ####解包-r ####添加文件到包中--delete filename ##刪除包中指定文件--get filename ##取出包中指定文件cffrcvf 等組合使用2. 壓縮…

spring集合的注入

<bean id"date" class"java.util.Date"></bean> <bean id"test" class"test.Test"> <!--注入list-->   <property name"list">     <list>       <value>1</valu…