日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)

發(fā)布時(shí)間:2024/9/27 数据库 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1、編寫Spark SQL查詢語句

在這之前創(chuàng)建Maven項(xiàng)目。創(chuàng)建的過程如:http://blog.csdn.net/tototuzuoquan/article/details/74571374

在這里:http://blog.csdn.net/tototuzuoquan/article/details/74907124,可以知道Spark Shell中使用SQL完成查詢,下面通過在自定義程序中編寫Spark SQL查詢程序。首先在maven項(xiàng)目的pom.xml中添加Spark SQL的依賴。

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.5.2</version> </dependency>

最終的Pom文件內(nèi)容如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.5.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build></project>

2、運(yùn)行參數(shù)準(zhǔn)備

person.txt的內(nèi)容如下:

1 zhangsan 19 2 lisi 20 3 wangwu 28 4 zhaoliu 26 5 tianqi 24 6 chengnong 55 7 zhouxingchi 58 8 mayun 50 9 yangliying 30 10 lilianjie 51 11 zhanghuimei 35 12 lian 53 13 zhangyimou 54

3、通過反射推斷出Schema

package cn.toto.sparkimport org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/10.*/ object InferringSchema {def main(args: Array[String]): Unit = {//創(chuàng)建SparkConf()并設(shè)置App名稱(本地運(yùn)行的時(shí)候加上:setMaster("local"),如果不是本地就不加這句)val conf = new SparkConf().setAppName("SQL-1").setMaster("local")//SQLContext要依賴SparkContextval sc = new SparkContext(conf)//創(chuàng)建SQLContextval sqlContext = new SQLContext(sc)//從指定的地址創(chuàng)建RDDval lineRDD = sc.textFile(args(0)).map(_.split(" "))//創(chuàng)建case class//將RDD和case class關(guān)聯(lián)val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))//引入隱式轉(zhuǎn)換,如果不到人無法將RDD轉(zhuǎn)換成DataFrame//將RDD轉(zhuǎn)換成DataFrameimport sqlContext.implicits._val personDF = personRDD.toDF//注冊(cè)表personDF.registerTempTable("t_person")//傳入SQLval df = sqlContext.sql("select * from t_person order by age desc limit 2")//將結(jié)果以JSON的方式存儲(chǔ)到指定位置df.write.json(args(1))//停止Spark Contextsc.stop()} }//case class一定要放在外面 case class Person(id:Int, name:String, age : Int)

參數(shù)配置:

運(yùn)行程序,結(jié)果如下:

將程序打包成jar,上傳到Spark集群,提交Spark任務(wù)(要以代碼中要去掉setMaster(“l(fā)ocal”))

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-submit --class cn.toto.spark.InferringSchema --master spark://hadoop1:7077,hadoop2:7077 /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar hdfs://mycluster/person.txt hdfs://mycluster/out

4.通過StructType直接指定Schema

代碼如下:

package cn.toto.sparkimport org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/10.*/ object SpecifyingSchema {def main(args: Array[String]): Unit = {//創(chuàng)建SparkConf()并設(shè)置App名稱val conf = new SparkConf().setAppName("SQL-2").setMaster("local")//SQLContext要依賴SparkContextval sc = new SparkContext(conf)//創(chuàng)建SQLContextval sqlContext = new SQLContext(sc)//從指定的地址創(chuàng)建RDDval personRDD = sc.textFile(args(0)).map(_.split(" "))//通過StructType直接指定每個(gè)字段的Schema,相當(dāng)于是表的描述信息val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))//將RDD映射到rowRDDval rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))//將schema信息應(yīng)用到rowRDD上val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)//注冊(cè)表personDataFrame.registerTempTable("t_person")//執(zhí)行SQLval df = sqlContext.sql("select * from t_person order by age desc limit 4")//將結(jié)果以JSON的方式存儲(chǔ)到指定位置df.write.json(args(1))//停止Spark Contextsc.stop()} }

運(yùn)行參數(shù)配置:

運(yùn)行后的結(jié)果:

總結(jié)

以上是生活随笔為你收集整理的Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。