日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题

發(fā)布時間:2023/11/27 生活经验 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

首先我們使用新的API方法連接mysql加載數(shù)據(jù) 創(chuàng)建DF

import org.apache.spark.sql.DataFrame
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.{SaveMode, DataFrame} 
import scala.collection.mutable.ArrayBuffer 
import org.apache.spark.sql.hive.HiveContext 
import java.sql.DriverManager 
import java.sql.Connection 
val sqlContext = new HiveContext(sc)
val mySQLUrl = "jdbc:mysql://10.180.211.100:3306/appcocdb?user=appcoc&password=Asia123"

val CI_MDA_SYS_TABLE = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE").cache()

val CI_MDA_SYS_TABLE_COLUMN = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE_COLUMN").cache()

val CI_LABEL_EXT_INFO = sqlContext.jdbc(mySQLUrl,"CI_LABEL_EXT_INFO").cache()

val CI_LABEL_INFO = sqlContext.jdbc(mySQLUrl,"CI_LABEL_INFO").cache()

val CI_APPROVE_STATUS = sqlContext.jdbc(mySQLUrl,"CI_APPROVE_STATUS").cache()

val DIM_COC_LABEL_COUNT_RULES = sqlContext.jdbc(mySQLUrl,"DIM_COC_LABEL_COUNT_RULES").cache()

?

?

根據(jù)多表ID進(jìn)行關(guān)聯(lián)

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,CI_MDA_SYS_TABLE("TABLE_ID") === CI_MDA_SYS_TABLE_COLUMN("TABLE_ID"),"inner").cache()
labels.join(CI_LABEL_EXT_INFO,CI_MDA_SYS_TABLE_COLUMN("COLUMN_ID") === CI_LABEL_EXT_INFO("COLUMN_ID"),"inner").cache()
labels.join(CI_LABEL_INFO,CI_LABEL_EXT_INFO("LABEL_ID") === CI_LABEL_INFO("LABEL_ID"),"inner").cache()
labels.join(CI_APPROVE_STATUS,CI_LABEL_INFO("LABEL_ID") === CI_APPROVE_STATUS("RESOURCE_ID"),"inner").cache()
labels.filter(CI_APPROVE_STATUS("CURR_APPROVE_STATUS_ID") === 107 and (CI_LABEL_INFO("DATA_STATUS_ID") === 1 || CI_LABEL_INFO("DATA_STATUS_ID") === 2) and (CI_LABEL_EXT_INFO("COUNT_RULES_CODE") isNotNull) and CI_MDA_SYS_TABLE("UPDATE_CYCLE") === 1).cache()

于是噼里啪啦的報錯了,在第三個join時找不到ID了,這個問題很詭異。。。:

無奈了。。于是使用官網(wǎng)API spark1.4的指定方法嘗試

val labels = CI_MDA_SYS_TABLE.join(CI_MDA_SYS_TABLE_COLUMN,"TABLE_ID")
labels.join(CI_LABEL_EXT_INFO,"COLUMN_ID")
labels.join(CI_LABEL_INFO,"LABEL_ID")
labels.join(CI_APPROVE_STATUS).WHERE($"LABEL_ID"===$"RESOURCE_ID")

于是又噼里啪啦的,還是找不到ID。。。。

?

最后無奈。。就用原來的方法 創(chuàng)建軟連接,加載數(shù)據(jù),發(fā)現(xiàn)可以。。這我就不明白了。。。

val CI_MDA_SYS_TABLE_DDL = s"""
             CREATE TEMPORARY TABLE CI_MDA_SYS_TABLEUSING org.apache.spark.sql.jdbcOPTIONS (url    '${mySQLUrl}',dbtable     'CI_MDA_SYS_TABLE')""".stripMargin
sqlContext.sql(CI_MDA_SYS_TABLE_DDL)val CI_MDA_SYS_TABLE = sql("SELECT * FROM CI_MDA_SYS_TABLE").cache()//val CI_MDA_SYS_TABLE  = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE").cache()
val CI_MDA_SYS_TABLE_COLUMN_DDL = s"""
            CREATE TEMPORARY TABLE CI_MDA_SYS_TABLE_COLUMNUSING org.apache.spark.sql.jdbcOPTIONS (url    '${mySQLUrl}',dbtable     'CI_MDA_SYS_TABLE_COLUMN')""".stripMargin
sqlContext.sql(CI_MDA_SYS_TABLE_COLUMN_DDL)val CI_MDA_SYS_TABLE_COLUMN = sql("SELECT * FROM CI_MDA_SYS_TABLE_COLUMN").cache()//val CI_MDA_SYS_TABLE_COLUMN  = sqlContext.jdbc(mySQLUrl,"CI_MDA_SYS_TABLE_COLUMN").cache()

.........

最終問題是解決了。。可是 為什么直接加載不行呢。。還有待考究。

?

