jdbc不能识别别名_Spark基础:读写JDBC
Spark SQL支持通過JDBC直接讀取數據庫中的數據,這個特性是基于JdbcRDD實現。返回值作為DataFrame返回,這樣可以直接使用Spark SQL并跟其他的數據源進行join操作。JDBC數據源可以很簡單的通過Java或者Python,而不需要提供ClassTag。注意這與Spark SQL JDBC server不同,后者是基于Spark SQL執行查詢。
要保證能使用需要把對應的jdbc驅動放到spark的classpath中。比如,想要連接postgres可以在啟動命令中添加jars:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar遠程數據庫的表可以加載成DataFrame或者注冊成Spark SQL的臨時表,用戶可以在數據源選項中配置JDBC相關的連接參數。user和password一般是必須提供的參數,另外一些參數可以參考下面的列表:
url
JDBC連接url,比如jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable
需要讀取或者寫入的JDBC表,注意里面的內容將會作為from后面的部分,比如 select * from 。注意不能同時配置dbtable和query。
query
query用于指定from后面的子查詢,拼接成的sql如下:SELECT FROM () spark_gen_alias 。注意dbtable和query不能同時使用;不允許同時使用partitionColumn和query。
注意:這里的dbtable和query其實沒有太大的區別,只是query會默認套一層別名而已。
driver
jdbc驅動driver
partitionColumn, lowerBound, upperBound
指定時這三項需要同時存在,描述了worker如何并行讀取數據庫。其中partitionColumn必須是數字、date、timestamp,lowerBound和upperBound只是決定了分區的步長,而不會過濾數據,因此表中所有的數據都會被分區返回。該參數僅用于讀。
numPartitions
讀寫時的最大分區數。這也決定了連接JDBC的最大連接數,如果并行度超過該配置,將會使用coalesce(partition)來降低并行度。
queryTimeout
driver執行statement的等待時間,0意味著沒有限制。寫入的時候這個選項依賴于底層是如何實現setQueryTimeout的,比如h2 driver會檢查每個query。默認是0
fetchSize
fetch的大小,決定了每一個fetch,拉取多少數據量。這個參數幫助針對默認比較小的驅動進行調優,比如oracle默認是10行。僅用于讀操作。
batchSize
batch大小,決定插入時的并發大小,默認1000。
isolationLvel
事務隔離的等級,作用于當前連接。可以配置成NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 依賴于底層jdbc提供的事務隔離,默認是READ_UNCOMMITTED。這個選項僅用于寫操作。
sessionInitStatment
每個數據庫session創建前執行的操作,用于初始化。如定義一些觸發器操作。如 BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;
truncate
寫操作選項,當使用SaveMode.Overwrite時,該選項用于是否直接刪除并重建表。當表結構發現變化的時候會失效。默認是false。
cascadeTruncate
寫操作選項,是否開啟級聯刪除。
createTableOptions
寫操作選項,一般用于配置特殊的分區或者數據庫配置,比如 CREATE TABLE t (name string) ENGINE=InnoDB
createTableColumnTypes
配置數據庫字段的類型,比如 name CHAR(64), comments VARCHAR(1024),僅支持spark sql中支持的數據類型。
customSchema
自定義讀取的schema信息,比如 id DECIMAL(38, 0), name STRING 。可以配置部分字段,其他的使用默認的類型映射,比如 id DECIMAL(38, 0)。僅用于讀操作。
pushDownPredicate
該選項用于開啟或禁用jdbc數據源的謂詞下推。默認是true。如果配置為false,那么所有的filter操作都會由spark來完成。當過濾操作用spark更快時,一般才會關閉下推功能。
// 加載jdbc val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").load()// 使用propeties val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password")val jdbcDF2 = spark.read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// 指定自定義的schema信息 connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING") val jdbcDF3 = spark.read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// 保存jdbc jdbcDF.write.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").save()jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// 指定自定義schema映射 jdbcDF.write.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)").jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)總結
以上是生活随笔為你收集整理的jdbc不能识别别名_Spark基础:读写JDBC的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: matlab 数据降维和重构_核主成分分
- 下一篇: verilog for循环_HDLBit