日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark总结——转载

發布時間:2024/4/17 编程问答 55 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark总结——转载 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

轉載自: ? ?spark總結??

?

第一個Spark程序

/**
* 功能:用spark實現的單詞計數程序
* 環境:spark 1.6.1, scala 2.10.4
*/

// 導入相關類庫
import org.apache.spark._

object WordCount {
def main(args: Array[String]) {
// 建立spark運行上下文
val sc = new SparkContext("local[3]", "WordCount", new SparkConf())

// 加載數據,創建RDD
val inRDD = sc.textFile("words.txt", 3)

// 對RDD進行轉換,得到最終結果
val res = inRDD.flatMap(_.split(' ')).map((_, 1)).reduceByKey(_ + _)

// 將計算結果collect到driver節點,并打印
res.collect.foreach(println)

// 停止spark運行上下文
sc.stop()
}
}

關于RDD

彈性分布式數據集(RDD)是分布式處理的一個數據集的抽象,RDD是只讀的,在RDD之上的操作都是并行的。實際上,RDD只是一個邏輯實體,其中存儲了分布式數據集的一些信息,并沒有包含所謂的“物理數據”,“物理數據”只有在RDD被計算并持久化之后才存在于內存或磁盤中。RDD的重要內部屬性有:

  • 計算RDD分區的函數。
  • 所依賴的直接父RDD列表。
  • RDD分區及其地址列表。
  • RDD分區器。
  • RDD分區優先位置。

RDD操作起來與Scala集合類型沒有太大差別,這就是Spark追求的目標:像編寫單機程序一樣編寫分布式程序,但它們的數據和運行模型有很大的不同,用戶需要具備更強的系統把控能力和分布式系統知識。

Transformation與Action

RDD提供了兩種類型的操作:transformation操作(轉化操作)和action操作(行動操作)。transformation操作是得到一個新的RDD,方式很多,比如從數據源生成一個新的RDD,從RDD生成一個新的RDD。action操作則是得到其他數據類型的結果。

所有的transformation都是采用的懶策略,就是如果只是將transformation提交是不會執行計算的,spark在內部只是用新的RDD記錄這些transformation操作并形成RDD對象的有向無環圖(DAG),計算只有在action被提交的時候才被觸發。實際上,我們不應該把RDD看作存放著特定數據的數據集,而最好把每個RDD當作我們通過transformation操作構建出來的、記錄如何計算數據的指令列表。

RDD的action算子會觸發一個新的job,spark會在DAG中尋找是否有cached或者persisted的中間結果,如果沒有找到,那么就會重新執行這些中間過程以重新計算該RDD。因此,如果想在多個action操作中重用同一個RDD,那么最好使用?cache()/persist()將RDD緩存在內存中,但如果RDD過大,那么最好使用?persist(StorageLevel.MEMORY_AND_DISK)?代替。注意cache/persist僅僅是設置RDD的存儲等級,因此你應該在第一次調用action之前調用cache/persist。cache/persist使得中間計算結果存在內存中,這個才是說為啥Spark是內存計算引擎的地方。在MR里,你是要放到HDFS里的,但Spark允許你把中間結果放內存里。

在spark程序中打印日志時,尤其需要注意打印日志的代碼很有可能使用到了action算子,如果沒有緩存中間RDD就可能導致程序的效率大大降低。另外,如果一個RDD的計算過程中有抽樣、隨機值或者其他形式的變化,那么一定要緩存中間結果,否則程序執行結果可能都是不準確的!

參考鏈接及進一步閱讀:

  • Spark會把數據都載入到內存么?
  • Using Spark’s cache for correctness, not just performance

RDD持久化(緩存)

正如在轉化和行動操作部分所說的一樣,為了避免在一個RDD上多次調用action操作從而可能導致的重新計算,我們應該將該RDD在第一次調用action之前進行持久化。對RDD進行持久化對于迭代式和交互式應用非常有好處,好處大大滴有。

持久化可以使用cache()或者persist()。默認情況下的緩存級別為MEMORY_ONLY,spark會將對象直接緩存在JVM的堆空間中,而不經過序列化處理。我們可以給persist()傳遞持久化級別參數以指定的方式持久化RDD。MEMORY_AND_DISK持久化級別盡量將RDD緩存在內存中,如果內存緩存不下了,就將剩余分區緩存在磁盤中。MEMORY_ONLY_SER將RDD進行序列化處理(每個分區序列化為一個字節數組)然后緩存在內存中。還有MEMORY_AND_DISK_SER等等很多選項。選擇持久化級別的原則是:盡量選擇緩存在內存中,如果內存不夠,則首選序列化內存方式,除非RDD分區重算開銷比緩存到磁盤來的更大(很多時候,重算RDD分區會比從磁盤中讀取要快)或者序列化之后內存還是不夠用,否則不推薦緩存到磁盤上。

如果要緩存的數據太多,內存中放不下,spark會自動利用最近最少使用(LRU)策略把最老的分區從內存中移除。對于僅放在內存中的緩存級別,下次要用到已被移除的分區時,這些分區就需要重新計算。對于使用內存與磁盤的緩存級別,被移除的分區都會被寫入磁盤。

另外,RDD還有一個unpersist()方法,用于手動把持久化的RDD從緩存中移除。

環境變量SPARK_LOCAL_DIRS用來設置RDD持久化到磁盤的目錄,它同時也是shuffle的緩存目錄。

各種RDD與RDD操作

基本RDD

抽象類RDD包含了各種數據類型的RDD都適用的通用操作。下面對基本類型RDD的操作進行分門別類地介紹。

針對各個元素的轉化操作:

  • map: 對各個元素進行映射操作。
  • flatMap: 對各個元素進行映射操作,并將最后結果展平。
  • filter: 過濾不滿足條件的元素。filter操作可能會引起數據傾斜,甚至可能導致空分區,新形成的RDD將會包含這些可能生成的空分區。所有這些都可能會導致問題,要想解決它們,最好在filter之后重新分區。

偽集合操作:

盡管RDD不是嚴格意義上的集合,但它支持許多數學上的集合操作。注意:這些操作都要求操作的RDD是相同的數據類型的。

  • distinct: 對RDD中的元素進行去重處理。需要注意的是,distinct操作開銷很大,因為它需要shuffle所有數據,以確保每一個元素都只有一份。
  • union: 返回一個包含兩個或多個RDD中所有元素的RDD。spark的union并不會去重,這點與數學上的不同。
  • intersection: 返回兩個RDD中都有的元素。intersection會在運行時除去所有重復的元素,因此它也需要shuffle,性能要差一些。
  • subtract: 返回一個由只存在于第一個RDD中而不存在于第二個RDD中的所有元素組成的RDD。它也需要shuffle。
  • cartesian: 計算兩個RDD的笛卡爾積。需要注意的是,求大規模RDD的笛卡爾積開銷巨大。
  • sample: 對RDD進行采樣,返回一個采樣RDD。

基于分區的轉化操作:

  • glom: 將每個分區中的所有元素都形成一個數組。如果在處理當前元素時需要使用前后的元素,該操作將會非常有用,不過有時我們可能還需要將分區邊界的數據收集起來并廣播到各節點以備使用。
  • mapPartitions: 基于分區的map,spark會為操作分區的函數該分區的元素的迭代器。
  • mapPartitionsWithIndex: 與mapPartitions不同之處在于帶有分區的序號。

管道(pipe)操作:

spark在RDD上提供了pipe()方法。通過pipe(),你可以使用任意語言將RDD中的各元素從標準輸入流中以字符串形式讀出,并將這些元素執行任何你需要的操作,然后把結果以字符串形式寫入標準輸出,這個過程就是RDD的轉化操作過程。

使用pipe()的方法很簡單,假如我們有一個用其他語言寫成的從標準輸入接收數據并將處理結果寫入標準輸出的可執行腳本,我們只需要將該腳本分發到各個節點相同路徑下,并將其路徑作為pipe()的參數傳入即可。

行動操作:

  • foreach: 對每個元素進行操作,并不會返回結果。
  • foreachPartition: 基于分區的foreach操作,操作分區元素的迭代器,并不會返回結果。
  • reduce: 對RDD中所有元素進行規約,最終得到一個規約結果。reduce接收的規約函數要求其返回值類型與RDD中元素類型相同。
  • fold: 與reduce類似,不同的是,它接受一個“初始值”來作為每個分區第一次調用時的結果。fold同樣要求規約函數返回值類型與RDD元素類型相同。
  • aggregate: 與reduce和fold類似,但它把我們從返回值類型必須與所操作的RDD元素類型相同的限制中解放出來。
  • count: 返回RDD元素個數。
  • collect: 收集RDD的元素到driver節點,如果數據有序,那么collect得到的數據也會是有序的。大數據量最好不要使用RDD的collect,因為它會在本機上生成一個新的Array,以存儲來自各個節點的所有數據,此時更好的辦法是將數據存儲在HDFS等分布式持久化層上。
  • take: 返回指定數量的元素到driver節點。它會嘗試只訪問盡量少的分區,因此該操作會得到一個不均衡的集合。需要注意的是,該操作返回元素的順序與你預期的可能不一樣。
  • top: 如果為元素定義了順序,就可以使用top返回前幾個元素。
  • takeSample: 返回采樣數據。

鍵值對RDD

PairRDDFunctions封裝了用于操作鍵值對RDD的一些功能函數。一些文件讀取操作(sc.sequenceFile()等)會直接返回RDD[(K, V)]類型。在RDD上使用map操作也可以將一個RDD轉換為RDD[(K, V)]類型。在用Scala書寫的Spark程序中,RDD[(K, V)]類型到PairRDDFunctions類型的轉換一般由隱式轉換函數完成。

基本類型RDD的操作同樣適用于鍵值對RDD。下面對鍵值對類型RDD特有的操作進行分門別類地介紹。

針對各個元素的轉化操作:

  • mapValues: 對各個鍵值對的值進行映射。該操作會保留RDD的分區信息。
  • flatMapValues: 對各個鍵值對的值進行映射,并將最后結果展平。該操作會保留RDD的分區信息。

