日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark-Streaming基础

發布時間:2023/12/10 编程问答 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark-Streaming基础 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark課堂筆記

Spark生態圈:
Spark Core : RDD(彈性分布式數據集)
Spark SQL
Spark Streaming
Spark MLLib:協同過濾,ALS,邏輯回歸等等 --> 機器學習
Spark Graphx : 圖計算

重點在前三章

-----------------Spark Core------------------------
一、什么是Spark?特點?
https://spark.apache.org/
Apache Spark? is a unified analytics engine for large-scale data processing.

特點:快、易用、通用性、兼容性(完全兼容Hadoop)

快:快100倍(Hadoop 3 之前)
易用:支持多種語言開發
通用性:生態系統全。
易用性:兼容Hadoop

spark 取代 Hadoop

二、安裝和部署Spark、Spark 的 HA

1、spark體系結構
Spark的運行方式

Yarn

Standalone:本機調試(demo)

Worker:從節點。每個服務器上,資源和任務的管理者。只負責管理一個節點。

執行過程:
一個Worker 有多個 Executor。 Executor是任務的執行者,按階段(stage)劃分任務。————> RDD

客戶端:Driver Program 提交任務到集群中。

1、spark-submit
2、spark-shell

2、spark的搭建
(1)準備工作:JDK 配置主機名 免密碼登錄
(2)偽分布式模式
在一臺虛擬機上模擬分布式環境(Master和Worker在一個節點上)

export JAVA_HOME=/usr/java/jdk1.8.0_201
export SPARK_MASTER_HOST=node3
export SPARK_MASTER_PORT=7077

(3)全分布式環境
修改slave文件 拷貝到其他兩臺服務器 啟動

3、Spark的 HA
回顧HA;
(*)HDFS Yarn Hbase Spark 主從結構
(*)單點故障

(1)基于文件目錄的單點恢復
(*)本質:還是只有一個主節點Master,創建了一個恢復目錄,保存集群狀態和任務的信息。
當Master掛掉,重新啟動時,會從恢復目錄下讀取狀態信息,恢復出來原來的狀態

用途:用于開發和測試,生產用zookeeper
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM
-Dspark.deploy.recoveryDirectory=/usr/local/spark-2.1.0-bin-hadoop2.7/recovery"

(2)基于Zookeeper :和Hadoop類似

(*)復習一下zookeeper:
相當于一個數據庫,把一些信息存放在zookeeper中,比如集群的信息。
數據同步功能,選舉功能,分布式鎖功能

數據同步:給一個節點中寫入數據,可以同步到其他節點

選舉:Zookeeper中存在不同的角色,Leader Follower。如果Leader掛掉,重新選舉Leader

分布式鎖:秒殺。以目錄節點的方式來保存數據。

修改 spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node3:2181,node4:2181,node5:2181
-Dspark.deploy.zookeeper.dir=/spark"

同步到其他兩臺服務器。

在node3 start-all node3 master node4 Worker node5 Worker
在node4 start-master node3 master node4 master(standby) node4 Worker node5 Worker

在node3上kill master
node4 master(Active) node4 Worker node5 Worker

在網頁http://192.168.109.134:8080/ 可以看到相應信息

三、執行Spark的任務:兩個工具

1、spark-submit:用于提交Spark的任務
任務:jar。

舉例:蒙特卡洛求PI(圓周率)。

./spark-submit --master spark://node3:7077 --class

--class指明主程序的名字

/usr/local/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://node3:7077
--class org.apache.spark.examples.SparkPi
/usr/local/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 100

2、spark-shell 相當于REPL
作為一個獨立的Application運行

兩種模式:
(1)本地模式
spark-shell 后面不接任何參數,代表本地模式

Spark context available as 'sc' (master = local[*], app id = local-1554038459298).

sc 是 SparkContext 對象名。 local[*] 代表本地模式,不提交到集群中運行。

(2)集群模式
./spark-submit --master spark://node3:7077 提交到集群中運行

Spark context available as 'sc' (master = spark://node3:7077, app id = app-20190331212447-0000).

master = spark://node3:7077

Spark session available as 'spark'
Spark Session 是 2.0 以后提供的,利用 SparkSession 可以訪問spark所有組件。

示例:WordCount程序

(*)處理本地文件,把結果打印到屏幕上
scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.collect

res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))

(*)處理HDFS文件,結果保存在hdfs上
sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")

-rw-r--r-- 3 root supergroup 0 2019-03-31 21:43 /output/0331/test_WordCount/_SUCCESS
-rw-r--r-- 3 root supergroup 40 2019-03-31 21:43 /output/0331/test_WordCount/part-00000
-rw-r--r-- 3 root supergroup 31 2019-03-31 21:43 /output/0331/test_WordCount/part-00001

_SUCCESS 代表程序執行成功

part-00000 part-00001 結果文件,分區。里面內容不重復。

(*)單步運行WordCount ----> RDD

scala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> 1+1
res2: Int = 2

scala> rdd1.collect
res3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)

scala> val rdd2 = rdd1.flatMap(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26

scala> rdd2.collect
res4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)

scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28

scala> rdd3.collect
res5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))

scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30

scala> rdd4.collect
res6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))

RDD 彈性分布式數據集
(1)依賴關系 : 寬依賴和窄依賴
(2)算子:
函數:
Transformation : 延時計算 map flatMap textFile
Action : 立即觸發計算 collect




說明:scala復習
(*)flatten:把嵌套的結果展開
scala> List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)


(*)flatmap : 相當于一個 map + flatten

scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))

scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)

myList.flatMap(x=>x.map(_*2))

執行過程:
1、將 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 調用 map(_*2) 方法。x 代表一個List
2、flatten
3、在IDE中開發scala版本和Java版本的WorkCount。

(1)scala版本的WordCount

新建一個工程,把jar引入到工程中。

export jar 點擊下一步下一步,不需要設置main class

把jar上傳到服務器上。

spark-submit --master spark://node3:7077
--class day1025.MyWordCount
/usr/local/tmp_files/Demo1.jar
hdfs://node2:8020/tmp_files/test_WordCount.txt
hdfs://node2:8020/output/1025/demo1

(2)java版本的WordCount

./spark-submit --master spark://node3:7077 --class day0330.JavaWordCount /usr/local/tmp_files/Demo2.jar

四、分析Spark的任務流程

1、分析WordCount程序處理過程
見圖片

2、Spark調度任務的過程

提交到及群眾運行任務時,spark執行任務調度。

見圖片

五、RDD和RDD特性、RDD的算子

1、RDD:彈性分布式數據集
(*)Spark中最基本的數據抽象。
(*)RDD的特性
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions

1、是一組分區。
RDD由分區組成,每個分區運行在不同的Worker上,通過這種方式來實現分布式計算。

* - A function for computing each split
在RDD中,提供算子處理每個分區中的數據

* - A list of dependencies on other RDDs

RDD存在依賴關系:寬依賴和窄依賴。

* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

可以自定義分區規則來創建RDD

* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)

優先選擇離文件位置近的節點來執行


如何創建RDD?

(1)通過SparkContext.parallelize方法來創建
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29

scala> rdd1.partitions.length
res35: Int = 3

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29

scala> rdd1.partitions.length
res36: Int = 2

(2)通過外部數據源來創建
sc.textFile()

scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
rdd2: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29


2、 算子
(1)Transformation

map(func):相當于for循環,返回一個新的RDD

filter(func):過濾
flatMap(func):flat+map 壓平


mapPartitions(func):對RDD中的每個分區進行操作
mapPartitionsWithIndex(func):對RDD中的每個分區進行操作,可以取到分區號。

sample(withReplacement, fraction, seed):采樣

集合運算
union(otherDataset)
intersection(otherDataset)

distinct([numTasks])):去重

聚合操作:group by
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])

排序
sortByKey([ascending], [numTasks])
sortBy(func,[ascending], [numTasks])

join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)

重分區:
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

