日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

好程序员大数据教程:SparkShell和IDEA中编写Spark程序

發布時間:2025/5/22 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 好程序员大数据教程:SparkShell和IDEA中编写Spark程序 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

好程序員大數據教程:SparkShell和IDEA中編寫Spark程序,spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶可以在該命令行下用Scala編寫Spark程序。spark-shell程序一般用作Spark程序測試練習來用。spark-shell屬于Spark的特殊應用程序,我們可以在這個特殊的應用程序中提交應用程序

spark-shell啟動有兩種模式,local模式和cluster模式,分別為

local模式:

spark-shell

local模式僅在本機啟動一個SparkSubmit進程,沒有與集群建立聯系,雖然進程中有SparkSubmit但是不會被提交到集群紅

?

Cluster模式(集群模式):

spark-shell \
--master spark://hadoop01:7077 \
--executor-memory 512m \
--total-executor-cores 1

后兩個命令不是必須的 --master這條命令是必須的(除非在jar包中已經指可以不指定,不然就必須指定)

退出shell

千萬不要ctrl+c spark-shell 正確退出 :quit 千萬不要ctrl+c退出 這樣是錯誤的 若使用了ctrl+c退出 使用命令查看監聽端口 netstat - apn | grep 4040 在使用kill -9 端口號 殺死即可

3.25.11 spark2.2shell和spark1.6shell對比

?

ps:啟動spark-shell若是集群模式,在webUI會有一個一直執行的任務

通過IDEA創建Spark工程

ps:工程創建之前步驟省略,在scala中已經講解,直接默認是創建好工程的

對工程中的pom.xml文件配置

<!-- 聲明公有的屬性 -->


<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.7.1</hadoop.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>

<!-- 聲明并引入公有的依賴 -->


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>

Spark實現WordCount程序


Scala版本
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]")

//創建sparkContext對象


val sc = new SparkContext(conf)

//通過sparkcontext對象就可以處理數據


//讀取文件 參數是一個String類型的字符串 傳入的是路徑


val lines: RDD[String] = sc.textFile(“dir/wordcount”)

//切分數據


val words: RDD[String] = lines.flatMap(_.split(" "))

//將每一個單詞生成元組 (單詞,1)


val tuples: RDD[(String, Int)] = words.map((_,1))

//spark中提供一個算子 reduceByKey 相同key 為一組進行求和 計算value


val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)

//對當前這個結果進行排序 sortBy 和scala中sotrBy是不一樣的 多了一個參數


//默認是升序 false就是降序


val sorted: RDD[(String, Int)] = sumed.sortBy(_._2,false)

//將數據提交到集群存儲 無法返回值


sorted.foreach(println)

//回收資源停止sc,結束任務


sc.stop()
}
}


Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class JavaWordCount {
public static void main(String[] args) {

//1.先創建conf對象進行配置主要是設置名稱,為了設置運行模式


SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");

//2.創建context對象


JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("dir/file");

//進行切分數據 flatMapFunction是具體實現類


JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> splited = Arrays.asList(s.split(" "));
return splited.iterator();
}

});

//將數據生成元組


//第一個泛型是輸入的數據類型 后兩個參數是輸出參數元組的數據


JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String,Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<String, Integer>(s, 1);}});復制代碼//聚合


JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer,Integer>() {復制代碼

@Override

//第一個Integer是相同key對應的value


//第二個Integer是相同key 對應的value


public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});復制代碼//因為Java api沒有提供sortBy算子,此時需要將元組中的數據進行位置調換,然后在排序,排完序在換回


//第一次交換是為了排序


JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception {return tup.swap();}});復制代碼//排序


JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);復制代碼//第二次交換是為了最終結果 <單詞,數量>


JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception{return tuple2.swap();}});System.out.println(res.collect());res.saveAsTextFile("out1");jsc.stop();} }復制代碼


總結

以上是生活随笔為你收集整理的好程序员大数据教程:SparkShell和IDEA中编写Spark程序的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。