Spark Streaming
spark streaming介紹
Spark streaming是Spark核心API的一個擴展,它對實時流式數(shù)據(jù)的處理具有可擴展性、高吞吐量、可容錯性等特點。我們可以從kafka、flume、witter、 ZeroMQ、Kinesis等源獲取數(shù)據(jù),也可以通過由 高階函數(shù)map、reduce、join、window等組成的復(fù)雜算法計算出數(shù)據(jù)。最后,處理后的數(shù)據(jù)可以推送到文件系統(tǒng)、數(shù)據(jù)庫、實時儀表盤中.
為什么使用spark streaming
很多大數(shù)據(jù)應(yīng)用程序需要實時處理數(shù)據(jù)流。思考:
我們知道spark和storm都能處理實時數(shù)據(jù),可是spark是如何處理實時數(shù)據(jù)的,spark包含比較多組件:包括
- spark core
- Spark SQL
- Spark Streaming
- GraphX
- MLlib
spark core中包含RDD、DataFrame和DataSet等,因此spark sql是為了兼容hive而產(chǎn)生的sql語句,GraphX提供的分布式圖計算框架,MLlib提供的機器學(xué)習(xí)框架。因此spark所謂的實時處理數(shù)據(jù)則是通過spark streaming來實現(xiàn)的。
什么是StreamingContext
為了初始化Spark Streaming程序,一個StreamingContext對象必需被創(chuàng)建,它是Spark Streaming所有流操作的主要入口。一個StreamingContext 對象可以用SparkConf對象創(chuàng)建。StreamingContext這里可能不理解,其實跟SparkContext也差不多的。(可參考讓你真正理解什么是SparkContext, SQLContext 和HiveContext)。同理也有hadoop Context,它們都是全文對象,并且會獲取配置文件信息。那么配置文件有哪些?比如hadoop的core-site.xml,hdfs-site.xml等,spark如spark-defaults.conf等。這時候我們可能對StreamingContext有了一定的認識。下面一個例子
為了初始化Spark Streaming程序,一個StreamingContext對象必需被創(chuàng)建,它是Spark Streaming所有流操作的主要入口。
一個StreamingContext 對象可以用SparkConf對象創(chuàng)建。
一個進程內(nèi)運行Spark Streaming。需要注意的是,它在內(nèi)部創(chuàng)建了一個SparkContext對象,你可以通過 ssc.sparkContext訪問這個SparkContext對象。
批時間片需要根據(jù)你的程序的潛在需求以及集群的可用資源來設(shè)定,你可以在性能調(diào)優(yōu)那一節(jié)獲取詳細的信息.可以利用已經(jīng)存在的 SparkContext 對象創(chuàng)建 StreamingContext 對象。 當(dāng)一個上下文(context)定義之后,你必須按照以下幾步進行操作
- 定義輸入源;
- 準(zhǔn)備好流計算指令;
- 利用 streamingContext.start() 方法接收和處理數(shù)據(jù);
- 處理過程將一直持續(xù),直到 streamingContext.stop() 方法被調(diào)用。
幾點需要注意的地方:
- 一旦一個context已經(jīng)啟動,就不能有新的流算子建立或者是添加到context中。
- 一旦一個context已經(jīng)停止,它就不能再重新啟動
- 在JVM中,同一時間只能有一個StreamingContext處于活躍狀態(tài)
- 在StreamingContext上調(diào)用 stop() 方法,也會關(guān)閉SparkContext對象。如果只想僅關(guān)閉StreamingContext對象,設(shè)
- 置 stop() 的可選參數(shù)為false
- 一個SparkContext對象可以重復(fù)利用去創(chuàng)建多個StreamingContext對象,前提條件是前面的StreamingContext在后面
- StreamingContext創(chuàng)建之前關(guān)閉(不關(guān)閉SparkContext)。
一個簡單的基于Streaming的workCount代碼如下: package com.debugo.example import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf ?? object WordCountStreaming { ??def main(args: Array[String]): Unit ={ ????val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("spark://172.19.1.232:7077") ?? ????//create the streaming context ????val? ssc = new StreamingContext(sparkConf, Seconds(30)) ?? ????//process file when new file be found. ????val lines = ssc.textFileStream("file:///home/spark/data") ????val words = lines.flatMap(_.split(" ")) ????val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)//這里不是rdd,而是dstream ????wordCounts.print() ????ssc.start() ????ssc.awaitTermination() ??} }
這段代碼實現(xiàn)了當(dāng)指定的路徑有新文件生成時,就會對這些文件執(zhí)行wordcount,并把結(jié)果print。具體流程如下:
代碼詮釋:
使用Spark Streaming就需要創(chuàng)建StreamingContext對象(類似SparkContext)。創(chuàng)建StreamingContext對象所需的參數(shù)與SparkContext基本一致,包括設(shè)定Master節(jié)點(setMaster),設(shè)定應(yīng)用名稱(setAppName)。第二個參數(shù)Seconds(30),指定了Spark Streaming處理數(shù)據(jù)的時間間隔為30秒。需要根據(jù)具體應(yīng)用需要和集群處理能力進行設(shè)置。
轉(zhuǎn)載于:https://www.cnblogs.com/chengzhihua/p/9512634.html
總結(jié)
以上是生活随笔為你收集整理的Spark Streaming的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [JSOI2010] 满汉全席
- 下一篇: (3)LoraWAN:链路控制、SF B