日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark数据倾斜解决方案(转)

發布時間:2023/12/2 编程问答 63 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark数据倾斜解决方案(转) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文轉發自技術世界,原文鏈接 http://www.jasongj.com/spark/skew/

Spark性能優化之道——解決Spark數據傾斜(Data Skew)的N種姿勢

本文結合實例詳細闡明了Spark數據傾斜的幾種場景以及對應的解決方案,包括避免數據源傾斜,調整并行度,使用自定義Partitioner,使用Map側Join代替Reduce側Join,給傾斜Key加上隨機前綴等。

摘要

本文結合實例詳細闡明了Spark數據傾斜的幾種場景以及對應的解決方案,包括避免數據源傾斜,調整并行度,使用自定義Partitioner,使用Map側Join代替Reduce側Join,給傾斜Key加上隨機前綴等。

為何要處理數據傾斜(Data Skew)

什么是數據傾斜

對Spark/Hadoop這樣的大數據系統來講,數據量大并不可怕,可怕的是數據傾斜。

何謂數據傾斜?數據傾斜指的是,并行處理的數據集中,某一部分(如Spark或Kafka的一個Partition)的數據顯著多于其它部分,從而使得該部分的處理速度成為整個數據集處理的瓶頸。

對于分布式系統而言,理想情況下,隨著系統規模(節點數量)的增加,應用整體耗時線性下降。如果一臺機器處理一批大量數據需要120分鐘,當機器數量增加到三時,理想的耗時為120 / 3 = 40分鐘,如下圖所示。

  
但是,上述情況只是理想情況,實際上將單機任務轉換成分布式任務后,會有overhead,使得總的任務量較之單機時有所增加,所以每臺機器的執行時間加起來比單臺機器時更大。這里暫不考慮這些overhead,假設單機任務轉換成分布式任務后,總任務量不變。
  
但即使如此,想做到分布式情況下每臺機器執行時間是單機時的1 / N,就必須保證每臺機器的任務量相等。不幸的是,很多時候,任務的分配是不均勻的,甚至不均勻到大部分任務被分配到個別機器上,其它大部分機器所分配的任務量只占總得的小部分。比如一臺機器負責處理80%的任務,另外兩臺機器各處理10%的任務,如下圖所示。

  
在上圖中,機器數據增加為三倍,但執行時間只降為原來的80%,遠低于理想值。   

數據傾斜的危害

從上圖可見,當出現數據傾斜時,小量任務耗時遠高于其它任務,從而使得整體耗時過大,未能充分發揮分布式系統的并行計算優勢。
  
另外,當發生數據傾斜時,部分任務處理的數據量過大,可能造成內存不足使得任務失敗,并進而引進整個應用失敗。   

數據傾斜是如何造成的

在Spark中,同一個Stage的不同Partition可以并行處理,而具有依賴關系的不同Stage之間是串行處理的。假設某個Spark Job分為Stage 0和Stage 1兩個Stage,且Stage 1依賴于Stage 0,那Stage 0完全處理結束之前不會處理Stage 1。而Stage 0可能包含N個Task,這N個Task可以并行進行。如果其中N-1個Task都在10秒內完成,而另外一個Task卻耗時1分鐘,那該Stage的總時間至少為1分鐘。換句話說,一個Stage所耗費的時間,主要由最慢的那個Task決定。

由于同一個Stage內的所有Task執行相同的計算,在排除不同計算節點計算能力差異的前提下,不同Task之間耗時的差異主要由該Task所處理的數據量決定。

Stage的數據來源主要分為如下兩類

  • 從數據源直接讀取。如讀取HDFS,Kafka
  • 讀取上一個Stage的Shuffle數據

如何緩解/消除數據傾斜

避免數據源的數據傾斜 ———— 讀Kafka

以Spark Stream通過DirectStream方式讀取Kafka數據為例。由于Kafka的每一個Partition對應Spark的一個Task(Partition),所以Kafka內相關Topic的各Partition之間數據是否平衡,直接決定Spark處理該數據時是否會產生數據傾斜。

如《Kafka設計解析(一)- Kafka背景及架構介紹》一文所述,Kafka某一Topic內消息在不同Partition之間的分布,主要由Producer端所使用的Partition實現類決定。如果使用隨機Partitioner,則每條消息會隨機發送到一個Partition中,從而從概率上來講,各Partition間的數據會達到平衡。此時源Stage(直接讀取Kafka數據的Stage)不會產生數據傾斜。

但很多時候,業務場景可能會要求將具備同一特征的數據順序消費,此時就需要將具有相同特征的數據放于同一個Partition中。一個典型的場景是,需要將同一個用戶相關的PV信息置于同一個Partition中。此時,如果產生了數據傾斜,則需要通過其它方式處理。

避免數據源的數據傾斜 ———— 讀文件

原理

Spark以通過textFile(path, minPartitions)方法讀取文件時,使用TextFileFormat。

對于不可切分的文件,每個文件對應一個Split從而對應一個Partition。此時各文件大小是否一致,很大程度上決定了是否存在數據源側的數據傾斜。另外,對于不可切分的壓縮文件,即使壓縮后的文件大小一致,它所包含的實際數據量也可能差別很多,因為源文件數據重復度越高,壓縮比越高。反過來,即使壓縮文件大小接近,但由于壓縮比可能差距很大,所需處理的數據量差距也可能很大。

此時可通過在數據生成端將不可切分文件存儲為可切分文件,或者保證各文件包含數據量相同的方式避免數據傾斜。

對于可切分的文件,每個Split大小由如下算法決定。其中goalSize等于所有文件總大小除以minPartitions。而blockSize,如果是HDFS文件,由文件本身的block大小決定;如果是Linux本地文件,且使用本地模式,由fs.local.block.size決定。

1 protected long computeSplitSize(long goalSize, long minSize, long blockSize) { 2 return Math.max(minSize, Math.min(goalSize, blockSize)); 3 }


默認情況下各Split的大小不會太大,一般相當于一個Block大小(在Hadoop 2中,默認值為128MB),所以數據傾斜問題不明顯。如果出現了嚴重的數據傾斜,可通過上述參數調整。?

案例

現通過腳本生成一些文本文件,并通過如下代碼進行簡單的單詞計數。為避免Shuffle,只計單詞總個數,不須對單詞進行分組計數。

1 SparkConf sparkConf = new SparkConf() 2 .setAppName("ReadFileSkewDemo"); 3 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 4 long count = javaSparkContext.textFile(inputFile, minPartitions) 5 .flatMap((String line) -> Arrays.asList(line.split(" ")).iterator()).count(); 6 System.out.printf("total words : %s", count); 7 javaSparkContext.stop();

總共生成如下11個csv文件,其中10個大小均為271.9MB,另外一個大小為8.5GB。?

之后將8.5GB大小的文件使用gzip壓縮,壓縮后大小僅為25.3MB。

使用如上代碼對未壓縮文件夾進行單詞計數操作。Split大小為 max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9?10+8.5?1024) / 1 MB, 128 MB) = 128MB。無明顯數據傾斜。

使用同樣代碼對包含壓縮文件的文件夾進行同樣的單詞計數操作。未壓縮文件的Split大小仍然為128MB,而壓縮文件(gzip壓縮)由于不可切分,且大小僅為25.3MB,因此該文件作為一個單獨的Split/Partition。雖然該文件相對較小,但是它由8.5GB文件壓縮而來,包含數據量是其它未壓縮文件的32倍,因此處理該Split/Partition/文件的Task耗時為4.4分鐘,遠高于其它Task的10秒。

