日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > windows >内容正文

windows

reducebykeyandwindow java_Spark Streaming笔记整理(三):DS的transformation与output操作

發(fā)布時(shí)間:2024/9/27 windows 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 reducebykeyandwindow java_Spark Streaming笔记整理(三):DS的transformation与output操作 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

[TOC]

DStream的各種transformation

Transformation Meaning

map(func) 對(duì)DStream中的各個(gè)元素進(jìn)行func函數(shù)操作,然后返回一個(gè)新的DStream.

flatMap(func) 與map方法類(lèi)似,只不過(guò)各個(gè)輸入項(xiàng)可以被輸出為零個(gè)或多個(gè)輸出項(xiàng)

filter(func) 過(guò)濾出所有函數(shù)func返回值為true的DStream元素并返回一個(gè)新的DStream

repartition(numPartitions) 增加或減少DStream中的分區(qū)數(shù),從而改變DStream的并行度

union(otherStream) 將源DStream和輸入?yún)?shù)為otherDStream的元素合并,并返回一個(gè)新的DStream.

count() 通過(guò)對(duì)DStreaim中的各個(gè)RDD中的元素進(jìn)行計(jì)數(shù),然后返回只有一個(gè)元素的RDD構(gòu)成的DStream

reduce(func) 對(duì)源DStream中的各個(gè)RDD中的元素利用func進(jìn)行聚合操作,然后返回只有一個(gè)元素的RDD構(gòu)成的新的DStream.

countByValue() 對(duì)于元素類(lèi)型為K的DStream,返回一個(gè)元素為(K,Long)鍵值對(duì)形式的新的DStream,Long對(duì)應(yīng)的值為源DStream中各個(gè)RDD的key出現(xiàn)的次數(shù)

reduceByKey(func, [numTasks]) 利用func函數(shù)對(duì)源DStream中的key進(jìn)行聚合操作,然后返回新的(K,V)對(duì)構(gòu)成的DStream

join(otherStream, [numTasks]) 輸入為(K,V)、(K,W)類(lèi)型的DStream,返回一個(gè)新的(K,(V,W)類(lèi)型的DStream

cogroup(otherStream, [numTasks]) 輸入為(K,V)、(K,W)類(lèi)型的DStream,返回一個(gè)新的 (K, Seq[V], Seq[W]) 元組類(lèi)型的DStream

transform(func) 通過(guò)RDD-to-RDD函數(shù)作用于源碼DStream中的各個(gè)RDD,可以是任意的RDD操作,從而返回一個(gè)新的RDD

updateStateByKey(func) 根據(jù)于key的前置狀態(tài)和key的新值,對(duì)key進(jìn)行更新,返回一個(gè)新?tīng)顟B(tài)的Dstream

Window 函數(shù):

可以看到很多都是在RDD中已經(jīng)有的transformation算子操作,所以這里只關(guān)注transform、updateStateByKey和window函數(shù)

transformation之transform操作

DStream transform

1、transform操作,應(yīng)用在DStream上時(shí),可以用于執(zhí)行任意的RDD到RDD的轉(zhuǎn)換操作。它可以用于實(shí)現(xiàn),DStream API中所沒(méi)有提供的操作。比如說(shuō),DStream API中,并沒(méi)有提供將一個(gè)DStream中的每個(gè)batch,與一個(gè)特定的RDD進(jìn)行join的操作。但是我們自己就可以使用transform操作來(lái)實(shí)現(xiàn)該功能。

2、DStream.join(),只能join其他DStream。在DStream每個(gè)batch的RDD計(jì)算出來(lái)之后,會(huì)去跟其他DStream的RDD進(jìn)行join。

案例

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

/**

* 使用Transformation之transform來(lái)完成在線黑名單過(guò)濾

* 需求:

* 將日志數(shù)據(jù)中來(lái)自于ip["27.19.74.143", "110.52.250.126"]實(shí)時(shí)過(guò)濾掉

* 數(shù)據(jù)格式

* 27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127

*/

