日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

《Scala机器学习》一一3.3 应用

發布時間:2024/4/14 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 《Scala机器学习》一一3.3 应用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本節書摘來自華章計算機《Scala機器學習》一書中的第3章,第3.3節,作者:[美] 亞歷克斯·科茲洛夫(Alex Kozlov),更多章節內容可以訪問云棲社區“華章計算機”公眾號查看。

3.3 應用

下面會介紹Spark/Scala中的一些實際示例和庫,具體會從一個非常經典的單詞計數問題開始。
3.3.1 單詞計數
大多數現代機器學習算法需要多次傳遞數據。如果數據能存放在單臺機器的內存中,則該數據會容易獲得,并且不會呈現性能瓶頸。如果數據太大,單臺機器的內存容納不下,則可保存在磁盤(或數據庫)上,這樣雖然可得到更大的存儲空間,但存取速度大約會降為原來的1/100。另外還有一種方式就是分割數據集,將其存儲在網絡中的多臺機器上,并通過網絡來傳輸結果。雖然對這種方式仍有爭議,但分析表明,對于大多數實際系統而言,如果能有效地在多個CPU之間拆分工作負載,則通過一組網絡連接節點存儲數據比從單個節點上的硬盤重復存儲和讀取數據略有優勢。
磁盤的平均帶寬約為100 MB/s,由于磁盤的轉速和緩存不同,其傳輸時會有幾毫秒的延遲。相對于直接從內存中讀取數據,速度要降為原來的1/100左右,當然,這也會取決于數據大小和緩存的實現?,F代數據總線可以超過10 GB/s的速度傳輸數據。而網絡速度仍然落后于直接的內存訪問,特別是標準網絡層中TCP/IP內核的開銷會對網絡速度影響很大。但專用硬件可以達到每秒幾十吉字節,如果并行運行,則可能和從內存讀取一樣快。當前的網絡傳輸速度介于1~10 GB/s之間,但在實際應用中仍然比磁盤更快。因此,可以將數據分配到集群節點中所有機器的內存中,并在集群上執行迭代機器學習算法。
但內存也有一個問題:在節點出現故障并重新啟動后,內存中的數據不會跨節點持久保存。一個流行的大數據框架Hadoop解決了這個問題。Hadoop受益于Dean/Ghemawat的論文(Jeff Dean和Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters, OSDI, 2004.),這篇文章提出使用磁盤層持久性來保證容錯和存儲中間結果。Hadoop MapReduce程序首先會在數據集的每一行上運行map函數,得到一個或多個鍵/值對。然后按鍵值對這些鍵/值對進行排序、分組和聚合,使得具有相同鍵的記錄最終會在同一個reducer上處理,該reducer可能在一個(或多個)節點上運行。reducer會使用一個reduce函數,遍歷同一個鍵對應的所有值,并將它們聚合在一起。如果reducer因為一些原因失敗,由于其中間結果持久保存,則可以丟棄部分計算,然后可從檢查點保存的結果重新開始reduce計算。很多簡單的類ETL應用程序僅在保留非常少的狀態信息的情況下才遍歷數據集,這些狀態信息是從一個記錄到另一個記錄的。
單詞計數是MapReduce的經典應用程序。該程序可統計文檔中每個單詞的出現次數。在Scala中,對排好序的單詞列表采用foldLeft方法,很容易得到單詞計數。

如果運行這個程序,會輸出(字,計數)這樣的元組列表。該程序會按行來分詞,并對得到的單詞排序,然后將每個單詞與(字,計數)元組列表中的最新條目(entry)進行匹配。同樣的計算在MapReduce中會表示成如下形式:

首先需要按行處理文本,將行拆分成單詞,并生成(word,1)對。這個任務很容易并行化。為了并行化全局計數,需對計數部分進行劃分,具體的分解通過對單詞子集分配計數任務來實現。在Hadoop中需計算單詞的哈希值,并根據哈希值來劃分工作。
一旦map任務找到給定哈希的所有條目,它就可以將鍵/值對發送到reducer,在MapReduce中,發送部分通常稱為shuffle。從所有mapper中接收完所有的鍵/值對后,reducer才會組合這些值(如果可能,在mapper中也可部分組合這些值),并對整個聚合進行計算,在這種情況下只進行求和。單個reducer將查看給定單詞的所有值。
下面介紹Spark中單詞計數程序的日志輸出(Spark在默認情況下輸出的日志會非常冗長,為了輸出關鍵的日志信息,可將conf /log4j.properties文件中的INFO替換為ERROR或FATAL):

