日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成

發布時間:2025/3/21 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark 編程模型

在Spark 中, 我們通過對分布式數據集的操作來表達計算意圖 ,這些計算會自動在集群上

井行執行 這樣的數據集被稱為彈性分布式數據集 Resilient Distributed Dataset ),簡稱 RDD

RDD 是Spark 分布式數據和計算的基本抽象。在 Spark 中,對數據的所有操作不外乎創建

RDD 、轉換己有 RDD 以及調用 RDD 操作進行求值 rdd 和wordrnap 都是 MapPartition RDD 類型 RD ,而 wordreduce ShuffiedRDD 類型的 RDD

RDD 支持2種 類型的操作 轉換操作( Transformation Operation )和行動操作 Action

Operation )。有些資料還會細分為創建操作、轉換操作、控制操作和行動操作4 類型。轉換

操作會由一個 RDD 生成一個新的 RDD 行動操作會對 RDD 算出 個結果,并把結果返回驅

動器程序,或者把結果存儲到外部存儲系統中 轉換操作和行動操作的區別在于 Spark 計算 RDD

的方式不同 雖然可以在任何時候定義新的 RDD ,但 Spark 只會惰性計算這些 RDD 。它們只有

第一次在 個行動操作中用到時才會 正計算。

轉換操作和行動操作的對比

通過轉換操作,從己有的 RDD 中派生出新的 RDD Spark 會使用譜系圖( Lineage Graph,

很多資料也會翻譯為“血統”)來記錄這些不同 RDD 間的依賴關系 Spark 要用這些信息

來按需計算每個 RDD ,也可以依賴譜系圖在持久化的 RD 丟失部分數據時恢復丟失的數據。

行動操作會把最終求得的結果返回驅動器程序,或者寫入外部存儲系統。由于行動操作需要生

產實際的輸出,所以它們會強制執行那些求值必須用到的 RDD 轉換操作。

Spark 中RDD 計算是以分區( Part ion )為單位的,將 RDD 分為很 個分區分布到集群

的節點中,分區的多少涉及對這個 RD 進行并行計算的粒度。如圖 12-2 所示 實線方框 A、B、C、D、E、F、G陰影背景的矩形 表示分區。 A、B、C、D、E、F、G之間的依賴關系構成整個應用的譜系圖。

依賴關系還可以分為窄依賴和寬依賴。窄依賴 Narrow ependen cie )是指每個父 RDD 的

分區都至多被一個RDD 的分區使用, 而寬依賴( Wide Dependencies )是指多個子 RDD 的分區依賴一個父 RDD 分區。圖 12-2 中,C和D 之間是窄依賴,而 A和B之間是寬依賴。 RDD中行動操作的執行會以寬依賴為分界來構建各個調度階段,各個調度階段 內部的窄依賴、前后鏈接構成流水線。圖中的 個虛線方框分別代表了 個不同的調度階段。 對于執行失敗的任 ,只 要它對應的調度階段的父類信息仍然可用,那么該 務就會分散

到其他節點重新執行。如果某些調 階段不可用,則重新提交相應的任務,并以并行方式計算

丟失的地方。在整個作業中,如果某個任務執行緩慢, 則系統會在其他節點上執行該任務的副 本,并取最先得到的結果作為最終的結果。

下面就以 12 節中相同的單詞統計程序為例來分析 Spark 的編程模型,與 12.1 節中所不

同的是, 這里是 個完整的 Scala 程序,程序對應的 Maven 依賴如下

單詞統計程序如代碼清單 12-1 示。

代碼清單 12-1 單詞統計程序

main() 方法主體的第①和第②行中首先創建一個 SparkConf 對象來配置應用程序,然后基于這個 SparkConf 建了一個 SparkContext 象。一旦有了 SparkContext ,就可 以用它創建RDD 第③行代碼中調用 sc textFile ()來創 建一個代表文 件中各行文本的 RDD 第④行中