object _06SparkStreamingTransformOps {

def main(args: Array[String]): Unit = {

if (args == null || args.length < 2) {

System.err.println(

"""Parameter Errors! Usage:

|hostname: 監(jiān)聽(tīng)的網(wǎng)絡(luò)socket的主機(jī)名或ip地址

|port: 監(jiān)聽(tīng)的網(wǎng)絡(luò)socket的端口

""".stripMargin)

System.exit(-1)

}

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf()

.setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)

.setMaster("local[2]")

val ssc = new StreamingContext(conf, Seconds(2))

val hostname = args(0).trim

val port = args(1).trim.toInt

//黑名單數(shù)據(jù)

val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true))

// val blacklist = List("27.19.74.143", "110.52.250.126")

val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist)

val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)

// 如果用到一個(gè)DStream和rdd進(jìn)行操作,無(wú)法使用dstream直接操作,只能使用transform來(lái)進(jìn)行操作

val filteredDStream:DStream[String] = linesDStream.transform(rdd => {

val ip2InfoRDD:RDD[(String, String)] = rdd.map{line => {

(line.split("##")(0), line)

}}

/** A(M) B(N)兩張表:

* across join

* 交叉連接,沒(méi)有on條件的連接,會(huì)產(chǎn)生笛卡爾積(M*N條記錄) 不能用

* inner join

* 等值連接,取A表和B表的交集,也就是獲取在A和B中都有的數(shù)據(jù),沒(méi)有的剔除掉 不能用

* left outer join

* 外鏈接:最常用就是左外連接(將左表中所有的數(shù)據(jù)保留,右表中能夠?qū)?yīng)上的數(shù)據(jù)正常顯示,在右表中對(duì)應(yīng)不上,顯示為null)

* 可以通過(guò)非空判斷是左外連接達(dá)到inner join的結(jié)果

*/

val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] = ip2InfoRDD.leftOuterJoin(blacklistRDD)

joinedInfoRDD.filter{case (ip, (line, joined)) => {

joined == None

}}//執(zhí)行過(guò)濾操作

.map{case (ip, (line, joined)) => line}

})

filteredDStream.print()

ssc.start()

ssc.awaitTermination()

ssc.stop() // stop中的boolean參數(shù),設(shè)置為true,關(guān)閉該ssc對(duì)應(yīng)的SparkContext,默認(rèn)為false,只關(guān)閉自身

}

}

nc中產(chǎn)生數(shù)據(jù):

[uplooking@uplooking01 ~]$ nc -lk 4893

27.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582

110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##603

8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

輸出結(jié)果如下:

-------------------------------------------

Time: 1526006084000 ms

-------------------------------------------

8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

transformation之updateStateByKey操作

概述

1、Spark Streaming的updateStateByKey可以DStream中的數(shù)據(jù)進(jìn)行按key做reduce操作,然后對(duì)各個(gè)批次的數(shù)據(jù)進(jìn)行累加。

2、 updateStateByKey 解釋

以DStream中的數(shù)據(jù)進(jìn)行按key做reduce操作,然后對(duì)各個(gè)批次的數(shù)據(jù)進(jìn)行累加在有新的數(shù)據(jù)信息進(jìn)入或更新時(shí),可以讓用戶保持想要的任何狀。使用這個(gè)功能需要完成兩步:

1) 定義狀態(tài):可以是任意數(shù)據(jù)類(lèi)型

2) 定義狀態(tài)更新函數(shù):用一個(gè)函數(shù)指定如何使用先前的狀態(tài),從輸入流中的新值更新?tīng)顟B(tài)。對(duì)于有狀態(tài)操作,要不斷的把當(dāng)前和歷史的時(shí)間切片的RDD累加計(jì)算,隨著時(shí)間的流失,計(jì)算的數(shù)據(jù)規(guī)模會(huì)變得越來(lái)越大

