SparkStreaming -Kafka数据源
生活随笔
收集整理的這篇文章主要介紹了
SparkStreaming -Kafka数据源
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
SparkStreaming處理kafka作為數(shù)據(jù)源
所以我們要創(chuàng)建的是kafka的Dstream,那么就要使用到KafkaUtils下的createStream,先來看一下ctrl點進去查看,然后來寫參數(shù)
package date_10_16_SparkStreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils object kafkaSource {def main(args: Array[String]): Unit = {//使用SparkStreaming完成wordcount//配置對象val conf = new SparkConf().setMaster("local[*]").setAppName("wordcount")//實時數(shù)據(jù)分析的環(huán)境對象//StreamingContext需要兩個參數(shù),一個conf,一個是采集周期val streamingContext = new StreamingContext(conf,Seconds(5))//從kafka采集數(shù)據(jù)val kafkaStream = KafkaUtils.createStream(streamingContext,"chun1:2181","chun",Map("chun"->3))//將采集的數(shù)據(jù)進行分解(扁平化)val wordToSumDstream = kafkaStream.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_)wordToSumDstream.print()//這里不能停止采集功能,也就是streamingContext不能結束//可以簡單理解為啟動采集器streamingContext.start()//Driver等待采集器,采集器不挺Driver不停止streamingContext.awaitTermination()} }開啟kafka,輸入數(shù)據(jù)
kafka-console-producer.sh --broker-list chun1:2181 --topic chun a a a a a a a a a a a a a a aidea里查看結果
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結
以上是生活随笔為你收集整理的SparkStreaming -Kafka数据源的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 哈弗酷狗内饰官图发布:坦克300同款档杆
- 下一篇: flume学习-含安装