聚合操作:

  • reduceByKey: 與reduce相當類似,它們都接收一個函數,并使用該函數對值進行合并。不同的是,reduceByKey是transformation操作,reduceByKey只是對鍵相同的值進行規約,并最終形成RDD[(K, V)],而不像reduce那樣返回單獨一個“值”。
  • foldByKey: 與fold類似,就像reduceByKey之于reduce那樣。熟悉MapReduce中的合并器(combiner)概念的你可能已經注意到,reduceByKey和foldByKey會在為每個鍵計算全局的總結果之前先自動在每臺機器上進行本地合并。用戶不需要指定合并器。更泛化的combineByKey可以讓你自定義合并的行為。
  • combineByKey: 是最常用的基于鍵進行聚合的函數,大多數基于鍵聚合的函數都是用它實現的。與aggregate一樣,combineByKey可以讓用戶返回與輸入數據的類型不同的返回值。combineByKey的內部實現分為三步來完成:首先根據是否需要在map端進行combine操作決定是否對RDD先進行一次mapPartitions操作(利用createCombiner、mergeValue、mergeCombiners三個函數)來達到減少shuffle數據量的作用。第二步根據partitioner對MapPartitionsRDD進行shuffle操作。最后在reduce端對shuffle的結果再進行一次combine操作。

數據分組:

  • groupBy: 根據自定義的東東進行分組。groupBy是基本RDD就有的操作。
  • groupByKey: 根據鍵對數據進行分組。雖然groupByKey+reduce也可以實現reduceByKey一樣的效果,但是請你記住:groupByKey是低效的,而reduceByKey會在本地先進行聚合,然后再通過網絡傳輸求得最終結果。

在執行聚合或分組操作時,可以指定分區數以對并行度進行調優。

連接:

  • cogroup: 可以對多個RDD進行連接、分組、甚至求鍵的交集。其他的連接操作都是基于cogroup實現的。
  • join: 對數據進行內連接,也即當兩個鍵值對RDD中都存在對應鍵時才輸出。當一個輸入對應的某個鍵有多個值時,生成的鍵值對RDD會包含來自兩個輸入RDD的每一組相對應的記錄,也即笛卡爾積。
  • leftOuterJoin: 即左外連接,源RDD的每一個鍵都有對應的記錄,第二個RDD的值可能缺失,因此用Option表示。
  • rightOuterJoin: 即右外連接,與左外連接相反。
  • fullOuterJoin: 即全外連接,它是是左右外連接的并集。

如果一個RDD需要在多次連接操作中使用,對該RDD分區并持久化分區后的RDD是有益的,它可以避免不必要的shuffle。

數據排序:

在基本類型RDD中,sortBy()可以用來排序,max()和min()則可以用來方便地獲取最大值和最小值。另外,在OrderedRDDFunctions中,存在一個sortByKey()可以方便地對鍵值對RDD進行排序,通過spark提供的隱式轉換函數可以將RDD自動地轉換為OrderedRDDFunctions,并隨意地使用它的排序功能。

行動操作:

鍵值對RDD提供了一些額外的行動操作供我們隨意使用。如下:

  • countByKey: 對每個鍵對應的元素分別計數。
  • collectAsMap: 將結果以Map的形式返回,以便查詢。
  • lookup: 返回給定鍵對應的所有值。

數值RDD

DoubleRDDFunctions為包含數值數據的RDD提供了一些描述性的統計操作,RDD可以通過隱式轉換方便地使用這些方便的功能。

這些數值操作是通過流式算法實現的,允許以每次一個元素的方式構建出模型。這些統計數據都會在調用stats()時通過一次遍歷數據計算出來,并以StatCounter對象返回。如果你只想計算這些統計數據中的一個,也可以直接對RDD調用對應的方法。更多信息參見Spark API。

RDD依賴、窄寬依賴

RDD依賴與DAG

一系列轉化操作形成RDD的有向無環圖(DAG),行動操作觸發作業的提交與執行。每個RDD維護了其對直接父RDD(一個或多個)的依賴,其中包含了父RDD的引用和依賴類型信息,通過dependencies()我們可以獲取對應RDD的依賴,其返回一個依賴列表。

通過RDD的父RDD引用就可以從DAG上向前回溯找到其所有的祖先RDD。spark提供了toDebugString方法來查看RDD的譜系。對于如下一段簡單的代碼:

val input = sc.parallelize(1 to 10)
val repartitioned = input.repartition(2)
val sum = repartitioned.sum

我們就可以通過在RDD上調用toDebugString來查看其依賴以及轉化關系,結果如下:

// input.toDebugString
res0: String = (4) ParallelCollectionRDD[0] at parallelize at <console>:21 []

// repartitioned.toDebugString
res1: String =
(2) MapPartitionsRDD[4] at repartition at <console>:23 []
| CoalescedRDD[3] at repartition at <console>:23 []
| ShuffledRDD[2] at repartition at <console>:23 []
+-(4) MapPartitionsRDD[1] at repartition at <console>:23 []
| ParallelCollectionRDD[0] at parallelize at <console>:21 []

上述repartitioned的依賴鏈存在兩個縮進等級。同一縮進等級的轉化操作構成一個Stage(階段),它們不需要混洗(shuffle)數據,并可以流水線執行(pipelining)。

窄依賴和寬依賴

spark中RDD之間的依賴分為窄(Narrow)依賴寬(Wide)依賴兩種。我們先放出一張示意圖:

窄依賴指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區,或多個父RDD的分區對應于一個子RDD的分區。圖中,map/filter和union屬于第一類,對輸入進行協同劃分(co-partitioned)的join屬于第二類。

寬依賴指子RDD的分區依賴于父RDD的多個或所有分區,這是因為shuffle類操作,如圖中的groupByKey和未經協同劃分的join。

窄依賴對優化很有利。邏輯上,每個RDD的算子都是一個fork/join(此join非上文的join算子,而是指同步多個并行任務的barrier(路障)): 把計算fork到每個分區,算完后join,然后fork/join下一個RDD的算子。如果直接翻譯到物理實現,是很不經濟的:一是每一個RDD(即使 是中間結果)都需要物化到內存或存儲中,費時費空間;二是join作為全局的barrier,是很昂貴的,會被最慢的那個節點拖死。如果子RDD的分區到父RDD的分區是窄依賴,就可以實施經典的fusion優化,把兩個fork/join合為一個;如果連續的變換算子序列都是窄依賴,就可以把很多個fork/join并為一個,不但減少了大量的全局barrier,而且無需物化很多中間結果RDD,這將極大地提升性能。Spark把這個叫做流水線(pipeline)優化。關于流水線優化,從MapPartitionsRDD中compute()的實現就可以看出端倪,該compute方法只是對迭代器進行復合,復合就是嵌套,因此數據處理過程就是對每條記錄進行同樣的嵌套處理直接得出所需結果,而沒有中間計算結果,同時也要注意:依賴過長將導致嵌套過深,從而可能導致棧溢出。

轉換算子序列一碰上shuffle類操作,寬依賴就發生了,流水線優化終止。在具體實現 中,DAGScheduler從當前算子往前回溯依賴圖,一碰到寬依賴,就生成一個stage來容納已遍歷的算子序列。在這個stage里,可以安全地實施流水線優化。然后,又從那個寬依賴開始繼續回溯,生成下一個stage。

另外,寬窄依賴的劃分對spark的容錯也具有重要作用,參見本文容錯機制部分。

DAG到任務的劃分

用戶代碼定義RDD的有向無環圖,行動操作把DAG轉譯為執行計劃,進一步生成任務在集群中調度執行。

具體地說,RDD的一系列轉化操作形成RDD的DAG,在RDD上調用行動操作將觸發一個Job(作業)的運行,Job根據DAG中RDD之間的依賴關系(寬依賴/窄依賴,也即是否發生shuffle)的不同將DAG劃分為多個Stage(階段),一個Stage對應DAG中的一個或多個RDD,一個Stage對應多個RDD是因為發生了流水線執行(pipelining),一旦Stage劃分出來,Task(任務)就會被創建出來并發給內部的調度器,進而分發到各個executor執行,一個Stage會啟動很多Task,每個Task都是在不同的數據分區上做同樣的事情(即執行同樣的代碼段),Stage是按照依賴順序處理的,而Task則是獨立地啟動來計算出RDD的一部分,一旦Job的最后一個Stage結束,一個行動操作也就執行完畢了。

Stage分為兩種:ShuffleMapStageResultStageShuffleMapStage是非最終stage,后面還有其他的stage,所以它的輸出一定是需要shuffle并作為后續stage的輸入。ShuffleMapStage的最后Task就是ShuffleMapTaskResultStage是一個Job的最后一個Stage,直接生成結果或存儲。ResultStage的最后Task就是ResultTask。一個Job含有一個或多個Stage,最后一個為ResultTask,其他都為ShuffleMapStage。

RDD不能嵌套

RDD嵌套是不被支持的,也即不能在一個RDD操作的內部再使用RDD。如果在一個RDD的操作中,需要訪問另一個RDD的內容,你可以嘗試join操作,或者將數據量較小的那個RDD廣播(broadcast)出去。

你同時也應該注意到:join操作可能是低效的,將其中一個較小的RDD廣播出去然后再join可以避免不必要的shuffle,俗稱“小表廣播”。

使用其他分區數據

由于RDD不能嵌套,這使得“在計算一個分區時,訪問另一個分區的數據”成為一件困難的事情。那么有什么好的解決辦法嗎?請繼續看。

spark依賴于RDD這種抽象模型進行粗粒度的并行計算,一般情況下每個節點的每次計算都是針對單一記錄,當然也可以使用 RDD.mapPartition 來對分區進行處理,但都限制在一個分區內(當然更是一個節點內)。

spark的worker節點相互之間不能直接進行通信,如果在一個節點的計算中需要使用到另一個分區的數據,那么還是有一定的困難的。

