日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客

發(fā)布時(shí)間:2024/1/18 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

項(xiàng)目場(chǎng)景:

使用sparkStream接收kafka的數(shù)據(jù)進(jìn)行計(jì)算,并且打包上傳到linux進(jìn)行spark任務(wù)的submit


錯(cuò)誤集合:

1.錯(cuò)誤1:

Failed to add file:/usr/local/spark-yarn/./myapp/sparkDemo04.jar to Spark environment java.io.FileNotFoundException: Jar D:\usr\local\spark-yarn\myapp\sparkDemo04.jar not found WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped

2.windows下ideal中在yarn模式下運(yùn)行代碼出錯(cuò),顯示如下報(bào)錯(cuò)

WARN CheckpointReader: Error reading checkpoint from file hdfs://hadoop102:9000/checkpoint6/checkpoint-1637834226000 java.io.IOException: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.streaming.dstream.MappedDStream.mapFunc of type scala.Function1 in instance of org.apache.spark.streaming.dstream.MappedDStream

3.報(bào)的一些kafka包notfound的問(wèn)題,這個(gè)下面就不討論了,只需要把對(duì)應(yīng)的包下載后放到spark目錄下的jars文件中即可,比如常見(jiàn)的

java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater

都可以通過(guò)添加包的方式解決,如果是spark shell里面出現(xiàn)這種錯(cuò)誤,則需要在輸入spark-shell命令時(shí),在后面添加 --jars 包路徑
最初的代碼:

