日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

如何将自定义数据源集成到Apache Spark中

發(fā)布時間:2023/12/3 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何将自定义数据源集成到Apache Spark中 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

如今,流數(shù)據(jù)是一個熱門話題,而Apache Spark是出色的流框架。 在此博客文章中,我將向您展示如何將自定義數(shù)據(jù)源集成到Spark中。

Spark Streaming使我們能夠從各種來源進行流傳輸,同時使用相同的簡潔API訪問數(shù)據(jù)流,執(zhí)行SQL查詢或創(chuàng)建機器學(xué)習(xí)算法。 這些功能使Spark成為流式(或任何類型的工作流)應(yīng)用程序的首選框架,因為我們可以使用框架的所有方面。

面臨的挑戰(zhàn)是弄清楚如何將自定義數(shù)據(jù)源集成到Spark中,以便我們能夠利用其強大功能而無需更改為更多標準源。 更改似乎是合乎邏輯的,但是在某些情況下,這樣做是不可能或不方便的。

流式自定義接收器

Spark提供了不同的擴展點,正如我們在此處擴展Data Source API以便將自定義數(shù)據(jù)存儲集成到Spark SQL中所看到的那樣。

在此示例中,我們將做同樣的事情,但是我們還將擴展流API,以便我們可以從任何地方流。

為了實現(xiàn)我們的自定義接收器,我們需要擴展Receiver [A]類。 請注意,它具有類型注釋,因此我們可以從流客戶端的角度對DStream實施類型安全。

我們將使用此自定義接收器來流式傳輸我們的應(yīng)用程序之一通過套接字發(fā)送的訂單。

通過網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)的結(jié)構(gòu)如下所示:

1 5 1 1 2 2 1 1 2 1 1 4 1 1 2 2 1 2 2

我們首先接收訂單ID和訂單總金額,然后接收訂單的行項目。 第一個值是商品ID,第二個是訂單ID(與訂單ID值匹配),然后是商品成本。 在此示例中,我們有兩個訂單。 第一個只有四個項目,第二個只有一個項目。

這個想法是將所有這些隱藏在我們的Spark應(yīng)用程序中,因此它在DStream上收到的是在流上定義的完整順序,如下所示:

val orderStream: DStream[Order] = .....val orderStream: DStream[Order] = .....

同時,我們還使用接收器來流式傳輸我們的自定義流式源。 即使它通過套接字發(fā)送數(shù)據(jù),使用來自Spark的標準套接字流也將非常復(fù)雜,因為我們將無法控制數(shù)據(jù)的輸入方式,并且會遇到在應(yīng)用程序上遵循順序的問題本身。 這可能非常復(fù)雜,因為一旦進入應(yīng)用程序空間,我們便會并行運行,并且很難同步所有這些傳入數(shù)據(jù)。 但是,在接收方空間中,很容易從原始輸入文本創(chuàng)建訂單。

讓我們看看我們的初始實現(xiàn)是什么樣的。

case class Order(id: Int, total: Int, items: List[Item] = null) case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = .... }case class Order(id: Int, total: Int, items: List[Item] = null) case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = .... }

我們的OrderReceiver擴展了Receiver [Order],它使我們可以在Spark內(nèi)部存儲Order(帶注釋的類型)。 我們還需要實現(xiàn)onStart()和onStop()方法。 請注意,onStart()創(chuàng)建一個線程,因此它是非阻塞的,這對于正確的行為非常重要。

現(xiàn)在,讓我們看一下接收方法,真正發(fā)生魔術(shù)的地方。

def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}

在這里,我們創(chuàng)建一個套接字并將其指向源,然后我們就可以簡單地開始讀取它,直到調(diào)度了stop命令,或者套接字上沒有更多數(shù)據(jù)為止。 請注意,我們正在讀取與之前定義的結(jié)構(gòu)相同的結(jié)構(gòu)(如何發(fā)送數(shù)據(jù))。 完全閱讀訂單后,我們將調(diào)用store(…),以便將其保存到Spark中。

除了在我們的應(yīng)用程序中使用我們的接收器外,這里別無所要做:

val config = new SparkConf().setAppName("streaming") val sc = new SparkContext(config) val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))val config = new SparkConf().setAppName("streaming") val sc = new SparkContext(config) val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))

請注意我們是如何使用自定義OrderReceiver創(chuàng)建流的(僅為了清楚起見,對val流進行了注釋,但這不是必需的)。 從現(xiàn)在開始,我們將流(DString [Order])用作我們在任何其他應(yīng)用程序中使用的任何其他流。

stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id)) order.items.foreach(println)}}stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id)) order.items.foreach(println)}}

摘要

當(dāng)處理生成無盡數(shù)據(jù)的源時,Spark Streaming非常方便。 您可以使用與Spark SQL和系統(tǒng)中其他組件相同的API,但它也足夠靈活,可以擴展以滿足您的特定需求。

翻譯自: https://www.javacodegeeks.com/2016/05/integrate-custom-data-sources-apache-spark.html

總結(jié)

以上是生活随笔為你收集整理的如何将自定义数据源集成到Apache Spark中的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。