日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming 实战案例(二) Transformation操作

發布時間:2024/1/23 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming 实战案例(二) Transformation操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本節主要內容

本節部分內容來自官方文檔:http://spark.apache.org/docs/latest/streaming-programming-guide.html

  • DStream Transformation操作
  • 1. Transformation操作

    TransformationMeaning
    map(func)對DStream中的各個元素進行func函數操作,然后返回一個新的DStream.
    flatMap(func)與map方法類似,只不過各個輸入項可以被輸出為零個或多個輸出項
    filter(func)過濾出所有函數func返回值為true的DStream元素并返回一個新的DStream
    repartition(numPartitions)增加或減少DStream中的分區數,從而改變DStream的并行度
    union(otherStream)將源DStream和輸入參數為otherDStream的元素合并,并返回一個新的DStream.
    count()通過對DStreaim中的各個RDD中的元素進行計數,然后返回只有一個元素的RDD構成的DStream
    reduce(func)對源DStream中的各個RDD中的元素利用func進行聚合操作,然后返回只有一個元素的RDD構成的新的DStream.
    countByValue()對于元素類型為K的DStream,返回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數
    reduceByKey(func, [numTasks])利用func函數對源DStream中的key進行聚合操作,然后返回新的(K,V)對構成的DStream
    join(otherStream, [numTasks])輸入為(K,V)、(K,W)類型的DStream,返回一個新的(K,(V,W)類型的DStream
    cogroup(otherStream, [numTasks])輸入為(K,V)、(K,W)類型的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream
    transform(func)通過RDD-to-RDD函數作用于源碼DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD
    updateStateByKey(func)根據于key的前置狀態和key的新值,對key進行更新,返回一個新狀態的DStream

    具體示例:

    //讀取本地文件~/streaming文件夾val lines = ssc.textFileStream(args(0))val words = lines.flatMap(_.split(" "))val wordMap = words.map(x => (x, 1))val wordCounts=wordMap.reduceByKey(_ + _)val filteredWordCounts=wordCounts.filter(_._2>1)val numOfCount=filteredWordCounts.count()val countByValue=words.countByValue()val union=words.union(word1)val transform=words.transform(x=>x.map(x=>(x,1))) //顯式原文件lines.print() //打印flatMap結果words.print() //打印map結果wordMap.print() //打印reduceByKey結果wordCounts.print() //打印filter結果filteredWordCounts.print() //打印count結果numOfCount.print() //打印countByValue結果countByValue.print() //打印union結果union.print() //打印transform結果transform.print()
    • 1

    下面的代碼是運行時添加的文件內容

    root@sparkmaster:~/streaming# echo "A B C D" >> test12.txt; echo "A B" >> test12.txt
    • 1

    下面是前面各個函數的結果

    ------------------------------------------- lines.print() ------------------------------------------- A B C D A B------------------------------------------- flatMap結果 ------------------------------------------- A B C D A B------------------------------------------- map結果 ------------------------------------------- (A,1) (B,1) (C,1) (D,1) (A,1) (B,1)------------------------------------------- reduceByKey結果 ------------------------------------------- (B,2) (D,1) (A,2) (C,1)------------------------------------------- filter結果 ------------------------------------------- (B,2) (A,2)------------------------------------------- count結果 ------------------------------------------- 2------------------------------------------- countByValue結果 ------------------------------------------- (B,2) (D,1) (A,2) (C,1)------------------------------------------- union結果 ------------------------------------------- A B C D A B A B C D ...------------------------------------------- transform結果 ------------------------------------------- (A,1) (B,1) (C,1) (D,1) (A,1) (B,1)
    • 1

    示例2:
    上節課中演示的WordCount代碼并沒有只是對輸入的單詞進行分開計數,沒有記錄前一次計數的狀態,如果想要連續地進行計數,則可以使用updateStateByKey方法來進行。下面的代碼主要給大家演示如何updateStateByKey的方法。

    import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner import org.apache.spark.streaming._object StatefulNetworkWordCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")System.exit(1)}//函數字面量,輸入的當前值與前一次的狀態結果進行累加val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.sumval previousCount = state.getOrElse(0)Some(currentCount + previousCount)}//輸入類型為K,V,S,返回值類型為K,S//V對應為帶求和的值,S為前一次的狀態val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))}val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[4]")//每一秒處理一次val ssc = new StreamingContext(sparkConf, Seconds(1))//當前目錄為checkpoint結果目錄,后面會講checkpoint在Spark Streaming中的應用ssc.checkpoint(".")//RDD的初始化結果val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))//使用Socket作為輸入源,本例ip為localhost,端口為9999val lines = ssc.socketTextStream(args(0), args(1).toInt)//flatMap操作val words = lines.flatMap(_.split(" "))//map操作val wordDstream = words.map(x => (x, 1))//updateStateByKey函數使用val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)stateDstream.print()ssc.start()ssc.awaitTermination()} }

    下圖是初始時的值:

    使用下列命令啟動netcat server

    root@sparkmaster:~/streaming# nc -lk 9999

    然后輸入

    root@sparkmaster:~/streaming# nc -lk 9999 hello

    將得到下圖的結果

    然后再輸入world,

    root@sparkmaster:~/streaming# nc -lk 9999 hello world
    • 1

    則將得到下列結果

    總結

    以上是生活随笔為你收集整理的Spark Streaming 实战案例(二) Transformation操作的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 日本欧美在线 | 亚洲国产精品一区二区三区 | 新亚洲天堂 | 午夜电影天堂 | 99视频免费看 | 人人妻人人爽一区二区三区 | 国产乡下妇女做爰视频 | 公侵犯一区二区三区四区中文字幕 | 自拍偷拍亚洲天堂 | 少妇人妻一区二区三区 | 少妇aaaa| 99伊人| 欧美黄一级 | 欧美乱插 | 欧美在线一区二区 | 无码人妻精品一区二区三 | 日本高清不卡在线 | 中文字幕网站 | 亚洲美女自拍偷拍 | 探花视频在线免费观看 | 噜噜吧噜噜色 | 日韩一区二区三区免费视频 | www国产精品 | 国产精品第 | 欧美日韩一级黄色片 | 天堂视频一区二区 | 日韩av女优在线观看 | 农民工hdxxxx性中国 | 综合爱爱网 | 成人精品久久久午夜福利 | 国产日韩精品一区二区三区在线 | 欧美超碰在线观看 | 欧美午夜精品久久久久久人妖 | 亚洲性欧美 | 片多多在线观看 | brazzers精品成人一区 | 中文字幕精品一区久久久久 | 美女免费看片 | 午夜视频大全 | 国产日韩精品在线观看 | 最新免费av | 欧美三级一区二区 | 日韩成人激情 | 国产91久久精品一区二区 | 二级毛片在线观看 | 成人av电影在线观看 | 在线免费观看视频a | 国产精品久久久久久亚洲影视 | 成人午夜看片 | 经典一区二区三区 | 美女黄色片网站 | 成人娱乐网 | 日日夜夜爱 | av一级二级 | 三浦惠理子aⅴ一二三区 | 国产91网址 | 丰满人妻综合一区二区三区 | 激情精品| 又大又粗欧美黑人aaaaa片 | 日韩视频在线播放 | 日本视频在线 | 性a视频| 国产精品传媒一区二区 | 免费av在线播放 | 中国黄色1级片 | 992tv人人草| 91尤物视频在线观看 | 国产精品白浆一区二小说 | 色综合狠狠 | 蜜桃久久av一区 | 五十路毛片 | 一本免费视频 | 88av视频| 中文字幕高清在线免费播放 | 免费一级片在线观看 | 先锋资源一区 | 国产中文字幕视频 | 国产网红无码精品视频 | 潘金莲激情呻吟欲求不满视频 | 日韩一区二区三区免费视频 | 欧美精品在线第一页 | 五月天导航 | 国产aa毛片 | 国产成人精品无码高潮 | a√天堂网 | 中文字幕一区二区在线观看 | www.黄色网址 | 亚洲国产精品va在线看黑人 | 丰满肉嫩西川结衣av | 亚洲色图久久 | 天天色天天色 | 韩国成人理伦片免费播放 | 经典三级在线视频 | 观看免费av | 久久午夜精品 | 91精彩视频 | 欧美日韩亚洲免费 | 成人视频在线观看 | 欧美成a |