生活随笔
收集整理的這篇文章主要介紹了
spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
項(xiàng)目場(chǎng)景:
使用sparkStream接收kafka的數(shù)據(jù)進(jìn)行計(jì)算,并且打包上傳到linux進(jìn)行spark任務(wù)的submit
錯(cuò)誤集合:
1.錯(cuò)誤1:
Failed to
add file
: / usr
/ local
/ spark
- yarn
/ . /myapp
/ sparkDemo04
. jar to Spark environment
java
. io
. FileNotFoundException
: Jar D
: \usr\local\spark
- yarn\myapp\sparkDemo04
. jar not found
WARN ProcfsMetricsGetter
: Exception when trying to
compute pagesize
, as a result reporting of ProcessTree metrics is stopped
2.windows下ideal中在yarn模式下運(yùn)行代碼出錯(cuò),顯示如下報(bào)錯(cuò)
WARN CheckpointReader
: Error reading checkpoint from file hdfs
: / / hadoop102
: 9000 / checkpoint6
/ checkpoint
- 1637834226000
java
. io
. IOException
: java
. lang
. ClassCastException
: cannot assign instance of java
. lang
. invoke
. SerializedLambda to
field org
. apache
. spark
. streaming
. dstream
. MappedDStream
. mapFunc of
type scala
. Function1 in instance of org
. apache
. spark
. streaming
. dstream
. MappedDStream
3.報(bào)的一些kafka包notfound的問(wèn)題,這個(gè)下面就不討論了,只需要把對(duì)應(yīng)的包下載后放到spark目錄下的jars文件中即可,比如常見(jiàn)的
java
. lang
. NoClassDefFoundError
: org
/ apache
/ spark
/ kafka010
/ KafkaConfigUpdater
都可以通過(guò)添加包的方式解決,如果是spark shell里面出現(xiàn)這種錯(cuò)誤,則需要在輸入spark-shell命令時(shí),在后面添加 --jars 包路徑 最初的代碼:
import com. study. stream05_kafka. SparkKafka
. createSSC
import org. apache. kafka. clients. consumer. { ConsumerConfig
, ConsumerRecord
}
import org. apache. log4j. { Level
, Logger
}
import org. apache. spark. SparkConf
import org. apache. spark. rdd. RDD
import org. apache. spark. streaming. dstream. { DStream
, InputDStream
}
import org. apache. spark. streaming. kafka010. { ConsumerStrategies
, KafkaUtils
, LocationStrategies
}
import org. apache. spark. streaming. { Seconds
, StreamingContext
} import java. lang. System
. getProperty
import scala. collection. mutable. ListBuffer
object stream05_kafka
{ object SparkKafka
{ def createSSC
( ) : _root_
. org
. apache
. spark
. streaming
. StreamingContext
= { val sparkConf
= new SparkConf
( ) . setMaster
( "local[*]" ) . setAppName
( "kafka2" ) sparkConf
. set
( "spark.streaming.stopGracefullyOnShutdown" , "true" ) sparkConf
. set
( "spark.hadoop.fs.defaultFS" , "hdfs://hadoop102:9000" ) sparkConf
. set
( "spark.hadoop.yarn.resoursemanager.address" , "hadoop103:8088" ) val streamingContext
: StreamingContext
= new StreamingContext
( sparkConf
, Seconds
( 3 ) ) streamingContext
. checkpoint
( "hdfs://hadoop102:9000/checkpoint6" ) val kafkaPara
: Map
[ String , Object
] = Map
[ String , Object
] ( ConsumerConfig
. BOOTSTRAP_SERVERS_CONFIG
-> "hadoop102:9092,hadoop103:9092,hadoop104:9092" , ConsumerConfig
. GROUP_ID_CONFIG
-> "second" , "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" , "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) val kafkaDS
: InputDStream
[ ConsumerRecord
[ String , String ] ] = KafkaUtils
. createDirectStream
[ String , String ] ( streamingContext
, LocationStrategies
. PreferConsistent
, ConsumerStrategies
. Subscribe
[ String , String ] ( Set
( "sparkOnKafka" ) , kafkaPara
) ) val num
: DStream
[ String ] = kafkaDS
. map
( _
. value
( ) ) val result
= num
. map
( line
=> { val flows
= line
. split
( "," ) val up
= flows
( 1 ) . toInt
val down
= flows
( 2 ) . toInt
( flows
( 0 ) , ( up
, down
, up
+ down
) ) } ) . updateStateByKey
( ( queueValue
, buffValue
: Option
[ ( Int , Int , Int ) ] ) => { val cur
= buffValue
. getOrElse
( ( 0 , 0 , 0 ) ) var curUp
= cur
. _1
var curDown
= cur
. _2
for ( elem
<- queueValue
) { curUp
+= elem
. _1curDown
+= elem
. _2
} Option
( ( curUp
, curDown
, curUp
+ curDown
) ) } ) result
. print
( ) streamingContext
} } def main
( args
: Array
[ String ] ) : Unit = { println
( "**************" ) Logger
. getLogger
( "org.apache.spark" ) . setLevel
( Level
. WARN
) System
. getProperties
. setProperty
( "HADOOP_USER_NAME" , "hadoop" ) val streamingContext
= StreamingContext
. getActiveOrCreate
( "hdfs://hadoop102:9000/checkpoint6" , ( ) => createSSC
( ) ) streamingContext
. start
( ) streamingContext
. awaitTermination
( ) } }
原因分析:
首先,這里指出如果要打包到linux 下在yarn模式下進(jìn)行spark的submit,需要設(shè)置master為yarn,至于是yarn-client還是yarn-cluster需要提交任務(wù)時(shí)指定,默認(rèn)是client。我這里寫(xiě)成local,所以一開(kāi)始都是windows下可以正常連接kafka拿到數(shù)據(jù)進(jìn)行計(jì)算,但是linux下就不行了。歸根結(jié)底沒(méi)有連接yarn。 1.錯(cuò)誤1是因?yàn)閣indows下spark任務(wù)提交的時(shí)候,找不到你的jar包,試想一下spark的spark-submit命令,需要指定jar包以及class 2.這個(gè)是序列化問(wèn)題還是廣播變量不適合于檢查點(diǎn)的問(wèn)題,查資料發(fā)現(xiàn)廣播變量的內(nèi)容寫(xiě)入hdfs后就難以恢復(fù)了,這里可以把錯(cuò)誤定位到StreamingContext.getActiveOrCreate里面,這里有時(shí)候可以正常進(jìn)行數(shù)據(jù)恢復(fù),但是有時(shí)候就會(huì)報(bào)錯(cuò)。解決方法還沒(méi)找到,我就直接換檢查點(diǎn)路徑了,一般生產(chǎn)環(huán)境下也只有代碼升級(jí)的情況下會(huì)關(guān)閉流計(jì)算,這里就沒(méi)有深究,希望大神可以解答一下。猜測(cè)是讀取檢查點(diǎn)數(shù)據(jù)的時(shí)候序列化出了問(wèn)題
解決方案:
錯(cuò)誤1的解決:所以如果要在windows下運(yùn)行,需要先使用mvn package或者build artifacts對(duì)程序進(jìn)行打包,然后對(duì)sparkConf.setJars指定包的路徑,這樣在windows下就可以正常運(yùn)行了 錯(cuò)誤2的解決:這里我就換檢查點(diǎn)了 最后貼一下我最終成功運(yùn)行的代碼
import com. study. stream05_kafka. SparkKafka
. createSSC
import org. apache. kafka. clients. consumer. { ConsumerConfig
, ConsumerRecord
}
import org. apache. log4j. { Level
, Logger
}
import org. apache. spark. SparkConf
import org. apache. spark. rdd. RDD
import org. apache. spark. streaming. dstream. { DStream
, InputDStream
}
import org. apache. spark. streaming. kafka010. { ConsumerStrategies
, KafkaUtils
, LocationStrategies
}
import org. apache. spark. streaming. { Seconds
, StreamingContext
} import java. lang. System
. getProperty
import scala. collection. mutable. ListBuffer
object stream05_kafka
{ object SparkKafka
{ def createSSC
( ) : _root_
. org
. apache
. spark
. streaming
. StreamingContext
= { val sparkConf
= new SparkConf
( ) . setMaster
( "yarn" ) . setAppName
( "kafka2" ) . set
( "spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) sparkConf
. set
( "spark.streaming.stopGracefullyOnShutdown" , "true" ) sparkConf
. set
( "spark.hadoop.fs.defaultFS" , "hdfs://hadoop102:9000" ) sparkConf
. set
( "spark.hadoop.yarn.resoursemanager.address" , "hadoop103:8088" ) val streamingContext
: StreamingContext
= new StreamingContext
( sparkConf
, Seconds
( 3 ) ) streamingContext
. checkpoint
( "hdfs://hadoop102:9000/checkpoint7" ) val kafkaPara
: Map
[ String , Object
] = Map
[ String , Object
] ( ConsumerConfig
. BOOTSTRAP_SERVERS_CONFIG
-> "hadoop102:9092,hadoop103:9092,hadoop104:9092" , ConsumerConfig
. GROUP_ID_CONFIG
-> "second" , "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" , "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) val kafkaDS
: InputDStream
[ ConsumerRecord
[ String , String ] ] = KafkaUtils
. createDirectStream
[ String , String ] ( streamingContext
, LocationStrategies
. PreferConsistent
, ConsumerStrategies
. Subscribe
[ String , String ] ( Set
( "sparkOnKafka" ) , kafkaPara
) ) val num
: DStream
[ String ] = kafkaDS
. map
( _
. value
( ) ) val result
= num
. map
( line
=> { val flows
= line
. split
( "," ) val up
= flows
( 1 ) . toInt
val down
= flows
( 2 ) . toInt
( flows
( 0 ) , ( up
, down
, up
+ down
) ) } ) . updateStateByKey
( ( queueValue
, buffValue
: Option
[ ( Int , Int , Int ) ] ) => { val cur
= buffValue
. getOrElse
( ( 0 , 0 , 0 ) ) var curUp
= cur
. _1
var curDown
= cur
. _2
for ( elem
<- queueValue
) { curUp
+= elem
. _1curDown
+= elem
. _2
} Option
( ( curUp
, curDown
, curUp
+ curDown
) ) } ) result
. print
( ) streamingContext
} } def main
( args
: Array
[ String ] ) : Unit = { println
( "**************" ) Logger
. getLogger
( "org.apache.spark" ) . setLevel
( Level
. WARN
) System
. getProperties
. setProperty
( "HADOOP_USER_NAME" , "hadoop" ) val streamingContext
= StreamingContext
. getActiveOrCreate
( "hdfs://hadoop102:9000/checkpoint7" , ( ) => createSSC
( ) )
streamingContext
. start
( ) streamingContext
. awaitTermination
( ) } }
另外,打包的時(shí)候不要添加setJars,否則還是會(huì)報(bào)錯(cuò),報(bào)的是什么已經(jīng)忘了,這篇博客也是在我解決問(wèn)題之后寫(xiě)的,沒(méi)有記錄太多報(bào)錯(cuò),如果我沒(méi)記錯(cuò)的話可能會(huì)報(bào)這種錯(cuò)誤
cannot assign instance of java
. lang
. invoke
. SerializedLambda to
field org
. apache
. spark
. rdd
. MapPartitionsRDD
. f of
type scala
. Function3 in instance of org
. apache
. spark
. rdd
. MapPartitionsRDD
困惑:
為了解決這個(gè)bug,也是在yarn日志和spark日志來(lái)回看,看了一天,最讓我頭疼的就是spark-submit使用control+z退出后,spark-submit進(jìn)行還會(huì)在后臺(tái)運(yùn)行,我都懷疑是不是我的kill -9 操作使檢查點(diǎn)損壞導(dǎo)致數(shù)據(jù)恢復(fù)失敗的,請(qǐng)問(wèn)各路大神怎么才能結(jié)束sparkSubmit進(jìn)程?
總結(jié)
以上是生活随笔 為你收集整理的spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客 的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔 網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔 推薦給好友。