舉例:
1、創建一個RDD,每個元素乘以2,再排序
scala> val rdd1 = sc.parallelize(Array(3,4,5,100,79,81,6,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:29

scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at map at <console>:31

scala> rdd2.collect
res37: Array[Int] = Array(6, 8, 10, 200, 158, 162, 12, 16)

scala> rdd2.sortBy(x=>x,true).collect
res39: Array[Int] = Array(6, 8, 10, 12, 16, 158, 162, 200)

scala> rdd2.sortBy(x=>x,false).collect
res40: Array[Int] = Array(200, 162, 158, 16, 12, 10, 8, 6)

def sortBy[K](f: (T) ? K, ascending: Boolean = true)

過濾出大于20的元素:

scala> val rdd3 = rdd2.filter(_>20)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[53] at filter at <console>:33

scala> rdd3.collect
res41: Array[Int] = Array(200, 158, 162)

2、字符串(字符)類型的RDD

scala> val rdd4 = sc.parallelize(Array("a b c","d e f","g h i"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[54] at parallelize at <console>:29

scala> rdd4.flatMap(_.split(" ")).collect
res42: Array[String] = Array(a, b, c, d, e, f, g, h, i)

3、RDD的集合運算:

scala> val rdd6 = sc.parallelize(List(1,2,3,6,7,8,9,100))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:29

scala> val rdd7 = sc.parallelize(List(1,2,3,4))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at parallelize at <console>:29

scala> val rdd8 = rdd6.union(rdd7)
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[58] at union at <console>:33

scala> rdd8.collect
res43: Array[Int] = Array(1, 2, 3, 6, 7, 8, 9, 100, 1, 2, 3, 4)

scala> rdd8.distinct.collect
res44: Array[Int] = Array(100, 4, 8, 1, 9, 6, 2, 3, 7)


4、分組操作:reduceByKey

<key value>
scala> val rdd1 = sc.parallelize(List(("Tom",1000),("Andy",2000),("Lily",1500)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[62] at parallelize at <console>:29

scala> val rdd2 = sc.parallelize(List(("Andy",1000),("Tom",2000),("Mike",500)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[63] at parallelize at <console>:29

scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[64] at union at <console>:33

scala> rdd3.collect
res45: Array[(String, Int)] = Array((Tom,1000), (Andy,2000), (Lily,1500), (Andy,1000), (Tom,2000), (Mike,500))

scala> val rdd4= rdd3.groupByKey
rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[65] at groupByKey at <console>:35

scala> rdd4.collect
res46: Array[(String, Iterable[Int])] = Array(
(Tom,CompactBuffer(1000, 2000)),
(Andy,CompactBuffer(2000, 1000)),
(Mike,CompactBuffer(500)), (
Lily,CompactBuffer(1500)))

scala> rdd3.reduceByKey(_+_).collect
res47: Array[(String, Int)] = Array((Tom,3000), (Andy,3000), (Mike,500), (Lily,1500))

reduceByKey will provide much better performance.

官方不推薦使用 groupByKey 推薦使用 reduceByKey

5、cogroup

scala> val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[67] at parallelize at <console>:29

scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[68] at parallelize at <console>:29

scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[70] at cogroup at <console>:33

scala> rdd3.collect
res48: Array[(String, (Iterable[Int], Iterable[Int]))] = Array(
(tom,(CompactBuffer(1, 2),CompactBuffer(1))),
(jerry,(CompactBuffer(3),CompactBuffer(2))),
(shuke,(CompactBuffer(),CompactBuffer(2))),
(kitty,(CompactBuffer(2),CompactBuffer())))

6、reduce操作(Action)

聚合操作
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:29

scala> rdd1.reduce(_+_)
res49: Int = 15


7、需求:按照value排序。
做法:
1、交換,把key 和 value交換,然后調用sortByKey方法
2、再次交換

scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",3),("ketty",2),("shuke",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[72] at parallelize at <console>:29

scala> val rdd2 = sc.parallelize(List(("jerry",1),("tom",3),("shuke",5),("ketty",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[73] at parallelize at <console>:29

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[74] at union at <console>:33

scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[75] at reduceByKey at <console>:35

scala> rdd4.collect
res50: Array[(String, Int)] = Array((tom,4), (jerry,4), (shuke,7), (ketty,3))

scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[80] at map at <console>:37

scala> rdd5.collect
res51: Array[(String, Int)] = Array((shuke,7), (tom,4), (jerry,4), (ketty,3))


(2)Action

reduce(func)

collect()
count()
first()
take(n)
takeSample(withReplacement,num, [seed])
takeOrdered(n,?[ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)?
saveAsObjectFile(path)?
countByKey()

foreach(func):與map類似,沒有返回值。

3、特性:
(1)RDD的緩存機制
(*)作用:提高性能
(*)使用:標識RDD可以被緩存 persist cache

(*)可以緩存的位置:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()


舉例:測試數據,92萬條

scala> val rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[82] at textFile at <console>:29

scala> rdd1.count --> 直接出發計算
res52: Long = 923452

scala> rdd1.cache --> 標識RDD可以被緩存,不會觸發計算
res53: rdd1.type = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[82] at textFile at <console>:29

scala> rdd1.count --> 和第一步一樣,觸發計算,但是,把結果進行緩存
res54: Long = 923452

scala> rdd1.count --> 從緩存中直接讀出結果
res55: Long = 923452

(2)RDD的容錯機制:通過檢查點來實現。

/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
(*)復習檢查點:HDFS中的檢查點:有SecondaryNamenode來實現日志的合并。

(*)RDD的檢查點:容錯
概念:血統 Lineage
理解:表示任務執行的生命周期。
WordCount textFile ---> redceByKey

如果血統越長,越容易出錯。

假如有檢查點,可以從最近的一個檢查點開始,往后面計算。不用重頭計算。

(*)RDD檢查點的類型:

(1)基于本地目錄:需要將Spark shell 或者任務運行在本地模式上(setMaster("local"))

開發和測試

(2)HDFS目錄:用于生產。

sc.setCheckPointDir(目錄)

舉例:設置檢查點
scala> var rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24

設置檢查點目錄:
scala> sc.setCheckpointDir("hdfs://192.168.109.131:8020/sparkckpt")

標識rdd1可以執行檢查點操作
scala> rdd1.checkpoint

scala> rdd1.count
res2: Long = 923452

(3)依賴關系:寬依賴,窄依賴。
劃分任務執行的stage

見講義。

六、RDD的高級算子
1、mapPartitionsWithIndex:對RDD中的每個分區(帶有下標)進行操作,下標用index表示
通過這個算子,我們可以獲取分區號。

def mapPartitionsWithIndex[U](
f: (Int, Iterator[T]) ? Iterator[U],
preservesPartitioning: Boolean = false)(
implicit arg0: ClassTag[U]): RDD[U]

參數:f是個函數參數 f 中第一個參數是Int,代表分區號,第二個Iterator[T]代表分區中的元素

舉例:把分區中的元素,包括分區號,都打印出來。

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]

scala> rdd1.mapPartitions
mapPartitions mapPartitionsWithIndex

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ],
[partId : 2 , value = 6 ], [partId : 2 , value = 7 ], [partId : 2 , value = 8 ])

2、aggregate:聚合操作。類似于分組。

(*)先對局部進行聚合操作,再對全局進行聚合操作。

調用聚合操作
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd2.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])

scala> import scala.math._
import scala.math._

scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7

說明:aggregate
(0) 初始值是 0
(max(_,_) 局部操作的函數
, _+_ 全局操作的函數
)

scala> rdd2.aggregate(100)(max(_,_),_+_)
res8: Int = 300

分析結果:初始值是100,代表每個分區多了一個100
全局操作,也多了一個100
100+100+100 = 300


對RDD中的元素進行求和
1、RDD.map
2、聚合操作

scala> rdd2.aggregate(0)(_+_,_+_)
res9: Int = 15

MapReduce Combiner

scala> rdd2.aggregate(10)(_+_,_+_)
res10: Int = 45

(*)對字符串操作
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27

scala> rdd2.aggregate("")(_+_,_+_)
res11: String = abcdef

scala> rdd2.aggregate("*")(_+_,_+_)
res12: String = **def*abc

結果分析:
1、*abc *def
2、**def*abc

(*)復雜的例子:
1、
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27

scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }

scala> rdd3.mapPartitionsWithIndex(fun1).collect
res17: Array[String] = Array(
[partId : 0 , value = 12 ], [partId : 0 , value = 23 ],
[partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])

scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res13: String = 42

執行過程:
第一個分區:
第一次比較: "" "12" 長度最大值 2 2-->"2"
第二次比較: “2” “23” 長度最大值 2 2-->"2"

第二個分區:
第一次比較: "" "345" 長度最大值 3 3-->"3"
第二次比較: “3” “4567” 長度最大值 4 4-->"4"


2、
rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)

scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res18: String = 11

執行過程:
第一個分區:
第一次比較: "" "12" 長度最小值 0 0-->"0"
第二次比較: “0” “23” 長度最小值 1 1-->"1"

第二個分區:
第一次比較: "" "345" 長度最小值 0 0-->"0"
第二次比較: “0” “4567” 長度最小值 1 1-->"1"

val rdd3 = sc.parallelize(List("12","23","345",""),2)
rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)

scala> val rdd3 = sc.parallelize(List("12","23","345",""),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:27

scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res19: String = 10

scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res20: String = 01

3、aggregateByKey:類似于aggregate,區別:操作的是 key value 的數據類型。

scala> def fun3(index:Int, iter:Iterator[(String,Int)]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }

scala> pairRDD.mapPartitionsWithIndex(fun3).collect
res22: Array[String] = Array(

[partId : 0 , value = (cat,2) ], [partId : 0 , value = (cat,5) ], [partId : 0 , value = (mouse,4) ],


[partId : 1 , value = (cat,12) ], [partId : 1 , value = (dog,12) ], [partId : 1 , value = (mouse,2) ])

(1)將每個動物園(分區)中,動物數最多的動物,進行求和

動物園0
[partId : 0 , value = (cat,2) ], [partId : 0 , value = (cat,5) ], [partId : 0 , value = (mouse,4) ],

動物園1
[partId : 1 , value = (cat,12) ], [partId : 1 , value = (dog,12) ], [partId : 1 , value = (mouse,2) ])

pairRDD.aggregateByKey(0)(math.max(_,_),_+_)

scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res24: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

(2)將所有動物求和
pairRDD.aggregateByKey(0)(_+_,_+_).collect

scala> pairRDD.reduceByKey(_+_).collect
res27: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))

aggregateByKey效率更高。

4、 coalesce與repartition
與分區有關
都是對RDD進行重分區。

區別:
coalesce 默認不會進行Shuffle 默認 false 如需修改分區,需置為true
repartition 會進行Shuffle

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27

scala> val rdd2 = rdd1.repartition(3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at repartition at <console>:29

scala> rdd2.partitions.length
res28: Int = 3

scala> val rdd3 = rdd1.coalescse(3,true)
<console>:29: error: value coalescse is not a member of org.apache.spark.rdd.RDD[Int]
val rdd3 = rdd1.coalescse(3,true)
^

scala> val rdd3 = rdd1.coalesce(3,true)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at coalesce at <console>:29

scala> rdd3.partitions.length
res29: Int = 3

scala> val rdd4 = rdd1.coalesce(4)
rdd4: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[26] at coalesce at <console>:29

scala> rdd4.partitions.length
res30: Int = 2

5、其他高級算子
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html


七、編程案例

(1)分析日志
需求:找到訪問量最高的兩個網頁
(*)第一步:對網頁的訪問量求和
(*)第二步:排序,降序

(2)創建自定義分區

(3)使用JDBCRDD 操作數據庫

(4)操作數據庫:把結果存放到數據庫中

IDE

-----------------Spark SQL------------------------
類似于Hive

一、Spark SQL 基礎

1、什么是Spark SQL
Spark SQL is Apache Spark's module for working with structured data.
Spark SQL 是spark 的一個模塊。來處理 結構化 的數據
不能處理非結構化的數據

特點:
1、容易集成
不需要單獨安裝。

2、統一的數據訪問方式
結構化數據的類型:JDBC JSon Hive parquer文件 都可以作為Spark SQL 的數據源
對接多種數據源,且使用方式類似

3、完全兼容hive
把Hive中的數據,讀取到Spark SQL中運行。

4、支持標準的數據連接
JDBC

2、為什么學習Spark SQL
執行效率比Hive高

hive 2.x 執行引擎可以使用 Spark

3、核心概念:表(DataFrame DataSet)
mysql中的表:表結構、數據
DataFrame:Schema、RDD(數據)

DataSet 在spark1.6以后,對DataFrame做了一個封裝。


4、創建DataFrame
(*)測試數據:員工表、部門表


第一種方式:使用case class

1、定義Schema
樣本類來定義Schema。
case class 特點:
可以支持模式匹配,使用case class建立表結構

7521, WARD, SALESMAN,7698, 1981/2/22, 1250, 500, 30

case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)

2、讀取文件
val lines = sc.textFile("/usr/local/tmp_files/emp.csv").map(_.split(","))

3、把每行數據,映射到Emp上。
val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

4、生成DataFrame
val df1 = allEmp.toDF

df1.show


第二種方式 使用Spark Session

(1)什么是Spark Session
Spark session available as 'spark'.
2.0以后引入的統一訪問方式。可以訪問所有的Spark組件。

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

(2)使用StructType來創建Schema

val struct =
StructType(
StructField("a", IntegerType, true) ::
StructField("b", LongType, false) ::
StructField("c", BooleanType, false) :: Nil)


case class Emp(empno:Int,
ename:String,
job:String,
mgr:Int,
hiredate:String,
sal:Int,
comm:Int,
deptno:Int)

val myschema = StructType(
List(
StructField("empno",DataTypes.IntegerType),
StructField("ename",DataTypes.StringType),
StructField("job",DataTypes.StringType),
StructField("mgr",DataTypes.IntegerType),
StructField("hiredate",DataTypes.StringType),
StructField("sal",DataTypes.IntegerType),
StructField("comm",DataTypes.IntegerType),
StructField("deptno",DataTypes.IntegerType),
))

import org.apache.spark.sql.types._

準備數據 RDD[Row]
val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

import org.apache.spark.sql.Row

val df2 = spark.createDataFrame(allEmp,myschema)

第三種方式
直接讀取一個帶格式的文件。

val df3 = spark.read 讀文件,默認是Parquet文件
val df3 = spark.read.json("/usr/local/tmp_files/people.json")

df3.show

val df4 = spark.read.format("json").load("/usr/local/tmp_files/people.json")


5、操作DataFrame

(1)DSL語句
mybatis Hibernate

df1.select($"ename",$"sal",$"sal"+100).show

$"sal" 可以看做是一個變量。

查詢薪水大于2000的員工
df1.filter($"sal" > 2000).show

求每個部門的員工人數
df1.groupBy($"deptno").count.show

select deptno,count(1) from emp group by deptno

(2)SQL語句

注意:不能直接執行SQL,需要生成一個視圖,再執行sql。

scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView

一般用到 createOrReplaceTempView createTempView
視圖:類似于表,但不保存數據。

df1.createOrReplaceTempView("emp")

操作:
spark.sql("select * from emp").show

查詢薪水大于2000的員工
spark.sql("select * from emp where sal > 2000").show

求每個部門的員工人數
spark.sql("select deptno,count(1) from emp group by deptno").show

(3)多表查詢
10,ACCOUNTING,NEW YORK

case class Dept(deptno:Int,dname:String,loc:String)
val lines = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))

