Java Spark之创建RDD的两种方式和操作RDD
首先看看思維導(dǎo)圖,我的spark是1.6.1版本,jdk是1.7版本?
spark是什么??
Spark是基于內(nèi)存計算的大數(shù)據(jù)并行計算框架。Spark基于內(nèi)存計算,提高了在大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark 部署在大量廉價硬件之上,形成集群。
下載和安裝?
可以看我之前發(fā)表的博客?
Spark安裝
安裝成功后運行示例程序
在spark安裝目錄下examples/src/main目錄中。 運行的一個Java或Scala示例程序,使用bin/run-example <class> [params]
./bin/run-example SparkPi 10
啟動spark-shell時的參數(shù)?
./bin/spark-shell –master local[2]?
參數(shù)master 表名主機master在分布式集群中的URL?
local【2】 表示在本地通過開啟2個線程運行
運行模式?
四種:?
1.Mesos?
2.Hadoop YARN?
3.spark?
4.local
一般我們用的是local和spark模式
首先建立maven工程加入整個項目所用到的包的maven依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
? <modelVersion>4.0.0</modelVersion>
? <groupId>sparkday01</groupId>
? <artifactId>sparkday01</artifactId>
? <version>0.0.1-SNAPSHOT</version>
? <packaging>jar</packaging>
? <name>sparkday01</name>
? <url>http://maven.apache.org</url>
? <properties>
? ? <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
? </properties>
? <dependencies>
? ? <dependency>
? ? ? <groupId>junit</groupId>
? ? ? <artifactId>junit</artifactId>
? ? ? <version>4.12</version>
? ? ? <scope>test</scope>
? ? </dependency>
? ? ?<dependency>
? ? ? <groupId>org.apache.spark</groupId>
? ? ? <artifactId>spark-core_2.10</artifactId>
? ? ? <version>1.6.1</version>
? ? ?</dependency>
? ? <dependency>
? ? ? <groupId>org.apache.hadoop</groupId>
? ? ? <artifactId>hadoop-client</artifactId>
? ? ? <version>2.6.4</version>
? ? </dependency>
? ? <dependency>
? ? <groupId>org.apache.spark</groupId>
? ? <artifactId>spark-sql_2.10</artifactId>
? ? <version>1.6.1</version>
? ? </dependency>
? </dependencies>
</project>
下面開始初始化spark
spark程序需要做的第一件事情,就是創(chuàng)建一個SparkContext對象,它將告訴spark如何訪問一個集群,而要創(chuàng)建一個SparkContext對象,你首先要創(chuàng)建一個SparkConf對象,該對象訪問了你的應(yīng)用程序的信息
比如下面的代碼是運行在spark模式下
public class sparkTestCon {
? ? public static void main(String[] args) {
? ? ? ? SparkConf conf=new SparkConf();
? ? ? ? conf.set("spark.testing.memory", "2147480000"); ? ? //因為jvm無法獲得足夠的資源
? ? ? ? JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
? ? ? ? System.out.println(sc);
? ? }
}
下面是運行在本機,把上面的第6行代碼改為如下
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
快速入門
可以參看我的博客,轉(zhuǎn)載的一篇文章?
Spark快速入門
Spark編程
每一個spark應(yīng)用程序都包含一個驅(qū)動程序(driver program ),他會運行用戶的main函數(shù),并在集群上執(zhí)行各種并行操作(parallel operations)
spark提供的最主要的抽象概念有兩種:?
彈性分布式數(shù)據(jù)集(resilient distributed dataset)簡稱RDD ,他是一個元素集合,被分區(qū)地分布到集群的不同節(jié)點上,可以被并行操作,RDDS可以從hdfs(或者任意其他的支持Hadoop的文件系統(tǒng))上的一個文件開始創(chuàng)建,或者通過轉(zhuǎn)換驅(qū)動程序中已經(jīng)存在的Scala集合得到,用戶也可以讓spark將一個RDD持久化到內(nèi)存中,使其能再并行操作中被有效地重復(fù)使用,最后RDD能自動從節(jié)點故障中恢復(fù)
spark的第二個抽象概念是共享變量(shared variables),它可以在并行操作中使用,在默認情況下,當(dāng)spark將一個函數(shù)以任務(wù)集的形式在不同的節(jié)點上并行運行時,會將該函數(shù)所使用的每個變量拷貝傳遞給每一個任務(wù)中,有時候,一個變量需要在任務(wù)之間,或者驅(qū)動程序之間進行共享,spark支持兩種共享變量:?
廣播變量(broadcast variables),它可以在所有節(jié)點的內(nèi)存中緩存一個值。?
累加器(accumulators):只能用于做加法的變量,例如計算器或求和器
RDD的創(chuàng)建有兩種方式?
1.引用外部文件系統(tǒng)的數(shù)據(jù)集(HDFS)?
2.并行化一個已經(jīng)存在于驅(qū)動程序中的集合(并行集合,是通過對于驅(qū)動程序中的集合調(diào)用JavaSparkContext.parallelize來構(gòu)建的RDD)
第一種方式創(chuàng)建?
下面通過代碼來理解RDD和怎么操作RDD
package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
?* 引用外部文件系統(tǒng)的數(shù)據(jù)集(HDFS)創(chuàng)建RDD
?* ?匿名內(nèi)部類定義函數(shù)傳給spark
?* @author 湯高
?*
?*/
public class RDDOps {
? ? //完成對所有行的長度求和
? ? public static void main(String[] args) {
? ? ? ? SparkConf conf=new SparkConf();
? ? ? ? conf.set("spark.testing.memory", "2147480000"); ? ? //因為jvm無法獲得足夠的資源
? ? ? ? JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
? ? ? ? System.out.println(sc);
? ? ? ? //通過hdfs上的文件定義一個RDD 這個數(shù)據(jù)暫時還沒有加載到內(nèi)存,也沒有在上面執(zhí)行動作,lines僅僅指向這個文件
? ? ? ? JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
? ? ? ? //定義lineLengths作為Map轉(zhuǎn)換的結(jié)果 由于惰性,不會立即計算lineLengths
? ? ? ? //第一個參數(shù)為傳入的內(nèi)容,第二個參數(shù)為函數(shù)操作完后返回的結(jié)果類型
? ? ? ? JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
? ? ? ? ? public Integer call(String s) {?
? ? ? ? ? ? ? System.out.println("每行長度"+s.length());
? ? ? ? ? ? ? return s.length(); }
? ? ? ? });
? ? ? ? //運行reduce ?這是一個動作action ?這時候,spark才將計算拆分成不同的task,
? ? ? ? //并運行在獨立的機器上,每臺機器運行他自己的map部分和本地的reducation,并返回結(jié)果集給去驅(qū)動程序
? ? ? ? int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
? ? ? ? ? public Integer call(Integer a, Integer b) { return a + b; }
? ? ? ? });
? ? ? ? System.out.println(totalLength);
? ? ? ? //為了以后復(fù)用 ?持久化到內(nèi)存...
? ? ? ? lineLengths.persist(StorageLevel.MEMORY_ONLY());
? ? }
}
如果覺得剛剛那種寫法難以理解,可以看看第二種寫法
package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
?* 引用外部文件系統(tǒng)的數(shù)據(jù)集(HDFS)創(chuàng)建RDD?
?* ?外部類定義函數(shù)傳給spark
?* @author 湯高
?*
?*/
public class RDDOps2 {
? ? // 完成對所有行的長度求和
? ? public static void main(String[] args) {
? ? ? ? SparkConf conf = new SparkConf();
? ? ? ? conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
? ? ? ? JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
? ? ? ? System.out.println(sc);
? ? ? ? //通過hdfs上的文件定義一個RDD 這個數(shù)據(jù)暫時還沒有加載到內(nèi)存,也沒有在上面執(zhí)行動作,lines僅僅指向這個文件
? ? ? ? JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
? ? ? ? //定義lineLengths作為Map轉(zhuǎn)換的結(jié)果 由于惰性,不會立即計算lineLengths
? ? ? ? JavaRDD<Integer> lineLengths = lines.map(new GetLength());
? ? ? ? //運行reduce ?這是一個動作action ?這時候,spark才將計算拆分成不同的task,
? ? ? ? ? ? ? ? //并運行在獨立的機器上,每臺機器運行他自己的map部分和本地的reducation,并返回結(jié)果集給去驅(qū)動程序
? ? ? ? int totalLength = lineLengths.reduce(new Sum());
? ? ? ? System.out.println("總長度"+totalLength);
? ? ? ? // 為了以后復(fù)用 持久化到內(nèi)存...
? ? ? ? lineLengths.persist(StorageLevel.MEMORY_ONLY());
? ? }
? ? //定義map函數(shù)
? ? //第一個參數(shù)為傳入的內(nèi)容,第二個參數(shù)為函數(shù)操作完后返回的結(jié)果類型
? ? static class GetLength implements Function<String, Integer> {
? ? ? ? public Integer call(String s) {
? ? ? ? ? ? return s.length();
? ? ? ? }
? ? }
? ? //定義reduce函數(shù)?
? ? //第一個參數(shù)為內(nèi)容,第三個參數(shù)為函數(shù)操作完后返回的結(jié)果類型
? ? static class Sum implements Function2<Integer, Integer, Integer> {
? ? ? ? public Integer call(Integer a, Integer b) {
? ? ? ? ? ? return a + b;
? ? ? ? }
? ? }
}
第二種方式創(chuàng)建RDD
package com.tg.spark;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
?* 并行化一個已經(jīng)存在于驅(qū)動程序中的集合創(chuàng)建RDD
?* @author 湯高
?*
?*/
public class RDDOps3 {
? ? // 完成對所有數(shù)求和
? ? public static void main(String[] args) {
? ? ? ? SparkConf conf = new SparkConf();
? ? ? ? conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
? ? ? ? JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
? ? ? ? System.out.println(sc);
? ? ? ? List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
? ? ? ? //并行集合,是通過對于驅(qū)動程序中的集合調(diào)用JavaSparkContext.parallelize來構(gòu)建的RDD
? ? ? ? JavaRDD<Integer> distData = sc.parallelize(data);
? ? ? ? JavaRDD<Integer> lineLengths = distData.map(new GetLength());
? ? ? ? // 運行reduce 這是一個動作action 這時候,spark才將計算拆分成不同的task,
? ? ? ? // 并運行在獨立的機器上,每臺機器運行他自己的map部分和本地的reducation,并返回結(jié)果集給去驅(qū)動程序
? ? ? ? int totalLength = lineLengths.reduce(new Sum());
? ? ? ? System.out.println("總和" + totalLength);
? ? ? ? // 為了以后復(fù)用 持久化到內(nèi)存...
? ? ? ? lineLengths.persist(StorageLevel.MEMORY_ONLY());
? ? }
? ? // 定義map函數(shù)
? ? static class GetLength implements Function<Integer, Integer> {
? ? ? ? @Override
? ? ? ? public Integer call(Integer a) throws Exception {
? ? ? ? ? ? return a;
? ? ? ? }
? ? }
? ? // 定義reduce函數(shù)
? ? static class Sum implements Function2<Integer, Integer, Integer> {
? ? ? ? public Integer call(Integer a, Integer b) {
? ? ? ? ? ? return a + b;
? ? ? ? }
? ? }
}
注意:上面的寫法是基于jdk1.7或者更低版本?
基于jdk1.8有更簡單的寫法?
下面是官方文檔的說明
Note: In this guide, we’ll often use the concise Java 8 lambda syntax to specify Java functions, but in older versions of Java you can implement the interfaces in the org.apache.spark.api.java.function package. We describe passing functions to Spark in more detail below.
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. In Java, functions are represented by classes implementing the interfaces in the org.apache.spark.api.java.function package. There are two ways to create such functions:
Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, and pass an instance of it to Spark.
In Java 8, use lambda expressions to concisely define an implementation.
所以如果要完成上面第一種創(chuàng)建方式,在jdk1.8中可以簡單的這么寫
JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
要完成第二種方式的創(chuàng)建,簡單的這么寫
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
主要不同就是在jdk1.7中我們要自己寫一個函數(shù)傳到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中寫lambda表達式
好了,今天就寫到這里,以后的更多內(nèi)容后面再寫?
碼字不易,轉(zhuǎn)載請指明出處http://blog.csdn.net/tanggao1314/article/details/51570452
參考資料?
Spark編程指南
?
總結(jié)
以上是生活随笔為你收集整理的Java Spark之创建RDD的两种方式和操作RDD的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark RDD创建操作
- 下一篇: 使用Java客户端操作elasticse