日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark Streaming整合flume实战

發布時間:2024/1/17 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Streaming整合flume实战 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark Streaming對接Flume有兩種方式

  • Poll:Spark Streaming從flume 中拉取數據
  • Push:Flume將消息Push推給Spark Streaming

1、安裝flume1.6以上

2、下載依賴包

spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目錄下

3、生成數據

服務器上的 /root/data目錄下準備數據文件data.txt

vi data.txthadoop spark hive spark hadoop sqoop flume redis flume hadoop solr kafka solr hadoop

4、配置采集方案

vi flume-poll.confa1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname=hdp-node-01 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000

5、添加依賴

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>2.0.2</version> </dependency>

6、代碼實現

package cn.cheng.spark import java.net.InetSocketAddress import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume 拉模式Poll*/ object SparkStreaming_Flume_Poll {//newValues 表示當前批次匯總成的(word,1)中相同單詞的所有的1//runningCount 歷史的所有相同key的value總和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf參數val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")//構建sparkContext對象val sc: SparkContext = new SparkContext(sparkConf)//構建StreamingContext對象,每個批處理的時間間隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//設置checkpointscc.checkpoint("./")//設置flume的地址,可以設置多臺val address=Seq(new InetSocketAddress("192.168.200.160",8888))// 從flume中拉取數據val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)//獲取flume中數據,數據存在event的body中,轉化為Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//實現單詞匯總val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}}

7、啟動flume

flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console

8、啟動spark-streaming應用程序

9、查看結果

?

flume將消息Push推給Spark Streaming

1、配置采集方案

vi flume-push.conf#push mode a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname=172.16.43.63 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000

注意配置文件中指明的hostname和port是spark應用程序所在服務器的ip地址和端口。

2、代碼實現

package cn.cheng.sparkimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume 推模式Push*/ object SparkStreaming_Flume_Push {//newValues 表示當前批次匯總成的(word,1)中相同單詞的所有的1//runningCount 歷史的所有相同key的value總和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf參數val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")//構建sparkContext對象val sc: SparkContext = new SparkContext(sparkConf)//構建StreamingContext對象,每個批處理的時間間隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//設置日志輸出級別sc.setLogLevel("WARN")//設置檢查點目錄scc.checkpoint("./")//flume推數據過來// 當前應用程序部署的服務器ip地址,跟flume配置文件保持一致val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(scc,"172.16.43.63",8888,StorageLevel.MEMORY_AND_DISK)//獲取flume中數據,數據存在event的body中,轉化為Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//實現單詞匯總val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}} }

3、啟動spark-streaming應用程序

4、生成數據

cp data.txt data2.txt

5、啟動flume

flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-push.conf -Dflume.root.logger=INFO,console

6、查看結果

?

總結

以上是生活随笔為你收集整理的Spark Streaming整合flume实战的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 欧美精品99久久 | 69福利社区 | 华人永久免费视频 | 天堂岛av| h视频免费在线观看 | 涩涩涩在线视频 | 看全黄大色黄大片美女人 | 综合图区亚洲 | av官网| 久久艹在线观看 | 免费av网站在线看 | 91嫩草精品 | 欧洲日韩一区二区三区 | 欧美久久一级 | 久久久久久久人妻无码中文字幕爆 | 六月天综合网 | 国内av自拍 | 精品国产一二 | 色哒哒影院 | 成人精品久久 | 久久亚洲成人av | 亚洲麻豆av| 亚洲午夜精品久久久久久app | 一级免费a | 亚洲自偷自偷偷色无码中文 | 成年在线观看视频 | 在线成年人视频 | 亚洲精品99久久久久中文字幕 | 中文写幕一区二区三区免费观成熟 | 日韩av免费| 久久精品国内 | 免费的理伦片在线播放 | 亚洲先锋影音 | 亚洲av日韩av在线观看 | 日韩视频免费在线 | 嫩草午夜少妇在线影视 | 久草视频在线资源站 | 日本精品一区二区三区视频 | 国产日韩激情 | 激情福利网 | 国产视频最新 | 欧美美女黄色 | 在线免费观看视频网站 | 一区二区视频免费看 | 成人午夜视频免费 | 国产精品jizz在线观看老狼 | 亚洲最新网址 | 双性尿奴穿贞c带憋尿 | 欧美综合视频 | 欧美视频不卡 | 99成人精品 | 极品少妇xxxx精品少妇偷拍 | 久久国产视频网 | 亚洲天堂第一 | 黄色一级片网站 | 北京少妇xxxx做受 | 精品人妻一区二区三区三区四区 | 一卡二卡三卡四卡 | 毛片在哪看 | 啪啪网页| 超碰人人干人人 | 国产精品久久久久久久久免费软件 | 国产一区二区网 | 欧美人与动物xxxx | 人人爱国产 | 国产精品久久国产愉拍 | 欧美日本一道本 | xxxx日本黄色 | 亚洲天堂一级 | 一级特黄aa | 国产亚洲精品久久久久久无几年桃 | 成人中文在线 | 无码国产精品久久一区免费 | 国产二区三区 | 夜操操 | 91美女片黄在线观看 | av免费在线不卡 | 亚洲av无码国产精品永久一区 | 我色综合 | 无码粉嫩虎白一线天在线观看 | 99这里只有精品 | 久久精品1 | 啪啪网站免费观看 | av香港经典三级级 在线 | 2021av视频 | 青青草原综合网 | 在线日韩三级 | av在线播放一区二区三区 | 国产成人专区 | 九色网站在线观看 | 天天操天天操天天操天天操 | 孕期1ⅴ1高h | 黄色小说视频网站 | 国产精品资源在线 | 91精品人妻一区二区三区四区 | 国产成人在线视频观看 | 日本在线精品视频 | 欧美黄色a视频 | 黄色www|