日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果

發(fā)布時(shí)間:2024/9/27 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1、安裝flume
2、到Spark-Streaming官網(wǎng)下載poll方式的Sink
3、將sink放入到flume的lib包里面
4、先啟動(dòng)flume(多個(gè)),然后在啟動(dòng)Streaming程序

下載spark-flume
http://spark.apache.org/documentation.html
到Spark-1.6.2中
http://spark.apache.org/docs/1.6.2/,

搜一下flume

最后在安裝的flume中加入:commons-lang3-3.3.2.jar、scala-library-2.10.5.jar、spark-streaming-flume-sink_2.10-1.6.1.jar,效果如右側(cè):

同步到集群中的其它的flume中:

[root@hadoop1 lib]# pwd /home/tuzq/software/apache-flume-1.6.0-bin/lib [root@hadoop1 lib]# scp -r * root@hadoop2:$PWD [root@hadoop1 lib]# scp -r * root@hadoop3:$PWD [root@hadoop1 lib]# scp -r * root@hadoop4:$PWD [root@hadoop1 lib]# scp -r * root@hadoop5:$PWD

編寫flume的配置文件:

[root@hadoop1 agentconf]# pwd /home/tuzq/software/apache-flume-1.6.0-bin/agentconf [root@hadoop1 agentconf]# vim flume-poll.conf

其中flume-poll.conf的內(nèi)容如下:

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/tuzq/software/flumedata a1.sources.r1.fileHeader = true# Describe the sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink #表示從這里拉數(shù)據(jù) a1.sinks.k1.hostname = hadoop1 a1.sinks.k1.port = 8888# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

啟動(dòng)flume.

[root@hadoop1 apache-flume-1.6.0-bin]# cd /home/tuzq/software/apache-flume-1.6.0-bin [root@hadoop1 apache-flume-1.6.0-bin]# bin/flume-ng agent -n a1 -c agentconf/ -f agentconf/flume-poll.conf -Dflume.root.logger=WARN,console

啟動(dòng)后的效果如下:

這樣,一直啟動(dòng)Flume

然后編寫從Flume中讀取數(shù)據(jù)的程序。
pom文件的內(nèi)容如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>${spark.version}</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.FlumeStreamingWordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

編寫代碼:

package cn.toto.sparkimport java.net.InetSocketAddressimport org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.*/ object FlumeStreamingWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FlumeStreamingWordCount").setMaster("local[2]")//創(chuàng)建StreamingContext并設(shè)置產(chǎn)生批次的間隔時(shí)間val ssc = new StreamingContext(conf,Seconds(15))//從Socket端口中創(chuàng)建RDD,這里的SocketAddress可以傳遞多個(gè)val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =FlumeUtils.createPollingStream(ssc, Array(new InetSocketAddress("hadoop1", 8888)),StorageLevel.MEMORY_AND_DISK)//去取Flume中的數(shù)據(jù)val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" "))val wordAndOne : DStream[(String,Int)] = words.map((_,1))val result : DStream[(String,Int)] = wordAndOne.reduceByKey(_+_)//打印result.print()//開啟程序ssc.start()//等待結(jié)束ssc.awaitTermination()} }

啟動(dòng)程序。然后往Flume監(jiān)控的flumedata目錄下放入文件,如:

其中1.txt的內(nèi)容如下:

最后在IDEA的控制臺(tái)中觀察結(jié)果:

總結(jié)

以上是生活随笔為你收集整理的Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 精品国产aⅴ一区二区三区四川人 | 欧美福利视频在线 | 老妇女性较大毛片 | 午夜a视频 | 天堂素人 | 国语对白永久免费 | 97视频在线观看免费高清完整版在线观看 | 久久男女视频 | 成人爱爱网站 | 久久国产乱 | 欧美调教视频 | 最新国产精品自拍 | 超碰在线观看97 | 日本黄网站在线观看 | 天天做天天爱天天爽综合网 | 国产最新在线 | av香蕉| 久久久视频在线 | 久久精品国产一区二区 | 国产中文字幕网 | 久久天天操 | 韩国主播青草55部完整 | 亚洲性久久| 亚洲精品国产乱伦 | 欧美日韩中文字幕一区 | 亚洲免费成人在线 | 欧美日韩一区精品 | 亚洲国产无码精品 | 97香蕉超级碰碰久久免费软件 | 五月婷婷激情综合 | 亚洲最大视频网站 | 一区二区内射 | 狠狠干2019| 黄色av大片 | 一区二区久久精品66国产精品 | 日本伦理片在线播放 | 亚洲精品久久久中文字幕 | 伊人精品在线视频 | 国产欧美日韩久久 | 麻豆精品网站 | 中文在线观看av | 亚洲天天操 | 99久久综合网 | 亚洲av无码一区二区三区在线 | 粗大的内捧猛烈进出 | 天天躁狠狠躁狠狠躁夜夜躁68 | 97人妻精品一区二区三区软件 | 性欧美一区二区三区 | cao国产| 成人做爰9片免费视频 | 免费高清成人 | 亚洲最大综合网 | 亚洲色鬼| 欧美性精品 | 欧美日韩一区二区三区不卡 | 怡红院成人影院 | 国产精品美女久久久久av爽 | av资源免费看 | 一区二区三区免费在线 | 亚洲不卡免费视频 | 亚洲在线免费观看视频 | 超碰人人草人人干 | 日韩av一区在线 | 国产欧美日韩综合精品一区二区 | 麻豆av在线看 | 在线播放黄色av | 男人天堂你懂的 | 日日弄天天弄美女bbbb | 在线看黄网址 | 亚洲天堂男 | 国产片黄色 | 精品h | av国产成人 | 亚洲综合免费观看高清完整版 | 亚洲天堂免费在线 | 我爱我色成人网 | 国产成人91精品 | 9色91| 国产精品人人爽人人爽 | 国产网红在线观看 | 久久精品人人做人人爽 | 亚洲午夜久久久久 | 欧美日韩不卡合集视频 | 亚洲AV无码精品黑人黑人 | 伊人网综合网 | 欧美精品第1页 | 最新国产黄色网址 | 免费在线看黄网站 | 水蜜桃影库 | 99久久久无码国产精品免费麻豆 | 理论片大全免费理伦片 | 国产精品久久久一区 | 国产日韩欧美精品一区 | 综合亚洲网 | 黑人性高潮 | 99国产精品国产免费观看 | 亚洲国产成人无码av在线 | 丹丹的呻吟声1一7 | 无码人妻精品一区二区 |