Spark对Kafka两种连接方式的对比——Receiver和Direct
在知乎 Flink 取代 Spark Streaming 的實(shí)戰(zhàn)之路中,提到
因此下面對(duì)兩種方式進(jìn)行詳細(xì)說(shuō)明一下。
Receiver方式
Receiver:接收器模式是使用Kafka高級(jí)Consumer API實(shí)現(xiàn)的。與所有接收器一樣,從Kafka通過(guò)Receiver接收的數(shù)據(jù)存儲(chǔ)在Spark Executor的內(nèi)存中,然后由Spark Streaming啟動(dòng)的job來(lái)處理數(shù)據(jù)。然而默認(rèn)配置下,這種方式可能會(huì)因?yàn)榈讓拥氖《鴣G失數(shù)據(jù)(請(qǐng)參閱接收器可靠性)。如果要啟用高可靠機(jī)制,確保零數(shù)據(jù)丟失,要啟用Spark Streaming的預(yù)寫(xiě)日志機(jī)制(Write Ahead Log,(已引入)在Spark 1.2)。該機(jī)制會(huì)同步地將接收到的Kafka數(shù)據(jù)保存到分布式文件系統(tǒng)(比如HDFS)上的預(yù)寫(xiě)日志中,以便底層節(jié)點(diǎn)在發(fā)生故障時(shí)也可以使用預(yù)寫(xiě)日志中的數(shù)據(jù)進(jìn)行恢復(fù)。
單點(diǎn)讀數(shù)據(jù),讀到的數(shù)據(jù)會(huì)緩存到executor的cache里,增大了內(nèi)存使用的壓力。
在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式,但由于Spark Streaming消費(fèi)的數(shù)據(jù)和Zookeeper中記錄的offset不同步,這種方式偶爾會(huì)造成數(shù)據(jù)重復(fù)消費(fèi)。
特點(diǎn)
在spark的executor中,啟動(dòng)一個(gè)接收器,專(zhuān)門(mén)用于讀取kafka的數(shù)據(jù),然后存入到內(nèi)存中,供sparkStreaming消費(fèi)
1、為了保證數(shù)據(jù)0丟失,WAL,數(shù)據(jù)會(huì)保存2份,有冗余
2、Receiver是單點(diǎn)讀數(shù)據(jù),如果掛掉,程序不能運(yùn)行
3、數(shù)據(jù)讀到executor內(nèi)存中,增大了內(nèi)存使用的壓力,如果消費(fèi)不及時(shí),會(huì)造成數(shù)據(jù)積壓
如下圖:
詳細(xì)圖示:
還有幾個(gè)需要注意的點(diǎn):
1、Kafka中topic的partition與Spark Streaming中生成的RDD的partition無(wú)關(guān),因此,在KafkaUtils.createStream()中,增加某個(gè)topic的partition的數(shù)量,只會(huì)增加單個(gè)Receiver消費(fèi)topic的線程數(shù),也就是讀取Kafka中topic partition的線程數(shù)量,它不會(huì)增加Spark在處理數(shù)據(jù)時(shí)的并行性。
2、可以使用不同的consumer group和topic創(chuàng)建多個(gè)Kafka輸入DStream,以使用多個(gè)receiver并行接收數(shù)據(jù)。
3、如果已使用HDFS等復(fù)制文件系統(tǒng)啟用了“預(yù)讀日志”,則接收的數(shù)據(jù)已在日志中復(fù)制。因此,輸入流的存儲(chǔ)級(jí)別的存儲(chǔ)級(jí)別StorageLevel.MEMORY_AND_DISK_SER(即,使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。
Direct方式
Direct:直連模式,在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式?jīng)]有receiver這一層,其會(huì)周期性的獲取Kafka中每個(gè)topic的每個(gè)partition中的最新offsets,并且相應(yīng)的定義要在每個(gè)batch中處理偏移范圍,當(dāng)啟動(dòng)處理數(shù)據(jù)的作業(yè)時(shí),kafka的簡(jiǎn)單的消費(fèi)者api用于從kafka讀取定義的偏移范圍 。其形式如下圖:
這種方法相較于Receiver方式的優(yōu)勢(shì)在于:
1、簡(jiǎn)化的并行:在Receiver的方式中我們提到創(chuàng)建多個(gè)Receiver之后利用union來(lái)合并成一個(gè)Dstream的方式提高數(shù)據(jù)傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對(duì)應(yīng)的并行讀取Kafka數(shù)據(jù),這種映射關(guān)系也更利于理解和優(yōu)化。
2、高效:在Receiver的方式中,為了達(dá)到0數(shù)據(jù)丟失需要將數(shù)據(jù)存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數(shù)據(jù),浪費(fèi)!而第二種方式不存在這個(gè)問(wèn)題,只要我們Kafka的數(shù)據(jù)保留時(shí)間足夠長(zhǎng),我們都能夠從Kafka進(jìn)行數(shù)據(jù)恢復(fù)。
3、精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式,但由于Spark Streaming消費(fèi)的數(shù)據(jù)和Zookeeper中記錄的offset不同步,這種方式偶爾會(huì)造成數(shù)據(jù)重復(fù)消費(fèi)。而第二種方式,直接使用了簡(jiǎn)單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進(jìn)行記錄,消除了這種不一致性。
請(qǐng)注意,此方法的一個(gè)缺點(diǎn)是它不會(huì)更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka監(jiān)視工具將不會(huì)顯示進(jìn)度。但是,您可以在每個(gè)批處理中訪問(wèn)此方法處理的偏移量,并自行更新Zookeeper。
直連模式特點(diǎn):tatch time 每隔一段時(shí)間,去kafka讀取一批數(shù)據(jù),然后消費(fèi)
?????????簡(jiǎn)化并行度,rdd的分區(qū)數(shù)量=topic的分區(qū)數(shù)量
?????????數(shù)據(jù)存儲(chǔ)于kafka中,沒(méi)有數(shù)據(jù)冗余
?????????不存在單點(diǎn)問(wèn)題
?????????效率高
?????????可以實(shí)現(xiàn)僅消費(fèi)一次的語(yǔ)義 exactly-once語(yǔ)義
詳細(xì)圖示:
總結(jié)
以上是生活随笔為你收集整理的Spark对Kafka两种连接方式的对比——Receiver和Direct的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 谈一谈RDD 持久化的三个算子:cach
- 下一篇: 利剑无意之scala小考核