df5.createOrReplaceTempView("dept")

spark.sql("select dname,ename from emp,dept where emp.deptno=dept.deptno").show

6、操作DataSet
跟DataFrame類似,是一套新的接口。高級的Dataframe

舉例:
(1)創建DataSet
1、使用序列來創建DataSet。

定義一個case class
case class MyData(a:Int,b:String)

生成序列,并創建DataSet
val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS


.toDS 生成DataSet

ds.show


2、使用JSON數據來創建DataSet

定義case class
case class Person(name:String,age:BigInt)

通過Json數據來生成DataFrame
val df = spark.read.format("json").load("/usr/local/tmp_files/people.json")

將DataFrame轉換成DataSet
df.as[Person].show

df.as[Person] 就是一個DataSet

3、使用其他數據

RDD操作和DataFrame操作相結合 ---> DataSet

讀取數據,創建DataSet
val linesDS = spark.read.text("/usr/local/tmp_files/test_WordCount.txt").as[String]

對DataSet進行操作:
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

words.show
words.collect

執行一個WordCount程序
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey( x => x._1).count
result.show

排序:
result.orderBy($"value").show
result.orderBy($"count(1)").show


(2)DataSet操作案例

使用emp.json 生成一個DataFrame
val empDF = spark.read.json("/usr/local/tmp_files/emp.json")