由于上述gzip壓縮文件大小為25.3MB,小于128MB的Split大小,不能證明gzip壓縮文件不可切分。現將minPartitions從默認的1設置為229,從而目標Split大小為max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 * 10+25.3) / 229 MB, 128 MB) = 12 MB。如果gzip壓縮文件可切分,則所有Split/Partition大小都不會遠大于12。反之,如果仍然存在25.3MB的Partition,則說明gzip壓縮文件確實不可切分,在生成不可切分文件時需要如上文所述保證各文件數量大大致相同。

如下圖所示,gzip壓縮文件對應的Split/Partition大小為25.3MB,其它Split大小均為12MB左右。而該Task耗時4.7分鐘,遠大于其它Task的4秒。

總結

適用場景?
數據源側存在不可切分文件,且文件內包含的數據量相差較大。

解決方案?
盡量使用可切分的格式代替不可切分的格式,或者保證各文件實際包含數據量大致相同。

優勢?
可撤底消除數據源側數據傾斜,效果顯著。

劣勢?
數據源一般來源于外部系統,需要外部系統的支持。

調整并行度分散同一個Task的不同Key

原理

Spark在做Shuffle時,默認使用HashPartitioner(非Hash Shuffle)對數據進行分區。如果并行度設置的不合適,可能造成大量不相同的Key對應的數據被分配到了同一個Task上,造成該Task所處理的數據遠大于其它Task,從而造成數據傾斜。

如果調整Shuffle時的并行度,使得原本被分配到同一Task的不同Key發配到不同Task上處理,則可降低原Task所需處理的數據量,從而緩解數據傾斜問題造成的短板效應。

案例

現有一張測試表,名為student_external,內有10.5億條數據,每條數據有一個唯一的id值。現從中取出id取值為9億到10.5億的共1.5億條數據,并通過一些處理,使得id為9億到9.4億間的所有數據對12取模后余數為8(即在Shuffle并行度為12時該數據集全部被HashPartition分配到第8個Task),其它數據集對其id除以100取整,從而使得id大于9.4億的數據在Shuffle時可被均勻分配到所有Task中,而id小于9.4億的數據全部分配到同一個Task中。處理過程如下

1 INSERT OVERWRITE TABLE test 2 SELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 ) 3 ELSE CAST(id/100 AS INTEGER) 4 END, 5 name 6 FROM student_external 7 WHERE id BETWEEN 900000000 AND 1050000000;

通過上述處理,一份可能造成后續數據傾斜的測試數據即以準備好。接下來,使用Spark讀取該測試數據,并通過groupByKey(12)對id分組處理,且Shuffle并行度為12。代碼如下

?

1 public class SparkDataSkew { 2 public static void main(String[] args) { 3 SparkSession sparkSession = SparkSession.builder() 4 .appName("SparkDataSkewTunning") 5 .config("hive.metastore.uris", "thrift://hadoop1:9083") 6 .enableHiveSupport() 7 .getOrCreate(); 8 9 Dataset<Row> dataframe = sparkSession.sql( "select * from test"); 10 dataframe.toJavaRDD() 11 .mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1))) 12 .groupByKey(12) 13 .mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> { 14 int id = tuple._1(); 15 AtomicInteger atomicInteger = new AtomicInteger(0); 16 tuple._2().forEach((String name) -> atomicInteger.incrementAndGet()); 17 return new Tuple2<Integer, Integer>(id, atomicInteger.get()); 18 }).count(); 19 20 sparkSession.stop(); 21 sparkSession.close(); 22 } 23 24 }

?

?

?

本次實驗所使用集群節點數為4,每個節點可被Yarn使用的CPU核數為16,內存為16GB。使用如下方式提交上述應用,將啟動4個Executor,每個Executor可使用核數為12(該配置并非生產環境下的最優配置,僅用于本文實驗),可用內存為12GB。

1 spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar

?

GroupBy Stage的Task狀態如下圖所示,Task 8處理的記錄數為4500萬,遠大于(9倍于)其它11個Task處理的500萬記錄。而Task 8所耗費的時間為38秒,遠高于其它11個Task的平均時間(16秒)。整個Stage的時間也為38秒,該時間主要由最慢的Task 8決定。

在這種情況下,可以通過調整Shuffle并行度,使得原來被分配到同一個Task(即該例中的Task 8)的不同Key分配到不同Task,從而降低Task 8所需處理的數據量,緩解數據傾斜。

通過groupByKey(48)將Shuffle并行度調整為48,重新提交到Spark。新的Job的GroupBy Stage所有Task狀態如下圖所示。

從上圖可知,記錄數最多的Task 20處理的記錄數約為1125萬,相比于并行度為12時Task 8的4500萬,降低了75%左右,而其耗時從原來Task 8的38秒降到了24秒。

在這種場景下,調整并行度,并不意味著一定要增加并行度,也可能是減小并行度。如果通過groupByKey(11)將Shuffle并行度調整為11,重新提交到Spark。新Job的GroupBy Stage的所有Task狀態如下圖所示。

從上圖可見,處理記錄數最多的Task 6所處理的記錄數約為1045萬,耗時為23秒。處理記錄數最少的Task 1處理的記錄數約為545萬,耗時12秒。

總結

適用場景
大量不同的Key被分配到了相同的Task造成該Task數據量過大。

解決方案
調整并行度。一般是增大并行度,但有時如本例減小并行度也可達到效果。

優勢
實現簡單,可在需要Shuffle的操作算子上直接設置并行度或者使用spark.default.parallelism設置。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=[num_tasks]設置并行度。可用最小的代價解決問題。一般如果出現數據傾斜,都可以通過這種方法先試驗幾次,如果問題未解決,再嘗試其它方法。

劣勢
適用場景少,只能將分配到同一Task的不同Key分散開,但對于同一Key傾斜嚴重的情況該方法并不適用。并且該方法一般只能緩解數據傾斜,沒有徹底消除問題。從實踐經驗來看,其效果一般。

自定義Partitioner

原理

使用自定義的Partitioner(默認為HashPartitioner),將原本被分配到同一個Task的不同Key分配到不同Task。

案例

以上述數據集為例,繼續將并發度設置為12,但是在groupByKey算子上,使用自定義的Partitioner(實現如下)

1 .groupByKey(new Partitioner() { 2 @Override 3 public int numPartitions() { 4 return 12; 5 } 6 7 @Override 8 public int getPartition(Object key) { 9 int id = Integer.parseInt(key.toString()); 10 if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) { 11 return (id - 9500000) / 12; 12 } else { 13 return id % 12; 14 } 15 } 16 })

由下圖可見,使用自定義Partition后,耗時最長的Task 6處理約1000萬條數據,用時15秒。并且各Task所處理的數據集大小相當。?

總結

適用場景
大量不同的Key被分配到了相同的Task造成該Task數據量過大。

解決方案
使用自定義的Partitioner實現類代替默認的HashPartitioner,盡量將所有不同的Key均勻分配到不同的Task中。

優勢
不影響原有的并行度設計。如果改變并行度,后續Stage的并行度也會默認改變,可能會影響后續Stage。

劣勢
適用場景有限,只能將不同Key分散開,對于同一Key對應數據集非常大的場景不適用。效果與調整并行度類似,只能緩解數據傾斜而不能完全消除數據傾斜。而且需要根據數據特點自定義專用的Partitioner,不夠靈活。

將Reduce side Join轉變為Map side Join

