大数据学习系列----基于Spark Streaming流式计算
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
個(gè)性化的需求
隨著互聯(lián)網(wǎng)知識(shí)信息指數(shù)級(jí)膨脹,個(gè)性化的需求對(duì)于用戶(hù)來(lái)說(shuō)越來(lái)越重要,通過(guò)推薦算法和用戶(hù)點(diǎn)擊行為的流式計(jì)算可以很簡(jiǎn)單的做出一個(gè)商用的推薦系統(tǒng)。
流程
spark streaming從kafka讀取用戶(hù)行為數(shù)據(jù),過(guò)濾數(shù)據(jù)后從redis中拉取物品相似度矩陣,從db或緩存中獲取用戶(hù)歷史行為,通過(guò)協(xié)同過(guò)濾進(jìn)行興趣/ctr候選集計(jì)算,將結(jié)果緩存到redis,異步持久化到db,通過(guò)接口進(jìn)行數(shù)據(jù)展示。
開(kāi)發(fā)包使用KafkaUtils類(lèi)
設(shè)置消費(fèi)者offset
數(shù)據(jù)從kafka拉取時(shí),可能因?yàn)槌绦虍惓?#xff0c;造成數(shù)據(jù)丟失或不一致,可以通過(guò)kafka把數(shù)據(jù)重新拉取,可以指定offset讀取。
從kafka拉取數(shù)據(jù),轉(zhuǎn)換為spark streaming中的數(shù)據(jù)結(jié)構(gòu)DStream。 接收數(shù)據(jù)有兩種:
receiver方式
基本的使用kafka高階api,接收的所有數(shù)據(jù)存儲(chǔ)在spark的executor中,之后spark streaming提交的job會(huì)處理這些數(shù)據(jù)。
reveiver方式,spark中partiton和kafka的partition并不是相關(guān)的,如果加大每個(gè)topic的partition數(shù)量,僅僅增加線(xiàn)程來(lái)處理由單一receiver消費(fèi)的主題,但并沒(méi)有增加spark在處理數(shù)據(jù)上的并行度。
對(duì)于不同的group和topic,可以使用多個(gè)receiver創(chuàng)建不同的DStream來(lái)提升并行度,之后利用union來(lái)統(tǒng)一成一個(gè)DStream。
直接讀取方式
Direct方式,沒(méi)有receiver這一層,會(huì)周期性的獲取kafka中每個(gè)topic的每個(gè)partition中新的offset,之后根據(jù)設(shè)定的maxRatePerPartition來(lái)處理每個(gè)batch。
相較于receiver方式的優(yōu)勢(shì)是:
receiver方式,是從zk獲取offset值,zk保存了當(dāng)前消費(fèi)的offset值,如果重新啟動(dòng)開(kāi)始消費(fèi)會(huì)接著上次offset繼續(xù)消費(fèi)。 direct方式中,直接從kafka來(lái)讀取數(shù)據(jù),offset要自己記錄,可以通過(guò)checkpoint,數(shù)據(jù)庫(kù),文件記錄,或者寫(xiě)回到zk。
調(diào)優(yōu)
如果批處理時(shí)間設(shè)置短,產(chǎn)生的job并不能在這期間完成,就會(huì)造成數(shù)據(jù)不斷累積,導(dǎo)致spark streaming阻塞。
spark streaming中的DStream如果被反復(fù)利用,最好使用cache(),將數(shù)據(jù)流緩存起來(lái),防止過(guò)度調(diào)度造成網(wǎng)絡(luò)開(kāi)銷(xiāo)。
設(shè)置合理的GC,并行垃圾回收。
轉(zhuǎn)載于:https://my.oschina.net/u/1000241/blog/1619063
總結(jié)
以上是生活随笔為你收集整理的大数据学习系列----基于Spark Streaming流式计算的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 恋与制作人专家怎么获得(汉典恋字的基本解
- 下一篇: redis3.0.7_sds.c_sds