你可以將整個RDD的數據全部廣播(如果數據集很大,這可不是好辦法),或者廣播一些其他輔助信息;也可以從所有節點均可以訪問到的文件(hdfs文件)或者數據庫(關系型數據庫或者hbase)中讀取;更進一步或許你應該修改你的并行方案,使之滿足“可針對拆分得到的小數據塊進行并行的獨立的計算,然后歸并得到大數據塊的計算結果”的MapReduce準則,在“劃分大的數據,并行獨立計算,歸并得到結果”的過程中可能存在數據冗余之類的,但它可以解決一次性沒法計算的大數據,并最終提高計算效率,hadoop和spark都依賴于MapReduce準則。

對RDD進行分區

何時進行分區?

spark程序可以通過控制RDD分區方式來減少通信開銷。分區并不是對所有應用都是有好處的,如果給定RDD只需要被掃描一次,我們完全沒有必要對其預先進行分區處理。只有當數據集多次在諸如連接這種基于鍵的操作中使用時,分區才會有幫助,同時記得將分區得到的新RDD持久化哦。

更多的分區意味著更多的并行任務(Task)數。對于shuffle過程,如果分區中數據量過大可能會引起OOM,這時可以將RDD劃分為更多的分區,這同時也將導致更多的并行任務。spark通過線程池的方式復用executor JVM進程,每個Task作為一個線程存在于線程池中,這樣就減少了線程的啟動開銷,可以高效地支持單個executor內的多任務執行,這樣你就可以放心地將任務數量設置成比該應用分配到的CPU cores還要多的數量了。

如何分區與分區信息

在創建RDD的時候,可以指定分區的個數,如果沒有指定,則分區個數是系統默認值,即該程序所分配到的CPU核心數。在Java/Scala中,你可以使用rdd.getNumPartitions(1.6.0+)或rdd.partitions.size()來獲取分區個數。

對基本類型RDD進行重新分區,可以通過repartition()函數,只需要指定重分區的分區數即可。repartition操作會引起shuffle,因此spark提供了一個優化版的repartition,叫做coalesce(),它允許你指定是否需要shuffle。在使用coalesce時,需要注意以下幾個問題:

  • coalesce默認shuffle為false,這將形成窄依賴,例如我們將1000個分區重新分到100個中時,并不會引起shuffle,而是原來的10個分區合并形成1個分區。
  • 但是對于從很多個(比如1000個)分區重新分到很少的(比如1個)分區這種極端情況,數據將會分布到很少節點(對于從1000到1的重新分區,則是1個節點)上運行,完全無法開掘集群的并行能力,為了規避這個問題,可以設置shuffle為true。由于shuffle可以分隔stage,這就保證了上一階段stage中的任務仍是很多個分區在并行計算,不這樣設置的話,則兩個上下游的任務將合并成一個stage進行計算,這個stage便會在很少的分區中進行計算。
  • 如果當前每個分區的數據量過大,需要將分區數量增加,以利于充分利用并行,這時我們可以設置shuffle為true。對于數據分布不均而需要重分區的情況也是如此。spark默認使用hash分區器將數據重新分區。

對RDD進行預置的hash分區,需將RDD轉換為RDD[(key,value)]類型,然后就可以通過隱式轉換為PairRDDFunctions,進而可以通過如下形式將RDD哈希分區,HashPartitioner會根據RDD中每個(key,value)中的key得出該記錄對應的新的分區號:

PairRDDFunctions.partitionBy(new HashPartitioner(n))

另外,spark還提供了一個范圍分區器,叫做RangePartitioner。范圍分區器爭取將所有的分區盡可能分配得到相同多的數據,并且所有分區內數據的上界是有序的。

一個RDD可能存在分區器也可能沒有,我們可以通過RDD的partitioner屬性來獲取其分區器,它返回一個Option對象。

如何進行自定義分區

spark允許你通過提供一個自定義的Partitioner對象來控制RDD的分區方式,這可以讓你利用領域知識進一步減少通信開銷。

要實現自定義的分區器,你需要繼承Partitioner類,并實現下面三個方法即可:

  • numPartitions: 返回創建出來的分區數。
  • getPartition: 返回給定鍵的分區編號(0到numPartitions-1)。
  • equals: Java判斷相等性的標準方法。這個方法的實現非常重要,spark需要用這個方法來檢查你的分區器對象是否和其他分區器實例相同,這樣spark才可以判斷兩個RDD的分區方式是否相同。

影響分區方式的操作

spark內部知道各操作會如何影響分區方式,并將會對數據進行分區的操作的結果RDD自動設置為對應的分區器。

不過轉化操作的結果并不一定會按照已知的分區方式分區,這時輸出的RDD可能就會丟失分區信息。例如,由于map()或flatMap()函數理論上可以改變元素的鍵,因此當你對一個哈希分區的鍵值對RDD調用map/flatMap時,結果RDD就不會再有分區方式信息。不過,spark提供了另外兩個操作mapValues()和flatMapValues()作為替代方法,它們可以保證每個二元組的鍵保持不變。

這里列出了所有會為生成的結果RDD設好分區方式的操作:cogroup()、?join()、?leftOuterJoin()、?rightOuterJoin()、?fullOuterJoin()、groupWith()、?groupByKey()、?reduceByKey()、combineByKey()、?partitionBy()、?sortBy()、?sortByKey()、?mapValues()(如果父RDD有分區方式的話)、?flatMapValues()(如果父RDD有分區方式的話)、?filter()(如果父RDD有分區方式的話) 等。其他所有操作生成的結果都不會存在特定的分區方式。

最后,對于二元操作,輸出數據的分區方式取決于父RDD的分區方式。默認情況下,結果會采用哈希分區,分區的數量和操作的并行度一樣。不過,如果其中一個父RDD已經設置過分區方式,那么結果就會采用那種分區方式;如果兩個父RDD都設置過分區方式,結果RDD會采用第一個父RDD的分區方式。

從分區中獲益的操作

spark的許多操作都引入了將數據根據鍵跨節點進行shuffle的過程。所有這些操作都會從數據分區中獲益。這些操作主要有:cogroup()、?join()、?leftOuterJoin()、?rightOuterJoin()、fullOuterJoin()、?groupWith()、?groupByKey()、?reduceByKey()、?combineByKey()、?lookup()等。

RDD分區優先位置

RDD分區優先位置與spark的調度有關,在spark進行任務調度的時候,會盡可能將任務分配到數據塊所存儲的節點。我們可以通過RDD的preferredLocations()來獲取指定分區的優先位置,返回值是該分區的優先位置列表。

數據加載與保存

從程序中的集合生成

sc.parallelize()可用于從程序中的集合產生RDD。sc.makeRDD()也是在程序中生成RDD,不過其還允許指定每一個RDD分區的優先位置。

以上這些方式一般用于原型開發和測試,因為它們需要把你的整個數據集先放在一臺機器(driver節點)的內存中,從而限制了只能用較小的數據量。

從文本文件加載數據

sc.textFile()默認從hdfs中讀取文件,在路徑前面加上hdfs://可顯式表示從hdfs中讀取文件,在路徑前面加上file://表示從本地文件系統讀。給sc.textFile()傳遞的文件路徑可以是如下幾種情形:

  • 一個文件路徑,這時候只裝載指定的文件。
  • 一個目錄路徑,這時候只裝載指定目錄下面的所有文件(不包括子目錄下面的文件)。
  • 通過通配符的形式加載多個文件或者加載多個目錄下面的所有文件。

如果想一次性讀取一個目錄下面的多個文件并想知道數據來自哪個文件,可以使用sc.wholeTextFiles。它會返回一個鍵值對RDD,其中鍵是輸入文件的文件名。由于該函數會將一個文件作為RDD的一個元素進行讀取,因此所讀取的文件不能太大,以便其可以在一個機器上裝得下。

同其他transform算子一樣,文本讀取操作也是惰性的并由action算子觸發,如果發生重新計算,那么讀取數據的操作也可能會被再次執行。另外,在spark中超出內存大小的文件同樣是可以被處理的,因為spark并不是將數據一次性全部裝入內存,而是邊裝入邊計算。

從數據庫加載數據

spark中可以使用JdbcRDD從數據庫中加載數據。spark會將數據從數據庫中拷貝到集群各個節點,因此使用JdbcRDD會有初始的拷貝數據的開銷。也可以考慮使用sqoop將數據從數據庫中遷移到hdfs中,然后從hdfs中讀取數據。

將結果寫入文本文件

rdd.saveAsTextFile()用于將RDD寫入文本文件。spark會將傳入該函數的路徑參數作為目錄對待,默認情況下會在對應目錄輸出多個文件,這取決于并行度。如果要將結果寫入hdfs的一個文件中,可以這樣:

rdd.coalesce(1).saveAsTextFile("filename")

而不要使用repartition,因為repartition會引起shuffle,而coalesce在默認情況下會避免shuffle。

關于文件系統

spark支持讀寫很多文件系統,包括本地文件系統、HDFS、Amazon S3等等很多。

spark在本地文件系統中讀取文件時,它要求文件在集群中所有節點的相同路徑下都可以找到。我們可以通過sc.addFile()來將文件弄到所有節點同路徑下面,并在各計算節點中通過SparkFiles.get()來獲取對應文件在該節點上的絕對路徑。

sc.addFile()的輸入文件路徑不僅可以是本地文件系統的,還可以是HDFS等spark所支持的所有文件系統,甚至還可以是來自網絡的,如HTTP、HTTPS、FTP。

關于并行

慎用可變數據

當可變數據用于并發/并行/分布式程序時,都有可能出現問題,因此對于會并發執行的代碼段不要使用可變數據。

尤其要注意不要在scala的object中使用var變量!其實scala的object單例對象只是對java中靜態的一種封裝而已,在class文件層面,object單例對象就是用java中靜態(static)來實現的,而java靜態成員變量不會被序列化!在編寫并行計算程序時,不要在scala的object中使用var變量,如果確實需要使用var變量,請寫在class中。

另外,在分布式執行的spark代碼段中使用可變的閉包變量也可能會出現不同步問題,因此請謹慎使用。

閉包 vs 廣播變量