原理

通過Spark的Broadcast機制,將Reduce側Join轉化為Map側Join,避免Shuffle從而完全消除Shuffle帶來的數據傾斜。

案例

通過如下SQL創建一張具有傾斜Key且總記錄數為1.5億的大表test。

1 INSERT OVERWRITE TABLE test 2 SELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 ) 3 ELSE CAST(id/10 AS INT) END AS STRING), 4 name 5 FROM student_external 6 WHERE id BETWEEN 900000000 AND 1050000000;

使用如下SQL創建一張數據分布均勻且總記錄數為50萬的小表test_new。?

1 INSERT OVERWRITE TABLE test_new 2 SELECT CAST(CAST(id/10 AS INT) AS STRING), 3 name 4 FROM student_delta_external 5 WHERE id BETWEEN 950000000 AND 950500000;


直接通過Spark Thrift Server提交如下SQL將表test與表test_new進行Join并將Join結果存于表test_join中。
?

1 INSERT OVERWRITE TABLE test_join 2 SELECT test_new.id, test_new.name 3 FROM test 4 JOIN test_new 5 ON test.id = test_new.id;

該SQL對應的DAG如下圖所示。從該圖可見,該執行過程總共分為三個Stage,前兩個用于從Hive中讀取數據,同時二者進行Shuffle,通過最后一個Stage進行Join并將結果寫入表test_join中。?

從下圖可見,Join Stage各Task處理的數據傾斜嚴重,處理數據量最大的Task耗時7.1分鐘,遠高于其它無數據傾斜的Task約2秒的耗時。

接下來,嘗試通過Broadcast實現Map側Join。實現Map側Join的方法,并非直接通過CACHE TABLE test_new將小表test_new進行cache。現通過如下SQL進行Join。

1 CACHE TABLE test_new; 2 INSERT OVERWRITE TABLE test_join 3 SELECT test_new.id, test_new.name 4 FROM test 5 JOIN test_new 6 ON test.id = test_new.id;


通過如下DAG圖可見,該操作仍分為三個Stage,且仍然有Shuffle存在,唯一不同的是,小表的讀取不再直接掃描Hive表,而是掃描內存中緩存的表。
?

并且數據傾斜仍然存在。如下圖所示,最慢的Task耗時為7.1分鐘,遠高于其它Task的約2秒。

正確的使用Broadcast實現Map側Join的方式是,通過SET spark.sql.autoBroadcastJoinThreshold=104857600;將Broadcast的閾值設置得足夠大。

再次通過如下SQL進行Join。

1 SET spark.sql.autoBroadcastJoinThreshold=104857600; 2 INSERT OVERWRITE TABLE test_join 3 SELECT test_new.id, test_new.name 4 FROM test 5 JOIN test_new 6 ON test.id = test_new.id;

通過如下DAG圖可見,該方案只包含一個Stage。?

并且從下圖可見,各Task耗時相當,無明顯數據傾斜現象。并且總耗時為1.5分鐘,遠低于Reduce側Join的7.3分鐘。

總結

適用場景
參與Join的一邊數據集足夠小,可被加載進Driver并通過Broadcast方法廣播到各個Executor中。

解決方案
在Java/Scala代碼中將小數據集數據拉取到Driver,然后通過Broadcast方案將小數據集的數據廣播到各Executor。或者在使用SQL前,將Broadcast的閾值調整得足夠大,從而使用Broadcast生效。進而將Reduce側Join替換為Map側Join。

優勢
避免了Shuffle,徹底消除了數據傾斜產生的條件,可極大提升性能。

劣勢
要求參與Join的一側數據集足夠小,并且主要適用于Join的場景,不適合聚合的場景,適用條件有限。

為skew的key增加隨機前/后綴

原理

為數據量特別大的Key增加隨機前/后綴,使得原來Key相同的數據變為Key不相同的數據,從而使傾斜的數據集分散到不同的Task中,徹底解決數據傾斜問題。Join另一則的數據中,與傾斜Key對應的部分數據,與隨機前綴集作笛卡爾乘積,從而保證無論數據傾斜側傾斜Key如何加前綴,都能與之正常Join。

案例

通過如下SQL,將id為9億到9.08億共800萬條數據的id轉為9500048或者9500096,其它數據的id除以100取整。從而該數據集中,id為9500048和9500096的數據各400萬,其它id對應的數據記錄數均為100條。這些數據存于名為test的表中。

對于另外一張小表test_new,取出50萬條數據,并將id(遞增且唯一)除以100取整,使得所有id都對應100條數據。

1 INSERT OVERWRITE TABLE test 2 SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) 3 ELSE CAST(id/100 AS INT) END AS STRING), 4 name 5 FROM student_external 6 WHERE id BETWEEN 900000000 AND 1050000000; 7 8 INSERT OVERWRITE TABLE test_new 9 SELECT CAST(CAST(id/100 AS INT) AS STRING), 10 name 11 FROM student_delta_external 12 WHERE id BETWEEN 950000000 AND 950500000;


?
通過如下代碼,讀取test表對應的文件夾內的數據并轉換為JavaPairRDD存于leftRDD中,同樣讀取test表對應的數據存于rightRDD中。通過RDD的join算子對leftRDD與rightRDD進行Join,并指定并行度為48。

1 public class SparkDataSkew{ 2 public static void main(String[] args) { 3 SparkConf sparkConf = new SparkConf(); 4 sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect"); 5 sparkConf.set("spark.default.parallelism", String.valueOf(parallelism)); 6 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 7 8 JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/") 9 .mapToPair((String row) -> { 10 String[] str = row.split(","); 11 return new Tuple2<String, String>(str[0], str[1]); 12 }); 13 14 JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/") 15 .mapToPair((String row) -> { 16 String[] str = row.split(","); 17 return new Tuple2<String, String>(str[0], str[1]); 18 }); 19 20 leftRDD.join(rightRDD, parallelism) 21 .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2())) 22 .foreachPartition((Iterator<Tuple2<String, String>> iterator) -> { 23 AtomicInteger atomicInteger = new AtomicInteger(); 24 iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet()); 25 }); 26 27 javaSparkContext.stop(); 28 javaSparkContext.close(); 29 } 30 }

從下圖可看出,整個Join耗時1分54秒,其中Join Stage耗時1.7分鐘。

通過分析Join Stage的所有Task可知,在其它Task所處理記錄數為192.71萬的同時Task 32的處理的記錄數為992.72萬,故它耗時為1.7分鐘,遠高于其它Task的約10秒。這與上文準備數據集時,將id為9500048為9500096對應的數據量設置非常大,其它id對應的數據集非常均勻相符合。

現通過如下操作,實現傾斜Key的分散處理

  • 將leftRDD中傾斜的key(即9500048與9500096)對應的數據單獨過濾出來,且加上1到24的隨機前綴,并將前綴與原數據用逗號分隔(以方便之后去掉前綴)形成單獨的leftSkewRDD
  • 將rightRDD中傾斜key對應的數據抽取出來,并通過flatMap操作將該數據集中每條數據均轉換為24條數據(每條分別加上1到24的隨機前綴),形成單獨的rightSkewRDD
  • 將leftSkewRDD與rightSkewRDD進行Join,并將并行度設置為48,且在Join過程中將隨機前綴去掉,得到傾斜數據集的Join結果skewedJoinRDD
  • 將leftRDD中不包含傾斜Key的數據抽取出來作為單獨的leftUnSkewRDD
  • 對leftUnSkewRDD與原始的rightRDD進行Join,并行度也設置為48,得到Join結果unskewedJoinRDD
  • 通過union算子將skewedJoinRDD與unskewedJoinRDD進行合并,從而得到完整的Join結果集

具體實現代碼如下

1 public class SparkDataSkew{ 2 public static void main(String[] args) { 3 int parallelism = 48; 4 SparkConf sparkConf = new SparkConf(); 5 sparkConf.setAppName("SolveDataSkewWithRandomPrefix"); 6 sparkConf.set("spark.default.parallelism", parallelism + ""); 7 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 8 9 JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/") 10 .mapToPair((String row) -> { 11 String[] str = row.split(","); 12 return new Tuple2<String, String>(str[0], str[1]); 13 }); 14 15 JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/") 16 .mapToPair((String row) -> { 17 String[] str = row.split(","); 18 return new Tuple2<String, String>(str[0], str[1]); 19 }); 20 21 String[] skewedKeyArray = new String[]{"9500048", "9500096"}; 22 Set<String> skewedKeySet = new HashSet<String>(); 23 List<String> addList = new ArrayList<String>(); 24 for(int i = 1; i <=24; i++) { 25 addList.add(i + ""); 26 } 27 for(String key : skewedKeyArray) { 28 skewedKeySet.add(key); 29 } 30 31 Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet); 32 Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList); 33 34 JavaPairRDD<String, String> leftSkewRDD = leftRDD 35 .filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1())) 36 .mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2())); 37 38 JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1())) 39 .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream() 40 .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2())) 41 .collect(Collectors.toList()) 42 .iterator() 43 ); 44 45 JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD 46 .join(rightSkewRDD, parallelism) 47 .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2())); 48 49 JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1())); 50 JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2())); 51 52 skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> { 53 AtomicInteger atomicInteger = new AtomicInteger(); 54 iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet()); 55 }); 56 57 javaSparkContext.stop(); 58 javaSparkContext.close(); 59 } 60 }

