spark消费kafka产生数据堆积怎么处理_SparkStreaming读取Kafka的两种方式
本文主要從以下幾個方面介紹SparkStreaming讀取Kafka的兩種方式:
一、SparkStreaming簡介
二、Kafka簡介
三、Redis簡介(可用于保存歷史數(shù)據(jù)或偏移量數(shù)據(jù))
四、SparkStreaming讀取Kafka數(shù)據(jù)的兩種方式
五、演示Demo
一、SparkStreaming簡介
可以參考這篇文章:SparkStreaming 詳解
二、Kafka簡介
可以參考這篇文章:Kafka(分布式發(fā)布訂閱消息系統(tǒng)) 簡介
三、Redis簡介
可以參考這篇文章:Redis簡介
四、SparkStreaming讀取Kafka數(shù)據(jù)的兩種方式
spark streaming提供了兩種獲取方式,一種是利用接收器(receiver)和kafaka的高層API實(shí)現(xiàn)。
一種是不利用接收器,直接用kafka底層的API來實(shí)現(xiàn)(spark1.3以后引入)。
1、reciver鏈接方式(有些問題,開發(fā)中不采用這種方式)
- 用KafkaUtils.createDstream創(chuàng)建鏈接。Receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。
- Receiver方式是通過zookeeper來連接kafka隊列,調(diào)用Kafka高階API,offset存儲在zookeeper,由Receiver維護(hù)。
- 在executor上會有receiver從kafka接收數(shù)據(jù)并存儲在Spark executor中,在到了batch時間后觸發(fā)job去處理接收到的數(shù)據(jù),1個receiver占用1個core使用wal預(yù)寫機(jī)制,因?yàn)樾枰褂胔dfs等存儲,因此會降低性能。
receiver方式
基于Receiver方式存在的問題:
- 啟用WAL機(jī)制,每次處理之前需要將該batch內(nèi)的數(shù)據(jù)備份到checkpoint目錄中,這降低了數(shù)據(jù)處理效率,同時加重了Receiver的壓力;另外由于數(shù)據(jù)備份機(jī)制,會受到負(fù)載影響,負(fù)載一高就會出現(xiàn)延遲的風(fēng)險,導(dǎo)致應(yīng)用崩潰。
- 采用MEMORY_AND_DISK_SER降低對內(nèi)存的要求,但是在一定程度上影響了計算的速度。
- 單Receiver內(nèi)存。由于Receiver是屬于Executor的一部分,為了提高吞吐量,提高Receiver的內(nèi)存。但是在每次batch計算中,參與計算的batch并不會使用這么多內(nèi)存,導(dǎo)致資源嚴(yán)重浪費(fèi)。
- 提高并行度,采用多個Receiver來保存kafka的數(shù)據(jù)。Receiver讀取數(shù)據(jù)是異步的,不會參與計算。如果提高了并行度來平衡吞吐量很不劃算。
- Receiver和計算的Executor是異步的,在遇到網(wǎng)絡(luò)等因素時,會導(dǎo)致計算出現(xiàn)延遲,計算隊列一直在增加,而Receiver一直在接收數(shù)據(jù),這非常容易導(dǎo)致程序崩潰。
- 在程序失敗恢復(fù)時,有可能出現(xiàn)數(shù)據(jù)部分落地,但是程序失敗,未更新offsets的情況,這會導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi)。
2、Direct直連方式(開發(fā)中使用的方式)
- 使用KafkaUtils.createDirectStream創(chuàng)建鏈接。這種方式定期從kafka的topic下對應(yīng)的partition中查詢最新偏移量,并在每個批次中根據(jù)相應(yīng)的定義的偏移范圍進(jìn)行處理。Spark通過調(diào)用kafka簡單的消費(fèi)者API讀取一定范圍的數(shù)據(jù)。
- Direct方式是直接連接kafka分區(qū)來獲取數(shù)據(jù)。從每個分區(qū)直接讀取數(shù)據(jù)大大提高了并行能力Direct方式調(diào)用Kafka低階API(底層API),offset自己存儲和維護(hù),默認(rèn)由Spark維護(hù)在checkpoint中,消除了與zk不一致的情況當(dāng)然也可以自己手動維護(hù),把offset存在mysql、redis中所以基于Direct模式可以在開發(fā)中使用,且借助Direct模式的特點(diǎn)+手動操作可以保證數(shù)據(jù)的Exactly once 精準(zhǔn)一次
基于Direct方式的優(yōu)勢:
- 簡化并行讀取:如果要讀取多個partition,不需要創(chuàng)建多個輸入DStream然后對他們進(jìn)行union操作。Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition,并且會并行從kafka中讀取數(shù)據(jù)。所以在kafka partition和RDD partition之間,有一一對應(yīng)的關(guān)系。
- 高性能:如果要保證數(shù)據(jù)零丟失,在基于Receiver的方式中,需要開啟WAL機(jī)制。這種方式其實(shí)效率很低,因?yàn)閿?shù)據(jù)實(shí)際被復(fù)制了兩份,kafka自己本身就有高可靠的機(jī)制,會對數(shù)據(jù)復(fù)制一份,而這里又會復(fù)制一份到WAL中。而基于Direct的方式,不依賴于Receiver,不需要開啟WAL機(jī)制,只要kafka中做了數(shù)據(jù)的復(fù)制,那么就可以通過kafka的副本進(jìn)行恢復(fù)。
- 強(qiáng)一致語義:基于Receiver的方式,使用kafka的高階API來在Zookeeper中保存消費(fèi)過的offset。這是消費(fèi)kafka數(shù)據(jù)的傳統(tǒng)方式。這種方式配合WAL機(jī)制,可以保證數(shù)據(jù)零丟失的高可靠性,但是卻無法保證數(shù)據(jù)被處理一次且僅一次,可能會處理兩次。因?yàn)镾park和Zookeeper之間可能是不同步的。基于Direct的方式,使用kafka的簡單api,Spark Streaming自己就負(fù)責(zé)追蹤消費(fèi)的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數(shù)據(jù)時消費(fèi)一次且僅消費(fèi)一次。
- 降低資源:Direct不需要Receiver,其申請的Executors全部參與到計算任務(wù)中;而Receiver則需要專門的Receivers來讀取kafka數(shù)據(jù)且不參與計算。因此相同的資源申請,Direct能夠支持更大的業(yè)務(wù)。Receiver與其他Executor是異步的,并持續(xù)不斷接收數(shù)據(jù),對于小業(yè)務(wù)量的場景還好,如果遇到大業(yè)務(wù)量時,需要提高Receiver的內(nèi)存,但是參與計算的Executor并不需要那么多的內(nèi)存,而Direct因?yàn)闆]有Receiver,而是在計算的時候讀取數(shù)據(jù),然后直接計算,所以對內(nèi)存的要求很低。
- 魯棒性更好:基于Receiver方式需要Receiver來異步持續(xù)不斷的讀取數(shù)據(jù),因此遇到網(wǎng)絡(luò)、存儲負(fù)載等因素,導(dǎo)致實(shí)時任務(wù)出現(xiàn)堆積,但Receiver卻還在持續(xù)讀取數(shù)據(jù),此種情況容易導(dǎo)致計算崩潰。Direct則沒有這種顧慮,其Driver在觸發(fā)batch計算任務(wù)時,才會讀取數(shù)據(jù)并計算,隊列出現(xiàn)堆積并不不會引起程序的失敗。
基于Direct方式的不足:
- Direct方式需要采用checkpoint或者第三方存儲來維護(hù)offset,而不是像Receiver那樣,通過Zookeeper來維護(hù)offsets,提高了用戶的開發(fā)成本。
- 基于Receiver方式指定topic指定consumer的消費(fèi)情況均能夠通過Zookeeper來監(jiān)控,而Direct則沒有這么便利,如果想做監(jiān)控并可視化,則需要投入人力開發(fā)。
五、演示Demo
1、reciver鏈接方式
package xxximport org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * Receiver鏈接方式 */object KafkaWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) val zkQuorum = "slave2:2181,slave3:2181,slave4:2181" val groupId = "g1" val topic = Map[String, Int]("test1" -> 1) //創(chuàng)建DStream,需要KafkaDStream val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic, StorageLevel.MEMORY_AND_DISK_SER) //對數(shù)據(jù)進(jìn)行處理 //Kafak的ReceiverInputDStream[(String, String)]里面裝的是一個元組(key是寫入的key,value是實(shí)際寫入的內(nèi)容) val lines: DStream[String] = data.map(_._2) //對DSteam進(jìn)行操作,操作這個抽象(代理,描述),就像操作一個本地的集合一樣,類似于RDD val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //打印結(jié)果(Action) reduced.print() //啟動sparksteaming程序 ssc.start() //等待優(yōu)雅的退出 ssc.awaitTermination() }}2、直連方式(用zookeeper存儲偏移量)
步驟:
準(zhǔn)備zookeeper集群存儲讀取到額kafka數(shù)據(jù)的每個分區(qū)的偏移量
調(diào)用KafkaUtils.createDirectStream建立直連鏈接
讀取zookeeper集群中的已經(jīng)存儲的每個數(shù)據(jù)分區(qū)地偏移量,根據(jù)該偏移量繼續(xù)讀取數(shù)據(jù)。或者從頭(當(dāng)前)位置讀取數(shù)據(jù)
調(diào)用kafkaStream.transform遍歷每個RDD,獲取該RDD對應(yīng)數(shù)據(jù)的偏移量
對RDD進(jìn)行操作,并將zookeeper中保存的數(shù)據(jù)偏移量進(jìn)行更新
package sparkStreamingAndKafkaimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}/** * 直連方式,用zookeeper存偏移量 */object KafkaDirection1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) val group = "group1" // 分組 val topic = "wordCount" // topic val brokerList = "slave1:9092,slave2:9092,slave3:9092" // broker集群,sparkStream的Task直接連到kafka分區(qū)上 val zkQuorum = "slave2:2181,slave3:2181,slave4:2181" // zookeeper集群,用于記錄偏移量(也可以選擇MySQL、Redis等記錄偏移量) val topics = Set(topic) // 創(chuàng)建stream時使用的topic名字集合,sparkStreaming可同時消費(fèi)多個topic val topicDirs = new ZKGroupTopicDirs(group, topic) // 創(chuàng)建一個ZKGroupTopicDirs對象,其實(shí)就是指定往zookeeper中寫入數(shù)據(jù)的目錄,該目錄用于保存偏移量 val zkTopicPath: String = topicDirs.consumerOffsetDir // 獲取zookeeper中的路徑"/group1/offsets/wordCount/" // 準(zhǔn)備kafka參數(shù) val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString // 偏移量最開始的時候從哪讀,SmallestTimeString表示從頭開始讀, // LargestTimeString表示從啟動時刻產(chǎn)生的數(shù)據(jù)讀 ) val zkClient = new ZkClient(zkQuorum) // zookeeper的客戶端,可以從zk中讀取偏移量數(shù)據(jù),并更新偏移量 val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 檢查該路徑下是否保存有數(shù)據(jù)(偏移量), // 例如:/group1/offsets/wordCount/2/1003 表示2號分區(qū)有偏移量1003 var kafkaStream: InputDStream[(String, String)] = null // 如果zookeeper中保存有偏移量offfset,則利用這個偏移量作為kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() if (numOfzkChildren > 0){ // 如果保存過offset for (i 1003 fromOffsets += (tp -> fromOffset.toLong) // 將topic不同分區(qū)所對應(yīng)的偏移量放入集合中 } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的消息進(jìn)行 transform,最終 kafka 的數(shù)據(jù)都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // 讀數(shù)據(jù)的規(guī)則 //通過KafkaUtils創(chuàng)建直連的DStream(fromOffsets參數(shù)的作用是:按照前面計算好了的偏移量繼續(xù)消費(fèi)數(shù)據(jù)) // 泛型參數(shù)說明: //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 處理完成后Dstream中的數(shù)據(jù)類型 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 沒有保存過offset,相當(dāng)于從頭讀 //如果未保存,根據(jù) kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范圍 var offsetRanges = Array[OffsetRange]() //從kafka讀取的消息,DStream的Transform方法可以將當(dāng)前批次的RDD獲取出來 //該transform方法計算獲取到當(dāng)前批次RDD,然后將RDD的偏移量取出來,然后在將RDD返回到DStream val transform: DStream[(String, String)] = kafkaStream.transform { rdd => //得到該 RDD對應(yīng) kafka 的消息的 offset //該RDD是一個KafkaRDD,可以獲得它的偏移量的范圍 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 偏移量范圍 rdd // 不對RDD進(jìn)行操作,再放回DStream } // DStream 是RDD的工廠,每隔一段時間產(chǎn)生一個RDD val messages: DStream[String] = transform.map(_._2) //依次迭代DStream中的RDD messages.foreachRDD { rdd => // foreachRDD,每隔一段時間產(chǎn)生一個RDD rdd.foreachPartition(partition => // foreachPartition 每個分區(qū)一個連接鏈接 partition.foreach(x => { // foreach 分區(qū)中的每條數(shù)據(jù) println(x) }) ) // 更新偏移量offset for (o但是,在這個方案中,為了獲取偏移量需要遍歷RDD,后續(xù)又要遍歷RDD操作RDD,代碼冗余
3、直連方式(獲取數(shù)據(jù)偏移量的同時處理數(shù)據(jù))
package xxximport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}/** * 直連方式,用zookeeper存偏移量(獲取偏移量的同時,對數(shù)據(jù)進(jìn)行操作) */object kafkaDirection2 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) val group = "group3" // 分組 val topic = "wordCount" // topic val brokerList = "slave1:9092,slave2:9092,slave3:9092" // broker集群,sparkStream的Task直接連到kafka分區(qū)上 val zkQuorum = "slave2:2181,slave3:2181,slave4:2181" // zookeeper集群,用于記錄偏移量(也可以選擇MySQL、Redis等記錄偏移量) val topics = Set(topic) // 創(chuàng)建stream時使用的topic名字集合,sparkStreaming可同時消費(fèi)多個topic val topicDirs = new ZKGroupTopicDirs(group, topic) // 創(chuàng)建一個ZKGroupTopicDirs對象,其實(shí)就是指定往zookeeper中寫入數(shù)據(jù)的目錄,該目錄用于保存偏移量 val zkTopicPath: String = topicDirs.consumerOffsetDir // 獲取zookeeper中的路徑"/group1/offsets/wordCount/" // 準(zhǔn)備kafka參數(shù) val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString // 偏移量最開始的時候從哪讀,SmallestTimeString表示從頭開始讀, // LargestTimeString表示從啟動時刻產(chǎn)生的數(shù)據(jù)讀 ) val zkClient = new ZkClient(zkQuorum) // zookeeper的客戶端,可以從zk中讀取偏移量數(shù)據(jù),并更新偏移量 val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 檢查該路徑下是否保存有數(shù)據(jù)(偏移量), // 例如:/group1/offsets/wordCount/2/1003 表示2號分區(qū)有偏移量1003 var kafkaStream: InputDStream[(String, String)] = null // 如果zookeeper中保存有偏移量offfset,則利用這個偏移量作為kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() if (numOfzkChildren > 0){ // 如果保存過offset for (i 1003 fromOffsets += (tp -> fromOffset.toLong) // 將topic不同分區(qū)所對應(yīng)的偏移量放入集合中 } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的消息進(jìn)行 transform,最終 kafka 的數(shù)據(jù)都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // 讀數(shù)據(jù)的規(guī)則 //通過KafkaUtils創(chuàng)建直連的DStream(fromOffsets參數(shù)的作用是:按照前面計算好了的偏移量繼續(xù)消費(fèi)數(shù)據(jù)) // 泛型參數(shù)說明: //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 處理完成后Dstream中的數(shù)據(jù)類型 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 沒有保存過offset,相當(dāng)于從頭讀 //如果未保存,根據(jù) kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范圍 var offsetRanges = Array[OffsetRange]() // 獲取偏移量的同時處理數(shù)據(jù) // 直連方式只有在kakaDstream中的RDD才能獲取偏移量,那么就不能調(diào)用DStream的Transformation // 所以只能在KafkaStream中調(diào)用foreachRDD,獲取RDD的偏移量,然后就是對RDD進(jìn)行操作了 //依次迭代DStream中的RDD // 如果使用直連方式進(jìn)行累加數(shù)據(jù),就需要在外部的數(shù)據(jù)庫中進(jìn)行累加(用kay-value的內(nèi)存數(shù)據(jù)庫,NoSQL型數(shù)據(jù)庫 Redis) kafkaStream.foreachRDD { kafkaRDD =>{ // 只有kafkaRDD可以強(qiáng)轉(zhuǎn)成HashOffSetRanges,并獲取偏移量 val offsetRanges: Array[OffsetRange] = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] = kafkaRDD.map(_._2) val words: RDD[String] = lines.flatMap(u => { u.split(" ") }) val wordsAndOne: RDD[(String, Int)] = words.map(word => { (word, 1) }) val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey((a, b) => { a + b }) reduced.foreach(println) // 更新偏移量offset for (o但是該方案,無法獲取歷史數(shù)據(jù)。這里統(tǒng)計到的wordcount只是某一時間片內(nèi)對應(yīng)數(shù)據(jù)的統(tǒng)計結(jié)果,并不包含歷史數(shù)據(jù)。
4、直連方式,zookeeper存儲偏移量數(shù)據(jù),redis存儲歷史數(shù)據(jù)。
redis的連接池:
package xxximport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}object JedisConnectePool { val config = new JedisPoolConfig() //最大連接數(shù), config.setMaxTotal(20) //最大空閑連接數(shù), config.setMaxIdle(10) //當(dāng)調(diào)用borrow Object方法時,是否進(jìn)行有效性檢查 --> config.setTestOnBorrow(true) //10000代表超時時間(10秒) val pool = new JedisPool(config, "192.168.247.8", 6379, 10000, "123") def getConnection():Jedis={ pool.getResource }}package xxximport jedis.JedisConnectionPoolimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import redis.clients.jedis.Jedis/** * 直連方式,在獲取RDD偏移量的同時操作偏移量,并且能夠wordcount統(tǒng)計時包含歷史統(tǒng)計數(shù)據(jù) */object kafkaDirection3 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) val group = "group2" // 分組 val topic = "wordCount" // topic val brokerList = "slave1:9092,slave2:9092,slave3:9092" // broker集群,sparkStream的Task直接連到kafka分區(qū)上 val zkQuorum = "slave2:2181,slave3:2181,slave4:2181" // zookeeper集群,用于記錄偏移量(也可以選擇MySQL、Redis等記錄偏移量) val topics = Set(topic) // 創(chuàng)建stream時使用的topic名字集合,sparkStreaming可同時消費(fèi)多個topic val topicDirs = new ZKGroupTopicDirs(group, topic) // 創(chuàng)建一個ZKGroupTopicDirs對象,其實(shí)就是指定往zookeeper中寫入數(shù)據(jù)的目錄,該目錄用于保存偏移量 val zkTopicPath: String = topicDirs.consumerOffsetDir // 獲取zookeeper中的路徑"/group1/offsets/wordCount/" // 準(zhǔn)備kafka參數(shù) val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString // 偏移量最開始的時候從哪讀,SmallestTimeString表示從頭開始讀, // LargestTimeString表示從啟動時刻產(chǎn)生的數(shù)據(jù)讀 ) val zkClient = new ZkClient(zkQuorum) // zookeeper的客戶端,可以從zk中讀取偏移量數(shù)據(jù),并更新偏移量 val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 檢查該路徑下是否保存有數(shù)據(jù)(偏移量), // 例如:/group1/offsets/wordCount/2/1003 表示2號分區(qū)有偏移量1003 var kafkaStream: InputDStream[(String, String)] = null // 如果zookeeper中保存有偏移量offfset,則利用這個偏移量作為kafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() if (numOfzkChildren > 0){ // 如果保存過offset for (i 1003 fromOffsets += (tp -> fromOffset.toLong) // 將topic不同分區(qū)所對應(yīng)的偏移量放入集合中 } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的消息進(jìn)行 transform,最終 kafka 的數(shù)據(jù)都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // 讀數(shù)據(jù)的規(guī)則 //通過KafkaUtils創(chuàng)建直連的DStream(fromOffsets參數(shù)的作用是:按照前面計算好了的偏移量繼續(xù)消費(fèi)數(shù)據(jù)) // 泛型參數(shù)說明: //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 處理完成后Dstream中的數(shù)據(jù)類型 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ // 沒有保存過offset,相當(dāng)于從頭讀 //如果未保存,根據(jù) kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset //[String, String, StringDecoder, StringDecoder] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的范圍 var offsetRanges = Array[OffsetRange]() // 直連方式只有在kakaDstream中的RDD才能獲取偏移量,那么就不能調(diào)用DStream的Transformation // 所以只能在KafkaStream中調(diào)用foreachRDD,獲取RDD的偏移量,然后就是對RDD進(jìn)行操作了 //依次迭代DStream中的RDD // 如果使用直連方式進(jìn)行累加數(shù)據(jù),就需要在外部的數(shù)據(jù)庫中進(jìn)行累加(用kay-value的內(nèi)存數(shù)據(jù)庫,NoSQL型數(shù)據(jù)庫 Redis) kafkaStream.foreachRDD { kafkaRDD =>{ // 只有kafkaRDD可以強(qiáng)轉(zhuǎn)成HashOffSetRanges,并獲取偏移量 val offsetRanges: Array[OffsetRange] = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] = kafkaRDD.map(_._2) val words: RDD[String] = lines.flatMap(u => { u.split(" ") }) val wordsAndOne: RDD[(String, Int)] = words.map(word => { (word, 1) }) val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey((a, b) => { a + b }) val stated: RDD[(String, Int)] = reduced.map(u => { // 獲取redis存放的歷史統(tǒng)計數(shù)據(jù) val conn: Jedis = JedisConnectionPool.getConnection() val str: String = conn.get(u._1) var num = 0 if(str != null){ num = str.toInt } val value: Int = u._2 val value1: Int = num+value // 更新redis中的統(tǒng)計數(shù)據(jù) conn.set(u._1, value1.toString) conn.close() (u._1, value1) }) stated.foreach(println) // 更新偏移量offset for (o 創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的spark消费kafka产生数据堆积怎么处理_SparkStreaming读取Kafka的两种方式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql 插入优化_MySQL批量SQ
- 下一篇: 使用frp工具实现内网穿透以及配置多个s