rdd flatMap(_.split(””)) .map(x=>幟, ))這一段內容的依賴關系是窄依賴,而reduceByKey(_+ _)操 作對單詞進行計數時屬于寬依賴。第⑤行中將排序后的結果存儲起來。最后第⑦行中使用 top()

方法來關閉應用。

在SPARK_HOME/bin 目錄中還有一個 spark-submit 腳本,用于將應用 快速部署到Spark 集

群。 比如這里的 WordCount 程序 當我 希望通過 park-submit 進行部署 ,只需要將應用打

包成 jar 包(即下面示例中的 wordcount. )井上傳到 Spark 集群 然后通 spark-submit 進行

部署即 ,示例如下

[root@no del spark)# bin/spark- submit --class scala.spark.demo . WordCount wordcount . jar --executor- memory lG --master spark : //localhost : 7077 2018 - 08 - 06 15:39 : 54 WARN NativeCodeLoader:62 - Unable to load native hadoop library for your platform . .. using builtin- ] ava classes where applicable 2018-08 - 06 15 : 39 : 55 INFO SparkContext : 54 - Running Spark vers on 2 . 3 . 1 2018 - 08 - 06 15 : 39 : 55 INFO SparkContext : 54 - Submitted applicat on WordCount 2018 - 08 - 06 15:39 : 55 INFO SecurityManager : 54 - Chang ng view acls to : root 2018 - 08 - 06 1 5 : 39 : 55 I NFO SecurityManager 54- Chang ng modify acls to : root ( .... ;占略若干)2018 - 08 - 07 12 : 25 : 47 INFO AbstractConnector : 318 - Stopped Spark@62 99e2cl {HTTP /1 . 1 , [http/ 1 . l) } { 0. 0 . 0. 0: 4 04 0} 2018 - 08 - 07 12 : 25 : 47 INFO SparkUI : 54 - Stopped Spark web UI at http: // 10 . 199 . 172 . 111 : 4040 2018 - 08-07 12 : 25 : 47 INFO MapOutp tTrackerMasterEndpo nt 54 - MapOutputTrackerMasterEndpoint stop ped' 2018 - 08-07 12 : 25:47 INFO MemoryStore : 54 - MemoryStore cleared 2018 - 08-07 12 : 25 : 47 INFO BlockManager:54 - BlockManager stopped 2018 - 08 - 07 12 : 25 : 47 INFO BlockManagerMaster : 54 - BlockManagerMaster stopped 2018 - 08 - 07 12 : 25 : 47 INFO OutputCommitCoordinator OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped ! 2018 08-06 15 : 46 : 57 INFO SparkCo text 54 - Successfully stopped SparkContext 20 1 8 - 08-06 1 5 : 46:57 INFO ShutdownHookManager : 54 Shutdown hook called 2018 - 08 - 06 15 : 46 : 57 INFO ShutdownHoo kManager : 54 - Delet ng directory /tmp/spark- fa955139-270c-4899 - 82b7 - 4959983alcb0 2018 - 08 - 06 15 : 46 : 57 INFO ShutdownHookManager : 54 - Deleting directory /tmp/spark-3f359966- 2167 - 4bb9 - 863a - 2d8a8d5e8fbe

實例中的--class 用來指定應用程序 主類,這里為 eal ark.demo.WordCount;

execu or memory 用來指定執行器節點的內容,這里設置為 lG 。最后得到的輸出結果如

下所示。

[root@node l spark) # ls /tmp/spar k part 00000 SUCCESS [root@nodel spark)# cat /tmp/spark/part-00000 (, 91) (# ' 37) (the , 19) (in, 7) (to , 7) (for, 6) (if, 5) (then, 5) (under, 4) (stty, 4) (not, 4)

Spark 的運行結構

在分布式環境下 Spark 集群采用的是主從架構。如圖 12-3 示,在一個Spark 集群中,