這個過程發生的唯一的事情是元數據操作,Spark不會觸及數據本身,它會估計數據集的大小和分區數。默認情況下是HDFS塊數,但是可使用minPartitions參數明確指定最小分區數:

下面定義另一個RDD,它源于linesRdd:

在2 GB的文本數據(共有40 291行,353 087個單詞)上執行單詞計算程序時,進行讀取、分詞和按詞分組所花的時間不到1秒。通過擴展日志記錄可看到以下內容:
Spark打開幾個端口與執行器和用戶通信
Spark UI運行的端口為4040(可通過http://localhost: 4040打開)
可從本地或分布式存儲(HDFS、Cassandra和S3)中讀取文件
如果Spark構建時支持Hive,它會連接到Hive上
Spark使用惰性求值(僅當輸出請求時)來執行管道
Spark使用內部調度器將作業拆分為任務,優化執行任務,然后執行它們
結果存儲在RDD中,可用集合方法來保存或導入到執行shell的節點的RAM中
并行性能調整的原則是在不同節點或線程之間分割工作負載,使得開銷相對較小,而且要保持負載平衡。
3.3.2 基于流的單詞計數
Spark支持對輸入流進行監聽,能對其進行分區,并以接近實時的方式來計算聚合。目前支持來自Kafka、Flume、HDFS/S3、Kinesis、Twitter,以及傳統的MQ(如ZeroMQ和MQTT)的數據流。在Spark中,流的傳輸是以小批量(micro-batch)方式進行的。在Spark內部會將輸入數據分成小批量,通常按大小的不同,有些所花的時間不到1秒,有些卻要幾分鐘,然后會對這些小批量數據執行RDD聚合操作。
下面擴展前面介紹的Flume示例。這需要修改Flume配置文件來創建一個Spark輪詢槽(polling sink),用這種槽來替代HDFS:

現在不用寫入HDFS,Flume將會等待Spark的輪詢數據:

為了運行程序,在一個窗口中啟動Flume代理:

然后在另一個窗口運行FlumeWordCount對象:

現在任何輸入到netcat連接的文本都將被分詞并在6秒的滑動窗口上按每2秒計算單詞的量:

Spark/Scala允許在不同的流之間無縫切換。例如,Kafka發布/訂閱主題模型類似于如下形式:

要啟動Kafka代理,首先下載最新發布的二進制包并啟動ZooKeeper。ZooKeeper是一個分布式服務協調器,即使Kafka部署在單節點上也需要它:

在另一個窗口中啟動Kafka服務器:

運行KafkaWordCount對象:

現在將單詞流發布到Kafka主題中,這需要再開啟一個計數窗口:

從上面的結果可以看出程序每兩秒輸出一次。Spark流有時被稱為小批次處理(micro-batch processing)。數據流有許多其他應用程序(和框架),但要完全討論清楚會涉及很多內容,因此需要單獨進行介紹。在第5章會討論一些數據流上的機器學習問題。下面將介紹更傳統的類SQL接口。
3.3.3 Spark SQL和數據框
數據框(Data Frame)相對較新,在Spark的1.3版本中才引入,它允許人們使用標準的SQL語言來分析數據。在第1章就使用了一些SQL命令來進行數據分析。SQL對于簡單的數據分析和聚合非常有用。
最新的調查結果表明大約有70%的Spark用戶使用DataFrame。雖然DataFrame最近成為表格數據最流行的工作框架,但它是一個相對重量級的對象。DataFrame使用的管道在執行速度上可能比基于Scala的vector或LabeledPoint(這兩個對象將在下一章討論)的速度慢得多。來自多名開發人員的證據表明:響應時間可為幾十或幾百毫秒,這與具體查詢有關,若是更簡單的對象會小于1毫秒。
Spark為SQL實現了自己的shell,這是除標準Scala REPL shell以外的另一個shell??赏ㄟ^./bin/spark-sql來運行該shell,還可通過這種shell來訪問Hive/Impala或關系數據庫表:

在標準Spark的REPL中,可以通過運行相同的查詢來執行以下命令:

總結

以上是生活随笔為你收集整理的《Scala机器学习》一一3.3 应用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。