日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming

發(fā)布時間:2024/9/5 编程问答 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

  spark streaming介紹
  Spark streaming是Spark核心API的一個擴展,它對實時流式數(shù)據(jù)的處理具有可擴展性、高吞吐量、可容錯性等特點。我們可以從kafka、flume、witter、 ZeroMQ、Kinesis等源獲取數(shù)據(jù),也可以通過由 高階函數(shù)map、reduce、join、window等組成的復(fù)雜算法計算出數(shù)據(jù)。最后,處理后的數(shù)據(jù)可以推送到文件系統(tǒng)、數(shù)據(jù)庫、實時儀表盤中.

為什么使用spark streaming
很多大數(shù)據(jù)應(yīng)用程序需要實時處理數(shù)據(jù)流。思考:
我們知道spark和storm都能處理實時數(shù)據(jù),可是spark是如何處理實時數(shù)據(jù)的,spark包含比較多組件:包括

    • spark core
    • Spark SQL
    • Spark Streaming
    • GraphX
    • MLlib

spark core中包含RDD、DataFrame和DataSet等,因此spark sql是為了兼容hive而產(chǎn)生的sql語句,GraphX提供的分布式圖計算框架,MLlib提供的機器學(xué)習(xí)框架。因此spark所謂的實時處理數(shù)據(jù)則是通過spark streaming來實現(xiàn)的。

什么是StreamingContext
為了初始化Spark Streaming程序,一個StreamingContext對象必需被創(chuàng)建,它是Spark Streaming所有流操作的主要入口。一個StreamingContext 對象可以用SparkConf對象創(chuàng)建。StreamingContext這里可能不理解,其實跟SparkContext也差不多的。(可參考讓你真正理解什么是SparkContext, SQLContext 和HiveContext)。同理也有hadoop Context,它們都是全文對象,并且會獲取配置文件信息。那么配置文件有哪些?比如hadoop的core-site.xml,hdfs-site.xml等,spark如spark-defaults.conf等。這時候我們可能對StreamingContext有了一定的認識。下面一個例子

為了初始化Spark Streaming程序,一個StreamingContext對象必需被創(chuàng)建,它是Spark Streaming所有流操作的主要入口。
一個StreamingContext 對象可以用SparkConf對象創(chuàng)建。

import org.apache.spark._ impoty org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc=new StreamingContext(conf,Seconds(1)) appName表示你的應(yīng)用程序顯示在集群UI上的名字,master 是一個Spark、Mesos、YARN集群URL 或者一個特殊字符串“l(fā)ocal”,它表示程序用本地模式運行。當(dāng)程序運行在集群中時,你并不希望在程序中硬編碼 master ,而是希望用 sparksubmit啟動應(yīng)用程序,并從 spark-submit 中得到 master 的值。對于本地測試或者單元測試,你可以傳遞“l(fā)ocal”字符串在同
一個進程內(nèi)運行Spark Streaming。需要注意的是,它在內(nèi)部創(chuàng)建了一個SparkContext對象,你可以通過 ssc.sparkContext訪問這個SparkContext對象。
批時間片需要根據(jù)你的程序的潛在需求以及集群的可用資源來設(shè)定,你可以在性能調(diào)優(yōu)那一節(jié)獲取詳細的信息.可以利用已經(jīng)存在的 SparkContext 對象創(chuàng)建 StreamingContext 對象。 當(dāng)一個上下文(context)定義之后,你必須按照以下幾步進行操作

  • 定義輸入源;
  • 準(zhǔn)備好流計算指令;
  • 利用 streamingContext.start() 方法接收和處理數(shù)據(jù);
  • 處理過程將一直持續(xù),直到 streamingContext.stop() 方法被調(diào)用。



幾點需要注意的地方:
  • 一旦一個context已經(jīng)啟動,就不能有新的流算子建立或者是添加到context中。
  • 一旦一個context已經(jīng)停止,它就不能再重新啟動
  • 在JVM中,同一時間只能有一個StreamingContext處于活躍狀態(tài)
  • 在StreamingContext上調(diào)用 stop() 方法,也會關(guān)閉SparkContext對象。如果只想僅關(guān)閉StreamingContext對象,設(shè)
  • 置 stop() 的可選參數(shù)為false
  • 一個SparkContext對象可以重復(fù)利用去創(chuàng)建多個StreamingContext對象,前提條件是前面的StreamingContext在后面
  • StreamingContext創(chuàng)建之前關(guān)閉(不關(guān)閉SparkContext)。
  什么是DStream Spark Streaming支持一個高層的抽象,叫做離散流( discretized stream )或者 DStream ,它代表連續(xù)的數(shù)據(jù)流。DStream既可以利用從Kafka, Flume和Kinesis等源獲取的輸入數(shù)據(jù)流創(chuàng)建,也可以 在其他DStream的基礎(chǔ)上通過高階函數(shù)獲得。在內(nèi)部,DStream是由一系列RDDs組成。 舉例:
一個簡單的基于Streaming的workCount代碼如下: package com.debugo.example import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkConf ?? object WordCountStreaming { ??def main(args: Array[String]): Unit ={ ????val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("spark://172.19.1.232:7077") ?? ????//create the streaming context ????val? ssc = new StreamingContext(sparkConf, Seconds(30)) ?? ????//process file when new file be found. ????val lines = ssc.textFileStream("file:///home/spark/data") ????val words = lines.flatMap(_.split(" ")) ????val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)//這里不是rdd,而是dstream ????wordCounts.print() ????ssc.start() ????ssc.awaitTermination() ??} }
這段代碼實現(xiàn)了當(dāng)指定的路徑有新文件生成時,就會對這些文件執(zhí)行wordcount,并把結(jié)果print。具體流程如下:

代碼詮釋:
使用Spark Streaming就需要創(chuàng)建StreamingContext對象(類似SparkContext)。創(chuàng)建StreamingContext對象所需的參數(shù)與SparkContext基本一致,包括設(shè)定Master節(jié)點(setMaster),設(shè)定應(yīng)用名稱(setAppName)。第二個參數(shù)Seconds(30),指定了Spark Streaming處理數(shù)據(jù)的時間間隔為30秒。需要根據(jù)具體應(yīng)用需要和集群處理能力進行設(shè)置。

轉(zhuǎn)載地址:http://www.aboutyun.com/thread-21141-1-1.html

轉(zhuǎn)載于:https://www.cnblogs.com/chengzhihua/p/9512634.html

總結(jié)

以上是生活随笔為你收集整理的Spark Streaming的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。