日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Spark基本操作SparkSession,DatasetRow,JavaRDDRow

發布時間:2024/1/17 java 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark基本操作SparkSession,DatasetRow,JavaRDDRow 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、Spark創建

1.創建SparkSession

/**

* local[*]表示使用本機的所有處理器創建工作節點

* spark.driver.memory spark的驅動器內存

* Spark2.2好像是需要最小2G

*/

SparkSession session = SparkSession.builder() .appName("sparkAnalysis").master("local[*]").config("spark.driver.memory","2147480000").getOrCreate();

2.創建可以連接hive的SparkSession(由于一般使用SparkSubmit進行提交任務,在sparkSubmit時候設置master,故可以不用配置master)

SparkSession?sparkSession =?SparkSession

.builder()

.appName("hive")

.config("spark.driver.memory","2147480000")

.enableHiveSupport()

.getOrCreate();

3.SparkSubmit 的 shell腳本

/data/spark/spark-2.2.0-bin-hadoop2.7/bin/spark-submit --master spark://elcndc2sc39t:7077 --class com.enc.analysis.core.AlgorithmExecute /data/upload/analysis/analysisFrame-1.1.0.jar $1 $2

--master表示master路徑,--class表示入口的類的全路徑 /data/upload/analysis/analysisFrame-1.1.0.jar 表示計算框架jar包的全路徑 $!,$2..是自定義的shell命令進行傳參,傳遞的參數會在入口類的main方法的String[] args中

?

二、利用Spark讀取jdbc

Properties?connectionProperties =?new?Properties();

String url =?"jdbc:mysql://"?+?"mysql服務器地址"?+?":"?+?"mysql端口"?+?"/"?+?"數據庫名?useUnicode=true&characterEncoding=utf-8";

String driver =?"com.mysql.jdbc.Driver";

connectionProperties.setProperty("user",?"用戶名");// 設置用戶名

connectionProperties.setProperty("password",?"密碼");// 設置密碼

connectionProperties.setProperty("driver",?driver);

connectionProperties.setProperty("url",url);

SparkSession spark = SparkSessionUtils.getLocalSession();

Dataset<Row> dataset = spark.read().jdbc(connectionProperties.getProperty("url"),"表名",connectionProperties).persist();

dataset.show();

?

三、Spark 的 map操作

/**

* 將Dataset<Row>轉化為List<Map>形式

*/

Dataset<Row> dataset = spark.read().jdbc(connectionProperties.getProperty("url"),"cq_jqxx",connectionProperties).persist();

Dataset<Map> mapDataset = dataset.map(new?MapFunction<Row,?Map>() {

@Override

public?Map?call(Row row)?throws?Exception {

HashMap hashMap =?new?HashMap();

//這是一個遍歷操作,row即表示為當前行數據,get(i)表示當前行的第幾列

hashMap.put(row.get(0),row.get(1));

return?hashMap;

}

//轉換為基本類型時用Encoders>STRING()等對應的基本類型

// 當使用Encoders.javaSerialization()時當前類需要實現序列化

},Encoders.javaSerialization(Map.class));

List<Map> maps = mapDataset.collectAsList();

?

四、Dataset<Row>相關類型的互相轉換

1.java中List轉為數組結構(由于經常使用到)

List<String> list =?new?ArrayList<>();

String[] strings = list.toArray(new?String[list.size()]);

2.Dataset<Row>轉為JavaRDD

JavaRDD<Row> rowJavaRDD = dataset.javaRDD();

3.JavaRDD<ROW>轉為Dataset<ROW>

Dataset<Row> dataFrame = sparkSession.createDataFrame(rowJavaRDD,?Row.class);

4.利用內部類實現Row轉為自己需要的Row,例如將某行進行分詞變為String[]

Dataset<Row> select = dataset.select("label",?"message");

JavaRDD<WordParticiple> map = select.javaRDD().map(WordParticiple::parseWordParticiple);

Dataset<Row> wordParticiple = spark.createDataFrame(map,WordParticiple.class);

內部類對象

public static class?WordParticiple{

private?String?label;

private?String[]?message;

public?WordParticiple(String label,?String[] message) {

this.label?= label;

this.message?= message;

}

public?WordParticiple() {

}

public?String?getLabel() {

return?label;

}

public void?setLabel(String label) {

this.label?= label;

}

public?String[]?getMessage() {

return?message;

}

public void?setMessage(String[] message) {

this.message?= message;

}

public static?WordParticiple?parseWordParticiple(Row row)?throws?IOException {

String string = row.getString(1);

String[] split = TermTokenizer.split(string);

return new?WordParticiple(row.get(0).toString(),split);

}

}

總結

以上是生活随笔為你收集整理的Spark基本操作SparkSession,DatasetRow,JavaRDDRow的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。