日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SparkCore基础

發布時間:2024/7/5 编程问答 64 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkCore基础 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Spark簡介

1?什么是Spark

2?Spark特點

3?Spark分布式環境安裝

3.1?Spark HA的環境安裝

3.2?動態增刪一個worker節點到集群

4?Spark核心概念

5 Spark案例

5.2? Master URL

5.3?spark日志的管理

5.4?WordCount案例程序的執行過程

6 Spark作業運行架構圖(standalone模式)

7?RDD操作

7.1?RDD初始化

7.2?RDD操作

7.3?transformation轉換算子

7.3?action行動算子

8?高級排序

8.1?普通的排序

8.2 二次排序

8.3?分組TopN

8.4?優化分組TopN

9?持久化操作

9.1 為什要持久化

9.2?如何進行持久化

9.3?持久化策略

9.4?如何選擇持久化策略

10 共享變量

10.1 概述

10.2?broadcast廣播變量

10.3?accumulator累加器

10.4?自定義累加器


? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?SparkCore基礎

1?什么是Spark

Spark是一個通用的可擴展的處理海量數據集的計算引擎。

Spark集成離線計算,實時計算,SQL查詢,機器學習,圖計算為一體的通用的計算框架。

2?Spark特點

(1)快:相比給予MR,官方表明基于內存計算spark要快mr100倍,基于磁盤計算spark要快mr10倍

快的原因:①基于內存計算,②計算和數據的分離 ③基于DAGScheduler的計算劃分 ④只有一次的Shuffle輸出操作

(2)易用:Spark提供超過80多個高階算子函數,來支持對數據集的各種各樣的計算,使用的時候,可以使用java、scala、python、R,非常靈活易用。

(3)通用:在一個項目中,既可以使用離線計算,也可以使用其他比如,SQL查詢,機器學習,圖計算等等,而這時Spark最強大的優勢

(4)到處運行

3?Spark分布式環境安裝

(1)下載解壓,添加環境變量

(2)修改配置文件

spark的配置文件,在$SPARK_HOME/conf目錄下

①拷貝slaves和spark-env.sh文件 :cp slaves.template slaves和cp spark-env.sh.template spark-env.sh

②修改slaves配置,配置spark的從節點的主機名,spark中的從節點叫做worker,主節點叫做Master。vim slaves

bigdata02 bigdata03

③修改spark-env.sh文件,添加如下內容

export JAVA_HOME=/opt/jdk export SCALA_HOME=/home/refuel/opt/mouldle/scala export SPARK_MASTER_IP=bigdata01 export SPARK_MASTER_PORT=7077 ##rpc通信端口,類似hdfs的9000端口,不是50070 export SPARK_WORKER_CORES=2 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_MEMORY=1g export HADOOP_CONF_DIR=/home/refuel/opt/mouldle/hadoop/etc/hadoop

(3)同步spark到其它節點中

3.1?Spark HA的環境安裝

有兩種方式解決單點故障,一種基于文件系統FileSystem(生產中不用),還有一種基于Zookeeper(使用)。 配置基于Zookeeper的一個ha是非常簡單的,只需要在spark-env.sh中添加一句話即可。

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"

spark.deploy.recoveryMode設置成 ZOOKEEPER spark.deploy.zookeeper.urlZooKeeper URL spark.deploy.zookeeper.dir ZooKeeper 保存恢復狀態的目錄,缺省為 /spark。因為ha不確定master在bigdata01上面啟動,所以將export SPARK_MASTER_IP=bigdata01和export SPARK_MASTER_PORT=7077注釋掉

3.2?動態增刪一個worker節點到集群

(1)上線一個節點:不需要在現有集群的配置上做任何修改,只需要準備一臺worker機器即可,可和之前的worker的配置相同。

(2)下線一個節點:kill或者stop-slave.sh都可以

4?Spark核心概念

ClusterManager:在Standalone(依托于spark集群本身)模式中即為Master(主節點),控制整個集群,監控Worker。在YARN模式中為資源管理器ResourceManager。

Worker:從節點,負責控制計算節點,啟動Executor。在YARN模式中為NodeManager,負責計算節點的控制,啟動的進程叫Container。

Driver:運行Application的main()函數并創建SparkContext(是spark中最重要的一個概念,是spark編程的入口,作用相當于mr中的Job)。

Executor:執行器,在worker node上執行任務的組件、用于啟動線程池運行任務。每個Application擁有獨立的一組Executors。

SparkContext:整個應用的上下文,控制應用的生命周期,是spark編程的入口。

RDD:彈性式分布式數據集。Spark的基本計算單元,一組RDD可形成執行的有向無環圖RDD Graph。

DAGScheduler:實現將Spark作業分解成一到多個Stage,每個Stage根據RDD的Partition個數決定Task的個數,然后生成相應的Task set放到TaskScheduler中。 DAGScheduler就是Spark的大腦,中樞神經

TaskScheduler:將任務(Task)分發給Executor執行。

Stage:一個Spark作業一般包含一到多個Stage。

Task :一個Stage包含一到多個Task,通過多個Task實現并行運行的功能。 task的個數由rdd的partition分區決定

Transformations:轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,并不會去執行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。

Actions:操作/行動(Actions)算子 (如:count, collect, foreach等),Actions操作會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啟動計算的動因。

SparkEnv:線程級別的上下文,存儲運行時的重要組件的引用。SparkEnv內創建并包含如下一些重要組件的引用。

MapOutPutTracker:負責Shuffle元信息的存儲

BroadcastManager:負責廣播變量的控制與元信息的存儲。

BlockManager:負責存儲管理、創建和查找塊。

MetricsSystem:監控運行時性能指標信息。

SparkConf:負責存儲配置信息。作用相當于hadoop中的Configuration。

5 Spark案例

pom文件的依賴配置如下

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- scala去除,因為spark-core包里有了scala的依賴<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency> --><!-- sparkcore --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.2</version></dependency><!-- sparksql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.2</version></dependency><!-- sparkstreaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.2</version></dependency></dependencies>

注意:入口類為SparkContext,java版本的是JavaSparkContext,scala的版本就是SparkContext;SparkSQL的入口有SQLContext、HiveContext;SparkStreaming的入口又是StreamingContext。

java版本