通過分析Join Stage的所有Task可知從下圖可看出,整個Join耗時58秒,其中Join Stage耗時33秒。

  • 由于Join分傾斜數據集Join和非傾斜數據集Join,而各Join的并行度均為48,故總的并行度為96
  • 由于提交任務時,設置的Executor個數為4,每個Executor的core數為12,故可用Core數為48,所以前48個Task同時啟動(其Launch時間相同),后48個Task的啟動時間各不相同(等待前面的Task結束才開始)
  • 由于傾斜Key被加上隨機前綴,原本相同的Key變為不同的Key,被分散到不同的Task處理,故在所有Task中,未發現所處理數據集明顯高于其它Task的情況

實際上,由于傾斜Key與非傾斜Key的操作完全獨立,可并行進行。而本實驗受限于可用總核數為48,可同時運行的總Task數為48,故而該方案只是將總耗時減少一半(效率提升一倍)。如果資源充足,可并發執行Task數增多,該方案的優勢將更為明顯。在實際項目中,該方案往往可提升數倍至10倍的效率。

總結

適用場景
兩張表都比較大,無法使用Map則Join。其中一個RDD有少數幾個Key的數據量過大,另外一個RDD的Key分布較為均勻。

解決方案
將有數據傾斜的RDD中傾斜Key對應的數據集單獨抽取出來加上隨機前綴,另外一個RDD每條數據分別與隨機前綴結合形成新的RDD(相當于將其數據增到到原來的N倍,N即為隨機前綴的總個數),然后將二者Join并去掉前綴。然后將不包含傾斜Key的剩余數據進行Join。最后將兩次Join的結果集通過union合并,即可得到全部Join結果。

優勢
相對于Map則Join,更能適應大數據集的Join。如果資源充足,傾斜部分數據集與非傾斜部分數據集可并行進行,效率提升明顯。且只針對傾斜部分的數據做數據擴展,增加的資源消耗有限。

劣勢
如果傾斜Key非常多,則另一側數據膨脹非常大,此方案不適用。而且此時對傾斜Key與非傾斜Key分開處理,需要掃描數據集兩遍,增加了開銷。

大表隨機添加N種隨機前綴,小表擴大N倍

原理

如果出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集全部加上隨機前綴,然后對另外一個不存在嚴重數據傾斜的數據集整體與隨機前綴集作笛卡爾乘積(即將數據量擴大N倍)。

案例

這里給出示例代碼,讀者可參考上文中分拆出少數傾斜Key添加隨機前綴的方法,自行測試。

1 public class SparkDataSkew { 2 public static void main(String[] args) { 3 SparkConf sparkConf = new SparkConf(); 4 sparkConf.setAppName("ResolveDataSkewWithNAndRandom"); 5 sparkConf.set("spark.default.parallelism", parallelism + ""); 6 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 7 8 JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/") 9 .mapToPair((String row) -> { 10 String[] str = row.split(","); 11 return new Tuple2<String, String>(str[0], str[1]); 12 }); 13 14 JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/") 15 .mapToPair((String row) -> { 16 String[] str = row.split(","); 17 return new Tuple2<String, String>(str[0], str[1]); 18 }); 19 20 List<String> addList = new ArrayList<String>(); 21 for(int i = 1; i <=48; i++) { 22 addList.add(i + ""); 23 } 24 25 Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList); 26 27 JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2())); 28 29 JavaPairRDD<String, String> rightNewRDD = rightRDD 30 .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream() 31 .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2())) 32 .collect(Collectors.toList()) 33 .iterator() 34 ); 35 36 JavaPairRDD<String, String> joinRDD = leftRandomRDD 37 .join(rightNewRDD, parallelism) 38 .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2())); 39 40 joinRDD.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> { 41 AtomicInteger atomicInteger = new AtomicInteger(); 42 iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet()); 43 }); 44 45 javaSparkContext.stop(); 46 javaSparkContext.close(); 47 } 48 }

總結?

適用場景
一個數據集存在的傾斜Key比較多,另外一個數據集數據分布比較均勻。

優勢
對大部分場景都適用,效果不錯。

劣勢
需要將一個數據集整體擴大N倍,會增加資源消耗。

總結

對于數據傾斜,并無一個統一的一勞永逸的方法。更多的時候,是結合數據特點(數據集大小,傾斜Key的多少等)綜合使用上文所述的多種方法。

轉載于:https://www.cnblogs.com/zuizui1204/p/7920692.html

總結

以上是生活随笔為你收集整理的Spark数据倾斜解决方案(转)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