3、要思考的是如果數(shù)據(jù)量很大的時(shí)候,或者對(duì)性能的要求極為苛刻的情況下,可以考慮將數(shù)據(jù)放在Redis或者tachyon或者ignite上

4、注意,updateStateByKey操作,要求必須開(kāi)啟Checkpoint機(jī)制。

案例

Scala版

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

* 狀態(tài)函數(shù)updateStateByKey

* 更新key的狀態(tài)(就是key對(duì)應(yīng)的value)

*

* 通常的作用,計(jì)算某個(gè)key截止到當(dāng)前位置的狀態(tài)

* 統(tǒng)計(jì)截止到目前為止的word對(duì)應(yīng)count

* 要想完成截止到目前為止的操作,必須將歷史的數(shù)據(jù)和當(dāng)前最新的數(shù)據(jù)累計(jì)起來(lái),所以需要一個(gè)地方來(lái)存放歷史數(shù)據(jù)

* 這個(gè)地方就是checkpoint目錄

*

*/

object _07SparkStreamingUpdateStateByKeyOps {

def main(args: Array[String]): Unit = {

if (args == null || args.length < 2) {

System.err.println(

"""Parameter Errors! Usage:

|hostname: 監(jiān)聽(tīng)的網(wǎng)絡(luò)socket的主機(jī)名或ip地址

|port: 監(jiān)聽(tīng)的網(wǎng)絡(luò)socket的端口

""".stripMargin)

System.exit(-1)

}

val hostname = args(0).trim

val port = args(1).trim.toInt

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf()

.setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName)

.setMaster("local[2]")

val ssc = new StreamingContext(conf, Seconds(2))

ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb")

// 接收到的當(dāng)前批次的數(shù)據(jù)

val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)

// 這是記錄下來(lái)的當(dāng)前批次的數(shù)據(jù)

val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

val usbDStream:DStream[(String, Int)] = rbkDStream.updateStateByKey(updateFunc)

usbDStream.print()

ssc.start()

ssc.awaitTermination()

ssc.stop() // stop中的boolean參數(shù),設(shè)置為true,關(guān)閉該ssc對(duì)應(yīng)的SparkContext,默認(rèn)為false,只關(guān)閉自身

}

/**

* @param seq 當(dāng)前批次的key對(duì)應(yīng)的數(shù)據(jù)

* @param history 歷史key對(duì)應(yīng)的數(shù)據(jù),可能有可能沒(méi)有

* @return

*/

def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = {

var sum = seq.sum

if(history.isDefined) {

sum += history.get

}

Option[Int](sum)

}

}

nc產(chǎn)生數(shù)據(jù):

[uplooking@uplooking01 ~]$ nc -lk 4893

hello hello

hello you hello he hello me

輸出結(jié)果如下:

-------------------------------------------

Time: 1526009358000 ms

-------------------------------------------

(hello,2)

18/05/11 11:29:18 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000:

-------------------------------------------

Time: 1526009360000 ms

-------------------------------------------

(hello,5)

(me,1)

(you,1)

(he,1)

18/05/11 11:29:20 INFO WriteAheadLogManager for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000:

-------------------------------------------

Time: 1526009362000 ms

-------------------------------------------

(hello,5)

(me,1)

(you,1)

(he,1)

Java版

用法略有不同,主要是 狀態(tài)更新函數(shù)的寫(xiě)法上有區(qū)別,如下:

package cn.xpleaf.bigdata.spark.java.streaming.p1;

import com.google.common.base.Optional;

import org.apache.log4j.Level;

import org.apache.log4j.Logger;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import java.util.Arrays;

import java.util.List;

public class _02SparkStreamingUpdateStateByKeyOps {

public static void main(String[] args) {

if(args == null || args.length < 2) {

System.err.println("Parameter Errors! Usage: ");

System.exit(-1);

}

Logger.getLogger("org.apache.spark").setLevel(Level.OFF);

SparkConf conf = new SparkConf()

.setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName())

.setMaster("local[2]");

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));

jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb");

String hostname = args[0].trim();

int port = Integer.valueOf(args[1].trim());

JavaReceiverInputDStream lineDStream = jsc.socketTextStream(hostname, port);//默認(rèn)的持久化級(jí)別:MEMORY_AND_DISK_SER_2

JavaDStream wordsDStream = lineDStream.flatMap(new FlatMapFunction() {

@Override

public Iterable call(String line) throws Exception {

return Arrays.asList(line.split(" "));

}

});

JavaPairDStream pairsDStream = wordsDStream.mapToPair(word -> {

return new Tuple2(word, 1);

});

JavaPairDStream rbkDStream = pairsDStream.reduceByKey(new Function2() {

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

// 做歷史的累計(jì)操作

JavaPairDStream usbDStream = rbkDStream.updateStateByKey(new Function2, Optional, Optional>() {

@Override

public Optional call(List current, Optional history) throws Exception {

int sum = 0;

for (int i : current) {

sum += i;

}

if (history.isPresent()) {

sum += history.get();

}

return Optional.of(sum);

}

});

usbDStream.print();

jsc.start();//啟動(dòng)流式計(jì)算

jsc.awaitTermination();//等待執(zhí)行結(jié)束

jsc.close();

}

}

transformation之window操作

DStream window 滑動(dòng)窗口

Spark Streaming提供了滑動(dòng)窗口操作的支持,從而讓我們可以對(duì)一個(gè)滑動(dòng)窗口內(nèi)的數(shù)據(jù)執(zhí)行計(jì)算操作。每次掉落在窗口內(nèi)的RDD的數(shù)據(jù),會(huì)被聚合起來(lái)執(zhí)行計(jì)算操作,然后生成的RDD,會(huì)作為window DStream的一個(gè)RDD。比如下圖中,就是對(duì)每三秒鐘的數(shù)據(jù)執(zhí)行一次滑動(dòng)窗口計(jì)算,這3秒內(nèi)的3個(gè)RDD會(huì)被聚合起來(lái)進(jìn)行處理,然后過(guò)了兩秒鐘,又會(huì)對(duì)最近三秒內(nèi)的數(shù)據(jù)執(zhí)行滑動(dòng)窗口計(jì)算。所以每個(gè)滑動(dòng)窗口操作,都必須指定兩個(gè)參數(shù),窗口長(zhǎng)度以及滑動(dòng)間隔,而且這兩個(gè)參數(shù)值都必須是batch間隔的整數(shù)倍。

1.紅色的矩形就是一個(gè)窗口,窗口hold的是一段時(shí)間內(nèi)的數(shù)據(jù)流。

2.這里面每一個(gè)time都是時(shí)間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個(gè)單位時(shí)間,窗口會(huì)slide一次。

所以基于窗口的操作,需要指定2個(gè)參數(shù):

window length - The duration of the window (3 in the figure)

slide interval - The interval at which the window-based operation is performed (2 in the figure).

1.窗口大小,個(gè)人感覺(jué)是一段時(shí)間內(nèi)數(shù)據(jù)的容器。

2.滑動(dòng)間隔,就是我們可以理解的cron表達(dá)式吧。

舉個(gè)例子吧:

還是以最著名的wordcount舉例,每隔10秒,統(tǒng)計(jì)一下過(guò)去30秒過(guò)來(lái)的數(shù)據(jù)。

// Reduce last 30 seconds of data, every 10 seconds

val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

DSstream window滑動(dòng)容器功能

window 對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行自定義的計(jì)算

countByWindow 對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行count操作

reduceByWindow 對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行reduce操作

reduceByKeyAndWindow 對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行reduceByKey操作

countByValueAndWindow 對(duì)每個(gè)滑動(dòng)窗口的數(shù)據(jù)執(zhí)行countByValue操作