有兩種方式將你的數據從driver節點發送到worker節點:通過閉包和通過廣播變量。閉包是隨著task的組裝和分發自動進行的,而廣播變量則是需要程序猿手動操作的,具體地可以通過如下方式操作廣播變量(假設sc為SparkContext類型的對象,bc為Broadcast類型的對象):

  • 可通過sc.broadcast(xxx)創建廣播變量。
  • 可在各計算節點中(閉包代碼中)通過bc.value來引用廣播的數據。
  • bc.unpersist()可將各executor中緩存的廣播變量刪除,后續再使用時數據將被重新發送。
  • bc.destroy()可將廣播變量的數據和元數據一同銷毀,銷毀之后就不能再使用了。

任務閉包包含了任務所需要的代碼和數據,如果一個executor數量小于RDD partition的數量,那么每個executor就會得到多個同樣的任務閉包,這通常是低效的。而廣播變量則只會將數據發送到每個executor一次,并且可以在多個計算操作中共享該廣播變量,而且廣播變量使用了類似于p2p形式的非常高效的廣播算法,大大提高了效率。另外,廣播變量由spark存儲管理模塊進行管理,并以MEMORY_AND_DISK級別進行持久化存儲。

什么時候用閉包自動分發數據?情況有幾種:

  • 數據比較小的時候。
  • 數據已在driver程序中可用。典型用例是常量或者配置參數。

什么時候用廣播變量分發數據?情況有幾種:

  • 數據比較大的時候(實際上,spark支持非常大的廣播變量,甚至廣播變量中的元素數超過java/scala中Array的最大長度限制(2G,約21.5億)都是可以的)。
  • 數據是某種分布式計算結果。典型用例是訓練模型等中間計算結果。

當數據或者變量很小的時候,我們可以在Spark程序中直接使用它們,而無需使用廣播變量。

對于大的廣播變量,序列化優化可以大大提高網絡傳輸效率,參見本文序列化優化部分。

巧用累加器

累加器提供了將工作節點中的值聚合到驅動器程序中的簡單語法。累加器的一個常見用途是在調試時對作業執行過程中的事件進行計數。可以通過sc.accumulator(xxx)來創建一個累加器,并在各計算節點中(閉包代碼中)直接寫該累加器。

累加器只能在驅動程序中被讀取,對于計算節點(閉包代碼)是只寫的,這大大精簡了累加器的設計。

使用累加器時,我們要注意的是:對于在RDD轉化操作中使用的累加器,如果發生了重新計算(這可能在很多種情況下發生),那么累加器就會被重復更新,這會導致問題。而在行動操作(如foreach)中使用累加器卻不會出現這種情況。因此,在轉化操作中,累加器通常只用于調試目的。盡管將來版本的spark可能會改善這一問題,但在spark 1.2.0中確實存在這個問題。

關于shuffle

在經典的MapReduce中,shuffle(混洗)是連接map階段和reduce階段的橋梁(注意這里的術語跟spark的map和reduce操作沒有直接關系),它是將各個map的輸出結果重新組合作為下階段各個reduce的輸入這樣的一個過程,由于這一過程涉及各個節點相互之間的數據傳輸,故此而名“混洗”。下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle階段是介于map階段和reduce階段之間。

Spark的shuffle過程類似于經典的MapReduce,但是有所改進。spark中的shuffle在實現上主要分為shuffle writeshuffle fetch這兩個大的階段。如下圖所示,shuffle過程大致可以描述為:

  • 首先每一個Mapper會根據Reducer的數量創建出相應的bucket,bucket的數量是M×R,其中M是Map的個數,R是Reduce的個數。
  • 其次Mapper產生的結果會根據設置的partition算法填充到每個bucket中去。這里的partition算法是可以自定義的,當然默認的算法是根據key哈希到不同的bucket中去。
  • 當Reducer啟動時,它會根據自己task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket作為Reducer的輸入進行處理。

spark的shuffle實現隨著spark版本的迭代正在逐步完善和成熟,這中間曾出現過多種優化實現,關于spark shuffle的演進過程和具體實現參見后面的參考鏈接。

shuffle(具體地是shuffle write階段)會引起數據緩存到本地磁盤文件,從spark 1.3開始,這些緩存的shuffle文件只有在相應RDD不再被使用時才會被清除,這樣在lineage重算的時候shuffle文件就不需要重新創建了,從而加快了重算效率(請注意這里的緩存并保留shuffle數據這一行為與RDD持久化和檢查點機制是不同的,緩存并保留shuffle數據只是省去了重算時重建shuffle文件的開銷,因此我們才有理由在shuffle(寬依賴)之后對形成的RDD進行持久化)。在standalone模式下,我們可以在spark-env.sh中通過環境變量SPARK_LOCAL_DIRS來設置shuffle數據的本地磁盤緩存目錄。為了優化效率,本地shuffle緩存目錄的設置都應該使用由單個逗號隔開的目錄列表,并且這些目錄分布在不同的磁盤上,寫操作會被均衡地分配到所有提供的目錄中,磁盤越多,可以提供的總吞吐量就越高。另外,SPARK_LOCAL_DIRS也是RDD持久化到磁盤的目錄。

參考鏈接及進一步閱讀:

  • 詳細探究Spark的shuffle實現
  • Spark Shuffle operations

序列化優化

在spark中,序列化通常出現在跨節點的數據傳輸(如廣播變量、shuffle等)和數據持久化過程中。序列化和反序列化的速度、序列化之后數據大小等都影響著集群的計算效率。

spark默認使用Java序列化庫,它對于除基本類型的數組以外的任何對象都比較低效。為了優化序列化效率,你可以在spark配置文件中通過spark.serializer屬性來設置你想使用的序列化庫,一般情況下,你可以使用這個序列化庫:org.apache.spark.serializer.KryoSerializer。

為了獲得最佳性能,你還應該向Kryo注冊你想要序列化的類,注冊類可以讓Kryo避免把每個對象的完整類名寫下來,成千上萬條記錄累計節省的空間相當可觀。如果你想強制要求這種注冊,可以把spark.kryo.registrationRequired設置為true,這樣Kryo會在遇到未注冊的類時拋出錯誤。使用Kryo序列化庫并注冊所需類的示例如下:

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

Spark調度

應用調度

應用是指用戶提交的spark應用程序。spark應用程序之間的調度關系,不一定由spark所管理。

在YARN和Mesos模式下,底層資源的調度策略由YARN和Mesos集群資源管理器所決定。

只有在standalone模式下,spark master按照當前集群資源是否滿足等待列表中的spark應用對資源的需求,而決定是否創建一個SparkContext對應的driver,進而完成spark應用的啟動過程,這個過程可以粗略地認為是一種粗顆粒度的有條件的FIFO(先進先出)調度策略。

作業調度

作業是指spark應用程序內部的由action算子觸發并提交的Job。在給定的spark應用中,不同線程的多個job可以并發執行,并且這個調度是線程安全的,這使得一個spark應用可以處理多個請求。

默認地,spark作業調度是FIFO的,在多線程的情況下,某些線程提交的job可能被大大推遲執行。

不過我們可以通過配置FAIR(公平)調度器來使spark在作業之間輪詢調度,這樣所有的作業都能得到一個大致公平的共享的集群資源。這就意味著即使有一個很長的作業在運行,較短的作業在提交之后也能夠得到不錯的響應。要啟用一個FAIR作業調度,需在創建SparkContext之前配置一下spark.scheduler.mode為FAIR:

// 假設conf是你的SparkConf變量
conf.set("spark.scheduler.mode", "FAIR")

公平調度還支持在池中將工作分組(這樣就形成兩級調度池),而不同的池可以設置不同的調度選項(如權重)。這種方式允許更重要的job配置在高優先級池中優先調度。如果沒有設置,新提交的job將進入默認池中,我們可以通過在對應線程中給SparkContext設置本地屬性spark.scheduler.pool來設置該線程對應的pool:

// 假設sc是你的SparkContext變量
sc.setLocalProperty("spark.scheduler.pool", "pool1")

在設置了本地屬性之后,所有在這個線程中提交的job都將會使用這個調度池的名字。如果你想清除該線程相關的pool,只需調用如下代碼:

sc.setLocalProperty("spark.scheduler.pool", null)

在默認情況下,每個調度池擁有相同的優先級來共享整個應用所分得的集群資源。同樣的,默認池中的每個job也擁有同樣的調度優先級,但是在用戶創建的每個池中,job是通過FIFO方式進行調度的

關于公平調度池的詳細配置,請參見官方文檔:Spark Job Scheduling。

如果你想閱讀相關實現代碼,可以觀看Schedulable.scala、SchedulingAlgorithm.scala以及SchedulableBuilder.scala等相關文件。

參考鏈接及進一步閱讀:

  • Spark Job Scheduling
  • Spark 作業調度–job執行方式介紹
  • spark internal - 作業調度

容錯機制與檢查點

spark容錯機制是粗粒度并且是輕量級的,主要依賴于RDD的依賴鏈(lineage)。spark能夠通過lineage獲取足夠的信息來重新計算和恢復丟失的數據分區。這樣的基于lineage的容錯機制可以理解為粗粒度的重做日志(redo log)。

鑒于spark的基于lineage的容錯機制,RDD DAG中寬窄依賴的劃分對容錯也有很重要的作用。如果一個節點宕機了,而且運算是窄依賴,那只要把丟失的父RDD分區重算即可,跟其他節點沒有依賴。而寬依賴需要父RDD的所有分區都存在,重算代價就很高了。可以這樣理解為什么窄依賴開銷小而寬依賴開銷大:在窄依賴中,在子RDD分區丟失、重算父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,并不存在冗余計算;而在寬依賴中,丟失一個子RDD分區將導致其每個父RDD的多個甚至所有分區的重算,而重算的結果并不都是給當前丟失的子RDD分區用的,這樣就存在了冗余計算。

不過我們可以通過檢查點(checkpoint)機制解決上述問題,通過在RDD上做檢查點可以將物理RDD數據存儲到持久層(HDFS、S3等)中。在RDD上做檢查點的方法是在調用action算子之前調用checkpoint(),并且RDD最好是緩存在內存中的,否則可能導致重算(參見API注釋)。示例如下:

