SparkSQL之DataFrame API
生活随笔
收集整理的這篇文章主要介紹了
SparkSQL之DataFrame API
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
測試文件
測試文件employees.json,內容如下:
{"name":"Michael", "salary":3000, "age": 28} {"name":"Andy", "salary":4500} {"name":"Justin", "salary":3500} {"name":"Berta", "salary":4000} {"name":"vincent", "salary":90000}CREATE DataFrame
package cn.ac.iie.sparkimport org.apache.spark.sql.SparkSession/*** DataFrame API基本操作*/object DataFrameApp {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()// 本地文件系統或者HDFS都支持// 將Json文件加載成一個DataFrameval peopleDF = spark.read.format("json").load("file:///E:/test/employees.json")// 輸出DataFrame對應的Schema信息peopleDF.printSchema()// 默認展示數據集前20條記錄peopleDF.show()//查詢某列的所有數據,相當于mysql中的 select name frompeopleDF.select("name").show()peopleDF.select(peopleDF.col("name"), peopleDF.col("age") + 10).show()peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2")).show()// 根據某一列的值進行過濾: select * from table where age > 20peopleDF.filter(peopleDF.col("age") > 20).show()// 根據某一列進行分組,然后在進行聚合操作:select age, count(1) from table group by agepeopleDF.groupBy(peopleDF.col("age")).count().show()spark.stop()} }show()方法默認展示前20條記錄,如果要展示多條,則寫為show(100)
printSchema
peopleDF.printSchema()
show
peopleDF.show()
select
查詢某一列數據
peopleDF.select("name").show()
查詢某幾列所有的數據
peopleDF.select(peopleDF.col("name"), peopleDF.col("age") + 10).show(),并且還可以對某一列數據進行相應的計算。
給某列名起別名
peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2")).show()
filter
peopleDF.filter(peopleDF.col("age") > 20).show()
group
根據某一列進行分組,然后在進行聚合操作.
原始數據:
peopleDF.groupBy(peopleDF.col("age")).count().show()
總結
以上是生活随笔為你收集整理的SparkSQL之DataFrame API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark SQL之DataFrame概
- 下一篇: Spark SQL之RDD转DataFr