案例

測(cè)試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

*窗口函數(shù)window

* 每隔多長(zhǎng)時(shí)間(滑動(dòng)頻率slideDuration)統(tǒng)計(jì)過(guò)去多長(zhǎng)時(shí)間(窗口長(zhǎng)度windowDuration)中的數(shù)據(jù)

* 需要注意的就是窗口長(zhǎng)度和滑動(dòng)頻率

* windowDuration = M*batchInterval,

slideDuration = N*batchInterval

*/

object _08SparkStreamingWindowOps {

def main(args: Array[String]): Unit = {

if (args == null || args.length < 2) {

System.err.println(

"""Parameter Errors! Usage:

|hostname: 監(jiān)聽(tīng)的網(wǎng)絡(luò)socket的主機(jī)名或ip地址

|port: 監(jiān)聽(tīng)的網(wǎng)絡(luò)socket的端口

""".stripMargin)

System.exit(-1)

}

val hostname = args(0).trim

val port = args(1).trim.toInt

Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

val conf = new SparkConf()

.setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName)

.setMaster("local[2]")

val ssc = new StreamingContext(conf, Seconds(2))

// 接收到的當(dāng)前批次的數(shù)據(jù)

val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)

val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1))

// 每隔4s,統(tǒng)計(jì)過(guò)去6s中產(chǎn)生的數(shù)據(jù)

val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4))

retDStream.print()

ssc.start()

ssc.awaitTermination()

ssc.stop() // stop中的boolean參數(shù),設(shè)置為true,關(guān)閉該ssc對(duì)應(yīng)的SparkContext,默認(rèn)為false,只關(guān)閉自身

}

}

nc產(chǎn)生數(shù)據(jù):

[uplooking@uplooking01 ~]$ nc -lk 4893

hello you

hello he

hello me

hello you

hello he

輸出結(jié)果如下:

-------------------------------------------

Time: 1526016316000 ms

-------------------------------------------

(hello,4)

(me,1)

(you,2)

(he,1)

-------------------------------------------

Time: 1526016320000 ms

-------------------------------------------

(hello,5)

(me,1)

(you,2)

(he,2)

-------------------------------------------

Time: 1526016324000 ms

-------------------------------------------

DStream的output操作以及foreachRDD

DStream output操作

1、print 打印每個(gè)batch中的前10個(gè)元素,主要用于測(cè)試,或者是不需要執(zhí)行什么output操作時(shí),用于簡(jiǎn)單觸發(fā)一下job。

2、saveAsTextFile(prefix, [suffix]) 將每個(gè)batch的數(shù)據(jù)保存到文件中。每個(gè)batch的文件的命名格式為:prefix-TIME_IN_MS[.suffix]

3、saveAsObjectFile 同上,但是將每個(gè)batch的數(shù)據(jù)以序列化對(duì)象的方式,保存到SequenceFile中。

4、saveAsHadoopFile 同上,將數(shù)據(jù)保存到Hadoop文件中

5、foreachRDD 最常用的output操作,遍歷DStream中的每個(gè)產(chǎn)生的RDD,進(jìn)行處理。可以將每個(gè)RDD中的數(shù)據(jù)寫(xiě)入外部存儲(chǔ),比如文件、數(shù)據(jù)庫(kù)、緩存等。通常在其中,是針對(duì)RDD執(zhí)行action操作的,比如foreach。

DStream foreachRDD詳解

相關(guān)內(nèi)容其實(shí)在Spark開(kāi)發(fā)調(diào)優(yōu)中已經(jīng)有相關(guān)的說(shuō)明。

通常在foreachRDD中,都會(huì)創(chuàng)建一個(gè)Connection,比如JDBC Connection,然后通過(guò)Connection將數(shù)據(jù)寫(xiě)入外部存儲(chǔ)。

誤區(qū)一:在RDD的foreach操作外部,創(chuàng)建Connection