附帶一個問題的解決 如果啊報這種錯誤

15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on cbg6aocdp9:49897 in memory (size: 8.4 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on cbg6aocdp5:45978 in memory (size: 8.4 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.176.238.11:38968 in memory (size: 8.2 KB, free: 4.7 GB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on cbg6aocdp4:55199 in memory (size: 8.2 KB, free: 1060.3 MB)
15/11/19 10:57:12 INFO ContextCleaner: Cleaned shuffle 0
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.176.238.11:38968 in memory (size: 6.5 KB, free: 4.7 GB)
15/11/19 10:57:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on cbg6aocdp8:55706 in memory (size: 6.5 KB, free: 1060.3 MB)
TARGET_TABLE_CODE:========================IT03
Exception in thread "main" java.lang.RuntimeException: Error in configuring objectat org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)at scala.Option.getOrElse(Option.scala:120)at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121)at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)at main.asiainfo.coc.impl.IndexMakerObj$$anonfun$makeIndexsAndLabels$1.apply(IndexMakerObj.scala:218)at main.asiainfo.coc.impl.IndexMakerObj$$anonfun$makeIndexsAndLabels$1.apply(IndexMakerObj.scala:137)at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)at main.asiainfo.coc.impl.IndexMakerObj$.makeIndexsAndLabels(IndexMakerObj.scala:137)at main.asiainfo.coc.CocDss$.main(CocDss.scala:23)at main.asiainfo.coc.CocDss.main(CocDss.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetExceptionat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)... 71 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)... 76 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not foundat org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2018)at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)... 78 more

一看最后就知道 是hadoop數(shù)據(jù)壓縮格式為lzo spark要想讀取 必須引入hadoop lzo的jar包

?

轉(zhuǎn)載于:https://www.cnblogs.com/yangsy0915/p/4978975.html

總結(jié)

以上是生活随笔為你收集整理的spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 一起射导航 | 91拔萝卜视频 | 老湿影院av| 青娱乐最新官网 | 男男一级淫片免费播放 | 欧美日本另类 | 日韩在线视频免费观看 | 久久久久久欧美精品se一二三四 | 亲子乱一区二区三区 | 国产真实伦对白全集 | 成人性生交大片免费卡看 | av色婷婷 | 精品国产免费人成在线观看 | 亚洲综合91 | 国产一级黄色大片 | 欧美色拍 | 日本做爰全过程免费看 | 99re这里只有精品在线观看 | 四虎影视在线播放 | 亚洲人一区二区三区 | 欧美做受喷浆在线观看 | 男男做性免费视频网 | 青青色在线视频 | 韩国av在线 | 日本激情视频 | 精品香蕉99久久久久网站 | 中文字幕视频 | 午夜精品久久久久久99热 | 亚洲一二三四五 | 日韩综合一区二区三区 | 白浆在线 | 国产综合激情 | 性囗交免费视频观看 | 中国女人裸体乱淫 | 精品人伦一区二区三区 | 少妇精品导航 | 夜夜看av | 看黄网站在线 | 亚洲精品综合在线观看 | 蘑菇福利视频一区播放 | 亚洲av乱码一区二区 | 美女被揉胸视频 | 全程粗话对白视频videos | 久热国产在线 | 日韩专区在线 | 夜夜操影视 | 99热久| 欧美丝袜视频 | 五月天精品 | 午夜伦伦电影理论片费看 | 日韩a√| 国产午夜精品一区二区三区 | www.操操操| 亚洲成人77777 | 农村妇女毛片精品久久久 | 久热国产精品视频 | 一边摸一边抽搐一进一出视频 | 欧美50p| 午夜性生活片 | 13日本xxxxxⅹxxx20 | 日韩电影三级 | 亚洲黄色三级视频 | 日韩av在线播放观看 | 天堂网视频在线观看 | 九色porny丨精品自拍视频 | 欧美黑丝少妇 | 久久久亚洲欧美 | 久久av影视 | 国产精品国产三级国产专区53 | 国产精品视频网站 | 亚洲人妻一区二区 | 玖玖视频 | 蜜桃传媒一区二区亚洲av | 欧美xxxxxxxxx | 四虎黄色网 | 日韩av在线一区二区三区 | 国产一区二区网址 | 2023国产精品 | 久久青娱乐| 伦理一级片| 日韩中文字幕一区二区三区 | 精品麻豆av | 有码在线视频 | 夜夜操夜夜干 | 日韩av三区 | 日韩欧美国产高清91 | 亚洲情侣av| 在线国产精品视频 | 天堂va蜜桃一区二区三区 | 成人午夜视频免费在线观看 | 国产精品一区二区av日韩在线 | 欧美网站在线观看 | 麻豆激情视频 | 亚洲天堂成人在线 | 亚洲h片| 小镇姑娘1979版 | 91传媒在线播放 | 91在线免费视频 | 亚洲美女在线播放 |