// 假設rdd是你的RDD變量
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd.checkpoint()
val count = rdd.count()

在RDD上做檢查點會切斷RDD依賴,具體地spark會清空該RDD的父RDD依賴列表。并且由于檢查點機制是將RDD存儲在外部存儲系統上,所以它可以被其他應用重用。

過長的lineage(如在pagerank、spark streaming等中)也將導致過大的重算代價,而且還會占用很多系統資源。因此,在遇到寬依賴或者lineage足夠長時,我們都應該考慮做檢查點

集群監控與運行日志

spark在應用執行時記錄詳細的進度信息和性能指標。這些內容可以在兩個地方找到:spark的網頁用戶界面以及driver進程和executor進程生成的日志文件中。

網頁用戶界面

在瀏覽器中打開?http://master:8080?頁面,你可以看到集群概況,包括:集群節點、可用的和已用的資源、已運行的和正在運行的應用等。

http://master:4040?頁面用來監控正在運行的應用(默認端口為4040,如果有多個應用在運行,那么端口順延,如4041、4042),包括其執行進度、構成Job的Stage的執行情況、Stage詳情、已緩存RDD的信息、各executor的信息、spark配置項以及應用依賴信息等,該頁面經常用來發現應用的效率瓶頸并輔助優化,不過該頁面只有在有spark應用運行時才可以被訪問到。

上述404x端口可用于查看正在運行的應用的執行詳情,但是應用運行結束之后該頁面就不可以訪問了。要想查看已經執行結束的應用的執行詳情,則需開啟事件日志機制,具體地設置如下兩個選項:

  • spark.eventLog.enabled: 設置為true時開啟事件日志機制。這樣已完成的spark作業就可以通過歷史服務器查看。

  • spark.eventLog.dir: 開啟事件日志機制時的事件日志文件存儲位置。如果要在歷史服務器中查看事件日志,需要將該值設置為一個全局可見的文件系統路徑,比如HDFS中。最后,請確保目錄以 ‘/‘ 結束,否則可能會出現如下錯誤:

    Application history not found ... No event logs found for application ...
    Did you specify the correct logging directory?

在配置好上述選項之后,我們就可以查看新提交的應用的詳細執行信息了。在不同的部署模式中,查看的方式不同。在standalone模式中,可以直接在master節點的UI界面(上述8080端口對應的頁面)中直接單擊已完成應用以查看詳細執行信息。在YARN/Mesos模式中,就要開啟歷史服務器了,此處略去。

Metrics系統

spark在其內部擁有一個可配置的度量系統(Metrics),它能夠將spark的內部狀態通過HTTP、JMX、CSV等多種不同形式呈現給用戶。同時,用戶也可以定義自己的數據源(Metrics Source)和數據輸出方式(Metrics Sink),從而獲取自己所需的數據。此處略去詳情,可參考下面的鏈接進一步閱讀。

參考鏈接及進一步閱讀:

  • Spark Monitoring and Instrumentation: Metrics

查看日志文件

spark日志文件的具體位置取決于具體的部署模式。在standalone模式中,日志默認存儲于各個工作節點的spark目錄下的work目錄中,此時所有日志還可以直接通過主節點的網頁用戶界面進行查看。

默認情況下,spark輸出的日志包含的信息量比較合適。我們可以自定義日志行為,改變日志等級或存儲位置。spark日志系統使用log4j實現,我們只需將conf目錄下的log4j.properties.template復制一個并命名為log4j.properties,然后自定義修改即可。

SparkConf與配置

spark中最主要的配置機制是通過SparkConf類對spark進行配置。當創建出一個SparkContext時,就需要創建出一個SparkConf的實例作為參數。

SparkConf實例包含用戶要重載的配置選項的鍵值對,spark中的每個配置選項都是基于字符串形式的鍵值對。你可以調用SparkConf的set()或者setXxx()來設置對應選項。

另外,spark-submit腳本可以動態設置配置項。當應用被spark-submit腳本啟動時,腳本會把這些配置項設置到運行環境中。當一個新的SparkConf被創建出來時,這些環境變量會被檢測出來并且自動配到SparkConf中。這樣在使用spark-submit時,用戶應用通常只需創建一個“空”的SparkConf,并直接傳遞給SparkContext的構造方法即可。

spark-submit為常用的spark配置選項提供了專用的標記,還有一個通用標記--conf來接收任意spark配置項的值,形如--conf 屬性名=屬性值。

spark-submit也支持從文件中讀取配置項的值。默認情況下,spark-submit會在spark安裝目錄中找到conf/spark-defaults.conf文件,讀取該文件中以空格隔開的鍵值對數據。你也可以通過spark-submit的--properties-File選項來自定義該文件的路徑。

spark-defaults.conf的作用范圍要搞清楚,編輯driver所在機器上的spark-defaults.conf,該文件會影響到driver所提交運行的application,及專門為該application提供計算資源的executor的啟動參數。

spark有特定的優先級順序來選擇實際配置。優先級最高的是在用戶代碼中顯式調用set()方法設置的選項。其次是通過spark-submit傳遞的參數。再次是寫在配置文件中的值。最后是系統默認值。如果你想知道應用中實際生效的配置,可以在應用的網頁用戶界面中查看。

下面列出一些常用的配置項,完整的配置項列表可以參見官方配置文檔。

選項默認值描述
spark.master(none)表示要連接的集群管理器。
spark.app.name(none)應用名,將出現在UI和日志中。
spark.driver.memory1g為driver進程分配的內存。注意:在客戶端模式中,不能在SparkConf中直接配置該項,因為driver JVM進程已經啟動了。
spark.executor.memory1g為每個executor進程分配的內存。
spark.executor.coresall/1每個executor可用的核心數。針對standalone和YARN模式。更多參見官方文檔。
spark.cores.max(not set)設置standalone和Mesos模式下應用程序的核心數上限。
spark.speculationfalse設置為true時開啟任務預測執行機制。當出現比較慢的任務時,這種機制會在另外的節點上也嘗試執行該任務的一個副本。打開此選項會幫助減少大規模集群中個別較慢的任務帶來的影響。
spark.driver.extraJavaOptions(none)設置driver節點的JVM啟動參數。
spark.executor.extraJavaOptions(none)設置executor節點的JVM啟動參數。
spark.serializerJavaSerializer指定用來進行序列化的類庫,包括通過網絡傳輸數據或緩存數據時的序列化。為了速度,推薦使用KryoSerializer。
spark.eventLog.enabledfalse設置為true時開啟事件日志機制。這樣已完成的spark作業就可以通過歷史服務器查看。
spark.eventLog.dirfile:///tmp/spark-events開啟事件日志機制時的事件日志文件存儲位置。如果要在歷史服務器中查看事件日志,需要將該值設置為一個全局可見的文件系統路徑,比如HDFS中。最后,請確保目錄以 ‘/‘ 結束,否則可能會出現錯誤,參見本文集群監控部分。

一些問題的解決辦法

/tmp目錄寫滿

由于Spark在計算的時候會將中間結果存儲到/tmp目錄,而目前linux又都支持tmpfs,其實說白了就是將/tmp目錄掛載到內存當中。那么這里就存在一個問題,中間結果過多導致/tmp目錄寫滿而出現如下錯誤:

No Space Left on the device

解決辦法就是針對tmp目錄不啟用tmpfs,修改/etc/fstab。

無法創建進程

有時可能會遇到如下錯誤,即無法創建進程:

java.lang.OutOfMemory, unable to create new native thread

導致這種錯誤的原因比較多。有一種情況并非真的是內存不足引起的,而是由于超出了允許的最大文件句柄數或最大進程數。

排查的步驟就是查看一下允許打開的文件句柄數和最大進程數,如果數值過低,使用ulimit將其調高之后,再試試問題是否已經解決。

不可序列化

Task not serializable: java.io.NotSerializableException

作為RDD操作算子參數的匿名函數使用外部變量從而形成閉包。為了效率,spark并不是將所有東東都序列化以分發到各個executor。spark會先對該匿名函數進行ClosureCleaner.clean()處理(將該匿名函數涉及到的$outer中的與閉包無關的變量移除),然后將該匿名函數對象及閉包涉及到的對象序列化并包裝成task分發到各個executor。

看到這里,你或許就發現了一個問題,那就是不管怎樣,spark需要序列化的對象必須都可以被序列化!Task not serializable: java.io.NotSerializableException錯誤就是由于相應的對象不能被序列化造成的!

為了解決這個問題,首先你可以使用?-Dsun.io.serialization.extendedDebugInfo=true?java選項來讓jvm打印出更多的關于序列化的信息,以便了解哪些對象不可以被序列化。然后就是使這些對象對應的類可序列化,或者將這些對象定義在RDD操作算子的參數(匿名函數)中以取消閉包。

缺少winutils.exe

在windows上進行spark程序測試時,你可能會碰到如下幾個問題:

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)

原因就是缺少 hadoop 的?winutils.exe?這個文件。解決方法是:下載一個(注意是32位還是64位),新建一個文件夾 D:\hadoop\bin\ 并將 winutils.exe 放入其中,并保證winutils.exe雙擊運行沒有報*.dll缺失的錯誤,然后在程序中設置一下hadoop目錄即可,如下:

