Spark _29_SparkStreaming初始
生活随笔
收集整理的這篇文章主要介紹了
Spark _29_SparkStreaming初始
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
SparkStreaming簡介
SparkStreaming是流式處理框架,是Spark API的擴展,支持可擴展、高吞吐量、容錯的實時數據流處理,實時數據的來源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高級功能的復雜算子來處理流數據。例如:map,reduce,join,window 。最終,處理后的數據可以存放在文件系統,數據庫等,方便實時展現。
SparkStreaming與Storm的區別
?
SparkStreaming初始
- SparkStreaming初始理解
注意:
- receiver ?task是7*24小時一直在執行,一直接受數據,將一段時間內接收來的數據保存到batch中。假設batchInterval為5s,那么會將接收來的數據每隔5秒封裝到一個batch中,batch沒有分布式計算特性,這一個batch的數據又被封裝到一個RDD中,RDD最終封裝到一個DStream中。
例如:假設batchInterval為5秒,每隔5秒通過SparkStreaming將得到一個DStream,在第6秒的時候計算這5秒的數據,假設執行任務的時間是3秒,那么第6~9秒一邊在接收數據,一邊在計算任務,9~10秒只是在接收數據。然后在第11秒的時候重復上面的操作。
- 如果job執行的時間大于batchInterval會有什么樣的問題?
如果接受過來的數據設置的級別是僅內存,接收來的數據會越堆積越多,最后可能會導致OOM(如果設置StorageLevel包含disk, 則內存存放不下的數據會溢寫至disk, 加大延遲?)。
SparkStreaming代碼
代碼注意事項:
- 啟動socket server 服務器:nc –lk 9999
- receiver模式下接受數據,local的模擬線程必須大于等于2,一個線程用來receiver用來接受數據,另一個線程用來執行job。
- Durations時間設置就是我們能接收的延遲度。這個需要根據集群的資源情況以及任務的執行情況來調節。
- 創建JavaStreamingContext有兩種方式(SparkConf,SparkContext)
- 所有的代碼邏輯完成后要有一個output operation類算子。
- JavaStreamingContext.start() Streaming框架啟動后不能再次添加業務邏輯。
- JavaStreamingContext.stop() 無參的stop方法將SparkContext一同關閉,stop(false),不會關閉SparkContext。
- JavaStreamingContext.stop()停止之后不能再調用start。
javaAPI:
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnline"); /*** 在創建streaminContext的時候 設置batch Interval*/ JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node5", 9999);JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String s) {return Arrays.asList(s.split(" "));} });JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);} });JavaPairDStream<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;} });//outputoperator類的算子 counts.print();jsc.start();//等待spark程序被終止jsc.awaitTermination();jsc.stop(false);scalaAPI:
package com.sparkStreamingimport org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}object StreamTest {def main(args: Array[String]): Unit = {//在這里將local線程設置兩個,否則會只能接受數據,無法處理val conf = new SparkConf().setMaster("local[2]").setAppName("st") // val sc = new SparkContext(conf)val ssc = new StreamingContext(conf,Durations.seconds(5)) // new StreamingContext(sc,Durations.seconds(5))ssc.sparkContext.setLogLevel("error")val lines = ssc.socketTextStream("henu1",9999)val words = lines.flatMap(one => {one.split(" ")})val map = words.map(one => {(one,1)})val result = map.reduceByKey(_+_)result.print(100)ssc.start()ssc.awaitTermination()ssc.stop()} }【注】啟動前先在對應虛擬機啟動
[root@henu1 ~]# nc -lk 9999如果沒有nc。下載
[root@henu1 ~]# yum install -y nc?再執行? ?nc -lk 9999
?否則報錯:
?
?然后在linux端輸入字段:
idea端:
?
在瀏覽器輸入localhost:4040,看其UI頁面:
具體就不多說了。
總結
以上是生活随笔為你收集整理的Spark _29_SparkStreaming初始的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一些 Linux 系统故障修复和修复技巧
- 下一篇: Spark _28_窗口函数