第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密
? ?Spark Streaming的DStream為我們提供了一個updateStateByKey方法,它的主要功能是可以隨著時間的流逝在Spark Streaming中為每一個key維護一份state狀態(tài),通過更新函數(shù)對該key的狀態(tài)不斷更新。對每一個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經(jīng)存在的key進行state的狀態(tài)更新(對每個新出現(xiàn)的key,會同樣執(zhí)行state的更新函數(shù)操作),但是如果通過更新函數(shù)對state更新后返回none的話,此時刻key對應(yīng)的state狀態(tài)被刪除掉,需要特別說明的是state可以是任意類型的數(shù)據(jù)結(jié)構(gòu),這就為我們的計算帶來無限的想象空間;
? 重點來了!!!如果要不斷的更新每個key的state,就一定會涉及到狀態(tài)的保存和容錯,這個時候就需要開啟checkpoint機制和功能,需要說明的是checkpoint可以保存一切可以存儲在文件系統(tǒng)上的內(nèi)容,例如:程序未處理的數(shù)據(jù)及已經(jīng)擁有的狀態(tài)。
? 補充說明:關(guān)于流式處理對歷史狀態(tài)進行保存和更新具有重大實用意義,例如進行廣告(投放廣告和運營廣告效果評估的價值意義,熱點隨時追蹤、熱力圖)
? 簡單的來說,如果我們需要進行wordcount,每個batchInterval都會計算出新的一批數(shù)據(jù),這批數(shù)據(jù)如何更新到以前計算的結(jié)果上?updateStateByKey就能實現(xiàn)此功能。
函數(shù)定義如下:
def?updateStateByKey[S:?ClassTag](updateFunc:?(Seq[V],?Option[S])?=>?Option[S]):?DStream[(K,?S)]?=?ssc.withScope?{updateStateByKey(updateFunc,?defaultPartitioner()) }updateStateByKey 需要傳入一個函數(shù),該函數(shù)有兩個參數(shù)Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key對應(yīng)的以前的值。返回的時一個key的最新值。
下面我們用實例演示:
package?com.dt.spark.streamingimport?org.apache.spark.SparkConf import?org.apache.spark.streaming.{Seconds,?StreamingContext}/***?Created?by?Administrator?on?2016/5/3.*/ object?UpdateStateByKeyDemo?{def?main(args:?Array[String])?{val?conf?=?new?SparkConf().setAppName("UpdateStateByKeyDemo")val?ssc?=?new?StreamingContext(conf,Seconds(20))//要使用updateStateByKey方法,必須設(shè)置Checkpoint。ssc.checkpoint("/checkpoint/")val?socketLines?=?ssc.socketTextStream("spark-master",9999)socketLines.flatMap(_.split(",")).map(word=>(word,1)).updateStateByKey((currValues:Seq[Int],preValue:Option[Int])?=>{val?currValue?=?currValues.sumSome(currValue?+?preValue.getOrElse(0))}).print()ssc.start()ssc.awaitTermination()ssc.stop()} }打包上傳至spark集群。
打開nc,發(fā)送測試數(shù)據(jù)
root@spark-master:~#?nc?-lk?9999 hadoop,spark,scala,hive hadoop,Hbase,spark運行spark 程序
root@spark-master:~#?/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit?--class?com.dt.spark.streaming.UpdateStateByKeyDemo??--master?spark://spark-master:7077?./spark.jar查看運行結(jié)果:
------------------------------------------- Time:?1462282180000?ms ------------------------------------------- (scala,1) (hive,1) (spark,2) (hadoop,2) (Hbase,1)我們在nc中再輸入一些數(shù)據(jù)
root@spark-master:~#?nc?-lk?9999 hadoop,spark,scala,hive hadoop,Hbase,spark hadoop,spark,scala,hive hadoop,Hbase,spark再次查看結(jié)果:
------------------------------------------- Time:?1462282200000?ms ------------------------------------------- (scala,2) (hive,2) (spark,4) (hadoop,4) (Hbase,2)可見,它將我們兩次統(tǒng)計結(jié)果合并了。
備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark?
2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
3、新浪微博:?http://www.weibo.com/ilovepains
轉(zhuǎn)載于:https://blog.51cto.com/lqding/1769852
總結(jié)
以上是生活随笔為你收集整理的第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一步一步学lucene——(第四步:搜索
- 下一篇: [android] 切换按钮-自定义控件