System.setProperty("hadoop.home.dir", "D:\hadoop\")

轉載于:https://www.cnblogs.com/xjh713/p/7301246.html

總結

以上是生活随笔為你收集整理的spark总结——转载的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

日韩一区正在播放 | 人人爽人人澡人人添人人人人 | 四虎成人精品 | 精品999| 国产69久久久欧美一级 | 国产三级av在线 | 综合久久精品 | mm1313亚洲精品国产 | 偷拍福利视频一区二区三区 | 国内精品视频在线播放 | 久久这里只有精品视频首页 | 91麻豆国产福利在线观看 | 国产精品久久99综合免费观看尤物 | 国产91精品一区二区 | 久久精品欧美一区 | 一区二区在线电影 | 亚洲免费成人av电影 | 国产区在线 | 草草草影院 | 激情av综合| 狠狠色噜噜狠狠狠狠2021天天 | 国产精品一区二区在线 | 久久夜色精品亚洲噜噜国4 午夜视频在线观看欧美 | 国产永久网站 | 色黄视频免费观看 | 中文字幕在线看视频国产中文版 | 中文乱码视频在线观看 | 最近更新好看的中文字幕 | 久久久久欧美精品999 | 免费国产黄线在线观看视频 | 日本三级在线观看中文字 | 激情电影影院 | 超碰在线官网 | 日韩精品在线看 | 欧美日韩国产二区 | 六月丁香在线视频 | 国产伦精品一区二区三区照片91 | 国产不卡精品视频 | 久久久久免费精品国产 | 99精品区 | 亚洲九九九在线观看 | 最近能播放的中文字幕 | 欧美另类xxx | 国产精品第一 | 黄色在线免费观看网址 | 欧美大香线蕉线伊人久久 | 999久久久精品视频 日韩高清www | 97超碰在线久草超碰在线观看 | 五月综合在线观看 | 久久国产精品成人免费浪潮 | 免费看v片 | 日韩网站在线免费观看 | 色婷五月 | 亚洲国产精品999 | 在线日本看片免费人成视久网 | 久久国产精品99精国产 | 麻豆视频在线观看免费 | 国产精品网红直播 | 玖草影院 | 亚洲精品乱码久久久久久蜜桃动漫 | 免费的国产精品 | 婷婷伊人五月 | 美女亚洲精品 | 一区二区三区国产欧美 | 婷婷丁香狠狠爱 | 婷婷福利影院 | 99久e精品热线免费 99国产精品久久久久久久久久 | 国产三级在线播放 | 欧美国产精品一区二区 | 98久久| 久久久伦理 | 婷婷六月天综合 | 欧美乱大交 | 欧美精品久久久久久久久久白贞 | 亚洲免费精品一区二区 | 国产剧情一区二区在线观看 | 精品日韩av| 91九色蝌蚪视频在线 | 在线观看日本高清mv视频 | 欧美韩日精品 | 欧美国产日韩一区二区三区 | 亚洲爱av | 在线免费观看黄网站 | 六月色播| 色婷婷成人网 | 超级碰碰碰免费视频 | 国产亚洲视频在线观看 | 亚洲视屏一区 | 日韩女同一区二区三区在线观看 | 日韩mv欧美mv国产精品 | 91女神的呻吟细腰翘臀美女 | 午夜10000| 国产又粗又猛又爽 | 人人爽人人爽人人爽人人爽 | 欧美韩国日本在线观看 | 色吊丝在线永久观看最新版本 | 国产手机在线 | 国产五月色婷婷六月丁香视频 | 色婷婷国产精品 | 久久网站最新地址 | 看片的网址 | 91网页版免费观看 | 欧美91精品久久久久国产性生爱 | 亚洲视屏在线播放 | 欧美a级在线免费观看 | 国产高清视频色在线www | 成人三级网址 | 国产福利精品一区二区 | 久久精品一区二区国产 | 99色在线播放 | 香蕉视频日本 | 免费三及片 | 国产成人三级在线 | 最新日本中文字幕 | 黄网站色视频免费观看 | 97视频在线 | 国产精品一区二区在线观看免费 | 亚洲经典视频在线观看 | 成人宗合网 | 成 人 黄 色视频免费播放 | av成年人电影 | 亚洲综合欧美日韩狠狠色 | 深爱五月网 | www.日日日.com | 久久精品2 | 久久中文网 | 久久成年人网站 | 久久狠狠干 | 在线视频国产区 | 久久精品8 | 国产成年免费视频 | 国产精品va在线 | 久久不色| 色999精品| aaa毛片视频| 欧美视频xxx | 在线中文字幕视频 | 国产91在线观看 | 黄色福利网站 | 六月丁香激情综合色啪小说 | 在线观看中文字幕一区二区 | 久久a热6 | 亚洲精品视频在线观看视频 | 久久国产91 | 国产精品毛片久久 | 成人亚洲免费 | 欧美亚洲一区二区在线 | 91人人人 | 蜜桃传媒一区二区 | 不卡av电影在线 | 永久免费的av电影 | 中文字幕电影一区 | 欧美大片在线看免费观看 | 久久国产手机看片 | 免费观看日韩av | 国产中文字幕91 | 黄色亚洲片 | 精品美女久久久久久免费 | a在线一区 | 狠狠色丁香婷婷综合久小说久 | 免费中文字幕在线观看 | 在线黄网站 | 一本—道久久a久久精品蜜桃 | 肉色欧美久久久久久久免费看 | 91精品亚洲影视在线观看 | 综合网伊人 | 久久久久久久久久网 | 欧美日韩一区二区三区视频 | 在线电影播放 | 天天操天天添天天吹 | 这里只有精品视频在线 | 四虎8848免费高清在线观看 | 中文字幕在线观看免费高清完整版 | 欧美a影视 | 五月天伊人网 | 色综合久久精品 | 色综合久久88色综合天天人守婷 | 五月天综合网站 | 国产精品久久久久一区二区三区 | 一区三区在线欧 | 在线天堂视频 | 一区 二区 精品 | 黄av资源 | 国产精品精 | a在线观看视频 | 午夜黄色一级片 | 国产亚洲精品v | 在线亚洲观看 | 婷婷五月色综合 | 日韩精品欧美专区 | www在线观看视频 | 69精品视频 | 久久夜夜爽 | 国产在线看一区 | 日韩视频在线不卡 | 一级免费观看 | 国产黄色片免费看 | 色婷婷伊人 | 色综合久久综合中文综合网 | 亚洲天堂自拍视频 | 午夜精品久久一牛影视 | 久久久久久久久久亚洲精品 | 久草免费福利在线观看 | 日韩电影中文,亚洲精品乱码 | 综合黄色网 | 国产一级在线播放 | 国产精品高潮在线观看 | 午夜精品久久久久久久久久久久久久 | 国产在线高清精品 | 免费视频久久久久久久 | 天天干天天操人体 | 婷婷综合视频 | 午夜精品电影 | av高清一区| 国产成人777777| 日本爱爱免费视频 | 精品一区 在线 | 久插视频 | 91精品色 | 亚洲国产资源 | 日韩动态视频 | 99精品黄色 | 夜夜夜 | 狠狠操狠狠干天天操 | 五月婷av| 久草在线视频资源 | 国产精品毛片网 | 国产一区二区成人 | 日韩丝袜在线观看 | 91麻豆精品国产自产在线游戏 | 国产一二三四在线视频 | 91完整视频 | 免费的国产精品 | 日本中文在线观看 | 久久专区| 中文字幕亚洲在线观看 | 国产视频资源 | 久久久免费精品国产一区二区 | 91av播放| 午夜影院三级 | 亚洲黄色精品 | 国产黑丝一区二区 | 麻豆视频免费播放 | 亚洲精品456在线播放第一页 | 久久亚洲欧美日韩精品专区 | 日韩啪视频 | 国产在线黄 | 精品视频在线免费 | 亚洲精品中文字幕在线观看 | 狠狠的干狠狠的操 | 亚洲国产精彩中文乱码av | 成人免费网视频 | 最新超碰在线 | 黄污污网站 | 黄在线| 色94色欧美| 精品在线视频一区 | 亚洲精品乱码久久久一二三 | 日本精品视频一区二区 | 国产69久久久 | 国产系列精品av | 最近能播放的中文字幕 | 91网在线观看 | 婷婷在线五月 | 亚洲精品国产精品久久99热 | 久久美女精品 | 在线观看一区 | 最近最新mv字幕免费观看 | 草久久久久久 | 亚洲一级片免费观看 | 国产亚洲观看 | 免费日韩一区二区三区 | 91完整版观看 | 蜜臀久久99精品久久久无需会员 | 99热在线免费观看 | 成人毛片网 | 91精品婷婷国产综合久久蝌蚪 | 91免费观看网站 | 高清av网| 久久久久久久久久久免费视频 | 91欧美国产 | 最近中文字幕久久 | 97精品国产97久久久久久春色 | 国产中文字幕视频在线观看 | 日韩精品在线观看av | 91视频大全 | 精品免费国产一区二区三区四区 | 成人小视频在线观看免费 | 午夜精品久久久久久久99 | 麻豆国产精品永久免费视频 | 国产精品乱码久久久 | 成人av一区二区三区 | 亚洲成人欧美 | www.国产精品 | 日本中文一级片 | 天天艹天天 | 在线亚洲人成电影网站色www | 69精品视频在线观看 | 色综合久久久久综合体 | 国产粉嫩在线 | 日韩av三区 | 懂色av一区二区在线播放 | 成人资源在线 | 日本狠狠干 | 人人插人人舔 | 日批视频在线 | 中文字幕在线观看视频网站 | 视频 天天草 | 一区二区三区视频 | 男女啪啪视屏 | 日韩精品久久久 | 天天操天天拍 | 国产区久久| 久草男人天堂 | 91夫妻自拍 | 国产精品白虎 | 99精品视频免费在线观看 | 欧美日韩国产精品一区二区三区 | 91爱爱免费观看 | 探花视频在线观看 | 91免费视频国产 | 97av在线视频免费播放 | www五月婷婷 | 日韩精品你懂的 | 操操色| 色欧美88888久久久久久影院 | 欧美日韩不卡一区二区三区 | 日韩精品 在线视频 | 中文字幕精品一区二区精品 | 成人免费看片网址 | 日韩高清久久 | 日韩免费成人av | 91精品视频一区二区三区 | 国产精品久久在线观看 | 国产拍在线 | 激情偷乱人伦小说视频在线观看 | 久久这里| 欧美在线a视频 | 日本激情视频中文字幕 | 91香蕉视频在线下载 | 99精品视频99 | 国产99久久精品一区二区永久免费 | 欧美亚洲国产日韩 | 亚洲精品国产精品国自产 | 精品久久久免费视频 | 伊人伊成久久人综合网小说 | 天天玩夜夜操 | 五月天综合在线 | 91精品国产三级a在线观看 | 麻豆传媒在线视频 | 中文字幕高清 | 久久久影片 | 欧美a级片免费看 | 久久精品一区二区国产 | 欧美成人tv | 亚洲欧洲精品久久 | 日韩xxxxxxxxx| 国产午夜精品一区二区三区嫩草 | 欧美在线91 | 日韩色高清 | 色全色在线资源网 | 亚洲精品动漫在线 | 久久a v电影 | 国产精品毛片一区视频播 | 91久久人澡人人添人人爽欧美 | 人人爽人人爽人人片av免 | 国内精品久久久久影院一蜜桃 | 亚洲精品系列 | 狠狠狠色丁香综合久久天下网 | 怡红院久久 | 91视频免费国产 | 日本激情动作片免费看 | 成人黄视频 | 91精品国产91久久久久久三级 | 九九视频这里只有精品 | 中文字幕在线国产精品 | 在线国产不卡 | 狠狠狠综合 | 中文字幕免费高 | 不卡av电影在线观看 | 国产成人一区二区三区久久精品 | 怡春院av | 免费看搞黄视频网站 | 久在线观看 | 婷婷激情网站 | 日韩在线观看中文 | 欧美成人黄 | 成人中文字幕+乱码+中文字幕 | 91麻豆产精品久久久久久 | 黄免费在线观看 | 免费能看的黄色片 | 亚洲女人天堂成人av在线 | 狠狠操综合网 | 久久精品网址 | 国产在线观看不卡 | 就操操久久 | 国产黄色片免费观看 | 五月婷婷丁香激情 | 91精品久久久久久久久久久久久 | av福利资源 | 国产精品亚洲综合久久 | 日日干美女 | 国产色在线,com | 91精品国产乱码久久桃 | 国产99黄 | 久久免费视频在线观看30 | 国产成人一区二区在线观看 | 丁香五月亚洲综合在线 | 欧美一级大片在线观看 | 97色在线观看 | 亚洲人人网 | 17videosex性欧美 | 狠狠干成人| 欧美激情精品久久久久久免费 | 97超视频免费观看 | 日日夜夜免费精品 | 亚洲精品久久久蜜臀下载官网 | 香蕉久久久久 | 日色在线视频 | 日韩av黄 | 久久99国产精品免费 | 激情五月婷婷丁香 | 欧美成人在线网站 | 日韩视频中文 | 亚洲国产午夜精品 | 欧美一区二区在线刺激视频 | 国产精品成人一区二区三区吃奶 | 亚洲区精品视频 | 精品九九久久 | 国产精品手机看片 | www.久久久.cum| www久久国产| 欧美福利视频一区 | 国产精品小视频网站 | 午夜精品一二三区 | 精品国产一区二 | 黄免费在线观看 | 99视频免费播放 | 国产精品黑丝在线观看 | 欧美资源在线观看 | 成在人线av| 国产成人精品综合久久久 | 91一区一区三区 | 五月婷婷六月丁香 | 亚洲精品午夜视频 | 福利视频一二区 | 日韩视频一区二区在线观看 | 三级黄色大片在线观看 | 久久午夜色播影院免费高清 | 51久久夜色精品国产麻豆 | 免费黄色av| 不卡的av电影 | 黄色免费网 | 久久精品99国产国产 | av在线网站观看 | 欧洲亚洲精品 | 深夜男人影院 | 成年人电影免费在线观看 | 色婷婷免费视频 | 特黄特色特刺激视频免费播放 | av短片在线| 国产精品国内免费一区二区三区 | 久香蕉| av在线免费网站 | 成人亚洲欧美 | 激情五月网站 | 久久人人爽爽人人爽人人片av | 最近最新中文字幕视频 | 久久91久久久久麻豆精品 | 在线观看中文字幕av | 免费人做人爱www的视 | 亚洲jizzjizz日本少妇 | 色在线视频 | 中文字幕av电影下载 | 久久成人18免费网站 | 欧美ⅹxxxxxx | 美女av电影| 色九九影院| 97在线免费 | 一区二区视 | 欧美性成人 | 精品99久久久久久 | 国产精品短视频 | 91精品一区在线观看 | 黄色一级影院 | 亚洲涩涩涩涩涩涩 | 狠狠狠色丁香综合久久天下网 | av成人在线网站 | 午夜精品一区二区三区在线视频 | 欧美国产日韩在线视频 | av高清一区二区三区 | 日韩三级成人 | 色噜噜在线观看 | 久久乱码卡一卡2卡三卡四 五月婷婷久 | 亚洲黄色小说网 | 麻豆系列在线观看 | 国产精品24小时在线观看 | 国产精品密入口果冻 | 成人手机在线视频 | 国产精品视频地址 | 久久av电影 | 欧美91成人网 | 久久国产精品视频 | 91在线播 | 久久看片网站 | 日韩99热| 久久大片 | 在线免费观看国产黄色 | 人人干人人干人人干 | 婷婷 中文字幕 | 视频国产一区二区三区 | 91精品导航 | 国产精品网在线观看 | 区一区二区三在线观看 | 97超碰人人模人人人爽人人爱 | www天天干 | 最近能播放的中文字幕 | 热久久视久久精品18亚洲精品 | 四虎影视成人永久免费观看亚洲欧美 | 在线观看91精品国产网站 | 欧美日韩不卡一区二区 | 国产真实在线 | 91日韩在线视频 | 九七视频在线 | 91精品对白一区国产伦 | 国产又黄又猛又粗 | 亚洲国产精品va在线看黑人动漫 | 日本bbbb摸bbbb | 一级免费片 | 日韩精品视频免费看 | 一区二区三区在线免费观看 | 色婷婷激情电影 | 欧美综合色在线图区 | 一区二区精 | 中文字幕之中文字幕 | 综合色亚洲 | 日韩欧美一区二区不卡 | 色视频在线 | 亚洲精品字幕 | 国产专区视频 | 午夜精品视频在线 | 国产分类视频 | 日韩三区在线观看 | 97国产精品视频 | avav99| 啪啪免费观看网站 | 日韩av免费一区二区 | 色94色欧美| 六月丁香色婷婷 | 国产 日韩 在线 亚洲 字幕 中文 | 久久99久久99精品免费看小说 | 久久在线观看 | 久久伊人91 | 日韩欧美视频一区二区三区 | 精品人人爽 | 久久综合狠狠综合久久激情 | 亚洲国产剧情 | 97人人人人 | 操处女逼 | 欧美激情一区不卡 | 97超碰人人模人人人爽人人爱 | 天天操天天添天天吹 | 久久综合狠狠狠色97 | 国产一级免费观看视频 | 毛片美女网站 | 久久天堂网站 | 久久久亚洲成人 | 免费看三级 | 国产成人精品国内自产拍免费看 | 99热这里只有精品1 av中文字幕日韩 | av午夜电影 | 国产精品一区二区av麻豆 | 国产欧美中文字幕 | 日韩av二区 | 日韩动漫免费观看高清完整版在线观看 | 国际av在线| 国产精久久久久久妇女av | 91精品久久香蕉国产线看观看 | 国产伦精品一区二区三区… | 综合久久久久 | 日韩欧美xxx | 日本性久久 | 又黄又爽又湿又无遮挡的在线视频 | 欧美日韩在线播放 | 亚洲永久精品在线观看 | 久久婷婷精品 | 亚洲人成免费网站 | 精品国产乱码久久久久久久 | 97在线观看免费高清 | 操处女逼 | 婷婷色伊人 | 免费观看黄色12片一级视频 | 成人wwwxxx视频 | 成人在线免费观看网站 | 精品国产美女在线 | 一区二区三区中文字幕在线 | 日本精品久久久久中文字幕 | 免费在线国产黄色 | 国产一区二区观看 | 国产99久久久国产精品成人免费 | 国产欧美在线一区 | 99草视频| 97精品国产一二三产区 | 91亚洲精品久久久蜜桃 | 国产黄色大片免费看 | 国产精品久久久久久久久久久久久久 | 人人舔人人舔 | 日韩理论电影在线观看 | 日韩免费一区二区在线观看 | 99热在| 欧美日韩午夜爽爽 | 天天天干天天射天天天操 | 日韩免费看 | 天天射天天操天天 | 日韩在线看片 | 久久一区国产 | 成人九九视频 | 超碰人人舔 | 中文字幕中文 | 又黄又刺激视频 | 亚洲欧洲久久久 | 97在线观看免费高清 | 在线观看91精品国产网站 | 视频二区在线视频 | 久久男人中文字幕资源站 | 亚洲一区二区三区精品在线观看 | 天天草网站 | 国产一级黄大片 | 日韩在线观看av | 免费视频一区二区 | 国产精品久久久久久久免费观看 | 精品视频久久 | 国产福利精品一区二区 | 日韩久久久久久久 | 四虎欧美| 欧美精品亚洲二区 | 国产资源精品在线观看 | 国产精品18久久久久久首页狼 | 在线看国产日韩 | www久久99| 国产在线精品区 | 国产精品 亚洲精品 | 欧美精品久久久久久久久久白贞 | 国产精品va在线观看入 | 亚洲一级特黄 | 欧美日韩视频网站 | 97超视频免费观看 | 亚洲精品视 | 国产精品igao视频网入口 | 在线高清| 国产成人精品一区二区三区网站观看 | 色先锋资源网 | 国产成人性色生活片 | 国产精品麻豆欧美日韩ww | 亚洲黄色免费网站 | 亚洲国产日本 | 天堂av在线中文在线 | 97爱| 亚洲人片在线观看 | 精品久久久久久久久久国产 | 中文字幕av有码 | 国产精品国产三级国产不产一地 | 国产精品视频免费在线观看 | 国产成人精品一区二 | 干干夜夜| 91看片在线观看 | 国产97av | 久久精品九色 | 色美女在线 | 激情伊人五月天 | 91成人网页版 | 国产在线高清精品 | 久久免费视频网站 | 日日操天天操夜夜操 | 色婷婷综合久色 | 丝袜制服综合网 | 久久久影视| 国内丰满少妇猛烈精品播放 | 在线观看视频你懂 | 91精品入口 | 国产色综合天天综合网 | 日韩成片| 亚洲精品456在线播放乱码 | 国产成人精品亚洲 | 色噜噜日韩精品一区二区三区视频 | 亚洲国产中文字幕在线 | 在线观看一二三区 | 免费人做人爱www的视 | 天天草天天插 | 免费特级黄色片 | 国产婷婷久久 | 国产精品久久久影视 | 在线视频 日韩 | 91在线看黄 | 国产精品毛片久久久久久 | 1区2区3区在线观看 三级动图 | 国产高清 不卡 | 在线精品视频免费播放 | 国产精品区在线观看 | 在线成人短视频 | 天天综合精品 | 夜夜躁狠狠躁日日躁视频黑人 | 欧美二区在线播放 | 激情五月亚洲 | 久草com| av中文国产 | 免费av网址在线观看 | 亚洲精品免费观看视频 | 91插插视频| 国产又粗又猛又黄视频 | 久久久久久久电影 | 在线观看aa| 天天射天天操天天干 | 亚洲一区日韩 | 天天射天天做 | 中文字幕九九 | 午夜男人影院 | 看国产黄色大片 | 99热这里只有精品8 久久综合毛片 | 亚洲国产成人高清精品 | 日本久久综合视频 | 久久综合久久综合久久综合 | 精品免费久久 | 国产亚洲激情视频在线 | 久久精品视频国产 | 久草在线视频资源 | 久久精品福利视频 | 丁香激情综合久久伊人久久 | 天天干天天天天 | av免费看电影 | 香蕉影视app | 精品在线小视频 | 免费a级大片 | 91精品国产麻豆国产自产影视 | 久久久久久亚洲精品 | 99精品免费久久久久久日本 | 亚洲欧美精品在线 | 91爱在线 | 国内三级在线观看 | av成人免费网站 | 涩涩成人在线 | 天天夜夜亚洲 | 伊人成人精品 | 日本视频高清 | 最近2019好看的中文字幕免费 | 欧美 日韩 性 | 天天操天天操天天操天天操天天操 | 超碰在线97观看 | 欧美最新另类人妖 | 欧美精品网站 | 日韩精品在线一区 | 99久久精品国产免费看不卡 | 欧美福利久久 | 最近中文字幕完整高清 | 国产视频综合在线 | 欧美aa级| 欧美性超爽 | 日日日操 | 色综合久久五月 | 免费久久99精品国产婷婷六月 | 久久综合久久综合这里只有精品 | 久久久免费高清视频 | 人人看人人草 | 黄色福利网站 | 久草在线视频看看 | 最新av在线免费观看 | 精品久久一区二区 | 亚洲视频一区二区三区在线观看 | 国产第一页在线播放 | 麻豆视频国产精品 | 97视频免费在线 | 国产成人精品一区二区三区网站观看 | 正在播放 国产精品 | 国产色视频123区 | 丝袜美腿在线 | 国产精品久久久久永久免费观看 | 狠狠躁夜夜a产精品视频 | 在线观看免费91 | 中文字幕在线观看免费高清完整版 | 日韩一区二区三区不卡 | 色综合天天在线 | 亚洲国产午夜视频 | 亚洲精品视频在线观看网站 | 亚洲一区二区麻豆 | 在线国产一区二区三区 | www天天操| 国产一卡二卡在线 | 玖玖国产精品视频 | www.黄色片网站 | av在线播放中文字幕 | 人人艹人人 | 日韩高清在线一区 | 九九免费观看视频 | 亚洲精品美女久久17c | 日本xxxx裸体xxxx17 | 成人久久国产 | 午夜久久网 | 色婷婷国产精品一区在线观看 | 日韩av一区二区在线影视 | 99热精品久久 | 亚洲成人av在线电影 | 中文字幕日本特黄aa毛片 | 国产精品白浆视频 | 日日夜夜婷婷 | 久久另类小说 | 色播五月激情五月 | 综合久久2023 | 久草视频精品 | av怡红院| 免费高清av在线看 | 国产精品美 | 免费在线中文字幕 | 久久国产影院 | 欧美精品久久久久久久久老牛影院 | 久久综合九色综合欧美就去吻 | 欧美国产一区在线 | 欧美一级电影在线观看 | 99精品一区二区三区 | 国产精品久久久久久久免费观看 | 九九精品视频在线观看 | 黄色片免费在线 | 国产精品久久99综合免费观看尤物 | 一区二区三区免费在线 | 精品国产乱码一区二 | 久久9999久久免费精品国产 | 麻豆视频免费入口 | 国产日韩视频在线观看 | 蜜桃视频在线视频 | 国产精品久久久久一区二区 | 久久午夜免费观看 | 免费日韩av电影 | 色综合天天做天天爱 | 中文字幕大全 | 精品电影一区 | 国产涩涩在线观看 | 在线日韩 | 精品国产免费人成在线观看 | 亚洲国产欧美在线人成大黄瓜 | 国产男女爽爽爽免费视频 | 国产亚洲婷婷免费 | 亚洲精品久久久久999中文字幕 | 日本黄色免费观看 | av中文字幕网 | www.av免费 | 97av视频| 久久狠狠干 | 狠狠狠狠狠色综合 | 国产在线综合视频 | 免费在线激情电影 | 色美女在线 | 一区二区三区电影 | 欧美精品久久久久久久久久白贞 | 国产1区2区 | 夜添久久精品亚洲国产精品 | 国产成人一区二区精品非洲 | 久久99久久99精品免观看软件 | 色综合天天综合 | 国内视频一区二区 | 中文字幕亚洲在线观看 | 国产正在播放 | 日韩理论影院 | 不卡av在线免费观看 | 亚洲国产大片 | 天天亚洲综合 | 色偷偷男人的天堂av | 亚洲精品 在线视频 | 国产午夜精品免费一区二区三区视频 | 日本最新一区二区三区 | 天天操天天干天天摸 | 中文字幕在线免费看 | 香蕉视频在线观看免费 | 又黄又爽又无遮挡免费的网站 | 五月婷婷久久丁香 | 中文字幕免费播放 | 亚洲精品成人网 | 国产在线观看黄 | www日韩视频 | 亚洲高清在线精品 | 99精品国产aⅴ | 91精品国产高清自在线观看 | 青草草在线 | 免费久久99精品国产婷婷六月 | 激情导航| www.五月天婷婷.com | 97精品视频在线 | 国产精品久久久久久久久久久久久 | 亚洲天堂在线观看完整版 | 亚洲精品在线网站 | 婷婷色av | 久久亚洲精品国产亚洲老地址 | 日韩专区在线观看 | 国产麻豆精品一区二区 | 国产视频不卡 | 日韩.com| 欧美综合在线视频 | 国产免费影院 | 99久热精品 | 精品一区二区久久久久久久网站 | 国产黄色片免费在线观看 | 啪嗒啪嗒免费观看完整版 | av在线短片| 深夜免费福利网站 | 亚州性色 | 国产成年免费视频 | 91资源在线观看 | 久久免费福利视频 | 亚洲激情网站免费观看 | 日韩电影中文字幕在线观看 | avhd高清在线谜片 | 国产精品久久久久久一二三四五 | 四虎国产精品成人免费影视 | 在线免费黄色毛片 | 9797在线看片亚洲精品 | 国产91区 | 亚洲成人黄色在线观看 | 国际精品久久久久 | 天天艹天天 | 日韩有码网站 | 狠狠操狠狠插 | 丝袜制服天堂 | 国产亚洲精品美女久久 | 午夜性生活片 | 99亚洲国产 | 亚洲成人av免费 | 九九热视频在线免费观看 | 色综合中文字幕 | 蜜臀av性久久久久蜜臀aⅴ涩爱 | 日韩电影中文,亚洲精品乱码 | 黄色一级动作片 | 国产91九色蝌蚪 | 91亚洲网| 久久视了 | 精品96久久久久久中文字幕无 | 国产精品久久一 | 天天干天天想 | 国产91免费在线观看 | 色中色亚洲| 天天曰天天干 | 精品国产一区二区三区在线 | www色片 | 欧美日韩国产高清视频 | 国产日本在线播放 | 中文字幕在线免费观看视频 | 久久久久久久久久久国产精品 | 国产精品久久久久久婷婷天堂 | 色吧久久| 一区二区三区精品在线视频 | 日韩中文字幕在线观看 | 精品一区二区影视 | 国产精品久久久久久久久久久久久久 | 日本高清免费中文字幕 | 国产精品午夜久久 | 免费在线观看中文字幕 | 国产精品久久久久久久久久久久 | 久久99亚洲精品久久久久 | 成人av电影在线 | 在线视频麻豆 | 久久99深爱久久99精品 | 国产人在线成免费视频 | 伊人久久国产 | 久久久精品欧美一区二区免费 | 亚洲无吗视频在线 | 国产一区免费视频 | 欧美一区二区三区免费看 | 中文字幕刺激在线 | jizz欧美性9 国产一区高清在线观看 | 亚洲精品综合欧美二区变态 | 日韩在线观看你懂得 | 最近2019中文免费高清视频观看www99 | 午夜久久精品 | 又黄又爽又刺激视频 | 日韩av电影免费在线观看 | 精品国产欧美 | 伊人久久在线观看 | 久久久999精品视频 国产美女免费观看 | 91免费的视频在线播放 | 国产免费久久久久 | 精品国产一二区 | 国产一区二区在线免费播放 | 91精品国产91热久久久做人人 | 日韩大片在线免费观看 | 亚洲成人黄色在线观看 | 日本中文字幕高清 | 久久夜色精品亚洲噜噜国4 午夜视频在线观看欧美 | 91桃色国产在线播放 | 日韩三级久久 | 精品国产成人av在线免 | 日本黄色免费大片 | 在线观看日韩精品 | 久久久久久国产精品999 | 色狠狠综合 | 免费一级片视频 | 日本天天操 | 中文字幕亚洲综合久久五月天色无吗'' | 91精品在线免费观看 | 啪啪免费视频网站 | 好看的国产精品视频 | 五月天网页 |