spark和HSQL的连接join方式
本文主要介紹spark join相關(guān)操作。
講述spark連接相關(guān)的三個(gè)方法join,left-outer-join,right-outer-join,在這之前,我們用hiveSQL先跑出了結(jié)果以方便進(jìn)行對(duì)比。
我們以實(shí)例來(lái)進(jìn)行說(shuō)明。我的實(shí)現(xiàn)步驟記錄如下。
?
1、數(shù)據(jù)準(zhǔn)備
2、HSQL描述
3、Spark描述
?
1、數(shù)據(jù)準(zhǔn)備
我們準(zhǔn)備兩張Hive表,分別是orders(訂單表)和drivers(司機(jī)表),通過(guò)driver_id字段進(jìn)行關(guān)聯(lián)。數(shù)據(jù)如下:
orders
orders表有兩個(gè)字段,訂單id:order_id和司機(jī)id:driver_id。司機(jī)id將作為連接鍵。
通過(guò)select可以看到三條數(shù)據(jù)。
hive (gulfstream_test)> select * from orders; OK orders.order_id orders.driver_id 1000 5000 1001 5001 1002 5002 Time taken: 0.387 seconds, Fetched: 3 row(s)?
drivers
drivers表由兩個(gè)字段,司機(jī)id:driver_id和車(chē)輛id:car_id。司機(jī)id將作為連接鍵。
通過(guò)select可以看到兩條數(shù)據(jù)。
hive (gulfstream_test)> select * from drivers; OK drivers.driver_id drivers.car_id 5000 100 5003 103 Time taken: 0.036 seconds, Fetched: 2 row(s)?
?
2、HSQL描述
JOIN
自然連接,輸出連接鍵匹配的記錄。
可以看到,通過(guò)driver_id匹配的數(shù)據(jù)只有一條。
hive (gulfstream_test)> select * from orders t1 join drivers t2 on (t1.driver_id = t2.driver_id) ; OK t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 Time taken: 36.079 seconds, Fetched: 1 row(s)?
LEFT OUTER JOIN
左外鏈接,輸出連接鍵匹配的記錄,左側(cè)的表無(wú)論匹配與否都輸出。
可以看到,通過(guò)driver_id匹配的數(shù)據(jù)只有一條,不過(guò)所有orders表中的記錄都被輸出了,drivers中未能匹配的字段被置為空。
hive (gulfstream_test)> select * from orders t1 left outer join drivers t2 on (t1.driver_id = t2.driver_id) ; OK t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 1001 5001 NULL NULL 1002 5002 NULL NULL Time taken: 36.063 seconds, Fetched: 3 row(s)?
RIGHT OUTER JOIN
右外連接,輸出連接鍵匹配的記錄,右側(cè)的表無(wú)論匹配與否都輸出。
可以看到,通過(guò)driver_id匹配的數(shù)據(jù)只有一條,不過(guò)所有drivers表中的記錄都被輸出了,orders中未能匹配的字段被置為空。
hive (gulfstream_test)> select * from orders t1 right outer join drivers t2 on (t1.driver_id = t2.driver_id) ; OK t1.order_id t1.driver_id t2.driver_id t2.car_id 1000 5000 5000 100 NULL NULL 5003 103 Time taken: 30.089 seconds, Fetched: 2 row(s)?
?
3、Spark描述
spark實(shí)現(xiàn)join的方式也是通過(guò)RDD的算子,spark同樣提供了三個(gè)算子join,leftOuterJoin,rightOuterJoin。
在下面給出的例子中,我們通過(guò)spark-hive讀取了Hive中orders表和drivers表中的數(shù)據(jù),這時(shí)候數(shù)據(jù)的表現(xiàn)形式是DataFrame,如果要使用Join操作:
1)首先需要先將DataFrame轉(zhuǎn)化成了JavaRDD。
2)不過(guò),JavaRDD其實(shí)是沒(méi)有join算子的,下面還需要通過(guò)mapToPair算子將JavaRDD轉(zhuǎn)換成JavaPairRDD,這樣就可以使用Join了。?
下面例子中給出了三種join操作的實(shí)現(xiàn)方式,在join之后,通過(guò)collect()函數(shù)把數(shù)據(jù)拉到Driver端本地,并通過(guò)標(biāo)準(zhǔn)輸出打印。
需要指出的是
1)join算子(join,leftOuterJoin,rightOuterJoin)只能通過(guò)PairRDD使用;
2)join算子操作的Tuple2<Object1, Object2>類(lèi)型中,Object1是連接鍵,我只試過(guò)Integer和String,Object2比較靈活,甚至可以是整個(gè)Row。
這里我們使用driver_id作為連接鍵。 所以在輸出Tuple2的時(shí)候,我們將driver_id放在了前面。
?
Join.java
/* * spark-submit --queue=root.zhiliangbu_prod_datamonitor spark-join-1.0-SNAPSHOT-jar-with-dependencies.jar * */ public class Join implements Serializable {private transient JavaSparkContext javaSparkContext;private transient HiveContext hiveContext;/** 初始化Load* 創(chuàng)建sparkContext, sqlContext, hiveContext* */public Join() {initSparckContext();initHiveContext();}/** 創(chuàng)建sparkContext* */private void initSparckContext() {String warehouseLocation = System.getProperty("user.dir");SparkConf sparkConf = new SparkConf().setAppName("spark-join").set("spark.sql.warehouse.dir", warehouseLocation).setMaster("yarn-client");javaSparkContext = new JavaSparkContext(sparkConf);}/** 創(chuàng)建hiveContext* 用于讀取Hive中的數(shù)據(jù)* */private void initHiveContext() {hiveContext = new HiveContext(javaSparkContext);}public void join() {/** 生成rdd1* */String query1 = "select * from gulfstream_test.orders";DataFrame rows1 = hiveContext.sql(query1).select("order_id", "driver_id");JavaPairRDD<String, String> rdd1 = rows1.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {@Overridepublic Tuple2<String, String> call(Row row) throws Exception {String orderId = (String)row.get(0);String driverId = (String)row.get(1);return new Tuple2<String, String>(driverId, orderId);}});/** 生成rdd2* */String query2 = "select * from gulfstream_test.drivers";DataFrame rows2 = hiveContext.sql(query2).select("driver_id", "car_id");JavaPairRDD<String, String> rdd2 = rows2.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {@Overridepublic Tuple2<String, String> call(Row row) throws Exception {String driverId = (String)row.get(0);String carId = (String)row.get(1);return new Tuple2<String, String>(driverId, carId);}});/** join* */System.out.println(" ****************** join *******************");JavaPairRDD<String, Tuple2<String, String>> joinRdd = rdd1.join(rdd2);Iterator<Tuple2<String, Tuple2<String, String>>> it1 = joinRdd.collect().iterator();while (it1.hasNext()) {Tuple2<String, Tuple2<String, String>> item = it1.next();System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );}/** leftOuterJoin* */System.out.println(" ****************** leftOuterJoin *******************");JavaPairRDD<String, Tuple2<String, Optional<String>>> leftOuterJoinRdd = rdd1.leftOuterJoin(rdd2);Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> it2 = leftOuterJoinRdd.collect().iterator();while (it2.hasNext()) {Tuple2<String, Tuple2<String, Optional<String>>> item = it2.next();System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );}/** rightOuterJoin* */System.out.println(" ****************** rightOuterJoin *******************");JavaPairRDD<String, Tuple2<Optional<String>, String>> rightOuterJoinRdd = rdd1.rightOuterJoin(rdd2);Iterator<Tuple2<String, Tuple2<Optional<String>, String>>> it3 = rightOuterJoinRdd.collect().iterator();while (it3.hasNext()) {Tuple2<String, Tuple2<Optional<String>, String>> item = it3.next();System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );}}public static void main(String[] args) {Join sj = new Join();sj.join();}}?
執(zhí)行結(jié)果
其中Optional.absent()表示的就是null,可以看到和HSQL是一致的。
Application ID is application_1508228032068_2746260, trackingURL: http://10.93.21.21:4040****************** join ******************* driver_id:5000, order_id:1000, car_id:100 ****************** leftOuterJoin ******************* driver_id:5001, order_id:1001, car_id:Optional.absent() driver_id:5002, order_id:1002, car_id:Optional.absent() driver_id:5000, order_id:1000, car_id:Optional.of(100)****************** rightOuterJoin ******************* driver_id:5003, order_id:Optional.absent(), car_id:103 driver_id:5000, order_id:Optional.of(1000), car_id:100由于數(shù)據(jù)量不大,我沒(méi)有從執(zhí)行效率上進(jìn)行考量。
根據(jù)經(jīng)驗(yàn),一般在數(shù)據(jù)量較大的情況下,HSQL的執(zhí)行效率會(huì)高一些,如果數(shù)據(jù)量較小,Spark會(huì)快。?
總結(jié)
以上是生活随笔為你收集整理的spark和HSQL的连接join方式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: spark学习:java版JavaRDD
- 下一篇: HDFS、MR、Kafka、Storm、