Spark SQL读数据库时不支持某些数据类型的问题(Timestamp with local Timezone)
在大數(shù)據(jù)平臺中,經(jīng)常需要做數(shù)據(jù)的ETL,從傳統(tǒng)關(guān)系型數(shù)據(jù)庫RDBMS中抽取數(shù)據(jù)到HDFS中。之前開發(fā)數(shù)據(jù)湖新版本時使用Spark SQL來完成ETL的工作,但是遇到了 Spark SQL?不支持某些數(shù)據(jù)類型(比如ORACLE中的Timestamp with local Timezone)的問題。
一、系統(tǒng)環(huán)境
-
Spark 版本:2.1.0.cloudera1
-
JDK 版本:Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131
-
ORACLE JDBC driver 版本:ojdbc7.jar
-
Scala 版本:2.11.8
?
二、Spark SQL讀數(shù)據(jù)庫表遇到的不支持某些數(shù)據(jù)類型
Spark SQL 讀取傳統(tǒng)的關(guān)系型數(shù)據(jù)庫同樣需要用到?JDBC,畢竟這是提供的訪問數(shù)據(jù)庫官方 API。Spark要讀取數(shù)據(jù)庫需要解決兩個問題:
-
分布式讀取;
-
原始表數(shù)據(jù)到DataFrame的映射。
2.1 業(yè)務(wù)代碼
public class Config {// spark-jdbc parameter namespublic static String JDBC_PARA_URL = "url";public static String JDBC_PARA_USER = "user";public static String JDBC_PARA_PASSWORD = "password";public static String JDBC_PARA_DRIVER = "driver";public static String JDBC_PARA_TABLE = "dbtable";public static String JDBC_PARA_FETCH_SIZE = "fetchsize"; } import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql._// 主類 object Main {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("yarn").appName("test")getOrCreate()val sqlContext = sparkSession.sqlContextval sc = sparkSession.sparkContextval partitionNum = 16val fetchSize = 1000val jdbcUrl = "..."val userName = "..."val schema_table = "..."val password = "..."val jdbcDriver = "oracle.jdbc.driver.OracleDriver"// 注意需要將oracle jdbc driver jar放置在spark lib jars目錄下,或者spark2-submit提交spark application時添加--jars參數(shù)val jdbcDF = sqlContext.read.format("jdbc").options(Map(Config.JDBC_PARA_URL -> jdbcUrl,Config.JDBC_PARA_USER -> userName,Config.JDBC_PARA_TABLE -> schema_table,Config.JDBC_PARA_PASSWORD -> password,Config.JDBC_PARA_DRIVER -> jdbcDriver,Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load()val rdd = jdbcDF.rddrdd.count()...... }2.2 部分?jǐn)?shù)據(jù)類型不支持
比如ORACLE中的Timestamp with local Timezone?和?FLOAT(126)。
三、解決方法:自定義JdbcDialects
3.1 什么是JdbcDialects ?
Spark SQL 中的?org.apache.spark.sql.jdbc package?中有個類?JdbcDialects.scala,該類定義了Spark DataType 和 SQLType 之間的映射關(guān)系,分析該類的源碼可知,該類是一個抽象類,包含以下幾個方法:
-
def canHandle(url : String):判斷該JdbcDialect 實例是否能夠處理該jdbc url;
-
getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):輸入數(shù)據(jù)庫中的SQLType,得到對應(yīng)的Spark DataType的mapping關(guān)系;
-
getJDBCType(dt: DataType):輸入Spark 的DataType,得到對應(yīng)的數(shù)據(jù)庫的SQLType;
-
quoteIdentifier(colName: String):引用標(biāo)識符,用來放置某些字段名用了數(shù)據(jù)庫的保留字(有些用戶會使用數(shù)據(jù)庫的保留字作為列名);
-
其他......。
該類還有一個伴生對象,其中包含3個方法:
-
get(url: String):根據(jù)database的url獲取JdbcDialect 對象;
-
unregisterDialect(dialect: JdbcDialect):將已注冊的JdbcDialect 注銷;
-
registerDialect(dialect: JdbcDialect):注冊一個JdbcDialect。
3.2 解決步驟
使用get(url: String)方法獲取當(dāng)前的 JdbcDialect 對象;
將當(dāng)前的 JdbcDialect 對象?unregistered?掉;
new 一個 JdbcDialect 對象,并重寫方法(主要是getCatalystType()方法,因為其定義了數(shù)據(jù)庫 SQLType 到 Spark DataType 的映射關(guān)系),修改映射關(guān)系,將不支持的 SQLType 以其他的支持的數(shù)據(jù)類型返回比如StringType,這樣就能夠解決問題了;
register新創(chuàng)建的 JdbcDialect 對象。
3.3 解決方案的業(yè)務(wù)代碼
?
object SaicSparkJdbcDialect {def useMyJdbcDIalect(jdbcUrl:String,dbType:String): Unit ={val logger = LoggerFactory.getLogger(classOf[SaicSparkJdbcDialect])// 將當(dāng)前的 JdbcDialect 對象unregistered掉val dialect = JdbcDialectsJdbcDialects.unregisterDialect(dialect.get(jdbcUrl))if (dbType.equals("ORACLE")) {val OracleDialect = new JdbcDialect {// 只能處理ORACLE數(shù)據(jù)庫override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")// 修改數(shù)據(jù)庫 SQLType 到 Spark DataType 的映射關(guān)系(從數(shù)據(jù)庫讀取到Spark中)override def getCatalystType(sqlType: Int, typeName: String, size: Int,md: MetadataBuilder): Option[DataType] = {if (sqlType==Types.TIMESTAMP || sqlType== -101 || sqlType== -102) {// 將不支持的 Timestamp with local Timezone 以TimestampType形式返回Some(TimestampType)} else if (sqlType == Types.BLOB) {Some(BinaryType)} else {Some(StringType)}}// 該方法定義的是數(shù)據(jù)庫Spark DataType 到 SQLType 的映射關(guān)系,此處不需要做修改override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))case _ => None}override def quoteIdentifier(colName: String): String = {colName}}// register新創(chuàng)建的 JdbcDialect 對象JdbcDialects.registerDialect(OracleDialect)}本文來自:https://www.jianshu.com/p/20b82891aac9?
總結(jié)
以上是生活随笔為你收集整理的Spark SQL读数据库时不支持某些数据类型的问题(Timestamp with local Timezone)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 无意看到,当真给力!记住:永远不要在My
- 下一篇: 大剑无锋之new一个对象背后发生了什么?