Spark踩坑记——数据库(Hbase+Mysql)转
轉(zhuǎn)自:http://www.cnblogs.com/xlturing/p/spark.html
前言
在使用Spark Streaming的過程中對于計算產(chǎn)生結(jié)果的進(jìn)行持久化時,我們往往需要操作數(shù)據(jù)庫,去統(tǒng)計或者改變一些值。最近一個實時消費者處理任務(wù),在使用spark streaming進(jìn)行實時的數(shù)據(jù)流處理時,我需要將計算好的數(shù)據(jù)更新到hbase和mysql中,所以本文對spark操作hbase和mysql的內(nèi)容進(jìn)行總結(jié),并且對自己踩到的一些坑進(jìn)行記錄。
Spark Streaming持久化設(shè)計模式
DStreams輸出操作
- print:打印driver結(jié)點上每個Dstream中的前10個batch元素,常用于開發(fā)和調(diào)試
- saveAsTextFiles(prefix, [suffix]):將當(dāng)前Dstream保存為文件,每個interval batch的文件名命名規(guī)則基于prefix和suffix:"prefix-TIME_IN_MS[.suffix]".
- saveAsObjectFiles(prefix, [suffix]):將當(dāng)前的Dstream內(nèi)容作為Java可序列化對象的序列化文件進(jìn)行保存,每個interval batch的文件命名規(guī)則基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
- saveAsHadoopFiles(prefix, [suffix]):將Dstream以hadoop文件的形式進(jìn)行保存,每個interval batch的文件命名規(guī)則基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
- foreachRDD(func):最通用的輸出操作,可以對從數(shù)據(jù)流中產(chǎn)生的每一個RDD應(yīng)用函數(shù)_fun_。通常_fun_會將每個RDD中的數(shù)據(jù)保存到外部系統(tǒng),如:將RDD保存到文件,或者通過網(wǎng)絡(luò)連接保存到數(shù)據(jù)庫。值得注意的是:_fun_執(zhí)行在跑應(yīng)用的driver進(jìn)程中,并且通常會包含RDD action以促使數(shù)據(jù)流RDD開始計算。
使用foreachRDD的設(shè)計模式
dstream.foreachRDD對于開發(fā)而言提供了很大的靈活性,但在使用時也要避免很多常見的坑。我們通常將數(shù)據(jù)保存到外部系統(tǒng)中的流程是:建立遠(yuǎn)程連接->通過連接傳輸數(shù)據(jù)到遠(yuǎn)程系統(tǒng)->關(guān)閉連接。針對這個流程我們很直接的想到了下面的程序代碼:
dstream.foreachRDD { rdd =>val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }在spark踩坑記——初試中,對spark的worker和driver進(jìn)行了整理,我們知道在集群模式下,上述代碼中的connection需要通過序列化對象的形式從driver發(fā)送到worker,但是connection是無法在機(jī)器之間傳遞的,即connection是無法序列化的,這樣可能會引起_serialization errors (connection object not serializable)_的錯誤。為了避免這種錯誤,我們將conenction在worker當(dāng)中建立,代碼如下:
dstream.foreachRDD { rdd =>rdd.foreach { record =>val connection = createNewConnection()connection.send(record)connection.close() } }似乎這樣問題解決了?但是細(xì)想下,我們在每個rdd的每條記錄當(dāng)中都進(jìn)行了connection的建立和關(guān)閉,這會導(dǎo)致不必要的高負(fù)荷并且降低整個系統(tǒng)的吞吐量。所以一個更好的方式是使用_rdd.foreachPartition_即對于每一個rdd的partition建立唯一的連接(注:每個partition是內(nèi)的rdd是運行在同一worker之上的),代碼如下:
dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>val connection = createNewConnection()partitionOfRecords.foreach(record => connection.send(record))connection.close()} }這樣我們降低了頻繁建立連接的負(fù)載,通常我們在連接數(shù)據(jù)庫時會使用連接池,把連接池的概念引入,代碼優(yōu)化如下:
dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>// ConnectionPool is a static, lazily initialized pool of connectionsval connection = ConnectionPool.getConnection()partitionOfRecords.foreach(record => connection.send(record))ConnectionPool.returnConnection(connection) // return to the pool for future reuse} }通過持有一個靜態(tài)連接池對象,我們可以重復(fù)利用connection而進(jìn)一步優(yōu)化了連接建立的開銷,從而降低了負(fù)載。另外值得注意的是,同數(shù)據(jù)庫的連接池類似,我們這里所說的連接池同樣應(yīng)該是lazy的按需建立連接,并且及時的收回超時的連接。
另外值得注意的是:
- 如果在spark streaming中使用了多次foreachRDD,它們之間是按照程序順序向下執(zhí)行的
- Dstream對于輸出操作的執(zhí)行策略是lazy的,所以如果我們在foreachRDD中不添加任何RDD action,那么系統(tǒng)僅僅會接收數(shù)據(jù)然后將數(shù)據(jù)丟棄。
Spark訪問Hbase
上面我們闡述了將spark streaming的Dstream輸出到外部系統(tǒng)的基本設(shè)計模式,這里我們闡述如何將Dstream輸出到Hbase集群。
Hbase通用連接類
Scala連接Hbase是通過zookeeper獲取信息,所以在配置時需要提供zookeeper的相關(guān)信息,如下:
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Connection import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.client.ConnectionFactory object HbaseUtil extends Serializable { private val conf = HBaseConfiguration.create() private val para = Conf.hbaseConfig // Conf為配置類,獲取hbase的配置 conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181")) conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts private val connection = ConnectionFactory.createConnection(conf) def getHbaseConn: Connection = connection }根據(jù)網(wǎng)上資料,Hbase的連接的特殊性我們并沒有使用連接池
Hbase輸出操作
我們以put操作為例,演示將上述設(shè)計模式應(yīng)用到Hbase輸出操作當(dāng)中:
dstream.foreachRDD(rdd => {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords => {val connection = HbaseUtil.getHbaseConn // 獲取Hbase連接partitionRecords.foreach(data => {val tableName = TableName.valueOf("tableName")val t = connection.getTable(tableName)try { val put = new Put(Bytes.toBytes(_rowKey_)) // row key // column, qualifier, value put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes) Try(t.put(put)).getOrElse(t.close()) // do some log(顯示在worker上) } catch { case e: Exception => // log error e.printStackTrace() } finally { t.close() } }) }) // do some log(顯示在driver上) } })關(guān)于Hbase的其他操作可以參考Spark 下操作 HBase(1.0.0 新 API)
填坑記錄
重點記錄在連接Hbase過程中配置HConstants.ZOOKEEPER_QUORUM的問題:
-
由于Hbase的連接不能直接使用ip地址進(jìn)行訪問,往往需要配置hosts,例如我在上述代碼段中127-0-0-1(任意),我們在hosts中需要配置
127-0-0-1 127.0.0.1 -
在單機(jī)情況下,我們只需要配置一臺zookeeper所在Hbase的hosts即可,但是當(dāng)切換到Hbase集群是遇到一個詭異的bug
問題描述:在foreachRDD中將Dstream保存到Hbase時會卡住,并且沒有任何錯誤信息爆出(沒錯!它就是卡住,沒反應(yīng))
問題分析:由于Hbase集群有多臺機(jī)器,而我們只配置了一臺Hbase機(jī)器的hosts,這樣導(dǎo)致Spark集群在訪問Hbase時不斷的去尋找但卻找不到就卡在那里
解決方式:對每個worker上的hosts配置了所有hbase的節(jié)點ip,問題解決
Spark訪問Mysql
同訪問Hbase類似,我們也需要有一個可序列化的類來建立Mysql連接,這里我們利用了Mysql的C3P0連接池
MySQL通用連接類
import java.sql.Connection import java.util.Propertiesimport com.mchange.v2.c3p0.ComboPooledDataSource class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) private val conf = Conf.mysqlConfig try { cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8")); cpds.setDriverClass("com.mysql.jdbc.Driver"); cpds.setUser(conf.get("username").getOrElse("root")); cpds.setPassword(conf.get("password").getOrElse("")) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception => e.printStackTrace() } def getConnection: Connection = { try { return cpds.getConnection(); } catch { case ex: Exception => ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool = _ def getMysqlManager: MysqlPool = { synchronized { if (mysqlManager == null) { mysqlManager = new MysqlPool } } mysqlManager } }我們利用c3p0建立Mysql連接池,然后訪問的時候每次從連接池中取出連接用于數(shù)據(jù)傳輸。
Mysql輸出操作
同樣利用之前的foreachRDD設(shè)計模式,將Dstream輸出到mysql的代碼如下:
dstream.foreachRDD(rdd => {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords => {//從連接池中獲取一個連接val conn = MysqlManager.getMysqlManager.getConnectionval statement = conn.createStatementtry {conn.setAutoCommit(false)partitionRecords.foreach(record => { val sql = "insert into table..." // 需要執(zhí)行的sql操作 statement.addBatch(sql) }) statement.executeBatch conn.commit } catch { case e: Exception => // do some log } finally { statement.close() conn.close() } }) } })值得注意的是:
- 我們在提交Mysql的操作的時候,并不是每條記錄提交一次,而是采用了批量提交的形式,所以需要將conn.setAutoCommit(false),這樣可以進(jìn)一步提高mysql的效率。
- 如果我們更新Mysql中帶索引的字段時,會導(dǎo)致更新速度較慢,這種情況應(yīng)想辦法避免,如果不可避免,那就硬上吧(T^T)
部署
提供一下Spark連接Mysql和Hbase所需要的jar包的maven配置:
<dependency><!-- Hbase --><groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.0</version> </dependency> <dependency><!-- Mysql --> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency> <dependency> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> <version>0.9.1.2</version> </dependency>參考文獻(xiàn):
總結(jié)
以上是生活随笔為你收集整理的Spark踩坑记——数据库(Hbase+Mysql)转的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 孕妇梦到摘草莓是胎梦吗
- 下一篇: SQL 语句优化--IN语句优化案例