public class JavaSparkWordCountOps {public static void main(String[] args) {//step 1、創建編程入口類SparkConf conf = new SparkConf();conf.setMaster("local[*]");conf.setAppName(JavaSparkWordCountOps.class.getSimpleName());JavaSparkContext jsc = new JavaSparkContext(conf);//step 2、加載外部數據 形成spark中的計算的編程模型RDDJavaRDD<String> linesRDD = jsc.textFile("E:/hello.txt");// step 3、對加載的數據進行各種業務邏輯操作---轉換操作transformationJavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {public Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split("\\s+")).iterator();}});//JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());System.out.println("-----經過拆分之后的rdd數據----");wordsRDD.foreach(new VoidFunction<String>() {public void call(String s) throws Exception {System.out.println(s);}});System.out.println("-----word拼裝成鍵值對----");JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});//JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(word -> new Tuple2<String, Integer>(word, 1));pairsRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});System.out.println("------按照相同的key,統計value--------------");JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {int i = 1 / 0; //印證出這些轉換的transformation算子是懶加載的,需要action的觸發return v1 + v2;}});//JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey((v1, v2) -> v1 + v2);retRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});} }

scala版本

object SparkWordCountOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCount")val sc = new SparkContext(conf)//load data from fileval linesRDD:RDD[String] = sc.textFile("E:/hello.txt")val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split("\\s+"))val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))val ret = pairsRDD.reduceByKey((v1, v2) => v1 + v2)ret.foreach(t => println(t._1 + "---" + t._2))sc.stop()} }

5.2? Master URL

master-url通過sparkConf.setMaster來完成。代表的是spark作業的執行方式,或者指定的spark程序的cluster-manager的類型。

master含義
local程序在本地運行,同時為本地程序提供一個線程來處理
local[M]程序在本地運行,同時為本地程序分配M個工作線程來處理
local[*]程序在本地運行,同時為本地程序分配機器可用的CPU core的個數工作線程來處理
local[M, N]程序在本地運行,同時為本地程序分配M個工作線程來處理,如果提交程序失敗,會進行最多N次的重試
spark://ip:port基于standalone的模式運行,提交撐到ip對應的master上運行
spark://ip1:port1,ip2:port2基于standalone的ha模式運行,提交撐到ip對應的master上運行
yarn/啟動腳本中的deploy-mode配置為cluster基于yarn模式的cluster方式運行,SparkContext的創建在NodeManager上面,在yarn集群中
yarn/啟動腳本中的deploy-mode配置為client基于yarn模式的client方式運行,SparkContext的創建在提交程序的那臺機器上面,不在yarn集群中

5.3?spark日志的管理

(1)全局管理:項目classpath下面引入log4j.properties配置文件進行管理

# 基本日志輸出級別為INFO,輸出目的地為console log4j.rootCategory=INFO, consolelog4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# 輸出配置的是spark提供的日志級別 log4j.logger.org.spark_project.jetty=INFO log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR

(2)局部管理 :就是在當前類中進行日志的管理。

import org.apache.log4j.{Level, Logger} Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.spark_project").setLevel(Level.WARN)

5.4?WordCount案例程序的執行過程

當deploy-mode為client模式的時候,driver就在我們提交作業的本機,而spark的作業對應的executor在spark集群中運行。

在上圖中可以發現相鄰兩個rdd之間有依賴關系,依賴分為寬依賴和窄依賴。

窄依賴:rdd中的partition中的數據只依賴于父rdd中的一個partition或者常數個partition。常見的窄依賴操作有:flatMap,map,filter,coalesce等

寬依賴:rdd中的partition中的數據只依賴于父rdd中的所有partition。常見的寬依賴操作有reduceByKey,groupByKey,join,sortByKey,repartition等

rdd和rdd之間的依賴關系構成了一個鏈條,這個鏈條稱之為lineage(血緣)

6 Spark作業運行架構圖(standalone模式)

①啟動spark集群:通過spark的start-all.sh腳本啟動spark集群,啟動了對應的Master進程和Worker進程

②Worker啟動之后向Master進程發送注冊信息

③Worker向Master注冊成功之后,worker要不斷的向master發送心跳包,去監聽主節點是否存在

④Driver向Spark集群提交作業,就是向Master提交作業,申請運行資源

⑤Master收到Driver的提交請求,向Worker節點指派相應的作業任務,就是在對應的Worker節點上啟動對應的executor進程

⑥Worker節點接收到Master節點啟動executor任務之后,就啟動對應的executor進程,向master匯報成功啟動,可以接收任務

⑦executor進程啟動之后,就像Driver進程進行反向注冊,告訴Driver誰可以執行spark任務

⑧Driver接收到注冊之后,就知道向誰發送spark作業,那么這樣在spark集群中就有一組獨立的executor進程為該Driver服務

⑨DAGScheduler根據編寫的spark作業邏輯,將spark作業分成若干個階段Stage(基于Spark的transformation里是否有shuffle Dependency),然后為每一個階段組裝一批task組成taskSet(task里面包含了序列化之后的我們編寫的spark transformation),然后將這些DAGScheduler組裝好的taskSet,交給taskScheduler,由taskScheduler將這些任務發給對應的executor

⑩executor進程接收到了Driver發送過來的taskSet之后,進行反序列化,然后將這些task封裝進一個叫tasksunner的線程中,然后放到本地線程池中調度我們的作業的執行。

7?RDD操作

7.1?RDD初始化

RDD的初始化,原生api提供的2中創建方式:

①是讀取文件textFile

②加載一個scala集合parallelize。

當然,也可以通過transformation算子來創建的RDD。

7.2?RDD操作

RDD操作算子的分類,基本上分為兩類:transformation和action,當然更加細致的分,可以分為輸入算子,轉換算子,緩存算子,行動算子。

輸入:在Spark程序運行中,數據從外部數據空間(如分布式存儲:textFile讀取HDFS等,parallelize方法輸入Scala集合或數據)輸入Spark,數據進入Spark運行時數據空間,轉化為Spark中的數據塊,通過BlockManager進行管理。

運行:在Spark數據輸入形成RDD后便可以通過變換算子,如filter等,對數據進行操作并將RDD轉化為新的RDD,通過Action算子,觸發Spark提交作業。 如果數據需要復用,可以通過Cache算子,將數據緩存到內存。

輸出:程序運行結束數據會輸出Spark運行時空間,存儲到分布式存儲中(如saveAsTextFile輸出到HDFS),或Scala數據或集合中(collect輸出到Scala集合,count返回Scala int型數據)。

7.3?transformation轉換算子

(1)map

rdd.map(func):RDD,對rdd集合中的每一個元素,都作用一次該func函數,之后返回值為生成元素構成的一個新的RDD。

(2)flatMap

rdd.flatMap(func):RDD ==>rdd集合中的每一個元素,都要作用func函數,返回0到多個新的元素,這些新的元素共同構成一個新的RDD。

map操作是一個一到一的操作,flatMap操作是一個1到多的操作

(3)filter

rdd.filter(func):RDD ==> 對rdd中的每一個元素操作func函數,該函數的返回值為Boolean類型,保留返回值為true的元素,共同構成一個新的RDD,過濾掉哪些返回值為false的元素。

(4)sample

rdd.sample(withReplacement:Boolean, fraction:Double [, seed:Long]):RDD ===> 抽樣,sample抽樣不是一個精確的抽樣。一個非常重要的作用,就是來看rdd中數據的分布情況,根據數據分布的情況,進行各種調優與優化,防止數據傾斜。

withReplacement:抽樣的方式,true有放回抽樣, false為無返回抽樣

fraction: 抽樣比例,取值范圍就是0~1

seed: 抽樣的隨機數種子,有默認值,通常也不需要傳值

(5)union

rdd1.union(rdd2),聯合rdd1和rdd2中的數據,形成一個新的rdd,其作用相當于sql中的union all。

(6)join

join就是sql中的inner join。

注意:要想兩個RDD進行連接,那么這兩個rdd的數據格式,必須是k-v鍵值對的,其中的k就是關聯的條件,也就是sql中的on連接條件。

RDD1的類型[K, V], RDD2的類型[K, W]

內連接 :val joinedRDD:RDD[(K, (V, W))] = rdd1.join(rdd2)

左外連接 :val leftJoinedRDD:RDD[(K, (V, Option[W]))] = rdd1.leftOuterJoin(rdd2)

右外連接 :val rightJoinedRDD:RDD[(K, (Option[V], W))] = rdd1.rightOuterJoin(rdd2)

全連接 :val fullJoinedRDD:RDD[(K, (Option[V], Option[W]))] = rdd1.fullOuterJoin(rdd2)

(7)groupByKey

rdd.groupByKey(),按照key進行分組,如果原始rdd的類型時[(K, V)] ,那必然其結果就肯定[(K, Iterable[V])],是一個shuffle dependency寬依賴shuffle操作,但是這個groupByKey不建議在工作過程中使用,除非非要用,因為groupByKey沒有本地預聚合,性能較差,一般我們能用下面的reduceByKey或者combineByKey或者aggregateByKey代替就盡量代替。

(8)reduceByKey

rdd.reduceByKey(func:(V, V) => V):RDD[(K, V)] :在scala集合中學習過一個reduce(func:(W, W) => W)操作,是一個聚合操作,這里的reduceByKey按照就理解為在groupByKey(按照key進行分組[(K, Iterable[V])])的基礎上,對每一個key對應的Iterable[V]執行reduce操作。

同時reduceByKey操作會有一個本地預聚合的操作,所以是一個shuffle dependency寬依賴shuffle操作。

(9)sortByKey

按照key進行排序

(10)combineByKey

這是spark最底層的聚合算子之一,按照key進行各種各樣的聚合操作,spark提供的很多高階算子,都是基于該算子實現的。

def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = {... }

createCombiner: V => C, 相同的Key在分區中會調用一次該函數,用于創建聚合之后的類型,為了和后續Key相同的數據進行聚合;mergeValue: (C, V) => C, 在相同分區中基于上述createCombiner基礎之上的局部聚合;mergeCombiners: (C, C) => C) 將每個分區中相同key聚合的結果在分區間進行全局聚合

(11)aggregateByKey

aggregateByKey[U:?ClassTag](zeroValue:?U)(seqOp:?(U, V)?=> U, combOp:?(U, U)?=> U):?RDD[(K, U)]和combineByKey都是一個相對底層的聚合算子,可以完成系統沒有提供的其它操作,相當于自定義算子。aggregateByKey底層使用combineByKeyWithClassTag來實現,所以本質上二者沒啥區別,區別就在于使用時的選擇而已。

aggregateByKey更為簡單,但是如果聚合前后數據類型不一致,建議使用combineByKey;同時如果初始化操作較為復雜,也建議使用combineByKey。

7.3?action行動算子

這些算子都是在rdd上的分區partition上面執行的,不是在driver本地執行。

(1)foreach

用于遍歷RDD,將函數f應用于每一個元素,無返回值(action算子)

(2)count

統計該rdd中元素的個數

(3)take(n)

返回該rdd中的前N個元素,如果該rdd的數據是有序的,那么take(n)就是Top N

(4)first

take(n)中比較特殊的一個take(1)(0)

(5)collect

將分布在集群中的各個partition中的數據拉回到driver中,進行統一的處理;但是這個算子有很大的風險存在,第一,driver內存壓力很大,第二數據在網絡中大規模的傳輸,效率很低;所以一般不建議使用,如果非要用,請先執行filter。

(6)reduce

reduce是一個action操作,reduceByKey是一個transformation。reduce對一個rdd執行聚合操作,并返回結果,結果是一個值。

(7)countByKey

統計key出現的次數

(8)saveAsTextFile

保存到文件,本質上是saveAsHadoopFile[TextOutputFormat[NullWritable, Text]]

(9)saveAsObjectFile和saveAsSequenceFile

saveAsObjectFile本質上是saveAsSequenceFile

(10)saveAsHadoopFile和saveAsNewAPIHadoopFile

這二者的主要區別就是OutputFormat的區別,接口org.apache.hadoop.mapred.OutputFormat,

抽象類org.apache.hadoop.mapreduce.OutputFormat? ?所以saveAshadoopFile使用的是接口OutputFormat,saveAsNewAPIHadoopFile使用的抽象類OutputFormat,建議使用后者。

8?高級排序

8.1?普通的排序

(1)sortByKey

object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 數據類型為k-v,且是按照key進行排序val stuRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))//按照學生身高進行降序排序val height2Stu = stuRDD.map(stu => (stu.height, stu))//注意:sortByKey是局部排序,不是全局排序,如果要進行全局排序,// 必須將所有的數據都拉取到一臺機器上面才可以val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)sorted.foreach{case (height, stu) => println(stu)}sc.stop()} }case class Student(id:Int, name:String, age:Int, height:Double)