查詢工資大于3000的員工
empDF.where($"sal" >= 3000).show

創建case class

case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)

生成DataSet
val empDS = empDF.as[Emp]

查詢工資大于3000的員工
empDS.filter(_.sal > 3000).show

查詢10號部門的員工
empDS.filter(_.deptno == 10).show


(3)多表查詢

1、創建部門表
val deptRDD = sc.textFile("/usr/local/tmp_files/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map( x=> Dept(x(0).toInt,x(1),x(2))).toDS

2、創建員工表
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
val empRDD = sc.textFile("/usr/local/tmp_files/emp.csv").map(_.split(","))

7369,SMITH,CLERK,7902,1980/12/17,800,0,20
val empDS = empRDD.map(x=> Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt)).toDS

3、執行多表查詢:等值連接

val result = deptDS.join(empDS,"deptno")

val result1 = deptDS.joinWith(empDS, deptDS("deptno") === empDS("deptno") )

join 和 joinWith 區別:連接后schema不同

4、多表連接后再篩選
deptDS.join(empDS,"deptno").where("deptno == 10").show

7、Spark SQL 中的視圖

視圖是一個虛表,不存儲數據。

兩種類型:

1、普通視圖(本地視圖):只在當前Session中有效。createOrReplaceTempView createTempView

2、全局視圖: createGlobalTempView
在不同的Session中都有用 把全局視圖創建在命名空間中:global_temp中。類似于一個庫。

scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView


舉例:
創建一個新session,讀取不到emp視圖
spark.newSession.sql("select * from emp")

以下兩種方式均可讀到 全局視圖 中的數據。
df1.createGlobalTempView("emp1")
spark.newSession.sql("select * from global_temp.emp1").show

spark.sql("select * from global_temp.emp1").show


二、使用數據源
在Spark SQL中,可以使用各種各樣的數據源來操作。 結構化

1、使用load函數、save函數
load函數是加載數據,save是存儲數據。

注意:使用load 或 save時,默認是Parquet文件。列式存儲文件。

舉例:
讀取 users.parquet 文件
val userDF = spark.read.load("/usr/local/tmp_files/users.parquet")

userDF.printSchema
userDF.show

val userDF = spark.read.load("/usr/local/tmp_files/emp.json")


保存parquet文件
userDF.select($"name",$"favorite_color").write.save("/usr/local/tmp_files/parquet")

讀取剛剛寫入的文件:
val userDF1 = spark.read.load("/usr/local/tmp_files/parquet/part-00000-1ab4e661-32c6-441a-b320-79d")---> 不推薦

生產:
val userDF2 = spark.read.load("/usr/local/tmp_files/parquet")


讀json文件 必須format
val userDF = spark.read.format("json").load("/usr/local/tmp_files/emp.json")
val userDF3 = spark.read.json("/usr/local/tmp_files/emp.json")

關于save函數:
調用save函數的時候,可以指定存儲模式,追加、覆蓋等等

userDF2.write.save("/usr/local/tmp_files/parquet")

userDF2.write.save("/usr/local/tmp_files/parquet")
org.apache.spark.sql.AnalysisException: path file:/usr/local/tmp_files/parquet already exists.;

save的時候覆蓋
userDF2.write.mode("overwrite").save("/usr/local/tmp_files/parquet")

將結果保存成表
userDF2.select($"name").write.saveAsTable("table1")

scala> userDF.select($"name").write.saveAsTable("table2")

scala> spark.sql("select * from table2").show
+------+
| name|
+------+
|Alyssa|
| Ben|
+------+

2、Parquet文件:列式存儲文件,是Spark SQL 默認的數據源

就是一個普通的文件

舉例:

1、把其他文件,轉換成Parquet文件
調用save函數

把數據讀進來,再寫出去,就是Parquet文件。

val empDF = spark.read.json("/usr/local/tmp_files/emp.json")
empDF.write.mode("overwrite").save("/usr/local/tmp_files/parquet")
empDF.write.mode("overwrite").parquet("/usr/local/tmp_files/parquet")

val emp1 = spark.read.parquet("/usr/local/tmp_files/parquet")
emp1.createOrReplaceTempView("emp1")
spark.sql("select * from emp1")

2、支持Schema的合并
項目開始 表結構簡單 schema簡單
項目越來越大 schema越來越復雜

舉例:
通過RDD來創建DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF("single","double")
"single","double" 是表結構
df1.show

df1.write.mode("overwrite").save("/usr/local/tmp_files/test_table/key=1")

val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF("single","triple")
df2.show
df2.write.mode("overwrite").save("/usr/local/tmp_files/test_table/key=2")

合并兩個部分
val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")

val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")


通過RDD來創建DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF("single","double")
"single","double" 是表結構
df1.show

df1.write.mode("overwrite").save("/usr/local/tmp_files/test_table/tzkt=1")

val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF("single","triple")
df2.show
df2.write.mode("overwrite").save("/usr/local/tmp_files/test_table/key=2")

合并兩個部分
val df3 = spark.read.parquet("/usr/local/tmp_files/test_table")

val df3 = spark.read.option("mergeSchema",true).parquet("/usr/local/tmp_files/test_table")

3、json文件

讀取Json文件,生成DataFrame
val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")

peopleDF.printSchema

peopleDF.createOrReplaceTempView("peopleView")

spark.sql("select * from peopleView").show

Spark SQL 支持統一的訪問接口。對于不同的數據源,讀取進來,生成DataFrame后,操作完全一樣。


4、JDBC

使用JDBC操作關系型數據庫,加載到Spark中進行分析和處理。

方式一:
val mysqlDF = spark.read.format("jdbc")
.option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8")
.option("user","root")
.option("password","123456")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","emp").load

mysqlDF.show

方式二:
定義一個Properties類

import java.util.Properties
val mysqlProps = new Properties()
mysqlProps.setProperty("user","root")
mysqlProps.setProperty("password","123456")

val mysqlDF1 = spark.read.jdbc("jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8","emp",mysqlProps)

mysqlDF1.show


5、使用Hive

比較常見
(*)spark SQL 完全兼容hive
(*)需要進行配置
拷貝一下文件到spark/conf目錄下:
Hive 配置文件: hive-site.xml
Hadoop 配置文件:core-site.xml hdfs-site.xml

配置好后,重啟spark

啟動Hadoop 與 hive

spark.sql("create table comany.emp_0410(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ','")

三、在IDE中開發Spark SQL

四、性能優化
與RDD類似

1、把內存中緩存表的數據
直接讀取內存的值,來提高性能。

RDD中如何緩存:
rdd.cache 或者 rdd.persist

在Spark SQL中,使用SparkSession.sqlContext.cacheTable

spark中所有context對象
1、sparkContext : SparkCore
2、sql Context : SparkSQL
3、Streaming Context :SparkStreaming

統一起來:SparkSession

操作mysql,啟動spark shell 時,需要:
./bin/spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar

val mysqlDF = spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","emp").load

mysqlDF.show
mysqlDF.createOrReplaceTempView("emp")

spark.sqlContext.cacheTable("emp") ----> 標識這張表可以被緩存,數據還沒有真正被緩存
spark.sql("select * from emp").show ----> 依然讀取mysql
spark.sql("select * from emp").show ----> 從緩存中讀取數據

spark.sqlContext.clearCache

清空緩存后,執行查詢,會觸發查詢mysql數據庫。

2、了解性能優化的相關參數:參考講義

-----------------Spark Streaming------------------------
流式計算框架,類似于Storm

常用的實時計算引擎(流式計算)
1、Apache Storm:真正的流式計算

2、Spark Streaming :嚴格上來說,不是真正的流式計算(實時計算)
把連續的流式數據,當成不連續的RDD
本質:是一個離散計算(不連續)

3、Apache Flink:真正的流式計算。與Spark Streaming相反。
把離散的數據,當成流式數據來處理

4、JStorm

一、Spark Streaming基礎