有一個節點負責中央協調,調度各個分布式工作節點,這個中央協調節點被稱為驅動器( Driver

節點 與之對應的工作節點被稱為執行器( Executor )節點。驅動器節點可以和大量的執行器節

點進行通信 它們都作為獨立的進程運行。驅動器節點和所有的執行器節點一起被稱為 Spark

應用( Application)。

Spark 應用通過一個叫作集群管理器( luster Manager )的外部服務在集群中的機器上啟動。 Spark 自帶的集群管理器被稱為獨立集群管理器 Spark 也能運行 YARN Mesos Kubemetes 這類開源集群管理器上Spark 驅動器節點是執行程序中的 main()方法的進程。它執行用戶編寫的用來創建 SparkContext RDD ,以及進行 RDD 轉換操作和行動操作的代碼。其實,當啟動 park-shell 時,就啟動了一個 park 驅動程序。驅動程序一旦停止 Spark 應用也就結束了

Kafka與Spark trea ing 的整合

采用 Spark Stre ming 流式處理 fka 中的數據,首先需要把數據從 Kafka 中接收過 ,然

后轉換為 Spark Streaming 中的 DStrea 。接收數據的方式一共有兩種:利用接收器Receiver 的方式接收數據和直接Kafka中讀取數據 。

Receiver 方式通過KafkaUtils. creates trea ()方法來創建一個DS tream 對象 ,它不關注消費的位移的處理,Receive方式的結構如圖 12-9所示 但這種方式在 Spark 任務執行異常 導致 數據丟失,如果要保證數據的可靠性,則需要開啟預寫式日志,簡稱 AL (Write Ahead Logs) , 只有收到的數據被持久化到 WAL 之后才會更新 Kafka 中的消費位移。收 的數據 WAL儲存

位置信息被可靠地存儲,如果期間出現故障,那么這些信息被用來從錯誤中恢復,并繼續處理

數據。

WAL 的方式可以保證從 Kafka 中接收的數據不被丟失 但是在某些異常情況下,一些數據

被可靠地保存到了 WAL 中,但是還沒有來得及更新消費位移,這樣會造成 Kafka 中的數據被

Spark 拉取 了不止一次。同時在 Receiver 方式中 Spark的RDD 分區 Kafka 的分區并不是相

關的,因此增加 Kafk 中主題的分區數并不能增加 Spark處理的并行度,僅僅增加了接收器接

收數據的并行度

Direct 方式是從 Spark 1.3 開始引入的,它通過 KafkaUtil s.createDire ctStream() 方法創建一個

DStream 象, Direct 方式的結構如圖 12-10 所示。該方式中 Kafka 的一個分區與 SparkRDD對應,通過定期掃描所訂閱的 afka 每個主題的每個分區的最新偏移量以確定當前批處理數據偏 移范圍。與 Rec iver 方式相比, irect 方式不需要維護一份WAL 數據,由 park Streaming 程序自控制位移的處理,通常通過檢查點機制處理消費位移,這樣可以保證 Kafka 中的數據只 會被 Spark 拉取一次。

下面使用一個簡單的例子來演示 Spark Streaming和Kafka 的集成。在該示例中,每秒往

Kafka寫入一個0到9之間的隨機數,通過 Spark Streaming從Kafka 中獲取數據并實 計算批次間隔內的數據的數值之和

往Kafk 中寫入隨 數的主要代碼如下:

Random random = new Random( ); wh le (true) { String msg = String.val ueOf( r andom.nextint(lO) ); ProducerRecord, String> message = new ProducerRecord<>(topic , msg ); producer.send(message) . get() ; TimeUnit.SECONDS . sleep(l) ;

Kafka與Spark Streaming的集成示例如代碼清單 12-3所示,代碼中的批次間隔設置為 2s

示例中的主題 topic spark 包含4個分區。

代碼清單12-3 Kafka與Spa Streaming的集成示例

其實,kafka的設計實現,涉及到太多的底層技術,為了能夠把它吃透,需要花大量的時間和精力。

在這里,送大家一張 Kafka 學習框架,分為 Kafka 入門、Kafka 的基本使用、客戶端詳解、Kafka 原理介紹、Kafka 運維與監控以及高級 Kafka 應用。

需要這份的kafka朋友們轉發收藏+關注私信“資料”立即獲取

總結

以上是生活随笔為你收集整理的spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。