(2)sortBy

這個sortBy其實使用sortByKey來實現,但是比sortByKey更加靈活,因為sortByKey只能應用在k-v數據格式上,而這個sortBy可以應在非k-v鍵值對的數據格式上面。

val sortedBy = stuRDD.sortBy(stu => stu.height,ascending = true,numPartitions = 1)(new Ordering[Double](){override def compare(x: Double, y: Double) = y.compareTo(x)},ClassTag.Double.asInstanceOf[ClassTag[Double]]) sortedBy.foreach(println)

sortedBy的操作,除了正常的升序,分區個數以外,還需需要傳遞一個將原始數據類型,提取其中用于排序的字段;并且提供用于比較的方式,以及在運行時的數據類型ClassTag標記型trait。

(3)takeOrdered

takeOrdered也是對rdd進行排序,但是和上述的sortByKey和sortBy相比較,takeOrdered是一個action操作,返回值為一個集合,而前兩者為transformation,返回值為rdd。如果我們想在driver中獲取排序之后的結果,那么建議使用takeOrdered,因為該操作邊排序邊返回。其實是take和sortBy的一個結合體。

takeOrdered(n),獲取排序之后的n條記錄

//先按照身高降序排序,身高相對按照年齡升序排 ---> 二次排序 stuRDD.takeOrdered(3)(new Ordering[Student](){override def compare(x: Student, y: Student) = {var ret = y.height.compareTo(x.height)if(ret == 0) {ret = x.age.compareTo(y.age)}ret} }).foreach(println)

8.2 二次排序

所謂二次排序,指的是排序字段不唯一,有多個,共同排序

object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 數據類型為k-v,且是按照key進行排序val personRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))sc.stop()} }case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//對學生的身高和年齡依次排序override def compare(that: Person) = {var ret = this.height.compareTo(that.height)if(ret == 0) {ret = this.age.compareTo(that.age)}ret} }

8.3?分組TopN

在分組的情況之下,獲取每個組內的TopN數據

object GroupSortTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目進行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目內排序,不是科目間的排序,所以需要把每個科目的信息匯總val course2Infos:RDD[(String, Iterable[String])] = course2Info.groupByKey()//按照key進行分組//分組內的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()} }

8.4?優化分組TopN

上述在編碼過程當中使用groupByKey,我們說著這個算子的性能很差,因為沒有本地預聚合,所以應該在開發過程當中盡量避免使用,能用其它代替就代替。

(1)使用combineByKey優化1

object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目進行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目內排序,不是科目間的排序,所以需要把每個科目的信息匯總val course2Infos= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)//分組內的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}def createCombiner(info:String): ArrayBuffer[String] = {val ab = new ArrayBuffer[String]()ab.append(info)ab}def mergeValue(ab:ArrayBuffer[String], info:String): ArrayBuffer[String] = {ab.append(info)ab}def mergeCombiners(ab:ArrayBuffer[String], ab1: ArrayBuffer[String]): ArrayBuffer[String] = {ab.++:(ab1)} }

此時這種寫法和上面的groupByKey性能一模一樣,沒有任何的優化。

(2)使用combineByKey的優化2

object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/spark/topn.txt")//按照科目進行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目內排序,不是科目間的排序,所以需要把每個科目的信息匯總val sorted= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 3) {ab.take(3)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 3) {ab.take(3)} else {ab}} }

?

9?持久化操作

9.1 為什要持久化

