Spark Streaming整合flume实战
生活随笔
收集整理的這篇文章主要介紹了
Spark Streaming整合flume实战
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Spark Streaming對接Flume有兩種方式
- Poll:Spark Streaming從flume 中拉取數據
- Push:Flume將消息Push推給Spark Streaming
1、安裝flume1.6以上
2、下載依賴包
spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目錄下
3、生成數據
服務器上的 /root/data目錄下準備數據文件data.txt
vi data.txthadoop spark hive spark hadoop sqoop flume redis flume hadoop solr kafka solr hadoop4、配置采集方案
vi flume-poll.confa1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname=hdp-node-01 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 20005、添加依賴
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>2.0.2</version> </dependency>6、代碼實現
package cn.cheng.spark import java.net.InetSocketAddress import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume 拉模式Poll*/ object SparkStreaming_Flume_Poll {//newValues 表示當前批次匯總成的(word,1)中相同單詞的所有的1//runningCount 歷史的所有相同key的value總和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf參數val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")//構建sparkContext對象val sc: SparkContext = new SparkContext(sparkConf)//構建StreamingContext對象,每個批處理的時間間隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//設置checkpointscc.checkpoint("./")//設置flume的地址,可以設置多臺val address=Seq(new InetSocketAddress("192.168.200.160",8888))// 從flume中拉取數據val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)//獲取flume中數據,數據存在event的body中,轉化為Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//實現單詞匯總val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}}7、啟動flume
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console8、啟動spark-streaming應用程序
9、查看結果
?
flume將消息Push推給Spark Streaming
1、配置采集方案
vi flume-push.conf#push mode a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname=172.16.43.63 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000注意配置文件中指明的hostname和port是spark應用程序所在服務器的ip地址和端口。
2、代碼實現
package cn.cheng.sparkimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume 推模式Push*/ object SparkStreaming_Flume_Push {//newValues 表示當前批次匯總成的(word,1)中相同單詞的所有的1//runningCount 歷史的所有相同key的value總和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf參數val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")//構建sparkContext對象val sc: SparkContext = new SparkContext(sparkConf)//構建StreamingContext對象,每個批處理的時間間隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//設置日志輸出級別sc.setLogLevel("WARN")//設置檢查點目錄scc.checkpoint("./")//flume推數據過來// 當前應用程序部署的服務器ip地址,跟flume配置文件保持一致val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(scc,"172.16.43.63",8888,StorageLevel.MEMORY_AND_DISK)//獲取flume中數據,數據存在event的body中,轉化為Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//實現單詞匯總val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}} }3、啟動spark-streaming應用程序
4、生成數據
cp data.txt data2.txt5、啟動flume
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-push.conf -Dflume.root.logger=INFO,console6、查看結果
?
總結
以上是生活随笔為你收集整理的Spark Streaming整合flume实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LDA 线性判别分析模型
- 下一篇: Spark SQL程序操作HiveCon