日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密

發(fā)布時間:2025/6/15 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

? ?Spark Streaming的DStream為我們提供了一個updateStateByKey方法,它的主要功能是可以隨著時間的流逝在Spark Streaming中為每一個key維護一份state狀態(tài),通過更新函數(shù)對該key的狀態(tài)不斷更新。對每一個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經(jīng)存在的key進行state的狀態(tài)更新(對每個新出現(xiàn)的key,會同樣執(zhí)行state的更新函數(shù)操作),但是如果通過更新函數(shù)對state更新后返回none的話,此時刻key對應(yīng)的state狀態(tài)被刪除掉,需要特別說明的是state可以是任意類型的數(shù)據(jù)結(jié)構(gòu),這就為我們的計算帶來無限的想象空間;

? 重點來了!!!如果要不斷的更新每個key的state,就一定會涉及到狀態(tài)的保存和容錯,這個時候就需要開啟checkpoint機制和功能,需要說明的是checkpoint可以保存一切可以存儲在文件系統(tǒng)上的內(nèi)容,例如:程序未處理的數(shù)據(jù)及已經(jīng)擁有的狀態(tài)。

? 補充說明:關(guān)于流式處理對歷史狀態(tài)進行保存和更新具有重大實用意義,例如進行廣告(投放廣告和運營廣告效果評估的價值意義,熱點隨時追蹤、熱力圖)

? 簡單的來說,如果我們需要進行wordcount,每個batchInterval都會計算出新的一批數(shù)據(jù),這批數(shù)據(jù)如何更新到以前計算的結(jié)果上?updateStateByKey就能實現(xiàn)此功能。

函數(shù)定義如下:

def?updateStateByKey[S:?ClassTag](updateFunc:?(Seq[V],?Option[S])?=>?Option[S]):?DStream[(K,?S)]?=?ssc.withScope?{updateStateByKey(updateFunc,?defaultPartitioner()) }

updateStateByKey 需要傳入一個函數(shù),該函數(shù)有兩個參數(shù)Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key對應(yīng)的以前的值。返回的時一個key的最新值。


下面我們用實例演示:

package?com.dt.spark.streamingimport?org.apache.spark.SparkConf import?org.apache.spark.streaming.{Seconds,?StreamingContext}/***?Created?by?Administrator?on?2016/5/3.*/ object?UpdateStateByKeyDemo?{def?main(args:?Array[String])?{val?conf?=?new?SparkConf().setAppName("UpdateStateByKeyDemo")val?ssc?=?new?StreamingContext(conf,Seconds(20))//要使用updateStateByKey方法,必須設(shè)置Checkpoint。ssc.checkpoint("/checkpoint/")val?socketLines?=?ssc.socketTextStream("spark-master",9999)socketLines.flatMap(_.split(",")).map(word=>(word,1)).updateStateByKey((currValues:Seq[Int],preValue:Option[Int])?=>{val?currValue?=?currValues.sumSome(currValue?+?preValue.getOrElse(0))}).print()ssc.start()ssc.awaitTermination()ssc.stop()} }

打包上傳至spark集群。


打開nc,發(fā)送測試數(shù)據(jù)

root@spark-master:~#?nc?-lk?9999 hadoop,spark,scala,hive hadoop,Hbase,spark

運行spark 程序

root@spark-master:~#?/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit?--class?com.dt.spark.streaming.UpdateStateByKeyDemo??--master?spark://spark-master:7077?./spark.jar


查看運行結(jié)果:

------------------------------------------- Time:?1462282180000?ms ------------------------------------------- (scala,1) (hive,1) (spark,2) (hadoop,2) (Hbase,1)


我們在nc中再輸入一些數(shù)據(jù)

root@spark-master:~#?nc?-lk?9999 hadoop,spark,scala,hive hadoop,Hbase,spark hadoop,spark,scala,hive hadoop,Hbase,spark

再次查看結(jié)果:

------------------------------------------- Time:?1462282200000?ms ------------------------------------------- (scala,2) (hive,2) (spark,4) (hadoop,4) (Hbase,2)


可見,它將我們兩次統(tǒng)計結(jié)果合并了。


備注:

1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark?
2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
3、新浪微博:?http://www.weibo.com/ilovepains

轉(zhuǎn)載于:https://blog.51cto.com/lqding/1769852

總結(jié)

以上是生活随笔為你收集整理的第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。