一個RDD如果被多次操作,為了提交后續的執行效率,我們建議對該RDD進行持久化操作。

9.2?如何進行持久化

rdd.persist()/cache()就完成了rdd的持久化操作,我們可以將該rdd的數據持久化到內存,磁盤,等等。

如果我們已經不再對該rdd進行一個操作,而此時程序并沒有終止,可以卸載已經持久化的該rdd數據,rdd.unPersist()。

9.3?持久化策略

可以通過persist(StoreageLevle的對象)來指定持久化策略,eg:StorageLevel.MEMORY_ONLY。

持久化策略含義
MEMORY_ONLY(默認)rdd中的數據以未經序列化的java對象格式,存儲在內存中。如果內存不足,剩余的部分不持久化,使用的時候,沒有持久化的那一部分數據重新加載。這種效率是最高,但是是對內存要求最高的。
MEMORY_ONLY_SER就比MEMORY_ONLY多了一個SER序列化,保存在內存中的數據是經過序列化之后的字節數組,同時每一個partition此時就是一個比較大的字節數組。
MEMORY_AND_DISK和MEMORY_ONLY相比就多了一個,內存存不下的數據存儲在磁盤中。
MEMEORY_AND_DISK_SER比MEMORY_AND_DISK多了個序列化。
DISK_ONLY就是MEMORY_ONLY對應,都保存在磁盤,效率太差,一般不用。
xxx_2就是上述多個策略后面加了一個_2,比如MEMORY_ONLY_2,MEMORY_AND_DISK_SER_2等等,就多了一個replicate而已,備份,所以性能會下降,但是容錯或者高可用加強了。所以需要在二者直接做權衡。如果說要求數據具備高可用,同時容錯的時間花費比從新計算花費時間少,此時便可以使用,否則一般不用。
HEAP_OFF(experimental)使用非Spark的內存,也即堆外內存,比如Tachyon,HBase、Redis等等內存來補充spark數據的緩存。

9.4?如何選擇持久化策略

(1)如果要持久化的數據是可以在內存中進行保存,那么毫無疑問,選擇MEMEORY_ONLY,因為這種方式的效率是最高的,但是在生成中往往要進行緩存的數據量還是蠻大的,而且因為數據都是未經序列化的java對象,所以很容易引起頻繁的gc。

(2)如果上述滿足不了,就退而求其次,MEMORY_ONLY_SER,這種方式增加的額外的性能開銷就是序列化和反序列化,經過反序列化之后的對象就是純java對象,因此性能還是蠻高的。

(3)如果還是扛不住,再退而求其次,MEMOEY_AND_DISK_SER,因為到這一步的話,那說明對象體積確實很多,為了提交執行效率,應該盡可能的將數據保存在內存,所以就對數據進行序列化,其次在序列化到磁盤。

(4)一般情況下DISK_ONLY,DISK_SER不用,效率太低,有時候真的不容從源頭計算一遍。

(5)一般情況下我們都不用XXX_2,代備份的種種持久化策略,除非程序對數據的安全性要求非常高,或者說備份的對性能的消耗低于從頭再算一遍,我們可以使用這種xxx_2以外,基本不用。

10 共享變量

10.1 概述

如果transformation使用到Driver中的變量,在executor中執行的時候,就需要通過網絡傳輸到對應的executor,如果該變量很大,那么網絡傳輸一定會成為性能的瓶頸。Spark就提供了兩種有限類型的共享變量:累加器和廣播變量

10.2?broadcast廣播變量

廣播變量:為每個task都拷貝一份變量,將變量包裝成為一個廣播變量(broadcast),只需要在executor中拷貝一份,在task運行的時候,直接從executor調用即可,相當于局部變量變成成員變量,性能就得到了提升。

val num:Any = xxxval numBC:Broadcast[Any] = sc.broadcast(num)調用:val n = numBC.value注意:該num需要進行序列化。

10.3?accumulator累加器

累加器的一個好處是,不需要修改程序的業務邏輯來完成數據累加,同時也不需要額外的觸發一個action job來完成累加,反之必須要添加新的業務邏輯,必須要觸發一個新的action job來完成,顯然這個accumulator的操作性能更佳!

構建一個累加器val accu = sc.longAccumuator()累加的操作accu.add(參數)獲取累加器的結果val ret = accu.value val conf = new SparkConf() .setAppName("AccumulatorOps") .setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/data.txt") val words = lines.flatMap(_.split("\\s+"))//統計每個單詞出現的次數 val accumulator = sc.longAccumulatorval rbk = words.map(word => {if(word == "is")accumulator.add(1)(word, 1) }).reduceByKey(_+_) rbk.foreach(println) println("================使用累加器===================") println("is: " + accumulator.value)Thread.sleep(1000000) sc.stop()

注意:累加器的調用,在action之后被調用,也就是說累加器必須在action觸發之后;多次使用同一個累加器,應該盡量做到用完即重置;盡量給累加器指定name,方便我們在web-ui上面進行查看

10.4?自定義累加器

自定義一個類繼承AccumulatorV2,重寫方法

