日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

發(fā)布時間:2024/9/27 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1、創(chuàng)建Maven項目

創(chuàng)建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374

2、啟動Kafka

A:安裝kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874
B:創(chuàng)建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874

3、編寫Pom文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version> </properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>${spark.version}</version></dependency> </dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.FlumeStreamingWordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins> </build></project>

4.編寫代碼

package cn.toto.sparkimport cn.toto.spark.streams.LoggerLevels import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.* 從kafka中讀數(shù)據(jù),并且進(jìn)行單詞數(shù)量的計算*/ object KafkaWordCount {/*** String :單詞* Seq[Int] :單詞在當(dāng)前批次出現(xiàn)的次數(shù)* Option[Int] :歷史結(jié)果*/val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }}def main(args: Array[String]): Unit = {LoggerLevels.setStreamingLogLevels()//這里的args從IDEA中傳入,在Program arguments中填寫如下內(nèi)容://參數(shù)用一個數(shù)組來接收://zkQuorum :zookeeper集群的//group :組//topic :kafka的組//numThreads :線程數(shù)量//hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1 要注意的是要創(chuàng)建line這個topicval Array(zkQuorum, group, topics, numThreads) = argsval sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(5))ssc.checkpoint("E:\\wordcount\\outcheckpoint")//"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"//"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//保存到內(nèi)存和磁盤,并且進(jìn)行序列化val data: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)//從kafka中寫數(shù)據(jù)其實也是(key,value)形式的,這里的_._2就是valueval words = data.map(_._2).flatMap(_.split(" "))val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism), true)wordCounts.print()ssc.start()ssc.awaitTermination()} }

5.配置IDEA中運行的參數(shù):


配置說明:

hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1 hadoop11:2181,hadoop12:2181,hadoop13:2181 :zookeeper集群地址 g1 :組 wordcount :kafkatopic 1 :線程數(shù)為1

6、創(chuàng)建kafka,并在kafka中傳遞參數(shù)

啟動kafka

[root@hadoop1 kafka]# pwd /home/tuzq/software/kafka/servers/kafka [root@hadoop1 kafka]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

創(chuàng)建topic

[root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 --partitions 1 --topic wordcount Created topic "wordcount".

查看主題

bin/kafka-topics.sh --list --zookeeper hadoop11:2181

啟動一個生產(chǎn)者發(fā)送消息(我的kafka在hadoop1,hadoop2,hadoop3這幾臺機(jī)器上)

[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordcount No safe wading in an unknown water Anger begins with folly,and ends in repentance No safe wading in an unknown water Anger begins with folly,and ends in repentance Anger begins with folly,and ends in repentance

使用spark-submit來運行程序

#啟動spark-streaming應(yīng)用程序 bin/spark-submit --class cn.toto.spark.KafkaWordCount /root/streaming-1.0.jar hadoop11:2181 group1 wordcount 1

7、查看運行結(jié)果


8、再如統(tǒng)計URL出現(xiàn)的次數(shù)

package cn.toto.sparkimport org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/14.*/ object UrlCount {val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}}def main(args: Array[String]) {//接收命令行中的參數(shù)val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args//創(chuàng)建SparkConf并設(shè)置AppNameval conf = new SparkConf().setAppName("UrlCount")//創(chuàng)建StreamingContextval ssc = new StreamingContext(conf, Seconds(2))//設(shè)置檢查點ssc.checkpoint(hdfs)//設(shè)置topic信息val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//重Kafka中拉取數(shù)據(jù)創(chuàng)建DStreamval lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)//切分?jǐn)?shù)據(jù),截取用戶點擊的urlval urls = lines.map(x=>(x.split(" ")(6), 1))//統(tǒng)計URL點擊量val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)//將結(jié)果打印到控制臺result.print()ssc.start()ssc.awaitTermination()} }

總結(jié)

以上是生活随笔為你收集整理的Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。