Spark介绍
Spark
Spark 是什么?
Apache Spark?是用于大規模數據處理的快速和通用引擎.
速度:在內存中,運行程序比Hadoop MapReduce快100倍,在磁盤上則要快10倍.
Apache Spark具有支持非循環數據流和內存計算的高級DAG執行引擎.
易用:可以使用Java,Scala,Python,R快速編寫程序.
Spark提供80+高級操作方法,可以輕松構建并行應用程序.
Spark提供了一堆庫,包括SQL和DataFrame,MLlib,GraphX和Spark Streaming。您可以在相同的應用程序中無縫地組合這些庫. Spark在Hadoop,Mesos,獨立或云端運行。它可以訪問各種數據源,包括HDFS,Cassandra,HBase和S3
一,RDD 彈性分布式數據集
定義, TA 容錯的,并行的數據結構,存儲到磁盤和內存,控制數據分區。本質上是一個只讀的分區記錄集合,RDD包含多個分區,每個分區是一個dataset片段.
依賴, RDD可以相互依賴。如果RDD的每個分區最多只能被一個Child RDD的一個分區使用,窄依賴;若多個Child RDD分區都可以依賴,寬依賴.
首先,窄依賴被劃分到同一個stage,支持在同一個cluster node上以管道形式執行多條命令,eg,先map,緊接著filter.相反,寬依賴由于依賴的上游節點不止一個,往往跨界點傳輸數據.
其次從容災角度講,窄依賴的只需要執行父RDD的丟失分區的計算即可恢復.而寬依賴需要考慮恢復所有父RDD的丟失分區.
?本質, RDD是Spark中的抽象數據結構類型,從編程的角度來看,RDD可以簡單看成是一個數組。和普通數組的區別是,RDD中的數據是分區存儲的,這樣不同分區的數據就可以分布在不同的機器上,同時可以被并行處理。因此,Spark應用程序所做的無非是把需要處理的數據轉換為RDD,然后對RDD進行一系列的變換和操作從而得到結果。本質是一個抽象類,如下:
abstract class RDD[T: ClassTag](@transient private var _sc: SparkContext,@transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging {}| transformation | map() | 函數應用于RDD每一個元素,返回值是新的RDD |
| transformation | flatMap() | 函數應用于RDD每一個元素,將元素數據進行拆分變成迭代器返回值是新的RDD |
| transformation | filter() | 過濾,返回值是新的RDD |
| transformation | distinct() | 去重,返回值是新的RDD |
| transformation | union() | 并集,返回值是新的RDD |
| transformation | intersection() | 交集,返回值是新的RDD |
| transformation | subtract() | 原RDD里和參數RDD里相同的元素去掉 |
| transformation | cartesian() | 函數應用于RDD每一個元素,返回值是新的RDD |
| action | collect() | 返回RDD所有元素 |
| action | count() | RDD里元素個數 |
| action | countByValue() | 各元素在RDD中出現次數 |
| action | reduce() | 并行整合所有RDD數據,例如求和操作 |
| action | fold(0)(func) | 和reduce功能一樣,不過fold帶有初始值 |
| action | aggregate(0)(seqOp,combop) | 和reduce功能一樣,但是返回的RDD數據類型和原RDD不一樣 |
| action | foreach(func) | 對RDD每個元素都是使用特定函數 |
DAG 有向無環圖
容錯處理
傳統關系型數據庫:采用日志記錄容災,數據恢復都依賴于重新執行日志中的SQL;
Hadoop:通過把數據備份到其他機器來容災;
RDD:本身是一個不可變的數據集,當某個worker節點上的任務失敗時,可以利用DAG重新調度計算這個失敗的任務,由于不用復制數據,從而大大降低了網絡通信.在流式計算場景中,Spark需要記錄日志和檢查點,以便利用checkpoint和日志對數據進行恢復;
二,Discretized Streams (DStreams)
DStream是一系列連續的RDD,是Spark Streaming提供的基本抽象如下圖所示:?
對DStream應用的任何操作都將轉換為底層RDD上的操作
三,Initializing StreamingContext
要初始化Spark Streaming程序,必須創建一個StreamingContext對象,它是所有Spark Streaming功能的主要入口.
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));?
四,Input DStreams and Receivers
Spark Streaming提供兩類內置流式傳輸源:
基本數據源:StreamingContext API中直接提供的源.比如:文件系統和套接字連接.(file 和 socket) 高級源:Kafka,Flume,Kinesis等資源可以通過額外的實用類來獲得.
Spark Streaming 提供兩種接收器:
可靠的接收器 - 當數據已被接收并且通過復制存儲在Spark中時,可靠的接收器正確地向可靠的源發送確認。 不可靠的接收器 - 不可靠的接收器不向源發送確認。這可以用于不支持確認的源,或者甚至當不需要或需要進入確認的復雜性時,用于可靠的源。
五,Transformations on DStreams
| map(func) | 通過func傳遞源DStream的每個元素,返回新的DStream |
| flatMap(func) | 與map類似,但每個輸入項可以映射到0個或更多的輸出項 |
| filter(func) | 過濾 |
| repartition(numPartitions) | 通過修改分區來更改DStream中的并發數 |
| union(otherStream) | 求兩個DStream的并集 |
| count() | 計算源DStream的每個RDD中的元素數量,返回RDD的新DStream |
| reduce(func) | 使用函數func聚合源DStream的每個RDD中的元素來返回單個元素RDD的新DStream |
| countByValue() | 根據value計算key. |
| reduceByKey(func, [numTasks]) | 根據Key進行特定的計算 |
| join(otherStream, [numTasks]) | 當(K,V)和(K,W)對的兩個DStream被調用時,返回一個新的(K,(V,W))對的DStream與每個鍵的所有元素對 |
| transform(func) | 通過對源DStream的每個RDD應用RDD到RDD函數來返回新的DStream。這可以用于對DStream進行任意RDD操作 |
| updateStateByKey(func) | 返回一個新的“狀態”DStream,其中每個key的狀態通過在key的先前狀態應用給定的功能和key的新值來更新。這可以用于維護每個key的任意狀態數據 |
六,Output Operations on DStreams
| print() | 打印10個元素,用于調試 |
| saveAsTextFiles(prefix, [suffix]) | 將此DStream的內容另存為文本文件。每個批處理間隔的文件名是根據前綴和后綴“prefix-TIME_IN_MS [.suffix]”生成的 |
| saveAsObjectFiles(prefix, [suffix]) | 將此DStream的內容保存為序列化Java對象的SequenceFiles。每個批處理間隔的文件名是根據前綴和后綴“prefix-TIME_IN_MS [.suffix]”生成的。 |
| saveAsHadoopFiles(prefix, [suffix]) | 將此DStream的內容另存為Hadoop文件。每個批處理間隔的文件名是根據前綴和后綴“prefix-TIME_IN_MS [.suffix]”生成的。 |
| foreachRDD(func) | 對從流中生成的每個RDD應用函數func的最通用的輸出運算符。此功能應將每個RDD中的數據推送到外部系統,例如將RDD保存到文件,或將其通過網絡寫入數據庫 |
轉載:https://www.2cto.com/net/201711/695363.html
總結
- 上一篇: 【转载保存】Netty实现单客户端多连接
- 下一篇: jedis StreamEntryID参