1、什么是 Spark Streaming。

Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
易于構建靈活的、高容錯的流式系統。

特點:
1、易用,已經集成到Spark中
2、容錯性:底層RDD,RDD本身具有容錯機制
3、支持多種語言:Java Scala Python

2、演示官方的Demo
往Spark Streaming中發送字符串,Spark 接收到以后,進行計數
使用消息服務器 netcat Linux自帶
yum install nc.x86_64

nc -l 1234

注意:總核心數 大于等于2。一個核心用于接收數據,另一個用于處理數據

在netcat中寫入數據 Spark Streaming可以取到

3、開發自己的NetWorkWordCount程序
和Spark Core類似

問題:Hello Hello
Hello World

現在現象:(Hello,2)
(Hello , 1) (World , 1)

能不能累加起來?保存記錄下以前的狀態?

通過Spark Streaming提供的算子來實現

二、高級特性

1、什么是DStream?離散流
把連續的數據變成不連續的RDD
因為DStream的特性,導致,Spark Streaming不是真正的流式計算

2、重點算子講解

(1)updateStateByKey
默認情況下,Spark Streaming不記錄之前的狀態,每次發數據,都會從0開始
現在使用本算子,實現累加操作。

(2)transform

3、窗口操作
窗口:對落在窗口內的數據進行處理,也是一個DStream,RDD

舉例:每10秒鐘把過去30秒的數據采集過來

注意:先啟動nc 再啟動程序 local[2]

4、集成Spark SQL : 使用SQL語句來處理流式數據



5、緩存和持久化:和RDD一樣

6、支持檢查點:和RDD一樣

三、數據源
Spark Streaming是一個流式計算引擎,就需要從外部數據源來接收數據

1、基本的數據源
文件流:監控文件系統的變化,如果文件有增加,讀取文件中的內容

希望Spark Streaming監控一個文件夾,如果有變化,則把變化采集過來

RDD隊列流:可以從隊列中獲取數據


套接字流:socketTextStream

2、高級數據源
(1)Flume

Spark SQL 對接flume有多種方式:
push方式:flume將數據推送給Spark Streaming
























(2)Kafka
在講Kafka時,舉例。









?

?

?




?

轉載于:https://www.cnblogs.com/jareny/p/10799752.html

總結

以上是生活随笔為你收集整理的Spark-Streaming基础的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