/*自定義累加器IN 指的是accmulator.add(sth.)中sth的數據類型OUT 指的是accmulator.value返回值的數據類型*/ class MyAccumulator extends AccumulatorV2[String, Map[String, Long]] {private var map = mutable.Map[String, Long]()/*** 當前累加器是否有初始化值* 如果為一個long的值,0就是初始化值,如果為list,Nil就是初始化值,是map,Map()就是初始化值*/override def isZero: Boolean = trueoverride def copy(): AccumulatorV2[String, Map[String, Long]] = {val accu = new MyAccumulatoraccu.map = this.mapaccu}override def reset(): Unit = map.clear()//分區內的數據累加 is: 5, of:4override def add(word: String): Unit = {if(map.contains(word)) {val newCount = map(word) + 1map.put(word, newCount)} else {map.put(word, 1)} // map.put(word, map.getOrElse(word, 0) + 1)}//多個分區間的數據累加override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {other.value.foreach{case (word, count) => {if(map.contains(word)) {val newCount = map(word) + countmap.put(word, newCount)} else {map.put(word, count)} // map.put(word, map.getOrElse(word, 0) + count)}}}override def value: Map[String, Long] = map.toMap }

注冊使用:

object _08AccumulatorOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("$AccumulatorOps").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data.txt")val words = lines.flatMap(_.split("\\s+"))//注冊val myAccu = new MyAccumulator()sc.register(myAccu, "myAccu")//統計每個單詞出現的次數val pairs = words.map(word => {if(word == "is" || word == "of" || word == "a")myAccu.add(word)(word, 1)})val rbk = pairs.reduceByKey(_+_)rbk.foreach(println)println("=============累加器==========")myAccu.value.foreach(println)Thread.sleep(10000000)sc.stop()} }

?

總結

以上是生活随笔為你收集整理的SparkCore基础的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

韩国av免费在线观看 | 日韩黄色在线 | av在线免费播放网站 | 久久国产精品小视频 | 4hu视频| 人人射人人澡 | 久久免费视频8 | 国产电影一区二区三区四区 | 亚洲精品网址在线观看 | 黄色成人av| 国产视频在线一区二区 | 精品自拍sae8—视频 | 日韩电影在线观看一区二区三区 | 免费日韩一区二区三区 | 亚洲激情国产精品 | 午夜精品在线看 | 九九九视频在线 | 成人黄色片在线播放 | 国产视频精选在线 | 天天操夜夜操国产精品 | 国产成人精品亚洲精品 | 1024手机基地在线观看 | 欧美日韩不卡一区二区 | 激情欧美一区二区三区免费看 | 久草网站 | 欧美成人手机版 | 欧美乱码精品一区 | 亚洲国产片色 | 综合网在线视频 | 久久免费成人精品视频 | 激情五月在线视频 | 激情综合五月婷婷 | 9草在线| 国产精品久久久久久久av电影 | 国产 日韩 欧美 中文 在线播放 | 色婷婷综合久色 | www.久久精品视频 | 欧美日韩在线观看视频 | 人人干狠狠干 | 四川bbb搡bbb爽爽视频 | 国产福利a | 国产精品欧美一区二区三区不卡 | 国产一级性生活视频 | 久久久久女教师免费一区 | 亚洲精品国产区 | 久久国内精品99久久6app | 超碰人人舔 | 国产精品一区二区av日韩在线 | 亚洲国产精品视频 | 在线国产中文字幕 | 久久久男人的天堂 | 久久精品79国产精品 | 天天爽夜夜爽人人爽曰av | 在线色吧| 美女亚洲精品 | 精品三级av| 婷婷激情综合网 | 日韩精品大片 | 91mv.cool在线观看 | 午夜丰满寂寞少妇精品 | 五月婷婷操 | 操操综合| 亚洲欧洲精品久久 | 日韩精品视频久久 | 日韩三级视频在线观看 | 欧美一级免费在线 | 国产精品乱码一区二区视频 | 久久久久久免费毛片精品 | 探花视频免费在线观看 | 中文字幕一区二区三区久久蜜桃 | 精品视频免费观看 | 又黄又爽又色无遮挡免费 | 国产高清视频免费最新在线 | 国产精品成久久久久 | 爱色婷婷| 99视频这里有精品 | 国产美女黄网站免费 | 国产在线视频资源 | 欧美伦理一区二区三区 | 久久国产精品久久久久 | 黄色三级久久 | av电影中文 | 日韩av在线免费看 | 99精品久久精品一区二区 | 中文av网站 | 在线免费av电影 | 亚洲天堂网视频在线观看 | 精品久久久久久久久久久久久久久久久久 | 久久久久久草 | 国产精品福利小视频 | 天天做天天爱天天综合网 | 国产日韩欧美在线影视 | 国产女人免费看a级丨片 | 日韩成人在线免费观看 | 亚洲欧美日韩国产一区二区三区 | 色福利网站 | 欧美日韩不卡一区二区 | 91精品久久久久久粉嫩 | 欧美日韩免费观看一区=区三区 | 国产在线不卡 | 婷婷色网 | 九九九九精品 | www.色的| 成人av中文字幕在线观看 | 97人人视频 | 99精品在线观看 | 2019中文最近的2019中文在线 | 99精品国产一区二区三区麻豆 | 久久综合一本 | 免费看污污视频的网站 | 国产精品一区二区三区电影 | www.91国产| www激情久久 | 日日麻批40分钟视频免费观看 | 在线观看免费国产小视频 | 亚洲国产精品电影 | 国产不卡精品视频 | 日韩国产欧美在线播放 | 欧美激情视频一区 | 午夜国产影院 | www.黄色片网站| 欧美最新另类人妖 | 亚洲91视频 | 九九免费精品视频在线观看 | 精品一区二区在线观看 | 在线观看一 | 99热在线观看 | 丁香婷婷久久久综合精品国产 | 69精品视频在线观看 | 一级a性色生活片久久毛片波多野 | 亚洲一区日韩精品 | 人人看人人做人人澡 | 日韩亚洲在线 | 国产高清免费在线播放 | 国产电影黄色av | 韩国在线视频一区 | 91热爆视频 | 97网在线观看 | 婷婷午夜天 | 午夜精品久久久久久久99水蜜桃 | 精品久久久久久久久久久院品网 | 国内外成人在线 | 韩国av免费在线 | 中文字幕在线观看日本 | 国产视频一区在线 | 国产成人免费网站 | 国产精品久久久久三级 | 国产亚洲精品久久久久久移动网络 | 久久久穴 | 成人毛片100免费观看 | 欧美性黑人 | 超碰免费观看 | 欧美 亚洲 另类 激情 另类 | 欧美日韩色婷婷 | 91av官网| 精品96久久久久久中文字幕无 | 国内精品久久久久影院优 | 午夜的福利 | av一级二级 | 日韩中文在线电影 | 视频三区在线 | 天天干,夜夜操 | 欧洲亚洲国产视频 | 国产麻豆成人传媒免费观看 | 中文字幕在线观看完整 | 91精品国自产在线观看欧美 | 69精品视频 | 久久免费视频4 | 久久久91精品国产 | 狠狠撸电影 | 久久夜色精品国产欧美乱极品 | 成人国产在线 | 久久精品国产亚洲精品2020 | 伊人色播 | 日韩中文字幕免费在线观看 | 黄色aaa级片 | 四虎在线视频免费观看 | 免费看一级特黄a大片 | 人人爽人人爽人人片av免 | 狠狠色丁香婷婷综合视频 | 国产色小视频 | 日韩高清观看 | 久久久久久久久久久免费 | 久久私人影院 | 成年人app网址| 日韩高清黄色 | 精品一区二区久久久久久久网站 | 91桃色在线免费观看 | 人人射网站 | 手机在线看片日韩 | av丝袜天堂 | 激情综合网五月婷婷 | 国产精品久久久久久久久软件 | 国产精品美 | 国产专区精品视频 | 亚洲国产精品资源 | 处女av在线| 日韩精品视频在线观看免费 | 亚洲做受高潮欧美裸体 | 不卡av电影在线观看 | 国产黄色片久久久 | 国产精品免费在线视频 | 国产精品日韩在线观看 | 国产在线播放观看 | 日本中文字幕影院 | 亚洲国产日韩精品 | 天天操狠狠干 | 久久久官网 | aaawww| 日韩一三区 | 在线观看日本高清mv视频 | 日韩中文字幕一区 | 国产在线观看av | 在线 国产 亚洲 欧美 | 免费看污片 | 亚洲精品看片 | 色久五月| 亚洲精品国偷拍自产在线观看蜜桃 | 久久久久99999| 毛片久久久 | 免费观看黄 | 久久精品视 | 日本黄色免费在线 | 狠狠色伊人亚洲综合网站野外 | 成人免费看电影 | 又黄又爽的视频在线观看网站 | 天天综合日日夜夜 | 久久精品欧美一区二区三区麻豆 | 四虎影视成人永久免费观看视频 | 国产精品 国内视频 | 香蕉免费在线 | 欧美亚洲免费在线一区 | 97人人澡人人爽人人模亚洲 | 久久免费的精品国产v∧ | 奇米四色影狠狠爱7777 | 欧美久久久久久久 | 99久久婷婷国产综合亚洲 | 99视频久久 | 伊人婷婷久久 | 久久免费a | 在线天堂v| 99久热在线精品视频成人一区 | 91亚·色| 久久精品2 | 欧美有色 | 亚洲综合色视频在线观看 | 精品国产91亚洲一区二区三区www | 日韩av中文字幕在线 | 99九九免费视频 | 亚洲爽爽网 | 成人一区二区在线 | 国产一级淫片免费看 | 99色在线播放| 99热最新 | 狠狠久久婷婷 | 911国产| 日韩精品一区二区三区高清免费 | 成人a级黄色片 | 成年人免费在线观看网站 | 中文字幕在线视频一区 | 国产区精品视频 | 欧美一级性生活 | 欧美色图亚洲图片 | 欧美精选一区二区三区 | 欧美日韩色婷婷 | 免费午夜在线视频 | 久久99久久99久久 | 91桃花视频| 在线观看aaa | 色窝资源 | 日韩久久精品一区二区 | 国产无限资源在线观看 | 免费视频三区 | 91在线视频网址 | 亚洲专区欧美专区 | 亚洲精品中文字幕在线观看 | 在线观看香蕉视频 | 久久污视频 | 久久久久久久网 | 国产成人三级在线播放 | 丁香婷婷激情五月 | 亚洲国产免费看 | 九九热av| 黄色视屏在线免费观看 | 成人高清在线 | 精品视频123区在线观看 | 国产大陆亚洲精品国产 | 最近中文字幕在线中文高清版 | av电影不卡| 国产成人一区二区三区 | 99精品亚洲 | 亚洲最大免费成人网 | 99精品国产视频 | 深爱激情五月网 | 97成人精品视频在线播放 | 国产一级性生活视频 | 久草视频看看 | 国产视频在线播放 | 日本九九视频 | 欧美一区日韩一区 | 一本一本久久a久久精品综合 | 中文字幕免费高清 | 极品久久久久 | 97香蕉超级碰碰久久免费软件 | 国产不卡在线看 | 性色大片在线观看 | 国产精品亚洲片夜色在线 | 日韩在线观看第一页 | 亚洲欧洲成人精品av97 | 国产在线观看你懂得 | 久久久这里有精品 | 麻豆久久久| 国产网站在线免费观看 | 亚洲第一伊人 | 久久国产精品久久精品国产演员表 | 中文字幕日韩伦理 | 国产精品毛片久久久久久久久久99999999 | 性色av免费观看 | 91精品国产乱码久久 | 丁香五月缴情综合网 | 999久久国产精品免费观看网站 | 91精品在线免费 | 久久久久激情电影 | 91在线看黄| 日韩在线免费高清视频 | 国产明星视频三级a三级点| 最近中文字幕第一页 | 狠狠干网址 | 国产精品免费观看在线 | 国产免费观看高清完整版 | 免费久久99精品国产 | 日韩在线高清免费视频 | 久久精品美女 | 久久五月激情 | 99精品观看 | 色激情在线 | 成人av免费| 欧美久久久久久久久久久久久 | 波多野结衣电影一区 | 久久99亚洲精品 | 天天射天天拍 | 中文十次啦 | 成人免费网站视频 | 97综合视频| 日本久久久影视 | www.com.黄 | 黄色亚洲免费 | 日日操操 | 久久国产精品影片 | 久久资源在线 | 国产亚洲一区二区三区 | 精品嫩模福利一区二区蜜臀 | 国产精品久久久久久久99 | av 一区二区三区四区 | 香蕉久草 | 久草视频一区 | 久久成人国产精品入口 | 久日精品| 国产精品免费在线 | 一区二区亚洲精品 | 日本一区二区免费在线观看 | 日本激情视频中文字幕 | 九九热在线精品 | 国产精品一区二区三区电影 | 国产日韩三级 | 久久,天天综合 | 亚洲精品乱码久久久久久久久久 | 免费色视频网站 | 在线国产一区二区 | av电影免费| 91精品爽啪蜜夜国产在线播放 | 四虎在线视频免费观看 | 国产一区二区高清视频 | 久久人91精品久久久久久不卡 | 最新日韩视频 | 久久久久免费视频 | 欧美成人精品在线 | 久久久久久久久久久高潮一区二区 | 国产真实精品久久二三区 | 一区二区三区www | 日韩高清免费电影 | 久久免费视频3 | 婷婷丁香激情五月 | 97电影手机版 | 免费在线观看亚洲视频 | 亚洲一区二区视频 | 97电影在线观看 | 久久精品视频免费 | 国产精品网站一区二区三区 | 亚洲自拍自偷 | 国产精品乱码高清在线看 | 久久精品99久久久久久 | 亚洲欧美国内爽妇网 | 国产福利在线免费 | 美女久久久久久久久久 | 深夜激情影院 | 国产精品理论片 | 久操视频在线播放 | 91视频国产免费 | 国产韩国精品一区二区三区 | 69国产成人综合久久精品欧美 | 日韩亚洲国产中文字幕 | 激情欧美一区二区三区免费看 | www.狠狠 | 一区二区精品视频 | 中文av网 | 九九久久电影 | 中文字幕有码在线 | 国产成人99av超碰超爽 | 久草在线免费看视频 | 国产免费小视频 | 在线观看色视频 | 天天操天天干天天 | 国产精品1区2区3区在线观看 | aaa日本高清在线播放免费观看 | 欧美一区二区在线免费观看 | 久久免费视频8 | 日韩区欠美精品av视频 | 国产精品露脸在线 | 91插插插免费视频 | 91九色视频在线观看 | 毛片3 | 国产中文字幕精品 | 69成人在线| 国内精品久久久久久 | 欧美精品中文字幕亚洲专区 | 中文久草 | 天天干夜夜想 | 激情综合网婷婷 | 91精品对白一区国产伦 | 99精品欧美一区二区三区 | 99精品久久久久久久久久综合 | 久黄色| 国产午夜精品一区 | 在线 高清 中文字幕 | 欧美日韩中文另类 | 色亚洲激情 | 久久香蕉国产精品麻豆粉嫩av | 久久精国产| 狠色在线| 成人一区二区三区在线 | 91九色最新 | 欧美一级性生活片 | 人人爱在线视频 | 天天操夜| 在线亚洲播放 | 久爱综合 | 精品久久久久免费极品大片 | 国产精品成人久久久 | 国产中文字幕在线 | 久久久精品视频网站 | 国产一区在线视频 | 激情综合网婷婷 | 欧美一级久久久久 | 免费aa大片 | 欧美孕妇与黑人孕交 | av免费看在线 | 国产日韩精品久久 | 在线国产一区二区 | 国产精品久久久久久久久久久免费看 | 国产精品久久久久久久久久久不卡 | 久久久久国产精品视频 | 偷拍视频一区 | www.狠狠色| 日日夜精品 | 免费在线黄色av | 日日射天天射 | 狠狠色丁香婷婷 | 日韩精品视频久久 | 国产原创在线视频 | 奇米四色影狠狠爱7777 | 中文字幕色综合网 | 国产高清免费在线播放 | 久久天堂精品视频 | 久久激情综合 | 色五月色开心色婷婷色丁香 | 国产色视频一区二区三区qq号 | 国产午夜视频在线观看 | 久久一区国产 | 欧美日韩性生活 | 日韩在线观看小视频 | 天天干天天搞天天射 | 天天爽天天碰狠狠添 | 欧美无极色 | 91 在线视频播放 | 国产精品v欧美精品 | 国产福利在线免费观看 | 一区二区三区在线观看 | 国内外成人在线 | 在线观看www91 | 黄色av电影 | 免费高清无人区完整版 | 国产真实精品久久二三区 | 亚洲午夜精品一区二区三区电影院 | 日韩久久久久 | 日韩激情在线 | 日韩精品一区二区三区中文字幕 | 精品亚洲免费 | 国产黄色精品在线观看 | www久| 亚洲理论在线观看电影 | 日本一区二区三区视频在线播放 | 久久免费视频在线观看6 | 日日摸日日添夜夜爽97 | 五月婷婷六月丁香在线观看 | 91视频88av | 一区二区在线不卡 | 精品免费| 亚洲精品1234区 | 国产亚洲在线视频 | 黄色软件网站在线观看 | 国产麻豆果冻传媒在线观看 | 久久久这里有精品 | 在线性视频日韩欧美 | 久草精品视频 | 日韩av看片| 欧美动漫一区二区三区 | 亚洲 欧洲 国产 日本 综合 | 日本黄色大片免费看 | 一级理论片在线观看 | 国产精品va在线观看入 | 深爱激情站 | 精品资源在线 | av大全在线播放 | 免费观看成年人视频 | 久久免费影院 | 日韩欧美网站 | 中文字幕中文字幕在线中文字幕三区 | 欧美精品在线观看免费 | 天天操天天操天天 | 91桃色在线播放 | 蜜臀av性久久久久蜜臀aⅴ流畅 | 99国产一区| 日韩av视屏| 成年人免费电影在线观看 | 欧美激情亚洲综合 | 麻花豆传媒mv在线观看网站 | 密桃av在线 | 中文字幕精品一区二区三区电影 | 久久人人爽人人爽人人片av免费 | www.久久免费视频 | 深爱激情五月综合 | 黄色av大片| 正在播放国产91 | 国产二级视频 | 欧美精品做受xxx性少妇 | 日日干美女 | 国产中文在线字幕 | 亚洲3级| 99久久精品国产一区二区成人 | 九九交易行官网 | 97视频亚洲 | 狠狠综合久久 | 区一区二区三在线观看 | 超碰97人人爱 | 国产精品久久久久久久久久久久 | 麻豆视频免费版 | 高清不卡一区二区在线 | 人人干人人添 | 国产超碰在线 | 韩国av一区二区三区在线观看 | 激情黄色av | 91成人精品一区在线播放69 | 99视频在线精品免费观看2 | 日本丰满少妇免费一区 | 激情综合啪 | 夜夜躁日日躁狠狠躁 | 91精品国自产在线观看欧美 | 97视频免费在线观看 | 欧美激情在线网站 | 黄色性av | 最近中文字幕久久 | 精品黄色在线观看 | 国产日韩欧美中文 | 日本不卡123 | 国产在线精品二区 | 亚洲成a人片77777潘金莲 | 黄色特级片 | 久久久五月天 | 久久精品aaa | 成人av资源网站 | 一区二区三区www | 精品一区二区免费视频 | 91av视频在线观看免费 | a资源在线 | 久久小视频| 国产精彩在线视频 | 久久av高清 | 成人黄色片免费看 | 99热官网 | 日韩区欧美久久久无人区 | 性日韩欧美在线视频 | www.在线观看视频 | 毛片二区| 午夜久久成人 | 中文字幕中文字幕在线一区 | 九月婷婷人人澡人人添人人爽 | 久久久视屏 | 国产只有精品 | 人人射| 狠狠操综合网 | 色综合在 | 久久99视频免费 | 青青草国产精品 | 久章草在线 | 国产综合在线视频 | 四虎海外影库www4hu | 四虎影视4hu4虎成人 | 中文字幕在线中文 | 免费网站在线观看人 | 操夜夜操 | 黄色成人av在线 | 在线免费高清视频 | 九九导航 | 亚洲人成人在线 | 日韩av成人免费看 | 91 在线视频播放 | 久热电影| 国产99亚洲 | 国产中文字幕91 | 亚洲乱码在线 | 精品福利在线视频 | 国产成人精品999在线观看 | 亚洲精品黄色片 | 日韩欧美精品一区 | 国产精品一区二区三区观看 | 五月的婷婷 | 国产麻豆成人传媒免费观看 | 五月天婷婷免费视频 | 亚洲久在线 | 久久精品视频4 | 日本成人中文字幕在线观看 | 免费av网站观看 | 日本一区二区三区视频在线播放 | 日本黄色免费在线观看 | 国产 日韩 欧美 在线 | 色狠狠一区二区 | 国产私拍在线 | 久草在线观 | 月丁香婷婷 | 97免费视频在线播放 | 久久再线视频 | 久久久综合香蕉尹人综合网 | 国内精品视频一区二区三区八戒 | 综合色影院 | 免费高清在线一区 | 日韩国产欧美在线播放 | 五月天丁香 | 亚洲一级电影在线观看 | 国产精品一区二区在线播放 | 国产三级av在线 | 国产高清一 | 99久免费精品视频在线观看 | 99久久精品免费看国产一区二区三区 | 久久国产亚洲精品 | 五月在线| 中文不卡视频 | 超碰97在线资源站 | 一级欧美日韩 | 91精品在线观看视频 | 夜色资源网| 69精品人人人人 | 精品成人a区在线观看 | 人人艹视频 | 久久视频在线观看 | 日韩有码在线播放 | 久久久久黄 | 亚洲最大av网站 | 日韩久久久久久久久久久久 | 亚洲精品视频第一页 | 免费裸体视频网 | 成人资源在线播放 | 奇米影视777影音先锋 | 亚洲精品1区2区3区 超碰成人网 | 欧美怡红院| 在线国产视频 | 中文字幕a在线 | 偷拍福利视频一区二区三区 | 综合久久2023 | 国产亚洲精品女人久久久久久 | 免费黄色av.| 97人人添人澡人人爽超碰动图 | 一区二区不卡高清 | 女人18片毛片90分钟 | 91视频高清 | 日本视频高清 | 久久国产女人 | 久久久久激情电影 | 黄色三级av | 国产成人99av超碰超爽 | 一本—道久久a久久精品蜜桃 | 精品国产乱码久久久久久久 | 免费中文字幕在线观看 | 成人中文字幕av | 亚洲成人资源网 | 99在线精品免费视频九九视 | 日本公乱妇视频 | 久久狠狠一本精品综合网 | 综合黄色网 | 精品国产一区二区三区久久久蜜月 | 亚洲不卡在线 | 国产精品私人影院 | 久久综合中文字幕 | 永久免费观看视频 | 国产亚洲视频在线观看 | 精品主播网红福利资源观看 | 免费看污片 | 婷婷综合影院 | 91麻豆精品国产91久久久使用方法 | 五月婷婷.com | 午夜av免费看 | 美女久久视频 | 992tv成人免费看片 | 99久久精品国产观看 | 九九免费精品视频在线观看 | 日韩久久电影 | 中文字幕 影院 | 欧美一性一交一乱 | 久久综合婷婷 | 成人app在线播放 | 一区二区 不卡 | 久久亚洲精品电影 | 国产精品成人a免费观看 | 久久久久女人精品毛片九一 | 欧美日韩国产一区二区三区在线观看 | 欧美亚洲国产精品久久高清浪潮 | 欧美日韩视频精品 | 国产一区免费观看 | 黄在线 | 伊人伊成久久人综合网站 | 亚洲欧美视频 | 在线色视频小说 | 女女av在线 | 69精品| 丁香婷婷综合色啪 | 精品1区2区3区 | 欧美成人h版 | 国产精品一区二区62 | 美女黄频网站 | 色狠狠狠| av在线播放快速免费阴 | 五月婷婷av在线 | 亚洲乱码国产乱码精品天美传媒 | 天天综合日日夜夜 | 手机在线看永久av片免费 | av中文字幕在线观看网站 | 日本一区二区三区视频在线播放 | 成人午夜电影在线观看 | 国产成人精品亚洲日本在线观看 | 亚洲精品久久久久58 | a黄色| 日韩av电影免费在线观看 | 精品国产一区二区三区四区vr | 手机色在线 | 99精品免费久久久久久日本 | 亚洲成人影音 | 午夜久久网站 | 一区二区三区视频 | 在线免费观看亚洲视频 | 午夜精品成人一区二区三区 | 性色大片在线观看 | 在线观看v片 | 国产福利久久 | 日韩精品视频在线免费观看 | 成人av片免费观看app下载 | 成人免费看黄 | 超碰人人在线观看 | 亚洲一区二区三区miaa149 | 香蕉久草 | 玖玖在线播放 | 色狠狠操| 丝袜美腿一区 | 国产精品国产三级国产aⅴ9色 | 日韩av电影免费观看 | 亚洲精品影视在线观看 | 久久久久亚洲国产精品 | 国产高清免费观看 | 中文十次啦| 国产91精品一区二区绿帽 | 色综合婷婷 | 国产在线高清精品 | 免费看黄色毛片 | 国产一级三级 | 在线观看精品黄av片免费 | 人人擦 | 国产视频不卡 | 成人久久毛片 | 99精品视频在线 | 久精品视频免费观看2 | 久久av一区二区三区亚洲 | 四虎伊人 | 欧美婷婷综合 | 色丁香综合 | 激情视频区 | 五月婷婷六月丁香在线观看 | 九九久久久久久久久激情 | 国产成人av电影在线观看 | 久久字幕| 天堂av网址 | 人人爽夜夜爽 | 免费av观看网站 | 中文字幕亚洲欧美日韩 | 2018精品视频| 五月婷婷中文网 | 日韩欧美aaa | 久久人网| 欧美男女爱爱视频 | 亚洲涩涩网 | 九九久久视频 | 99久久毛片 | 99精品视频99 | 国产一区二区在线影院 | 六月激情久久 | 亚洲精品在线视频播放 | 成人免费观看完整版电影 | 亚洲精品国产精品国自产 | 欧美乱熟臀69xxxxxx | 日韩在线视频免费看 | 手机av看片 | 亚洲精品色 | 国产精品99久久久精品 | 黄色成人免费电影 | 久久一区国产 | 亚洲精品免费在线播放 | 日日夜夜人人精品 | 国色天香在线 | 色偷偷人人澡久久超碰69 | 色妞色视频一区二区三区四区 | 激情黄色av | 日本在线观看中文字幕无线观看 | 免费视频一二三 | 亚洲人成精品久久久久 | 黄网站色 | 91在线视频免费 | 国产精品一区二区免费视频 | 91av国产视频 | 亚洲综合婷婷 | 精品国产乱码久久 | 欧美午夜久久久 | 成人国产精品久久久春色 | 久久久黄色av| 狠狠色综合欧美激情 | 久久久国产精品人人片99精片欧美一 | 欧美性色黄 | 国产精品va在线观看入 | 麻豆国产露脸在线观看 | 91麻豆高清视频 | 久久精品站 | 亚洲男男gaygay无套同网址 | 国产xxxx性hd极品 | 福利精品在线 | 亚洲精品乱码久久久久v最新版 | 免费视频区 | 成人av影院在线观看 | 91视视频在线直接观看在线看网页在线看 | 天天干天天做 | 久久精品高清视频 | 国产中文字幕久久 | 午夜10000| 国产在线观看高清视频 | 91亚洲在线 | 国产精品99页| 亚洲第一色 | 午夜性生活片 | 精品国产一区二区三区不卡 | 亚洲五月婷婷 | 成年人黄色大全 | 国产精品不卡在线播放 | 91看片淫黄大片一级在线观看 | 欧美aa级 | 国产一区二区网址 | 最近的中文字幕大全免费版 | 日韩在线免费 | 日本精品久久久久影院 | 91麻豆精品国产自产在线 | 日韩在线视频一区二区三区 | 91在线视频免费播放 | 国产欧美最新羞羞视频在线观看 | 成人黄色一级视频 | 最新真实国产在线视频 | 亚洲九九爱| 免费在线观看成年人视频 | 日韩欧美高清 | 国产一区免费 | 天天曰天天干 | 97精品超碰一区二区三区 | 人人射人人澡 | 国产精品久久久久久久午夜片 | 国产精品99久久久久久久久久久久 | av在线日韩 | 十八岁以下禁止观看的1000个网站 | 又黄又爽又色无遮挡免费 | 五月婷婷丁香激情 | 在线观看日韩精品视频 | 亚洲永久精品视频 | av免费看在线| 91最新网址在线观看 | 国产无套精品久久久久久 | 深爱婷婷网 | 国产一区视频免费在线观看 | 国产亚洲小视频 | 欧美日韩在线精品一区二区 | 日韩在线观看第一页 | 久久久久亚洲国产精品 | 天堂av观看 | 婷婷中文字幕 | 综合网伊人 | 超薄丝袜一二三区 | av中文字幕免费在线观看 | 亚洲国产精品影院 | 久草在线视频资源 | 超碰在线1 | 又黄又刺激视频 | 麻豆传媒一区二区 | 日本特黄特色aaa大片免费 | 成人h在线观看 | 美女视频久久久 | 欧美91精品国产自产 | 国产拍揄自揄精品视频麻豆 | 日韩精品一区二区在线观看视频 | 成人四虎 | 国产亚洲精品久久久久动 | 婷婷六月天丁香 | 一区二区精品视频 | 欧美日韩高清在线观看 | 国内精品免费 | av天天干| 日本黄色免费播放 | 国产九色视频在线观看 | 最近中文字幕久久 | 啪啪资源 | 国产高清在线观看 | 日韩欧美不卡 | 精品在线视频播放 | 97成人在线 | 99re中文字幕 | 国产区精品视频 | 成年人视频在线免费 | 午夜精品视频一区二区三区在线看 | 亚洲综合在线五月天 | 精品久久电影 | av手机在线播放 | 九九热精品视频在线观看 | av永久网址| 69国产盗摄一区二区三区五区 | 日韩视频中文 | 久久久亚洲国产精品麻豆综合天堂 | 久久久综合 | 视频1区2区 | 麻豆传媒视频在线 | 亚洲h在线播放在线观看h | 午夜电影中文字幕 | 欧美日韩高清在线 | 91精品国自产拍天天拍 | 久久中文字幕在线视频 | 免费情趣视频 | 日韩高清激情 | 综合精品久久久 | 在线电影 一区 | 久久久网 | 日韩高清免费电影 | av中文在线 | 欧美日韩国产三级 | 久久久久国产精品免费 | 国产亚洲精品久久久久久移动网络 | 亚洲天堂网在线视频观看 | 国产在线91在线电影 | 国产精品第7页 | 国产精品久久久久一区 | 国产a免费| 韩国精品视频在线观看 | 亚洲精品国偷拍自产在线观看蜜桃 | 亚洲aⅴ在线观看 | 亚洲精品久 | 韩日视频在线 | 亚洲 欧美日韩 国产 中文 | 国产一区二区久久精品 | 日日夜夜综合 | 日本精品视频在线观看 | 四虎精品成人免费网站 | 国产91精品久久久久久 | 欧美性色黄大片在线观看 | 国产精品1024 | 69国产精品视频免费观看 | 中文字幕日韩无 | 麻豆精品传媒视频 | 亚洲最新av在线网址 | 精品亚洲一区二区三区 | 欧美精品亚洲精品日韩精品 | 成人sm另类专区 | 国产精品综合久久 | 91精品国自产在线偷拍蜜桃 | 久久久黄色 | 久久99国产精品视频 | 久久久久久免费视频 | 日韩电影中文 | 97成人精品视频在线播放 | 91麻豆精品国产91久久久更新时间 |