Spark Streaming 实战案例(二) Transformation操作
生活随笔
收集整理的這篇文章主要介紹了
Spark Streaming 实战案例(二) Transformation操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
本節主要內容
本節部分內容來自官方文檔:http://spark.apache.org/docs/latest/streaming-programming-guide.html
1. Transformation操作
| map(func) | 對DStream中的各個元素進行func函數操作,然后返回一個新的DStream. |
| flatMap(func) | 與map方法類似,只不過各個輸入項可以被輸出為零個或多個輸出項 |
| filter(func) | 過濾出所有函數func返回值為true的DStream元素并返回一個新的DStream |
| repartition(numPartitions) | 增加或減少DStream中的分區數,從而改變DStream的并行度 |
| union(otherStream) | 將源DStream和輸入參數為otherDStream的元素合并,并返回一個新的DStream. |
| count() | 通過對DStreaim中的各個RDD中的元素進行計數,然后返回只有一個元素的RDD構成的DStream |
| reduce(func) | 對源DStream中的各個RDD中的元素利用func進行聚合操作,然后返回只有一個元素的RDD構成的新的DStream. |
| countByValue() | 對于元素類型為K的DStream,返回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數 |
| reduceByKey(func, [numTasks]) | 利用func函數對源DStream中的key進行聚合操作,然后返回新的(K,V)對構成的DStream |
| join(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)類型的DStream,返回一個新的(K,(V,W)類型的DStream |
| cogroup(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)類型的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream |
| transform(func) | 通過RDD-to-RDD函數作用于源碼DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD |
| updateStateByKey(func) | 根據于key的前置狀態和key的新值,對key進行更新,返回一個新狀態的DStream |
具體示例:
//讀取本地文件~/streaming文件夾val lines = ssc.textFileStream(args(0))val words = lines.flatMap(_.split(" "))val wordMap = words.map(x => (x, 1))val wordCounts=wordMap.reduceByKey(_ + _)val filteredWordCounts=wordCounts.filter(_._2>1)val numOfCount=filteredWordCounts.count()val countByValue=words.countByValue()val union=words.union(word1)val transform=words.transform(x=>x.map(x=>(x,1))) //顯式原文件lines.print() //打印flatMap結果words.print() //打印map結果wordMap.print() //打印reduceByKey結果wordCounts.print() //打印filter結果filteredWordCounts.print() //打印count結果numOfCount.print() //打印countByValue結果countByValue.print() //打印union結果union.print() //打印transform結果transform.print()- 1
下面的代碼是運行時添加的文件內容
root@sparkmaster:~/streaming# echo "A B C D" >> test12.txt; echo "A B" >> test12.txt- 1
下面是前面各個函數的結果
------------------------------------------- lines.print() ------------------------------------------- A B C D A B------------------------------------------- flatMap結果 ------------------------------------------- A B C D A B------------------------------------------- map結果 ------------------------------------------- (A,1) (B,1) (C,1) (D,1) (A,1) (B,1)------------------------------------------- reduceByKey結果 ------------------------------------------- (B,2) (D,1) (A,2) (C,1)------------------------------------------- filter結果 ------------------------------------------- (B,2) (A,2)------------------------------------------- count結果 ------------------------------------------- 2------------------------------------------- countByValue結果 ------------------------------------------- (B,2) (D,1) (A,2) (C,1)------------------------------------------- union結果 ------------------------------------------- A B C D A B A B C D ...------------------------------------------- transform結果 ------------------------------------------- (A,1) (B,1) (C,1) (D,1) (A,1) (B,1)- 1
示例2:
上節課中演示的WordCount代碼并沒有只是對輸入的單詞進行分開計數,沒有記錄前一次計數的狀態,如果想要連續地進行計數,則可以使用updateStateByKey方法來進行。下面的代碼主要給大家演示如何updateStateByKey的方法。
下圖是初始時的值:
使用下列命令啟動netcat server
然后輸入
root@sparkmaster:~/streaming# nc -lk 9999 hello將得到下圖的結果
然后再輸入world,
root@sparkmaster:~/streaming# nc -lk 9999 hello world- 1
則將得到下列結果
總結
以上是生活随笔為你收集整理的Spark Streaming 实战案例(二) Transformation操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: arcgis api for jav
- 下一篇: Scala入门到精通——第九节 继承与组