av久久久 | 天天干天天干天天干天天干天天干天天干 | 99自拍视频在线观看 | 天天天干天天射天天天操 | 综合网婷婷 | 日日麻批40分钟视频免费观看 | 天天摸日日操 | 国产91精品看黄网站在线观看动漫 | 一二三区在线 | 色资源在线| av大全在线看 | 丁香六月婷婷开心 | 欧美少妇xxxxxx | 亚洲三级在线免费观看 | 亚洲国产成人在线观看 | 日韩欧美在线观看一区二区三区 | 97超碰免费在线观看 | 亚洲精品国产欧美在线观看 | 欧美精品在线免费 | 久久精品3 | 欧美精品一区二区在线播放 | 激情丁香在线 | 色a在线观看| 九九久久精品视频 | 亚洲免费小视频 | 国产又粗又长的视频 | 九精品 | 手机在线永久免费观看av片 | 久草免费福利在线观看 | 国产日产精品一区二区三区四区 | 欧美激情视频一区二区三区 | 亚洲视屏 | 国产精品乱码久久久 | 国产成人精品999 | 日韩网站一区 | 日本精品在线视频 | 亚洲综合在线观看视频 | 日韩精品久久久 | 啪啪小视频网站 | 久久人人爽人人 | 中文字幕在线久一本久 | 久久久精品二区 | 日韩欧美在线综合网 | 国产亚洲视频中文字幕视频 | 啪嗒啪嗒免费观看完整版 | 成人黄色片在线播放 | 国产网站色| 99热在线精品观看 | 狠狠色丁香婷婷综合久小说久 | 精品国产人成亚洲区 | 97精品电影院 | 久久这里| 日韩 在线| 亚洲欧洲美洲av | 婷婷五天天在线视频 | 波多野结依在线观看 | 色网站在线观看 | 在线免费黄色毛片 | 五月天网页 | 九九亚洲视频 | 91人人视频在线观看 | 亚洲视频h| 99免费在线视频 | 一级免费av| 中文字幕婷婷 | 亚洲精品福利在线 | www.久久久.com| 国产精品美女免费视频 | 美女国产网站 | 啪啪精品 | 国产亚洲精品福利 | 久久蜜桃av| av免费看在线 | 国产麻豆视频 | www色网站 | 国产午夜精品一区二区三区在线观看 | 三级在线视频播放 | 黄色在线视频网址 | 亚洲国产精品电影在线观看 | 欧美极品久久 | 久久99亚洲热视 | 日本免费一二三区 | 亚洲免费高清视频 | 亚洲免费一级 | 999视频网站 | 五月婷婷六月丁香在线观看 | 成人欧美日韩国产 | 91视频高清免费 | 久久久私人影院 | 午夜三级影院 | 夜夜躁狠狠燥 | av高清免费| 右手影院亚洲欧美 | 黄色一级大片在线免费看国产一 | 免费视频一二三区 | 国产 中文 日韩 欧美 | 日韩中文字幕视频在线观看 | 亚洲禁18久人片 | 亚洲狠狠丁香婷婷综合久久久 | 日韩成人av在线 | 91av在线视频免费观看 | 国产视频一区二区在线播放 | 天天摸天天操天天爽 | 97在线观视频免费观看 | 久久这里只有精品9 | 国产中文字幕在线观看 | 日韩电影黄色 | 国产日本在线 | 精品91视频 | 在线免费中文字幕 | 成人国产精品免费 | 狠狠狠色丁香婷婷综合激情 | 奇米网网址 | 日本在线观看中文字幕 | 中文字幕亚洲欧美日韩2019 | 五月婷婷六月丁香在线观看 | 久久免费在线观看 | 香蕉视频日本 | 日韩欧美国产激情在线播放 | 2019免费中文字幕 | 一区二区中文字幕在线观看 | 亚洲精品在线一区二区三区 | 五月综合激情婷婷 | 成人四虎影院 | 黄色一级在线观看 | 99视频偷窥在线精品国自产拍 | 992tv在线观看网站 | aa一级片| 国产精品99久久久精品 | 亚洲精品字幕在线 | 久久久久区| 国产亚洲精品成人av久久影院 | 午夜 在线| 91麻豆操 | 亚洲国产精品成人综合 | 国产精品乱码一区二三区 | 久久激情电影 | 欧美成人黄色片 | 色综合久久久久综合99 | 成人影视免费 | 国产99re | 在线观看的a站 | 亚洲精品美女久久久久 | 福利视频入口 | 亚洲作爱视频 | zzijzzij日本成熟少妇 | 日韩成人精品 | 五月激情婷婷丁香 | 日韩精品一区二区三区免费观看 | 激情网站免费观看 | 干天天 | 国产精品久久99综合免费观看尤物 | 亚洲成av人片在线观看香蕉 | 色综合久久66| 精品久久久久久久久久久久 | 国产在线精品播放 | 黄a网站 | 国产a级片免费观看 | 精品久久网 | 丁香五月缴情综合网 | 国产中文字幕在线看 | 精品日韩中文字幕 | 国产精品一区二区av | 欧美日韩精品在线观看 | 免费黄色小网站 | 啪啪激情网 | 日韩高清免费无专码区 | 亚洲精品tv久久久久久久久久 | 婷婷色中文字幕 | 麻豆视频一区二区 | 天天天综合网 | 久久香蕉国产 | 在线黄色av电影 | 日韩二区在线播放 | 日本久久综合网 | 69精品久久久 | 国产婷婷视频在线 | 天天激情 | 天天爱av导航 | 97国产超碰 | 婷婷狠狠操| 激情综合网天天干 | 五月婷婷丁香在线观看 | 337p西西人体大胆瓣开下部 | 久久色在线观看 | www.日日日.com | 黄色av电影在线 | 亚洲 综合 国产 精品 | 九九久久精品视频 | 国产成人亚洲在线电影 | 久久精品中文字幕免费mv | 国产青青青 | 日韩视频在线观看视频 | 91av视频网站 | 伊在线视频 | 午夜精品久久久久 | 国产91丝袜在线播放动漫 | 日本精品久久久久中文字幕5 | 欧美夫妻生活视频 | 欧美一级小视频 | 夜色资源站wwwcom | 狠狠狠色丁香综合久久天下网 | 欧美成人亚洲成人 | 91丨九色丨蝌蚪丨对白 | 国产精品原创av片国产免费 | 成人午夜黄色影院 | 色五婷婷| 一区二区三区在线免费观看 | 91精品国产福利在线观看 | 狠狠狠狠狠狠操 | 国产精品久久久久久久av大片 | av一区二区在线观看中文字幕 | 亚洲精品激情 | 夜夜高潮夜夜爽国产伦精品 | 91在线看免费 | 色综合天天爱 | 亚洲激情六月 | 视频福利在线观看 | 99久久精品日本一区二区免费 | 日本在线观看一区 | 免费观看一级视频 | 69久久99精品久久久久婷婷 | 亚洲视频精品在线 | 久久精品三级 | 日本女人的性生活视频 | 精品色综合 | 国产一区二区在线精品 | 国产亚洲va综合人人澡精品 | 国产九九九精品视频 | 亚洲理论电影 | 九九爱免费视频在线观看 | 国产一级片视频 | 国产视频在 | 国产亚洲激情视频在线 | 色婷婷狠 | 在线影视 一区 二区 三区 | 综合久久精品 | 成人免费视频在线观看 | 中文字幕在线观看亚洲 | 狠狠干我| 国产资源在线观看 | 久久久综合精品 | а中文在线天堂 | 国产精品福利久久久 | 麻豆 91 在线 | 在线观看免费黄视频 | 久久免费视频播放 | 五月花婷婷 | 欧美成人中文字幕 | 91成版人在线观看入口 | 激情欧美xxxx | 久久免费视频5 | 97国产小视频 | 操操爽| 欧美激情精品久久久 | 97超碰在线久草超碰在线观看 | 91在线看免费 | 热久久免费国产视频 | 日韩精品一区二 | 99久久久久久久 | 久久精彩 | 久久影院午夜论 | 天天操天天操天天操天天操天天操天天操 | 亚洲国产三级在线观看 | 特黄特色特刺激视频免费播放 | 国产精品久久久久久久久免费 | 午夜精品一区二区三区免费 | 欧美日韩一区二区三区在线观看视频 | 国产中文字幕在线免费观看 | 91av在线看 | 国产又粗又硬又长又爽的视频 | 国产在线观看a | 91中文字幕在线播放 | 2023天天干 | 久久久久久亚洲精品 | 一级片免费观看 | 91精品国产综合久久福利 | 国产精品av电影 | 国产色黄网站 | 国产99久久久国产 | 亚洲精品视频在线观看免费 | 免费在线黄色av | 久久人人爽人人爽人人片 | 久久看片网 | 在线观看日韩精品 | 国产日韩欧美精品在线观看 | 国产一级片免费观看 | 久久久九九 | 久久黄色免费观看 | 亚洲更新最快 | 久久久色| 日韩精品免费在线 | 国产一区二区在线免费播放 | 在线亚洲播放 | 激情av五月婷婷 | 超碰久热| 五月婷婷影院 | 美女性爽视频国产免费app | 久草精品视频在线观看 | 久久不卡av| 亚洲激情在线视频 | 日日添夜夜添 | 久久久久久久免费观看 | 日韩理论视频 | 97精品国产91久久久久久 | www.xxxx变态.com | 色综合久久久久网 | 狠狠躁18三区二区一区ai明星 | 欧美少妇的秘密 | 国产高清在线a视频大全 | 久久理论电影网 | 欧美一级免费片 | 日本公妇色中文字幕 | 99热最新地址 | www91在线观看 | 91av在线免费视频 | 日韩欧美国产精品 | 国内三级在线观看 | 国产91亚洲 | 成人教育av | 国产精品第一视频 | 欧美成人h版| 又黄又爽的视频在线观看网站 | 成人黄色小视频 | 国产精品免费观看网站 | 在线观看免费高清视频大全追剧 | 美国av大片| 成人午夜在线电影 | av在线亚洲天堂 | a黄色| 日本91在线| 狠狠操精品| 日韩在线观看免费 | 午夜在线免费视频 | 欧美不卡视频在线 | av最新资源| 黄色在线观看污 | 麻豆高清免费国产一区 | 激情xxxx | 天天操偷偷干 | 日韩在线观看视频在线 | 国产精品午夜久久 | 在线视频欧美日韩 | 久久免费在线视频 | 久久久久国产精品午夜一区 | 精品欧美在线视频 | 日韩综合在线观看 | 午夜精品久久久久久中宇69 | 91免费看黄 | 国产高清黄 | 成人a视频 | 日韩精品久久久久久中文字幕8 | 精品视频免费播放 | a级国产乱理论片在线观看 伊人宗合网 | 亚洲久草在线视频 | 91视频传媒 | 欧美在线视频日韩 | 中文字幕亚洲欧美日韩2019 | 久久论理 | www.天天色 | 亚洲第一成网站 | 日一日干一干 | 免费黄色小网站 | 免费三及片 | 日本最大色倩网站www | 激情五月婷婷 | 天天草天天 | 亚洲二区精品 | 欧美狠狠色| 久久99网| 天堂av官网 | 国产精品久久久久久久久久妇女 | 久久在线观看 | 色一级片 | 久久精品国产第一区二区三区 | 欧美精品国产综合久久 | 五月婷婷综 | 五月婷婷在线播放 | 日韩精品在线免费播放 | 成人免费观看视频网站 | 97超碰香蕉| 日韩av片在线| 久久精品视频4 | 韩国视频一区二区三区 | 最近中文国产在线视频 | 久久天天躁夜夜躁狠狠85麻豆 | 99视频在线免费看 | 丁香婷婷在线 | 日日操夜夜操狠狠操 | 成人午夜电影久久影院 | www国产亚洲精品久久麻豆 | 美腿丝袜一区二区三区 | 96亚洲精品久久 | 久久少妇av | 麻豆成人在线观看 | av线上免费观看 | 一区二区三区国产欧美 | 久草爱| 天天曰视频 | 亚洲japanese制服美女 | 狠狠五月天 | 精品欧美日韩 | 亚洲成年人免费网站 | 久操久 | 久久精品三级 | 国产老太婆免费交性大片 | 日韩美女久久 | 亚洲精品mv在线观看 | 免费看在线看www777 | 亚洲精品男女 | 亚洲精品在线播放视频 | 深夜激情影院 | 久久天天躁狠狠躁亚洲综合公司 | 国产精品自在欧美一区 | 免费在线观看黄色网 | 久久成人资源 | 亚洲乱码中文字幕综合 | 国产色资源 | 欧美嫩草影院 | 中文av日韩 | 手机看片午夜 | 成人欧美一区二区三区黑人麻豆 | 国产精品s色 | 国产视频一区在线播放 | 国产高清在线永久 | 成人av电影在线播放 | 国产免费观看高清完整版 | 精品中文字幕视频 | 日韩精品免费在线视频 | 日韩精品黄 | 又黄又刺激的网站 | 久久精品99北条麻妃 | 美州a亚洲一视本频v色道 | 色先锋av资源中文字幕 | 97人人模人人爽人人少妇 | 久久久久www | www国产亚洲| 在线免费视频一区 | 亚洲精品国产精品国自产在线 | 手机看片99| 欧洲亚洲激情 | 91亚洲免费| 美女视频黄是免费的 | 国产资源在线视频 | 超碰在线网 | 91成人免费看 | 久久草在线精品 | 1000部国产精品成人观看 | 国产一线二线三线在线观看 | 成人欧美一区二区三区在线观看 | 伊人天天干| 国产99久久精品一区二区300 | 97在线公开视频 | 我要看黄色一级片 | 91成版人在线观看入口 | 黄色成人在线网站 | 欧美日韩精品免费观看 | 成人av网页 | 日本在线h | 久久久久国产成人免费精品免费 | 成人黄色大片在线免费观看 | av观看久久久 | 精品美女国产在线 | 日韩欧美高清免费 | 中文字幕高清视频 | 成人香蕉视频 | 日韩免费视频观看 | 亚洲国产成人在线 | 亚洲精品国产精品国自产在线 | 欧美日韩三区二区 | 精品产品国产在线不卡 | 日韩午夜电影院 | 国产尤物一区二区三区 | 亚洲一二三在线 | 黄色小网站免费看 | 97色免费视频 | 免费av视屏 | 色噜噜日韩精品欧美一区二区 | 国产欧美精品一区aⅴ影院 99视频国产精品免费观看 | 国产精品激情在线观看 | 麻豆传媒视频在线 | 最新日韩精品 | 成人免费电影 | 国产视频一级 | 国产做aⅴ在线视频播放 | 欧亚日韩精品一区二区在线 | 久99视频 | 在线中文字幕网站 | 日本精品一区二区 | 视频在线观看入口黄最新永久免费国产 | 日韩精品中文字幕在线观看 | 97在线观看免费观看 | 成年人天堂com | 在线国产欧美 | 91视频在线国产 | 久久久久久久久免费 | 国产在线观看你懂得 | 久久成人一区二区 | 日日摸日日添日日躁av | 久青草视频在线观看 | 国产专区精品 | 女人高潮特级毛片 | 中文字幕中文字幕中文字幕 | www在线观看视频 | 日韩激情小视频 | 久久er99热精品一区二区 | 99久久综合狠狠综合久久 | av 一区 二区 久久 | 亚洲成人精品影院 | 国产成人一二三 | 日韩激情片在线观看 | a视频在线播放 | 国产中文在线视频 | 一级黄色片在线观看 | 国产99久久精品一区二区300 | www.91av在线 | 国产成人高清在线 | 日本成址在线观看 | 69国产盗摄一区二区三区五区 | 色老板在线视频 | 超碰官网| 五月天国产精品 | 欧美片一区二区三区 | 国产成人免费观看 | 月下香电影 | 亚洲草视频 | 国产另类av| 色诱亚洲精品久久久久久 | 99久久久久成人国产免费 | 国产一二三在线视频 | 免费www视频 | 在线观看免费黄视频 | 国产无限资源在线观看 | 99久久国产免费免费 | 青青河边草观看完整版高清 | 91人人澡人人爽人人精品 | 午夜精品久久一牛影视 | 久久首页| 四虎影视成人永久免费观看亚洲欧美 | 在线观看成年人 | 婷婷激情5月天 | 日韩精品免费在线观看视频 | 在线不卡中文字幕播放 | 五月婷婷操 | 天天色天天射综合网 | 欧美日韩一区久久 | 欧美久久99 | av中文字幕在线电影 | 色婷婷狠 | 欧美资源| 国产成人精品日本亚洲999 | 天天做日日做天天爽视频免费 | 国产91丝袜在线播放动漫 | 欧美综合国产 | 国产精品色婷婷 | 99久久久成人国产精品 | 亚洲成人免费在线 | 日韩精品免费一区二区三区 | 婷婷在线五月 | 婷婷六月综合网 | 黄色大片视频网站 | 国产精品自产拍在线观看桃花 | 91pony九色丨交换 | 天天操夜夜曰 | 成人影视免费看 | 亚洲香蕉视频 | 美女免费黄视频网站 | 国产精品入口麻豆 | 免费久久99精品国产 | 国产精品乱码一区二三区 | 亚洲精品免费观看视频 | 亚洲一级片免费观看 | 色网站免费在线看 | 人人爱夜夜操 | 日韩高清在线一区二区三区 | av一区二区三区在线观看 | 丁香六月欧美 | 视频二区 | 黄色视屏在线免费观看 | 色婷婷啪啪免费在线电影观看 | 国产色久| 国产日韩欧美中文 | 成人av电影在线 | 午夜国产一区二区三区四区 | 日韩av在线高清 | 激情深爱| 在线观看av的网站 | 91视频91自拍 | 日本中文字幕视频 | 亚洲春色综合另类校园电影 | 国产精品久久一卡二卡 | 亚洲精品黄色片 | 欧美精品亚洲精品日韩精品 | 久久精品国产一区二区三区 | 国产99re | 久久久久久久99 | 国产一级黄色av | 玖玖在线观看视频 | 在线观看视频亚洲 | 国产成人精品一区二三区 | 97在线精品视频 | 很黄很色很污的网站 | 四虎在线免费观看 | 欧美一级欧美一级 | 精品一二| 国产成人99av超碰超爽 | 成人av电影在线 | 黄色av免费电影 | www免费在线观看 | 国产精品高潮久久av | 欧美男同网站 | av大全免费在线观看 | 一区二区三区在线观看免费 | 国产成人福利片 | 人人搞人人干 | 欧美精品免费一区二区 | 日韩在线免费高清视频 | 麻花豆传媒mv在线观看 | 久久黄色片子 | 婷婷亚洲五月色综合 | 日韩免费看片 | 极品嫩模被强到高潮呻吟91 | 亚洲欧美成aⅴ人在线观看 四虎在线观看 | 人成免费网站 | 成人午夜网址 | 亚洲欧美国产精品va在线观看 | 久久999久久| 97精产国品一二三产区在线 | 色婷婷视频| 国产精品视频永久免费播放 | 久久在线一区 | 4438全国亚洲精品在线观看视频 | 亚洲精品大片www | 久久午夜电影网 | 中文字幕一区av | 成人动漫精品一区二区 | 成人在线播放视频 | 字幕网av| 涩涩网站在线播放 | 永久精品视频 | 免费看国产一级片 | 超碰公开在线观看 | 久久理论电影 | 午夜免费在线观看 | 国产精品久久久久一区二区 | 日本爽妇网 | 国产精品久久免费看 | 69精品人人人人 | 精品91| 8x成人免费视频 | 99热手机在线 | 91网免费观看 | 91av亚洲 | 亚洲国产网站 | 色av色av色av| 免费看国产一级片 | 国产日韩视频在线播放 | 91久久久久久久一区二区 | 91精品国产乱码在线观看 | 大荫蒂欧美视频另类xxxx | 国产精品99爱 | 国产精品久久久久一区二区三区 | 看av在线| 人人爽人人干 | 蜜桃视频成人在线观看 | 国产精品久久久久久久电影 | 国产一区二区在线免费 | 国产中文字幕在线免费观看 | 国产麻豆精品传媒av国产下载 | 国产精品一区二区三区视频免费 | 国产成人精品网站 | h网站免费在线观看 | 中文字幕在线免费观看视频 | 国产夫妻性生活自拍 | 婷婷激情久久 | 高清精品视频 | 精品国产伦一区二区三区免费 | 亚州精品一二三区 | 成人av.com | 丁香综合av | 人人艹人人 | 日本公乱妇视频 | 国产手机视频在线观看 | 欧美日韩久久不卡 | 高清一区二区三区 | www.国产在线观看 | 国产精品久免费的黄网站 | 黄色网在线播放 | 在线黄色免费av | 久久久影片 | 亚洲欧美少妇 | 欧美日韩国产综合一区二区 | 久久国产一区二区三区 | 一级片视频在线 | 在线免费观看视频 | h网站免费在线观看 | 五月激情站 | 久久精品亚洲国产 | 欧美日韩视频在线一区 | 亚洲精品 在线视频 | 精品色999| 免费精品人在线二线三线 | 国产精品一区二区三区四区在线观看 | 国产精品精品久久久久久 | 丁香久久五月 | 丝袜护士aⅴ在线白丝护士 天天综合精品 | 国产不卡一 | 亚洲国产网站 | 一本色道久久综合亚洲二区三区 | 四虎永久网站 | 中文字幕免费一区 | 久久精品这里都是精品 | 亚洲精品黄色在线观看 | 亚洲国产69| 成人黄大片 | 91av蜜桃| 国产一级大片免费看 | 人人插人人做 | 欧美日本中文字幕 | 国产精品中文字幕在线 | 亚洲资源在线观看 | 免费在线国产精品 | 免费在线观看av电影 | 久久精品99国产精品日本 | 久久久国际精品 | 在线免费观看羞羞视频 | 欧美性做爰猛烈叫床潮 | 99久久精品国产亚洲 | 五月综合在线观看 | 国内精自线一二区永久 | 国产精品视频线看 | 亚洲精品九九 | 国产成人一区二 | 91粉色视频| 日本激情动作片免费看 | 欧美日韩视频免费 | 91香蕉视频色版 | 色插综合| 波多野结衣精品 | 91在线观看欧美日韩 | 欧美99精品 | 久久视频精品 | 美女黄网站视频免费 | 久热久草在线 | 国产理论在线 | 日本天天操 | 国产免费又爽又刺激在线观看 | 中文字幕中文字幕在线中文字幕三区 | 欧美日韩国产一二三区 | 一本一本久久a久久 | av中文电影 | 国产精品va最新国产精品视频 | 视频成人永久免费视频 | 麻豆国产在线播放 | 又黄又爽又色无遮挡免费 | 国产精品亚 | 精品国产一区二区三区噜噜噜 | 精品一区二区综合 | 亚洲日本欧美 | 天天操操| 亚洲影院色 | 午夜精品一区二区三区在线视频 | 日日夜夜网站 | 久久精品视频4 | 国产视频在线观看一区二区 | 亚洲丝袜一区 | 99久久精品费精品 | 久久这里有精品 | 最新av观看| 在线99| 久久久人人爽 | 久久综合欧美 | 亚洲精品久久久久中文字幕二区 | 欧美日韩一区二区视频在线观看 | 天天操夜夜看 | 亚洲欧美乱综合图片区小说区 | 久久久久免费精品国产 | 国产精品久久99精品毛片三a | 深爱激情五月网 | 青春草视频 | 日本精品一区二区 | 一区二区三区四区精品 | 国内精品久久久久影院优 | 午夜精品福利一区二区三区蜜桃 | 欧美日韩国产三级 | 亚洲91av| 在线免费观看麻豆 | 欧美日本啪啪无遮挡网站 | 黄色av成人在线 | 天天人人综合 | 91精品国产92久久久久 | 91网免费观看 | 五月激情综合婷婷 | 国产精品入口a级 | av千婊在线免费观看 | av成人在线电影 | 国产va精品免费观看 | 日日操日日 | 成全在线视频免费观看 | 国产精品欧美久久久久天天影视 | 激情综合一区 | 超碰在线97免费 | 日批视频在线观看免费 | 99精品视频在线播放观看 | 国产精品久久久久永久免费 | 91福利视频在线 | 成年人看片 | a级免费观看 | 亚洲区精品视频 | 久久久久日本精品一区二区三区 | 四虎在线观看 | 欧美在线日韩在线 | 久久国产精品免费 | 国产亚洲在 | 国产精品一区二区免费在线观看 | 欧美精品视 | 国产精品国产三级国产aⅴ无密码 | 天天操夜夜操天天射 | 色在线最新 | 午夜国产福利在线 | 国产黑丝一区二区三区 | 91黄在线看| 337p日本大胆噜噜噜噜 | 国产精品久久久久久久久久直播 | 成人v| 日韩性久久 | 国产精品麻豆果冻传媒在线播放 | 中文字幕电影一区 | 日韩高清av在线 | 国产精品高潮呻吟久久久久 | 日本精品一区二区三区在线观看 | 日韩一二三在线 | 国产免费又粗又猛又爽 | 日韩电影中文字幕在线 | 午夜在线看片 | 国产精品九九九九九九 | 在线日韩一区 | av中文资源在线 | 欧美日韩视频一区二区三区 | 激情自拍av | 日韩av电影国产 | 色综合天天视频在线观看 | 欧美日韩破处 | 国产视频一二三 | 日韩欧美高清在线 | 91亚洲精品国产 | 狠狠插狠狠操 | 四虎亚洲精品 | 青青河边草免费观看完整版高清 | 8x成人在线 | 一区二区三区动漫 | 欧美激情亚洲综合 | 欧美日韩国产二区 | 毛片网在线观看 | 日韩成人不卡 | 高清精品在线 | 九九国产精品视频 | 国内精品久久久久影院一蜜桃 | 黄色小说视频网站 | 精品1区2区3区| 97色涩 | 久久国产精品久久精品国产演员表 | 中文字幕亚洲综合久久五月天色无吗'' | 久久精品网站免费观看 | 免费看的毛片 | 欧美激情综合五月 | 日日夜夜天天久久 | 日韩在线电影一区二区 | 91高清完整版在线观看 | 欧美日韩在线播放一区 | av解说在线 | 91激情 | 人人射人人插 | 久久人人艹 | 色吊丝在线永久观看最新版本 | 亚洲视频精品在线 | 97视频免费观看2区 亚洲视屏 | 欧美吞精 | 香蕉在线观看 | www.久久久com | 91精品视频免费看 | 日本精品久久久久影院 | 欧美日韩在线免费观看视频 | 国产成人精品一区二区三区网站观看 | 在线观看网站av | 成人免费观看a | 麻豆国产露脸在线观看 | 欧美在线观看视频一区二区三区 | 国产不卡在线观看视频 | 狠狠色噜噜狠狠 | 日韩精品无码一区二区三区 | 最新超碰 | 99精品福利视频 | 日韩黄色大片在线观看 | 国产3p视频 | 在线精品观看国产 | 国产精品女主播一区二区三区 | 亚洲精品男人天堂 | 亚洲国产精品电影 | 国产区在线视频 | 中文字幕一区二区在线观看 | 国产一级一片免费播放放a 一区二区三区国产欧美 | 91麻豆精品国产自产在线游戏 | 在线观看的黄色 | 91九色porny在线 | 在线免费高清 | 91探花在线| 亚洲精品视频网站在线观看 | 综合久久久久久 | 91夜夜夜 | 五月开心婷婷网 | 久久久久久久久久久高潮一区二区 | 亚洲精品女人 | 亚洲一区免费在线 | 国产黄色在线 | 精品国产欧美一区二区三区不卡 | 中文字幕 在线看 | 久久久国产精品麻豆 | 日本中文在线观看 | 精品综合久久久 | 国产美女网站视频 | 四虎5151久久欧美毛片 | 婷婷国产v亚洲v欧美久久 | 欧美另类调教 | 免费一级日韩欧美性大片 | 在线免费黄色 | 在线观看免费黄视频 | 亚洲国产精品一区二区久久hs | 国产精品久久久久免费 | www.亚洲激情.com | 国产超碰在线观看 | www99久久 | 日韩色一区二区三区 | jizzjizzjizz亚洲 | 黄色成人在线 | 麻豆一精品传二传媒短视频 | 国产精品午夜av | 亚洲成人免费在线观看 | 亚洲精品国产拍在线 | 亚洲美女视频在线观看 | 国产91学生| 91插插影库| aⅴ视频在线 | 精品久久久久久久久中文字幕 | 亚洲欧洲精品视频 | 国产精品久久久久久久久久尿 | 成人午夜影视 | 久久国产精品影视 | www.久久精品视频 | 六月激情婷婷 | 国产精品18久久久 | 激情五月色播五月 | 久久久精品网站 | 国产成人99av超碰超爽 | 成人久久影院 | 国产又粗又硬又长又爽的视频 | 久久伊人国产精品 | 国产一级片视频 | 免费av在线网站 | 黄色国产区 | 久久婷五月 | 手机看片国产日韩 | 色噜噜在线观看视频 | 激情大尺度视频 | 日韩免费一级电影 | 婷婷中文在线 | 久久久www免费电影网 | 日躁夜躁狠狠躁2001 | 91在线日韩 | 在线成人性视频 | 激情影院在线观看 | 日本中文字幕在线播放 | 最新午夜 | av网站免费线看精品 | 成人影片在线免费观看 | 91人人爽久久涩噜噜噜 | 你操综合 | 香蕉视频国产在线观看 | av电影免费在线看 | 婷婷综合亚洲 | 婷婷色资源| 女女av在线 | 欧美极品少妇xxxxⅹ欧美极品少妇xxxx亚洲精品 | 91人人人 | 天天干.com| 亚洲 欧美 另类人妖 | 黄色的网站在线 | 国产91在线播放 | a级国产乱理论片在线观看 特级毛片在线观看 | 国产视频在线一区二区 | 欧美一级片免费在线观看 | 97福利在线观看 | av片在线看| 欧美日本一区 | 日韩在线短视频 | 天天射天天操天天色 | 成人黄在线 | 欧美大片mv免费 |