日韩av快播电影网 | 欧美极度另类性三渗透 | 亚洲精品www久久久 www国产精品com | 日日操天天操狠狠操 | 韩国一区视频 | 可以免费观看的av片 | 日本三级香港三级人妇99 | 久久艹国产视频 | av日韩不卡 | 日韩有码欧美 | 麻豆免费看片 | 日韩精品在线观看视频 | 精品免费观看视频 | 日韩精品一区二区三区电影 | 激情综合一区 | 久久综合婷婷 | 99福利片 | 婷婷开心久久网 | 综合激情| 免费人成在线观看网站 | 亚洲伊人色 | 99在线精品视频观看 | 狠狠地日 | 久久精品首页 | 人人舔人人插 | 中文字幕在线观看日本 | 欧美激情视频免费看 | 久久爱导航 | 97超碰在线资源 | 精品一区二区免费视频 | 91精品在线观看视频 | 黄色片网站 | 久久久久国产a免费观看rela | 激情婷婷在线 | 免费看一级黄色 | 国产成人精品亚洲a | 久久综合久久综合久久综合 | 93久久精品日日躁夜夜躁欧美 | 国产精品理论片 | 中文字幕一区二区三区四区在线视频 | 五月天亚洲精品 | 91精品国产91热久久久做人人 | 久草在线视频网站 | 午夜a区| 国产色婷婷精品综合在线手机播放 | 97色se | 91人人澡人人爽 | 96在线| 亚洲成人精品影院 | 久久激情视频 久久 | 2024国产在线 | 久久精品96 | 久青草国产在线 | 久草资源免费 | 日韩欧美一区视频 | 日韩色中色 | 天天干天天操人体 | 欧美日韩不卡一区二区 | 国产精品国产精品 | av天天澡天天爽天天av | 日本女人的性生活视频 | 亚洲一区二区三区四区在线视频 | 国产免费又粗又猛又爽 | 国产露脸91国语对白 | 狠狠的日日 | 国内精自线一二区永久 | 在线看国产一区 | 中文字幕精品在线 | 在线免费中文字幕 | 国产成人一区二区三区 | 成人影视片 | 97人人超碰在线 | 丰满少妇在线观看资源站 | 精品国模一区二区 | 五月天婷亚洲天综合网鲁鲁鲁 | 国产午夜精品免费一区二区三区视频 | 一区二区中文字幕在线播放 | 国产小视频国产精品 | 在线中文字幕播放 | 草久在线 | 国产91精品一区二区麻豆网站 | 亚洲天堂精品视频在线观看 | 欧美日韩亚洲第一页 | 成人久久电影 | 久久久www成人免费精品张筱雨 | 国产免费作爱视频 | aaa毛片视频 | 久草网视频在线观看 | 亚洲视频在线观看网站 | www.eeuss影院av撸 | 精品一区二区在线免费观看 | 蜜臀久久99精品久久久无需会员 | 91精品视频免费在线观看 | 国产精品欧美日韩在线观看 | 成人av电影免费在线观看 | 精品一区二区免费 | 在线观看一级视频 | 男女视频久久久 | 亚洲h视频在线 | 亚洲精品tv | 日韩欧美精品免费 | 亚洲精品乱码久久久久v最新版 | 精品一区二区免费视频 | 久久免费视频在线观看 | 丁香5月婷婷久久 | 天天爽天天爽 | 欧美老少交 | 亚洲综合导航 | 精品国产欧美一区二区 | 欧美日韩在线播放 | 中文字幕在线免费看 | 91在线入口 | 日韩久久久久 | 国产成人精品一区二区三区福利 | 精品国产一区二区三区四区vr | 欧美aaa视频 | 五月天婷婷在线播放 | 国产成人av电影在线 | 亚洲精品视频中文字幕 | 国产精品免费观看久久 | 国产 在线 日韩 | 亚洲区色 | 国产三级国产精品国产专区50 | 中文字幕在线视频免费播放 | 青青河边草免费直播 | 国产女教师精品久久av | 999久久 | 日本久久久影视 | 日韩一区在线播放 | 久久99国产精品久久99 | 中文字幕美女免费在线 | 国产一区免费观看 | 久久久综合 | 色99在线 | 91av资源网 | 天天操天天摸天天射 | 成人h动漫在线看 | 91亚洲精品久久久蜜桃网站 | 国产在线一区二区 | 一区二区三区影院 | 久久综合偷偷噜噜噜色 | 美女久久久 | 99精品视频免费 | 91香蕉视频在线下载 | 国产一区高清在线观看 | 伊人官网 | 国产精品嫩草69影院 | 亚洲精品黄色在线观看 | 国产视频亚洲视频 | 欧美午夜视频在线 | 色综合天天综合网国产成人网 | 在线观看免费色 | 精品福利视频在线 | 色香天天| 久久久99精品免费观看 | 色激情五月 | 好看av在线| 中文字幕一区二区在线播放 | 国产精品av免费在线观看 | 911香蕉视频 | 亚洲欧美国内爽妇网 | 色天天综合网 | 99成人免费视频 | 草久久精品 | 色播六月天| 国产字幕在线看 | 国产精品一区二区美女视频免费看 | av在线进入 | 午夜国产在线 | 亚洲国产精品久久久久 | 免费观看91 | www.香蕉视频在线观看 | 九九热av| 又粗又长又大又爽又黄少妇毛片 | 成年免费在线视频 | 日韩视频免费 | 综合网天天色 | 91高清一区| 黄色成年 | 日韩精品一区二区三区电影 | 日韩欧美在线综合网 | 国产精品久久久久久久久久久久午夜 | 亚洲精品在线视频播放 | 亚洲国产精品小视频 | 日日夜夜精品网站 | 色吊丝av中文字幕 | 日韩中文字幕亚洲一区二区va在线 | 91人人射| 国产精品刺激对白麻豆99 | 天天干天天干天天色 | 日韩丝袜在线观看 | 精品日韩在线 | 日本中文在线观看 | 韩日三级av | 狠狠操操操 | 日韩av二区 | 91精品天码美女少妇 | 99久久精品免费视频 | 丁香六月激情 | 在线免费观看视频a | 一级片视频在线 | 九色porny真实丨国产18 | 视频在线观看入口黄最新永久免费国产 | 九九九九色 | 在线视频日韩 | 91精品婷婷国产综合久久蝌蚪 | 91福利社在线观看 | 综合中文字幕 | 国产白浆在线观看 | 免费日韩一区 | 亚洲精品视频免费在线观看 | 五月天久久综合 | 天天插天天色 | 手机看片久久 | 国产精品久久久久久高潮 | 一级做a视频 | 97人人澡人人爽人人模亚洲 | 69亚洲乱| 欧美人体xx | 久久99在线观看 | 日韩欧美视频二区 | 黄色特级片 | 国产黄色大片 | 国产高清精 | 在线观看黄色 | 久久人人精 | 久久久久久久av麻豆果冻 | 人人干在线 | 免费av在 | 人人干人人艹 | 精品美女在线视频 | 91精品在线视频 | 玖玖爱国产在线 | 国产97色在线 | 色婷婷婷 | 久久精品视频播放 | 狠狠狠狠狠狠狠狠干 | 欧美巨乳网 | 久久久久久久久久久久国产精品 | 久久综合狠狠综合久久狠狠色综合 | 国产午夜精品av一区二区 | 91精彩在线视频 | 四虎永久国产精品 | 国产日产精品一区二区三区四区 | 日日爱av | 久久精品2 | 欧美精品久久久久久久 | 亚洲理论在线观看电影 | 亚洲国产精品成人精品 | 国产女教师精品久久av | 在线观看av黄色 | 国产人成在线视频 | 一本大道久久精品懂色aⅴ 五月婷社区 | 精品国产一区二区三区久久久 | 国产一级久久 | 国产精品久久久99 | 免费一级片在线 | 欧美性成人 | 日韩综合一区二区三区 | 草莓视频在线观看免费观看 | 欧美美女视频在线观看 | av福利在线播放 | 人人看人人做人人澡 | 久久网站免费 | 在线岛国av | 国产xxxxx在线观看 | 精品国产免费久久 | 中文字幕大全 | 国产成人一区二区三区免费看 | 国产一级黄色电影 | 69国产成人综合久久精品欧美 | 成人在线视频你懂的 | 国产亚洲精品久久久久5区 成人h电影在线观看 | 一区二区三区免费 | 黄色官网在线观看 | 久久99精品国产91久久来源 | 美女福利视频 | 久草视频在线资源站 | 亚洲三级国产 | 人人澡人人草 | 91伊人久久大香线蕉蜜芽人口 | 国产精品一区二区在线 | 色网av | 96亚洲精品久久 | 99国产情侣在线播放 | 国产一级电影 | 天天操 夜夜操 | 51久久夜色精品国产麻豆 | 婷婷午夜天 | 九九精品久久 | 中文字幕中文字幕中文字幕 | 欧美精品亚州精品 | 免费视频久久久久久久 | 国产91欧美 | 日日操夜夜操狠狠操 | 久久歪歪 | 国产自产高清不卡 | 青青网视频 | 超碰97国产精品人人cao | 亚洲精品久久久蜜臀下载官网 | 天天干天天操天天拍 | 成人小视频在线观看免费 | 最近中文字幕大全 | 毛片黄色一级 | www.天天操.com | 深爱激情五月网 | 五月天欧美精品 | 国产日韩中文字幕 | 91豆麻精品91久久久久久 | 91亚州 | 亚洲五月| 99视频导航 | 亚洲另类视频在线观看 | 深爱婷婷久久综合 | 日b黄色片| 亚洲视频久久久久 | 亚洲精品99 | 中文字幕在线观看国产 | 91黄色小网站 | 欧美激情视频在线观看免费 | 日本黄色大片儿 | 丁香六月激情婷婷 | 中文字幕日韩国产 | 青青河边草免费观看完整版高清 | 日韩亚洲欧美中文字幕 | 欧美日韩免费在线观看视频 | 久久久.com | 久草网视频在线观看 | 免费在线成人av | 国产色视频网站 | 日韩激情在线视频 | 九九热1| 亚洲精品久久久久中文字幕二区 | 国产四虎在线 | 欧美久久99 | 午夜成人免费电影 | 成人国产精品免费 | 国产高清不卡一区二区三区 | 国产精品初高中精品久久 | 欧美网址在线观看 | 午夜av色| 久久天| 国产一级a毛片视频爆浆 | 久草免费资源 | 黄色网址a| 在线观看爱爱视频 | 午夜在线观看影院 | 亚洲 av网站 | 中文字幕有码在线播放 | 国产女人18毛片水真多18精品 | 国产一级片观看 | 精品福利国产 | 麻豆mv在线观看 | 国产日韩精品在线 | 欧美日韩一区二区三区免费视频 | 国产日韩欧美视频 | 一区二区三区免费在线观看 | 日日干夜夜草 | 欧美激情视频三区 | 久久久久久久久久久久亚洲 | 亚洲欧洲国产视频 | www日日 | 激情片av| 日韩v在线 | 99国产精品久久久久久久久久 | 久久久精品 一区二区三区 国产99视频在线观看 | 午夜成人免费影院 | 蜜臀av性久久久久蜜臀aⅴ流畅 | 日韩欧美黄色网址 | 91大神在线看 | 97看片网 | 国产精品视频99 | 视频高清 | 免费三级黄色 | 在线播放av网址 | 天天操人人干 | 在线观看欧美成人 | 国产.精品.日韩.另类.中文.在线.播放 | 色的网站在线观看 | 香蕉国产91 | 成人黄色小说在线观看 | 精品一区二区三区在线播放 | 国产精品18久久久久vr手机版特色 | 国产精品麻豆视频 | 精品一区二区在线免费观看 | 人人爽人人爽人人爽人人爽 | 日韩精品一区二区三区丰满 | 国产精品国产三级国产不产一地 | 亚洲精品麻豆视频 | 99 久久久久 | 欧美色噜噜 | 国产人在线成免费视频 | 亚洲精品看片 | 六月丁香六月婷婷 | 久久66热这里只有精品 | 国产精品久久久久久久久蜜臀 | 亚洲免费在线播放视频 | 2019中文字幕第一页 | 97超在线 | 丁香六月中文字幕 | www.激情五月.com | 亚洲五月婷 | 亚洲成人免费观看 | 欧美 日韩 国产 成人 在线 | 久热av在线 | 中文永久字幕 | 正在播放一区二区 | 高清av免费看 | 亚洲精品456在线播放乱码 | 人人爽人人射 | 国产小视频福利在线 | 欧美在线视频免费 | 黄色片网站av | 日本丶国产丶欧美色综合 | 狠狠色丁香久久婷婷综合五月 | 在线观看91 | 国产高清不卡在线 | 国产一区免费视频 | 国产一区二区三区免费在线 | 黄色大片入口 | 久久天天躁狠狠躁夜夜不卡公司 | 狠狠久久 | 亚洲一级久久 | 久久怡红院 | 免费亚洲黄色 | 五月天色婷婷丁香 | 成年人网站免费在线观看 | av 在线观看 | 深爱五月激情五月 | 色多多视频在线观看 | 又湿又紧又大又爽a视频国产 | www.色com | av丁香| 丰满少妇对白在线偷拍 | 亚洲色综合| 免费中文字幕在线观看 | 久久国产精品久久国产精品 | 国产精品不卡视频 | 人人爽人人射 | 精品久久影院 | 久久狠狠一本精品综合网 | 久久不见久久见免费影院 | 久久久不卡影院 | 久色网 | 日日干网址 | 久久久久伊人 | 91麻豆精品国产91久久久久久久久 | 国产午夜精品一区二区三区嫩草 | 精品女同一区二区三区在线观看 | 一区三区在线欧 | 在线电影av | 日韩欧美有码在线 | 日韩一级片网址 | 精品国产一区二区三区噜噜噜 | 色偷偷人人澡久久超碰69 | 黄色a视频免费 | 欧美黑人性爽 | 91精品在线免费观看 | 精品国产乱码久久久久久天美 | 欧美日韩免费一区二区三区 | 久久精品区 | 在线观看视频 | 91福利视频久久久久 | 天天操天天干天天摸 | 在线免费观看涩涩 | 99久e精品热线免费 99国产精品久久久久久久久久 | 欧美性色黄 | 综合久久久久久久久 | 色天天中文 | 国产福利免费在线观看 | 国产一区二区影院 | 国产视频亚洲视频 | 国产小视频你懂的在线 | 国产精品 日本 | 国产精品久久麻豆 | 国产成人一区二区三区久久精品 | 国产99久久精品 | 韩国av免费观看 | 久久艹久久 | 欧美日本一区 | www.超碰97.com| 国产精品永久 | 黄在线免费观看 | 久久免费看视频 | 成人黄色小视频 | 午夜精品一区二区三区视频免费看 | 在线观看一级片 | 91成熟丰满女人少妇 | 热久久国产精品 | 九九九电影免费看 | 毛片网在线播放 | 午夜精品一区二区三区在线观看 | 国产精品24小时在线观看 | 久久99精品久久只有精品 | 狠狠干我 | 中文字幕欧美三区 | 久久久国产电影 | 一区二区三区四区精品视频 | av在线短片 | 亚洲天堂网在线播放 | 亚洲 欧洲av | 深夜免费福利在线 | 精品国产免费人成在线观看 | 亚洲最快最全在线视频 | 国产品久精国精产拍 | 超碰人人国产 | 国产精品麻豆一区二区三区 | 99精品国产一区二区三区麻豆 | 免费影视大全推荐 | a v在线视频 | 中文字幕乱码日本亚洲一区二区 | 日韩欧美久久 | 一区二区中文字幕在线观看 | 国产小视频在线播放 | 成人av播放 | 天天干天天做天天爱 | 国产精品久久久久久久久久久久 | 久久久天堂 | www.久久免费视频 | 欧美另类亚洲 | 亚洲精品国精品久久99热 | 337p日本大胆噜噜噜噜 | 99精品视频在线观看免费 | 亚洲黄色av一区 | 日韩欧美xxxx | 色综合久久久久 | 国产精品嫩草影院9 | 亚洲精品一区二区在线观看 | 免费欧美精品 | 日韩最新av在线 | 中文字幕一区在线 | av3级在线 | 日韩视频一二三区 | 超碰在线中文字幕 | 蜜桃视频成人在线观看 | 亚洲精品777 | 中文字幕亚洲欧美日韩2019 | 亚洲精品自拍 | 久久成人精品电影 | 国产精品精 | 69国产盗摄一区二区三区五区 | 亚洲国产欧洲综合997久久, | 波多野结衣日韩 | 午夜视频免费 | 国产成a人亚洲精v品在线观看 | 亚洲欧洲日韩在线观看 | 久久理论电影网 | 亚洲黄网站 | 黄色国产大片 | 国产第一页精品 | 中文字幕久久精品 | v片在线看 | 91中文字幕永久在线 | 草久视频在线观看 | 亚洲国产视频a | 国产视频精品网 | 精品国产成人av | 狠狠躁夜夜av | 久久这里有| 97在线观看免费观看 | 精品麻豆入口免费 | 久章草在线 | 超碰在线网 | 97操操操 | 国产精品美女久久久久久 | 国产精品久久久久久久久久久免费看 | 国产精品毛片久久久 | 9久久精品 | 色综合色综合色综合 | 麻豆视频在线免费 | 久久久久久久18 | 亚洲天天做 | 中文字幕免费一区二区 | 国产成人一区二区在线观看 | 国产一二区在线观看 | 黄色网址av | 丁香综合 | 18国产精品白浆在线观看免费 | 五月婷婷综合激情网 | 色视频网站免费观看 | av中文字幕在线观看网站 | 免费电影一区二区三区 | 日韩中文字幕免费 | 婷婷成人亚洲综合国产xv88 | 最近日本中文字幕a | 欧产日产国产69 | 九九视频一区 | 亚洲一级黄色 | av短片在线 | 在线观看一级 | 亚洲人成免费网站 | 久久久久久国产精品久久 | 色综合天天综合网国产成人网 | 在线视频欧美精品 | 91成人免费视频 | 久久福利国产 | 国产免费观看视频 | 亚洲国产精品视频在线观看 | 黄色www在线观看 | 综合激情网 | av在线日韩 | 成人国产网址 | 久草在线手机视频 | 国产精品福利在线播放 | 日韩免费观看一区二区 | 成人精品国产 | 中字幕视频在线永久在线观看免费 | 中文字幕av最新更新 | 日韩午夜电影网 | 亚洲伊人婷婷 | 麻豆国产精品永久免费视频 | 一区二区视频在线观看免费 | 深爱婷婷激情 | 中文日韩在线 | 国产精品高潮在线观看 | 久久久黄色免费网站 | 成人av中文字幕在线观看 | 久久久成人精品 | 1024手机看片国产 | 亚洲欧美成aⅴ人在线观看 四虎在线观看 | 久久不射网站 | 欧美日韩中字 | 4hu视频 | 国产流白浆高潮在线观看 | 免费激情网 | 久久久久日本精品一区二区三区 | 日韩性片 | 亚洲成 人精品 | 日韩一区二区三区免费电影 | 亚洲精品视频在线观看视频 | 在线视频 亚洲 | 日韩在线资源 | 在线三级中文 | www.久久婷婷| 91在线视频免费 | 国产 在线 日韩 | 国产91精品看黄网站 | 狠狠色噜噜狠狠狠狠 | 免费久久99精品国产婷婷六月 | 国产 在线 高清 精品 | 午夜丰满寂寞少妇精品 | 激情文学综合丁香 | 亚洲精品国 | 成年人黄色大全 | 国产精品mv在线观看 | 中文有码在线 | 日韩免费在线视频观看 | 欧美另类一二三四区 | 99精品免费 | 黄色官网在线观看 | 日日操日日插 | 国产精品一区二区av日韩在线 | 日韩欧美成人网 | 欧美 另类 交 | 91在线视频免费播放 | 色婷婷在线观看视频 | 91网免费观看 | 在线观看一区二区视频 | 国产夫妻av在线 | 亚洲精品九九 | 色播亚洲婷婷 | 欧美日韩精品免费观看视频 | 最新日韩视频在线观看 | 97超碰在| 一区二区免费不卡在线 | 激情久久综合网 | 国产精品久久久久久一区二区三区 | 亚州中文av | 日韩高清二区 | 成人免费大片黄在线播放 | 在线观看免费一区 | 婷婷激情五月综合 | 狠狠色狠狠色 | 精品在线看 | 国产福利91精品一区 | 成人午夜久久 | 久久久精品国产免费观看一区二区 | 久久精品一区二区三区四区 | 深爱激情开心 | 蜜臀av性久久久久av蜜臀妖精 | 久草干| 在线草 | 成人免费观看视频网站 | 天天综合91 | 香蕉国产91 | 国产精品久久久久久久午夜片 | 99精品视频99| 一二三区高清 | 国产午夜精品久久久久久久久久 | 美女激情影院 | 在线视频 91 | 五月开心色 | 国产精品专区一 | 国产一二三四在线观看视频 | 亚洲国产中文字幕在线 | 精品视频9999 | 97精品超碰一区二区三区 | 涩涩爱夜夜爱 | 欧美日韩国产在线一区 | 人人插人人舔 | 久久激情婷婷 | 射久久| 国产精品婷婷午夜在线观看 | 天天艹天天干天天 | 亚a在线| 日韩av一区在线观看 | 日韩精品不卡在线观看 | 精品国产亚洲一区二区麻豆 | 久草在线最新视频 | 成人在线免费小视频 | 欧美成人手机版 | 一级片视频在线 | 色综合久久88色综合天天人守婷 | 国产精品久久久久三级 | 亚洲精品国产片 | 人人玩人人添人人澡97 | 人成午夜视频 | 日韩欧美视频一区 | 免费手机黄色网址 | av线上看 | 国产婷婷久久 | 精品国产乱码一区二区三区在线 | 亚洲精品在线视频播放 | 久久综合精品国产一区二区三区 | 久久欧美视频 | 97精品国产97久久久久久久久久久久 | 日韩在线精品一区 | av中文天堂 | 久久69精品久久久久久久电影好 | 日韩高清网站 | 国产精品亚| 亚洲片在线观看 | 91精品对白一区国产伦 | 国产午夜麻豆影院在线观看 | 久久不射网站 | 国产精品久久久久久久久久 | 婷婷久久婷婷 | 亚洲国产中文在线 | 韩国中文三级 | 91在线色 | 国产69精品久久久久99 | 狠狠狠色丁香综合久久天下网 | 亚洲 欧洲 国产 精品 | 青草视频在线免费 | 在线观看视频在线 | aaa日本高清在线播放免费观看 | 91麻豆精品国产自产 | 精品九九久久 | 国产精成人品免费观看 | 久久伊人热 | 免费在线激情视频 | 天天在线视频色 | 国产精品完整版 | 国产精品麻豆果冻传媒在线播放 | 日韩欧美高清在线观看 | 国产男女爽爽爽免费视频 | 91女子私密保健养生少妇 | ,午夜性刺激免费看视频 | 久久中文字幕导航 | 国产精品网站 | 亚洲精品免费观看 | 夜夜操网 | 日韩精品视频久久 | 亚洲精品国产精品国自产在线 | 国产精彩在线视频 | 婷婷四房综合激情五月 | 日韩va亚洲va欧美va久久 | 亚洲视频每日更新 | 亚洲精品五月 | 婷婷六月天丁香 | 亚洲精品国精品久久99热 | 天堂资源在线观看视频 | 安徽妇搡bbbb搡bbbb | 国产一区高清在线观看 | 99热在线观看 | av亚洲产国偷v产偷v自拍小说 | 久久蜜臀一区二区三区av | 精品久久久久久久久久久久久久久久 | 国产亚洲精品久久久久久网站 | 开心激情五月婷婷 | 亚洲永久精品在线 | 精品爱爱 | 中文资源在线观看 | 精品亚洲免a| 一区二区三区在线免费观看视频 | 中文字幕免费国产精品 | 亚欧日韩av | 看片的网址 | 亚洲精品女人久久久 | 福利视频一区二区 | 国内精品久久久久久中文字幕 | 天天综合网久久 | 久久久穴 | av黄网站| 久久久久亚洲国产 | 成人网中文字幕 | 日韩精品欧美专区 | 国产精品视频地址 | 日韩av成人免费看 | 日韩理论电影在线观看 | 欧美日韩精品在线 | 国产黑丝一区二区 | 色瓜| 福利视频导航网址 | 高清不卡一区二区在线 | av成人资源 | 国产精品一区二区久久 | 精品一区 在线 | 99视频这里只有 | 色婷婷婷 | 在线观看视频中文字幕 | 国产日产高清dvd碟片 | zzijzzij日本成熟少妇 | 九九九热视频 | 二区精品视频 | 国产成人三级一区二区在线观看一 | 中文字幕国语官网在线视频 | 成人av高清在线观看 | 又黄又爽又刺激 | av动态图片 | 亚洲精品一区二区网址 | 91精品成人 | 日韩试看 | 亚洲午夜精品一区 | 97色噜噜 | 国产精品久久久久久久久久久久冷 | 麻豆视频在线观看免费 | 久精品视频免费观看2 | 中文字幕的 | 网站免费黄色 | 永久免费精品视频网站 | 干亚洲少妇| 欧美久久久久久久久久久 | 欧美日韩在线网站 | 久久久久久久久国产 | 日韩精品一区二区久久 | 肉色欧美久久久久久久免费看 | 6080yy精品一区二区三区 | 日本视频网 | 国产精品成人免费精品自在线观看 | 一本一本久久aa综合精品 | a级国产乱理伦片在线播放 久久久久国产精品一区 | 免费观看性生交大片3 | 2021国产视频 | 中文字幕乱码一区二区 | 久久精品免费 | 欧美一级片 | 久久久精品99 | 欧美99热 | 黄色成年网站 | 波多野结衣精品在线 | 四虎影视久久久 | 97人人澡人人添人人爽超碰 | av中文字幕在线看 | 97视频免费在线观看 | 久久久久久久久毛片 | 制服丝袜在线91 | 在线观看国产永久免费视频 | 五月天网站在线 | 国产精品美女久久久久久免费 | 精品国产伦一区二区三区观看方式 | 亚洲成人家庭影院 | 久久国产精品99国产精 | 亚洲欧美国产精品 | 黄色天堂在线观看 | 亚洲国产最新 | 成年人视频在线免费 | 91麻豆精品国产91久久久无限制版 | 国产伦精品一区二区三区高清 | 色成人亚洲 | av网站在线免费观看 | 成人精品一区二区三区中文字幕 | 国产美女精品久久久 | 性色av免费观看 | 91探花在线视频 | 久久视频在线观看 | 亚洲精品国内 | 国产女人40精品一区毛片视频 | 人人艹视频| 久久久久久久久久福利 | 在线观看成人国产 | 亚洲最大成人网4388xx | 最近中文字幕在线播放 | 欧美热久久 | 亚洲伊人天堂 | 中文字幕制服丝袜av久久 | 国产.精品.日韩.另类.中文.在线.播放 | av在线等 | 欧美综合国产 | 精品国产伦一区二区三区 | 国产精品不卡av | 精品视频专区 | 91尤物在线播放 | 天堂av免费在线 | 免费合欢视频成人app | 精品久久在线 | 国产精品一区久久久久 | 国产免费中文字幕 | 日韩欧美国产免费播放 | 麻豆影视在线免费观看 | 经典三级一区 | 丝袜美腿一区 | 亚洲精品1区2区3区 超碰成人网 | 久久精品这里都是精品 | 狠狠激情中文字幕 | 久草av在线播放 | 最新日韩视频在线观看 | 国产一区二区在线观看视频 | 在线中文字幕视频 | 国产破处在线播放 | av中文字幕日韩 | 色欧美88888久久久久久影院 | 国产日本三级 | 92国产精品久久久久首页 | 81国产精品久久久久久久久久 | 婷婷色在线资源 | 人人舔人人干 | 免费看污的网站 | 香蕉久久久久久av成人 | 开心色激情网 | 亚洲精品一区二区在线观看 | 亚洲日本成人网 | 欧美在线91| 91精品国自产在线观看欧美 | 日韩电影在线观看一区二区三区 | 日韩在线小视频 | 国产网红在线 | 日本精品视频网站 | 久久综合国产伦精品免费 | 日韩剧| 成人午夜免费福利 | 国产精品入口麻豆www | www.久久视频 | 在线亚洲欧美日韩 | 亚洲精品综合一区二区 | 亚洲成人在线免费 | 91视频免费 | 久久久久亚洲国产 | www.超碰97.com| 丁香五香天综合情 | 中文乱幕日产无线码1区 | zzijzzij日本成熟少妇 | 欧美成亚洲 | 亚洲人成免费 | 黄色91免费观看 | 黄色一区二区在线观看 | 亚洲成a人片在线www | 日本性高潮视频 | 日韩网站在线观看 | 久久久精品欧美一区二区免费 | 国产视频一区二区在线 | 激情视频一区二区三区 | 久久伊99综合婷婷久久伊 | 国产精品aⅴ | 午夜精品区 | 欧美久久综合 | 日韩三级中文字幕 | 久久久久久久国产精品 | 麻豆系列在线观看 | 国产流白浆高潮在线观看 | 美女av在线免费 | 国产99爱 | 亚洲二区精品 | 精品成人在线 | 久艹视频在线免费观看 | 精品免费久久久久久 | 四虎国产 | 国产粉嫩在线观看 | 欧美日韩有码 | 91中文字幕一区 | 国产99一区视频免费 | 欧美久久久久 | 国产美女免费 | 在线观看成人国产 | 国产视频在线免费 | 国产在线观看黄 | 色九九视频 | 久久久久久国产精品久久 | 久久在草| 亚洲深夜影院 | 成人在线观看日韩 | 国产偷v国产偷∨精品视频 在线草 | 国产精品久久久一区二区 | 摸bbb搡bbb搡bbbb | 中文字幕在线观看一区二区三区 | 人人射人人插 | 欧美福利久久 | 免费国产在线观看 | 久久婷婷国产色一区二区三区 | 亚洲激色| 国产精品6999成人免费视频 | 在线观看黄 | 亚洲精品乱码久久久久久高潮 | 久久99精品久久久久蜜臀 | 黄色在线观看www | 国产精品你懂的在线观看 | 久久大视频 | 在线之家免费在线观看电影 |