這種方式是錯(cuò)誤的,因?yàn)樗鼤?huì)導(dǎo)致Connection對(duì)象被序列化后傳輸?shù)矫總€(gè)Task中。而這種Connection對(duì)象,實(shí)際上一般是不支持序列化的,也就無(wú)法被傳輸。

dstream.foreachRDD { rdd =>

val connection = createNewConnection()

rdd.foreach { record => connection.send(record)

}

}

誤區(qū)二:在RDD的foreach操作內(nèi)部,創(chuàng)建Connection

這種方式是可以的,但是效率低下。因?yàn)樗鼤?huì)導(dǎo)致對(duì)于RDD中的每一條數(shù)據(jù),都創(chuàng)建一個(gè)Connection對(duì)象。而通常來(lái)說(shuō),Connection的創(chuàng)建,是很消耗性能的。

dstream.foreachRDD { rdd =>

rdd.foreach { record =>

val connection = createNewConnection()

connection.send(record)

connection.close()

}

}

DStream foreachRDD合理使用

合理方式一:使用RDD的foreachPartition操作,并且在該操作內(nèi)部,創(chuàng)建Connection對(duì)象,這樣就相當(dāng)于是,為RDD的每個(gè)partition創(chuàng)建一個(gè)Connection對(duì)象,節(jié)省資源的多了。

dstream.foreachRDD { rdd =>

rdd.foreachPartition { partitionOfRecords =>

val connection = createNewConnection()

partitionOfRecords.foreach(record => connection.send(record))

connection.close()

}

}

合理方式二:自己手動(dòng)封裝一個(gè)靜態(tài)連接池,使用RDD的foreachPartition操作,并且在該操作內(nèi)部,從靜態(tài)連接池中,通過(guò)靜態(tài)方法,獲取到一個(gè)連接,使用之后再還回去。這樣的話,甚至在多個(gè)RDD的partition之間,也可以復(fù)用連接了。而且可以讓連接池采取懶創(chuàng)建的策略,并且空閑一段時(shí)間后,將其釋放掉。

dstream.foreachRDD { rdd =>

rdd.foreachPartition { partitionOfRecords =>

val connection = ConnectionPool.getConnection()

partitionOfRecords.foreach(record => connection.send(record))

ConnectionPool.returnConnection(connection)

}

}

foreachRDD 與foreachPartition實(shí)現(xiàn)實(shí)戰(zhàn)

需要注意的是:

(1)、你最好使用forEachPartition函數(shù)來(lái)遍歷RDD,并且在每臺(tái)Work上面創(chuàng)建數(shù)據(jù)庫(kù)的connection。

(2)、如果你的數(shù)據(jù)庫(kù)并發(fā)受限,可以通過(guò)控制數(shù)據(jù)的分區(qū)來(lái)減少并發(fā)。

(3)、在插入MySQL的時(shí)候最好使用批量插入。

(4),確保你寫(xiě)入的數(shù)據(jù)庫(kù)過(guò)程能夠處理失敗,因?yàn)槟悴迦霐?shù)據(jù)庫(kù)的過(guò)程可能會(huì)經(jīng)過(guò)網(wǎng)絡(luò),這可能導(dǎo)致數(shù)據(jù)插入數(shù)據(jù)庫(kù)失敗。

(5)、不建議將你的RDD數(shù)據(jù)寫(xiě)入到MySQL等關(guān)系型數(shù)據(jù)庫(kù)中。

這部分內(nèi)容其實(shí)可以參考開(kāi)發(fā)調(diào)優(yōu)部分的案例,只是那里并沒(méi)有foreachRDD,因?yàn)槠洳](méi)有使用DStream,但是原理是一樣的,因?yàn)樽罱K都是針對(duì)RDD來(lái)進(jìn)行操作的。

總結(jié)

以上是生活随笔為你收集整理的reducebykeyandwindow java_Spark Streaming笔记整理(三):DS的transformation与output操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。