import com.study.stream05_kafka.SparkKafka.createSSC import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext}import java.lang.System.getProperty import scala.collection.mutable.ListBufferobject stream05_kafka {object SparkKafka{def createSSC(): _root_.org.apache.spark.streaming.StreamingContext={// TODO 創(chuàng)建環(huán)境對(duì)象// StreamingContext創(chuàng)建時(shí),第一個(gè)參數(shù)表示環(huán)境配置,第二個(gè)是數(shù)據(jù)采集周期val sparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka2")sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")sparkConf.set("spark.hadoop.fs.defaultFS","hdfs://hadoop102:9000")sparkConf.set("spark.hadoop.yarn.resoursemanager.address","hadoop103:8088")val streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(3))streamingContext.checkpoint("hdfs://hadoop102:9000/checkpoint6")val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "second","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")// TODO 邏輯處理val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("sparkOnKafka"), kafkaPara))val num: DStream[String] = kafkaDS.map(_.value())val result = num.map(line=>{val flows = line.split(",")val up=flows(1).toIntval down=flows(2).toInt(flows(0),(up,down,up+down))}).updateStateByKey((queueValue, buffValue: Option[(Int,Int,Int)]) => {val cur=buffValue.getOrElse((0,0,0))var curUp=cur._1var curDown=cur._2for (elem <- queueValue) {curUp+=elem._1curDown+=elem._2}Option((curUp,curDown,curUp+curDown))})result.print()streamingContext}}def main(args: Array[String]): Unit = {println("**************")Logger.getLogger("org.apache.spark").setLevel(Level.WARN)System.getProperties.setProperty("HADOOP_USER_NAME", "hadoop")val streamingContext = StreamingContext.getActiveOrCreate("hdfs://hadoop102:9000/checkpoint6", ()=>createSSC())streamingContext.start()// 2.等待關(guān)閉streamingContext.awaitTermination()}}

原因分析:

首先,這里指出如果要打包到linux 下在yarn模式下進(jìn)行spark的submit,需要設(shè)置master為yarn,至于是yarn-client還是yarn-cluster需要提交任務(wù)時(shí)指定,默認(rèn)是client。我這里寫(xiě)成local,所以一開(kāi)始都是windows下可以正常連接kafka拿到數(shù)據(jù)進(jìn)行計(jì)算,但是linux下就不行了。歸根結(jié)底沒(méi)有連接yarn。
1.錯(cuò)誤1是因?yàn)閣indows下spark任務(wù)提交的時(shí)候,找不到你的jar包,試想一下spark的spark-submit命令,需要指定jar包以及class
2.這個(gè)是序列化問(wèn)題還是廣播變量不適合于檢查點(diǎn)的問(wèn)題,查資料發(fā)現(xiàn)廣播變量的內(nèi)容寫(xiě)入hdfs后就難以恢復(fù)了,這里可以把錯(cuò)誤定位到StreamingContext.getActiveOrCreate里面,這里有時(shí)候可以正常進(jìn)行數(shù)據(jù)恢復(fù),但是有時(shí)候就會(huì)報(bào)錯(cuò)。解決方法還沒(méi)找到,我就直接換檢查點(diǎn)路徑了,一般生產(chǎn)環(huán)境下也只有代碼升級(jí)的情況下會(huì)關(guān)閉流計(jì)算,這里就沒(méi)有深究,希望大神可以解答一下。猜測(cè)是讀取檢查點(diǎn)數(shù)據(jù)的時(shí)候序列化出了問(wèn)題

解決方案:

錯(cuò)誤1的解決:所以如果要在windows下運(yùn)行,需要先使用mvn package或者build artifacts對(duì)程序進(jìn)行打包,然后對(duì)sparkConf.setJars指定包的路徑,這樣在windows下就可以正常運(yùn)行了
錯(cuò)誤2的解決:這里我就換檢查點(diǎn)了
最后貼一下我最終成功運(yùn)行的代碼

import com.study.stream05_kafka.SparkKafka.createSSC import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext}import java.lang.System.getProperty import scala.collection.mutable.ListBufferobject stream05_kafka {object SparkKafka{def createSSC(): _root_.org.apache.spark.streaming.StreamingContext={// TODO 創(chuàng)建環(huán)境對(duì)象// StreamingContext創(chuàng)建時(shí),第一個(gè)參數(shù)表示環(huán)境配置,第二個(gè)是數(shù)據(jù)采集周期val sparkConf = new SparkConf().setMaster("yarn").setAppName("kafka2").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")sparkConf.set("spark.hadoop.fs.defaultFS","hdfs://hadoop102:9000")sparkConf.set("spark.hadoop.yarn.resoursemanager.address","hadoop103:8088")val streamingContext: StreamingContext = new StreamingContext(sparkConf, Seconds(3))streamingContext.checkpoint("hdfs://hadoop102:9000/checkpoint7")val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "second","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")// TODO 邏輯處理val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](streamingContext,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("sparkOnKafka"), kafkaPara))val num: DStream[String] = kafkaDS.map(_.value())val result = num.map(line=>{val flows = line.split(",")val up=flows(1).toIntval down=flows(2).toInt(flows(0),(up,down,up+down))}).updateStateByKey((queueValue, buffValue: Option[(Int,Int,Int)]) => {val cur=buffValue.getOrElse((0,0,0))var curUp=cur._1var curDown=cur._2for (elem <- queueValue) {curUp+=elem._1curDown+=elem._2}Option((curUp,curDown,curUp+curDown))})result.print()streamingContext}}def main(args: Array[String]): Unit = {println("**************")Logger.getLogger("org.apache.spark").setLevel(Level.WARN)System.getProperties.setProperty("HADOOP_USER_NAME", "hadoop")val streamingContext = StreamingContext.getActiveOrCreate("hdfs://hadoop102:9000/checkpoint7", ()=>createSSC()) // new Thread(new MonitorStop(streamingContext)).start()streamingContext.start()// 2.等待關(guān)閉streamingContext.awaitTermination()}}

另外,打包的時(shí)候不要添加setJars,否則還是會(huì)報(bào)錯(cuò),報(bào)的是什么已經(jīng)忘了,這篇博客也是在我解決問(wèn)題之后寫(xiě)的,沒(méi)有記錄太多報(bào)錯(cuò),如果我沒(méi)記錯(cuò)的話可能會(huì)報(bào)這種錯(cuò)誤

cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

困惑:

為了解決這個(gè)bug,也是在yarn日志和spark日志來(lái)回看,看了一天,最讓我頭疼的就是spark-submit使用control+z退出后,spark-submit進(jìn)行還會(huì)在后臺(tái)運(yùn)行,我都懷疑是不是我的kill -9 操作使檢查點(diǎn)損壞導(dǎo)致數(shù)據(jù)恢復(fù)失敗的,請(qǐng)問(wèn)各路大神怎么才能結(jié)束sparkSubmit進(jìn)程?

總結(jié)

以上是生活随笔為你收集整理的spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。