生活随笔
收集整理的這篇文章主要介紹了
SparkStreaming - 自定义数据源(自定义采集器)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
// 聲明采集器
// 1)繼承Receiver
// 2) 重寫方法 onStart,onStop
package date_10_16_SparkStreaming
import java
.io
.{BufferedReader
, InputStreamReader
}
import java
.net
.Socket
import org
.apache
.spark
.SparkConf
import org
.apache
.spark
.storage
.StorageLevel
import org
.apache
.spark
.streaming
.{Seconds
, StreamingContext
}
import org
.apache
.spark
.streaming
.receiver
.Receiverobject MyReceiver
{def
main(args
: Array
[String
]): Unit
= {val conf
= new SparkConf().setMaster("local[*]").setAppName("wordcount")val streamingContext
= new StreamingContext(conf
,Seconds(5))val socketLineDstream
= streamingContext
.receiverStream(new MyReceiver1("chun1",9999))val wordToSumDstream
= socketLineDstream
.flatMap(_
.split(" ")).map((_
,1)).reduceByKey(_
+_
)wordToSumDstream
.print()streamingContext
.start()streamingContext
.awaitTermination()}}
class MyReceiver1(host
:String
,port
:Int
) extends Receiver[String
](StorageLevel
.MEMORY_ONLY
){var socket
: Socket
= nulldef
receive(): Unit
= {socket
= new Socket(host
,port
)val reader
= new BufferedReader(new InputStreamReader(socket
.getInputStream
,"UTF-8"))var line
: String
= null
while ((line
= reader
.readLine()) != null
){if ("END".equals(line
)){return}else{this.store(line
)}}}override def
onStart(): Unit
= {new Thread(new Runnable {override def
run(): Unit
= {receive()}}).start()}override def
onStop(): Unit
= {if (socket
!=null
){socket
.close()socket
= null
}}
}
總結
以上是生活随笔為你收集整理的SparkStreaming - 自定义数据源(自定义采集器)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。