日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Spark SQL读数据库时不支持某些数据类型的问题(Timestamp with local Timezone)

發(fā)布時間:2024/2/28 数据库 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark SQL读数据库时不支持某些数据类型的问题(Timestamp with local Timezone) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在大數(shù)據(jù)平臺中,經(jīng)常需要做數(shù)據(jù)的ETL,從傳統(tǒng)關(guān)系型數(shù)據(jù)庫RDBMS中抽取數(shù)據(jù)到HDFS中。之前開發(fā)數(shù)據(jù)湖新版本時使用Spark SQL來完成ETL的工作,但是遇到了 Spark SQL?不支持某些數(shù)據(jù)類型(比如ORACLE中的Timestamp with local Timezone)的問題。

一、系統(tǒng)環(huán)境

  • Spark 版本:2.1.0.cloudera1

  • JDK 版本:Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131

  • ORACLE JDBC driver 版本:ojdbc7.jar

  • Scala 版本:2.11.8

?

二、Spark SQL讀數(shù)據(jù)庫表遇到的不支持某些數(shù)據(jù)類型

Spark SQL 讀取傳統(tǒng)的關(guān)系型數(shù)據(jù)庫同樣需要用到?JDBC,畢竟這是提供的訪問數(shù)據(jù)庫官方 API。Spark要讀取數(shù)據(jù)庫需要解決兩個問題:

  • 分布式讀取;

  • 原始表數(shù)據(jù)到DataFrame的映射。

2.1 業(yè)務(wù)代碼

public class Config {// spark-jdbc parameter namespublic static String JDBC_PARA_URL = "url";public static String JDBC_PARA_USER = "user";public static String JDBC_PARA_PASSWORD = "password";public static String JDBC_PARA_DRIVER = "driver";public static String JDBC_PARA_TABLE = "dbtable";public static String JDBC_PARA_FETCH_SIZE = "fetchsize"; } import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql._// 主類 object Main {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("yarn").appName("test")getOrCreate()val sqlContext = sparkSession.sqlContextval sc = sparkSession.sparkContextval partitionNum = 16val fetchSize = 1000val jdbcUrl = "..."val userName = "..."val schema_table = "..."val password = "..."val jdbcDriver = "oracle.jdbc.driver.OracleDriver"// 注意需要將oracle jdbc driver jar放置在spark lib jars目錄下,或者spark2-submit提交spark application時添加--jars參數(shù)val jdbcDF = sqlContext.read.format("jdbc").options(Map(Config.JDBC_PARA_URL -> jdbcUrl,Config.JDBC_PARA_USER -> userName,Config.JDBC_PARA_TABLE -> schema_table,Config.JDBC_PARA_PASSWORD -> password,Config.JDBC_PARA_DRIVER -> jdbcDriver,Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load()val rdd = jdbcDF.rddrdd.count()...... }

2.2 部分?jǐn)?shù)據(jù)類型不支持

比如ORACLE中的Timestamp with local Timezone?和?FLOAT(126)


三、解決方法:自定義JdbcDialects

3.1 什么是JdbcDialects ?

Spark SQL 中的?org.apache.spark.sql.jdbc package?中有個類?JdbcDialects.scala,該類定義了Spark DataType 和 SQLType 之間的映射關(guān)系,分析該類的源碼可知,該類是一個抽象類,包含以下幾個方法:

  • def canHandle(url : String):判斷該JdbcDialect 實例是否能夠處理該jdbc url;

  • getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):輸入數(shù)據(jù)庫中的SQLType,得到對應(yīng)的Spark DataType的mapping關(guān)系;

  • getJDBCType(dt: DataType):輸入Spark 的DataType,得到對應(yīng)的數(shù)據(jù)庫的SQLType;

  • quoteIdentifier(colName: String):引用標(biāo)識符,用來放置某些字段名用了數(shù)據(jù)庫的保留字(有些用戶會使用數(shù)據(jù)庫的保留字作為列名);

  • 其他......。

該類還有一個伴生對象,其中包含3個方法:

  • get(url: String):根據(jù)database的url獲取JdbcDialect 對象;

  • unregisterDialect(dialect: JdbcDialect):將已注冊的JdbcDialect 注銷;

  • registerDialect(dialect: JdbcDialect):注冊一個JdbcDialect。

3.2 解決步驟

  • 使用get(url: String)方法獲取當(dāng)前的 JdbcDialect 對象;

  • 將當(dāng)前的 JdbcDialect 對象?unregistered?掉;

  • new 一個 JdbcDialect 對象,并重寫方法(主要是getCatalystType()方法,因為其定義了數(shù)據(jù)庫 SQLType 到 Spark DataType 的映射關(guān)系),修改映射關(guān)系,將不支持的 SQLType 以其他的支持的數(shù)據(jù)類型返回比如StringType,這樣就能夠解決問題了;

  • register新創(chuàng)建的 JdbcDialect 對象

  • 3.3 解決方案的業(yè)務(wù)代碼

    ?

    object SaicSparkJdbcDialect {def useMyJdbcDIalect(jdbcUrl:String,dbType:String): Unit ={val logger = LoggerFactory.getLogger(classOf[SaicSparkJdbcDialect])// 將當(dāng)前的 JdbcDialect 對象unregistered掉val dialect = JdbcDialectsJdbcDialects.unregisterDialect(dialect.get(jdbcUrl))if (dbType.equals("ORACLE")) {val OracleDialect = new JdbcDialect {// 只能處理ORACLE數(shù)據(jù)庫override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")// 修改數(shù)據(jù)庫 SQLType 到 Spark DataType 的映射關(guān)系(從數(shù)據(jù)庫讀取到Spark中)override def getCatalystType(sqlType: Int, typeName: String, size: Int,md: MetadataBuilder): Option[DataType] = {if (sqlType==Types.TIMESTAMP || sqlType== -101 || sqlType== -102) {// 將不支持的 Timestamp with local Timezone 以TimestampType形式返回Some(TimestampType)} else if (sqlType == Types.BLOB) {Some(BinaryType)} else {Some(StringType)}}// 該方法定義的是數(shù)據(jù)庫Spark DataType 到 SQLType 的映射關(guān)系,此處不需要做修改override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR))case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP))case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))case _ => None}override def quoteIdentifier(colName: String): String = {colName}}// register新創(chuàng)建的 JdbcDialect 對象JdbcDialects.registerDialect(OracleDialect)}

    本文來自:https://www.jianshu.com/p/20b82891aac9?

    總結(jié)

    以上是生活随笔為你收集整理的Spark SQL读数据库时不支持某些数据类型的问题(Timestamp with local Timezone)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。