日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到Spark SQL

發布時間:2025/3/19 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到Spark SQL 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

4.1 SQLContext/HiveContext/SparkSesson

1.SQLContext

老版本文檔:spark.apache.org/docs/1.6.1/

  • SQLContext示例文件:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext/** * SQLContext使用 * 注意:IDEA是在本地,而測試數據是在服務器上 ,能不能在本地進行開發測試的? */ object SQLContextApp {def main(args: Array[String]): Unit = {val path = args(0)//1)創建相應的Context val sparkConf = new SparkConf()//在測試或者生產中,AppName和Master我們是通過腳本進行指定 sparkConf.setAppName("SQLContextApp").setMaster("local[2]").set("spark.driver.bindAddress","127.0.0.1")val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc)//2)相關的處理: json val people = sqlContext.read.format("json").load(path) people.printSchema() people.show()//3)關閉資源 sc.stop() } } 復制代碼
  • 打包:
mvn clean package -DSkipTests 復制代碼
  • 提交Spark Application到環境中運行 文檔: spark.apache.org/docs/1.6.1/…
./bin/spark-submit \ --class <main-class> --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments] ./bin/spark-submit \ --class com.gwf.spark.SQLContextApp --master local[2] \ /Users/gaowenfeng/Downloads/MySparkSqlProject/target/sql-1.0.jar \ file:///Users/gaowenfeng/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json復制代碼
  • 腳本提交: 將上面的命令做成shell腳本,賦予執行權限即可執行

2.HiveContext使用

To use a HiveContext, you do not need to have an existing Hive setup

代碼上面代碼類似,只是把SQLContext改成HiveContext。不過使用時需要通過--jars 把mysql的驅動傳遞到classpath

3.SparkSession

def main(args: Array[String]): Unit = { val path = args(0)val spark = SparkSession .builder() .appName("SQLContextApp") .config("spark.driver.bindAddress","127.0.0.1") .master("local[2]") .getOrCreate()val people = spark.read.format("json").load(path) people.printSchema() people.show() spark.stop() } 復制代碼

4.2 spark-shell/spark-sql的使用

  • 在conf目錄添加hive-site.xml
  • --jars 傳遞mysql驅動包
  • # shell spark-shell --master local[2] --jars /Users/gaowenfeng/.m2/repository/mysql/mysql-connector-java/5.1.45/mysql-connector-java-5.1.45.jar # spark.sql('sql語句').show # mysql spark-sql --master local[2] --jars /Users/gaowenfeng/.m2/repository/mysql/mysql-connector-java/5.1.45/mysql-connector-java-5.1.45.jar # 可以直接執行SQL 復制代碼

    分析執行計劃理解sparksql的架構

    create table t(key string,value string); explain extended select a.key * (2+3),b.value from t a join t b on a.key = b.key and a.key > 3;# 解析成一個邏輯執行計劃 == Parsed Logical Plan == # unresolvedalias:并沒有解析全 'Project [unresolvedalias(('a.key * (2 + 3)), None), 'b.value] # select 的兩個字段 +- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3)) # or后面的條件 :- 'SubqueryAlias a : +- 'UnresolvedRelation `t` +- 'SubqueryAlias b +- 'UnresolvedRelation `t`# 解析操作(需要與底層的metastore打交道) == Analyzed Logical Plan == (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double, value: string # 將a.key , (2+3) 分別轉換成double類型 Project [(cast(key#8 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11] # select 的兩個字段 +- Join Inner, ((key#8 = key#10) && (cast(key#8 as int) > 3)) :- SubqueryAlias a : +- SubqueryAlias t # 已經解析出了使元數據中的哪張表 : +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9] +- SubqueryAlias b +- SubqueryAlias t +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11]# 優化操作 == Optimized Logical Plan == Project [(cast(key#8 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11] +- Join Inner, (key#8 = key#10) :- Project [key#8] : +- Filter (isnotnull(key#8) && (cast(key#8 as int) > 3)) # 把a.key>3 提到前面來,先過濾, : +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9] +- Filter (isnotnull(key#10) && (cast(key#10 as int) > 3)) +- CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11]# 物理執行計劃 == Physical Plan == *Project [(cast(key#8 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#12, value#11] +- *SortMergeJoin [key#8], [key#10], Inner :- *Sort [key#8 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#8, 200) : +- *Filter (isnotnull(key#8) && (cast(key#8 as int) > 3)) : +- HiveTableScan [key#8], CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#8, value#9] +- *Sort [key#10 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key#10, 200) +- *Filter (isnotnull(key#10) && (cast(key#10 as int) > 3)) +- HiveTableScan [key#10, value#11], CatalogRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#10, value#11] 復制代碼

    4.3 thriftserver/beeline的使用

  • 啟動thriftserver,默認端口是10000
  • ./sbin/start-thriftserver.sh \ # 修改端口 --hiveconf hive.server2.thrift.port=<listening-port> \ # 修改host --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> 復制代碼
  • 啟動beeline beeline -u jdbc:hive2://localhost:10000 -n gaowenfeng
    • http://localhost:4040/sqlserver/ 這個界面可以查看具體執行過的sql語句,可以查看執行計劃

    • http://localhost:4040/SQL/execution/ 可以查看sql執行的詳細信息

    3.thriftserver 和 spark-shell/spark-sql 的區別:

  • spark-shell,spark-sql都是一個spark application
  • thriftserver不管你啟動了多少個客戶端(beeline/code),永遠都是一個spark application,解決了一個數據共享的問題,多個客戶端可以共享數據

  • 4.4 jdbc方式編程訪問

    1.添加maven依賴

    <dependency> <groupId>org.spark-project.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1.spark2</version> </dependency> 復制代碼

    2.開發代碼訪問thriftserver

    注意事項:在使用jdbc開發時,一定要先啟動thriftserver

    def main(args: Array[String]): Unit = { Class.forName("org.apache.hive.jdbc.HiveDriver") try{} val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000","gaowenfeng","") val pstmt = conn.prepareStatement("select * from emp") val rs = pstmt.executeQuery()while (rs.next()){ print(rs.getInt("id")+"\t"+rs.getString("name")) }rs.close() pstmt.close() conn.close() } 復制代碼

    總結

    以上是生活随笔為你收集整理的慕课网Spark SQL日志分析 - 4.从Hive平滑过渡到Spark SQL的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。