日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(四十四):​​​​​​扩展阅读 End-to-End Exactly-Once

發(fā)布時(shí)間:2023/11/28 生活经验 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(四十四):​​​​​​扩展阅读 End-to-End Exactly-Once 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄

擴(kuò)展閱讀 End-to-End Exactly-Once

流處理的數(shù)據(jù)處理語義

At-most-once-最多一次

At-least-once-至少一次

?Exactly-once-精確一次

End-to-End Exactly-Once-端到端的精確一次

注意:精確一次? 有效一次!

補(bǔ)充:流計(jì)算系統(tǒng)如何支持一致性語義

???????End-to-End Exactly-Once的實(shí)現(xiàn)

???????Source

???????Transformation

???????Sink

???????Flink+Kafka的End-to-End Exactly-Once

???????版本說明

???????兩階段提交-API

???????兩階段提交-簡單流程

???????兩階段提交-詳細(xì)流程

代碼示例

Flink+Kafka實(shí)現(xiàn)End-to-End Exactly-Once

???????Flink+MySQL實(shí)現(xiàn)End-to-End Exactly-Once

深度總結(jié)

Exactly-Once

End-to-End Exactly-Once

Flink如何支持End-to-End Exactly-Once的?

兩階段事務(wù)提交協(xié)議


擴(kuò)展閱讀 End-to-End Exactly-Once

Flink 在1.4.0 版本引入『exactly-once』并號稱支持『End-to-End Exactly-Once』“端到端的精確一次”語義。

流處理數(shù)據(jù)處理語義

對于批處理,fault-tolerant(容錯(cuò)性)很容易做,失敗只需要replay,就可以完美做到容錯(cuò)。

對于流處理,數(shù)據(jù)流本身是動態(tài),沒有所謂的開始或結(jié)束,雖然可以replay buffer的部分?jǐn)?shù)據(jù),但fault-tolerant做起來會復(fù)雜的多

流處理(有時(shí)稱為事件處理)可以簡單地描述為是對無界數(shù)據(jù)或事件的連續(xù)處理。流或事件處理應(yīng)用程序可以或多或少地被描述為有向圖,并且通常被描述為有向無環(huán)圖(DAG)。在這樣的圖中,每個(gè)邊表示數(shù)據(jù)或事件流,每個(gè)頂點(diǎn)表示運(yùn)算符,會使用程序中定義的邏輯處理來自相鄰邊的數(shù)據(jù)或事件。有兩種特殊類型的頂點(diǎn),通常稱為 sources 和 sinks。sources讀取外部數(shù)據(jù)/事件到應(yīng)用程序中,而 sinks 通常會收集應(yīng)用程序生成的結(jié)果。下圖是流式應(yīng)用程序的示例。有如下特點(diǎn):

分布式情況下是由多個(gè)Source(讀取數(shù)據(jù))節(jié)點(diǎn)、多個(gè)Operator(數(shù)據(jù)處理)節(jié)點(diǎn)、多個(gè)Sink(輸出)節(jié)點(diǎn)構(gòu)成

每個(gè)節(jié)點(diǎn)的并行數(shù)可以有差異,且每個(gè)節(jié)點(diǎn)都有可能發(fā)生故障

對于數(shù)據(jù)正確性最重要的一點(diǎn),就是當(dāng)發(fā)生故障時(shí),是怎樣容錯(cuò)與恢復(fù)的。

?

流處理引擎通常為應(yīng)用程序提供了三種數(shù)據(jù)處理語義:最多一次、至少一次和精確一次。

如下是對這些不同處理語義的寬松定義(一致性由弱到強(qiáng)):

At most noce < At least once < Exactly once < End to End Exactly once

At-most-once-最多一次

有可能會有數(shù)據(jù)丟失

這本質(zhì)上是簡單的恢復(fù)方式,也就是直接從失敗處的下個(gè)數(shù)據(jù)開始恢復(fù)程序,之前的失敗數(shù)據(jù)處理就不管了。可以保證數(shù)據(jù)或事件最多由應(yīng)用程序中的所有算子處理一次。 這意味著如果數(shù)據(jù)在被流應(yīng)用程序完全處理之前發(fā)生丟失,則不會進(jìn)行其他重試或者重新發(fā)送。

?

?

?

?

At-least-once-至少一次

有可能重復(fù)處理數(shù)據(jù)

應(yīng)用程序中的所有算子都保證數(shù)據(jù)或事件至少被處理一次。這通常意味著如果事件在流應(yīng)用程序完全處理之前丟失,則將從源頭重放或重新傳輸事件。然而,由于事件是可以被重傳的,因此一個(gè)事件有時(shí)會被處理多次(至少一次),至于有沒有重復(fù)數(shù)據(jù),不會關(guān)心,所以這種場景需要人工干預(yù)自己處理重復(fù)數(shù)據(jù)

?Exactly-once-精確一次

?

Exactly-Once 是 Flink、Spark 等流處理系統(tǒng)的核心特性之一,這種語義會保證每一條消息只被流處理系統(tǒng)處理一次。即使是在各種故障的情況下,流應(yīng)用程序中的所有算子都保證事件只會被『精確一次』的處理。(也有文章將 Exactly-once 翻譯為:完全一次,恰好一次)

Flink實(shí)現(xiàn)『精確一次』的分布式快照/狀態(tài)檢查點(diǎn)方法受到 Chandy-Lamport 分布式快照算法的啟發(fā)。通過這種機(jī)制,流應(yīng)用程序中每個(gè)算子的所有狀態(tài)都會定期做 checkpoint。如果是在系統(tǒng)中的任何地方發(fā)生失敗,每個(gè)算子的所有狀態(tài)都回滾到最新的全局一致 checkpoint 點(diǎn)。在回滾期間,將暫停所有處理。源也會重置為與最近 checkpoint 相對應(yīng)的正確偏移量。整個(gè)流應(yīng)用程序基本上是回到最近一次的一致狀態(tài),然后程序可以從該狀態(tài)重新啟動。

End-to-End Exactly-Once-端到端的精確一次

Flink 在1.4.0 版本引入『exactly-once』并號稱支持『End-to-End Exactly-Once』“端到端的精確一次”語義。

它指的是 Flink 應(yīng)用從 Source 端開始到 Sink 端結(jié)束,數(shù)據(jù)必須經(jīng)過的起始點(diǎn)和結(jié)束點(diǎn)。

注意:

『exactly-once』和『End-to-End Exactly-Once』的區(qū)別:

?

?

?

?

???????注意:精確一次? 有效一次!

有些人可能認(rèn)為『精確一次』描述了事件處理的保證,其中流中的每個(gè)事件只被處理一次。實(shí)際上,沒有引擎能夠保證正好只處理一次。在面對任意故障時(shí),不可能保證每個(gè)算子中的用戶定義邏輯在每個(gè)事件中只執(zhí)行一次,因?yàn)橛脩舸a被部分執(zhí)行的可能性是永遠(yuǎn)存在的。

那么,當(dāng)引擎聲明『精確一次』處理語義時(shí),它們能保證什么呢?如果不能保證用戶邏輯只執(zhí)行一次,那么什么邏輯只執(zhí)行一次?當(dāng)引擎聲明『精確一次』處理語義時(shí),它們實(shí)際上是在說,它們可以保證引擎管理的狀態(tài)更新只提交一次到持久的后端存儲。

事件的處理可以發(fā)生多次,但是該處理的效果只在持久后端狀態(tài)存儲中反映一次。因此,我們認(rèn)為有效地描述這些處理語義最好的術(shù)語是『有效一次』(effectively once)

?

???????補(bǔ)充:流計(jì)算系統(tǒng)如何支持一致性語義

?

?

?

?

?

?

?

???????End-to-End Exactly-Once的實(shí)現(xiàn)

通過前面的學(xué)習(xí),我們了解到,Flink內(nèi)部借助分布式快照Checkpoint已經(jīng)實(shí)現(xiàn)了內(nèi)部的Exactly-Once,但是Flink 自身是無法保證外部其他系統(tǒng)“精確一次”語義的,所以 Flink 若要實(shí)現(xiàn)所謂“端到端(End to End)的精確一次”的要求,那么外部系統(tǒng)必須支持“精確一次”語義;然后借助一些其他手段才能實(shí)現(xiàn)。如下:

???????Source

發(fā)生故障時(shí)需要支持重設(shè)數(shù)據(jù)的讀取位置,如Kafka可以通過offset來實(shí)現(xiàn)(其他的沒有offset系統(tǒng),我們可以自己實(shí)現(xiàn)累加器計(jì)數(shù))

???????Transformation

也就是Flink內(nèi)部,已經(jīng)通過Checkpoint保證了,如果發(fā)生故障或出錯(cuò)時(shí),Flink應(yīng)用重啟后會從最新成功完成的checkpoint中恢復(fù)——重置應(yīng)用狀態(tài)并回滾狀態(tài)到checkpoint中輸入流的正確位置,之后再開始執(zhí)行數(shù)據(jù)處理,就好像該故障或崩潰從未發(fā)生過一般。

  • 分布式快照機(jī)制

我們在之前的課程中講解過 Flink 的容錯(cuò)機(jī)制,Flink?提供了失敗恢復(fù)的容錯(cuò)機(jī)制,而這個(gè)容錯(cuò)機(jī)制的核心就是持續(xù)創(chuàng)建分布式數(shù)據(jù)流的快照來實(shí)現(xiàn)。

?

同 Spark 相比,Spark 僅僅是針對 Driver 的故障恢復(fù) Checkpoint。而 Flink 的快照可以到算子級別,并且對全局?jǐn)?shù)據(jù)也可以做快照。Flink 的分布式快照受到 ?Chandy-Lamport?分布式快照算法啟發(fā),同時(shí)進(jìn)行了量身定做。

?

  • Barrier

Flink 分布式快照的核心元素之一是 Barrier(數(shù)據(jù)柵欄),我們也可以把 Barrier 簡單地理解成一個(gè)標(biāo)記,該標(biāo)記是嚴(yán)格有序的,并且隨著數(shù)據(jù)流往下流動。每個(gè) Barrier 都帶有自己的 ID,Barrier 極其輕量,并不會干擾正常的數(shù)據(jù)處理。

?

如上圖所示,假如我們有一個(gè)從左向右流動的數(shù)據(jù)流,Flink 會依次生成 snapshot 1、 snapshot 2、snapshot 3……Flink 中有一個(gè)專門的“協(xié)調(diào)者”負(fù)責(zé)收集每個(gè) snapshot 的位置信息,這個(gè)“協(xié)調(diào)者”也是高可用的。

?

Barrier 會隨著正常數(shù)據(jù)繼續(xù)往下流動,每當(dāng)遇到一個(gè)算子,算子會插入一個(gè)標(biāo)識,這個(gè)標(biāo)識的插入時(shí)間是上游所有的輸入流都接收到 snapshot n。與此同時(shí),當(dāng)我們的 sink 算子接收到所有上游流發(fā)送的 Barrier 時(shí),那么就表明這一批數(shù)據(jù)處理完畢,Flink 會向“協(xié)調(diào)者”發(fā)送確認(rèn)消息,表明當(dāng)前的 snapshot n 完成了。當(dāng)所有的 sink 算子都確認(rèn)這批數(shù)據(jù)成功處理后,那么本次的 snapshot 被標(biāo)識為完成。

?

這里就會有一個(gè)問題,因?yàn)?Flink 運(yùn)行在分布式環(huán)境中,一個(gè) operator 的上游會有很多流,每個(gè)流的 barrier n 到達(dá)的時(shí)間不一致怎么辦?這里 Flink 采取的措施是:快流等慢流。

拿上圖的 barrier n 來說,其中一個(gè)流到的早,其他的流到的比較晚。當(dāng)?shù)谝粋€(gè) barrier n到來后,當(dāng)前的 operator 會繼續(xù)等待其他流的 barrier n。直到所有的barrier n 到來后,operator 才會把所有的數(shù)據(jù)向下發(fā)送。

  • 異步和增量

按照上面我們介紹的機(jī)制,每次在把快照存儲到我們的狀態(tài)后端時(shí),如果是同步進(jìn)行就會阻塞正常任務(wù),從而引入延遲。因此 Flink 在做快照存儲時(shí),可采用異步方式。

此外,由于 checkpoint 是一個(gè)全局狀態(tài),用戶保存的狀態(tài)可能非常大,多數(shù)達(dá) G 或者 T 級別。在這種情況下,checkpoint 的創(chuàng)建會非常慢,而且執(zhí)行時(shí)占用的資源也比較多,因此 Flink 提出了增量快照的概念。也就是說,每次都是進(jìn)行的全量 checkpoint,是基于上次進(jìn)行更新的。

?

???????Sink

需要支持冪等寫入或事務(wù)寫入(Flink的兩階段提交需要事務(wù)支持)

?

???????冪等寫入(Idempotent Writes)

冪等寫操作是指:任意多次向一個(gè)系統(tǒng)寫入數(shù)據(jù),只對目標(biāo)系統(tǒng)產(chǎn)生一次結(jié)果影響。

例如,重復(fù)向一個(gè)HashMap里插入同一個(gè)Key-Value二元對,第一次插入時(shí)這個(gè)HashMap發(fā)生變化,后續(xù)的插入操作不會改變HashMap的結(jié)果,這就是一個(gè)冪等寫操作。

HBase、Redis和Cassandra這樣的KV數(shù)據(jù)庫一般經(jīng)常用來作為Sink,用以實(shí)現(xiàn)端到端的Exactly-Once。

需要注意的是,并不是說一個(gè)KV數(shù)據(jù)庫就百分百支持冪等寫。冪等寫對KV對有要求,那就是Key-Value必須是可確定性(Deterministic)計(jì)算的。假如我們設(shè)計(jì)的Key是:name + curTimestamp,每次執(zhí)行數(shù)據(jù)重發(fā)時(shí),生成的Key都不相同,會產(chǎn)生多次結(jié)果,整個(gè)操作不是冪等的。因此,為了追求端到端的Exactly-Once,我們設(shè)計(jì)業(yè)務(wù)邏輯時(shí)要盡量使用確定性的計(jì)算邏輯和數(shù)據(jù)模型。

?

???????事務(wù)寫入(Transactional Writes)

Flink借鑒了數(shù)據(jù)庫中的事務(wù)處理技術(shù),同時(shí)結(jié)合自身的Checkpoint機(jī)制來保證Sink只對外部輸出產(chǎn)生一次影響。大致的流程如下:

Flink先將待輸出的數(shù)據(jù)保存下來暫時(shí)不向外部系統(tǒng)提交,等到Checkpoint結(jié)束時(shí),Flink上下游所有算子的數(shù)據(jù)都是一致的時(shí)候,Flink將之前保存的數(shù)據(jù)全部提交(Commit)到外部系統(tǒng)。換句話說,只有經(jīng)過Checkpoint確認(rèn)的數(shù)據(jù)才向外部系統(tǒng)寫入。

如下圖所示,如果使用事務(wù)寫,那只把時(shí)間戳3之前的輸出提交到外部系統(tǒng),時(shí)間戳3以后的數(shù)據(jù)(例如時(shí)間戳5和8生成的數(shù)據(jù))暫時(shí)保存下來,等待下次Checkpoint時(shí)一起寫入到外部系統(tǒng)。這就避免了時(shí)間戳5這個(gè)數(shù)據(jù)產(chǎn)生多次結(jié)果,多次寫入到外部系統(tǒng)。

?

在事務(wù)寫的具體實(shí)現(xiàn)上,Flink目前提供了兩種方式:

1.預(yù)寫日志(Write-Ahead-Log,WAL)

2.兩階段提交(Two-Phase-Commit,2PC)

這兩種方式區(qū)別主要在于:

1.WAL方式通用性更強(qiáng),適合幾乎所有外部系統(tǒng),但也不能提供百分百端到端的Exactly-Once,因?yàn)閃AL預(yù)習(xí)日志會先寫內(nèi)存,而內(nèi)存是易失介質(zhì)。

2.如果外部系統(tǒng)自身就支持事務(wù)(比如MySQL、Kafka),可以使用2PC方式,可以提供百分百端到端的Exactly-Once。

事務(wù)寫的方式能提供端到端的Exactly-Once一致性,它的代價(jià)也是非常明顯的,就是犧牲了延遲。輸出數(shù)據(jù)不再是實(shí)時(shí)寫入到外部系統(tǒng),而是分批次地提交。目前來說,沒有完美的故障恢復(fù)和Exactly-Once保障機(jī)制,對于開發(fā)者來說,需要在不同需求之間權(quán)衡。

?

???????Flink+Kafka的End-to-End Exactly-Once

在上一小節(jié)我們了解到Flink的 End-to-End Exactly-Once需要Checkpoint+事務(wù)的提交/回滾操作,在分布式系統(tǒng)中協(xié)調(diào)提交和回滾的一個(gè)常見方法就是使用兩階段提交協(xié)議。接下來我們了解下Flink的TwoPhaseCommitSinkFunction是如何支持End-to-End Exactly-Once的

?

???????版本說明

Flink 1.4版本之前,支持Exactly Once語義,僅限于應(yīng)用內(nèi)部。

Flink 1.4版本之后,通過兩階段提交(TwoPhaseCommitSinkFunction)支持End-To-End Exactly Once,而且要求Kafka 0.11+。

利用TwoPhaseCommitSinkFunction是通用的管理方案,只要實(shí)現(xiàn)對應(yīng)的接口,而且Sink的存儲支持變亂提交,即可實(shí)現(xiàn)端到端的劃一性語義。

?

???????兩階段提交-API

在 Flink 中的Two-Phase-Commit-2PC兩階段提交的實(shí)現(xiàn)方法被封裝到了 TwoPhaseCommitSinkFunction 這個(gè)抽象類中,只需要實(shí)現(xiàn)其中的beginTransaction、preCommit、commit、abort 四個(gè)方法就可以實(shí)現(xiàn)“精確一次”的處理語義,如FlinkKafkaProducer就實(shí)現(xiàn)了該類并實(shí)現(xiàn)了這些方法

?

1.beginTransaction,在開啟事務(wù)之前,我們在目標(biāo)文件系統(tǒng)的臨時(shí)目錄中創(chuàng)建一個(gè)臨時(shí)文件,后面在處理數(shù)據(jù)時(shí)將數(shù)據(jù)寫入此文件;

2.preCommit,在預(yù)提交階段,刷寫(flush)文件,然后關(guān)閉文件,之后就不能寫入到文件了,我們還將為屬于下一個(gè)檢查點(diǎn)的任何后續(xù)寫入啟動新事務(wù);

3.commit,在提交階段,我們將預(yù)提交的文件原子性移動到真正的目標(biāo)目錄中,請注意,這會增加輸出數(shù)據(jù)可見性的延遲;

4.abort,在中止階段,我們刪除臨時(shí)文件。

?

???????兩階段提交-簡單流程

?

整個(gè)過程可以總結(jié)為下面四個(gè)階段:

1.一旦 Flink 開始做 checkpoint 操作,那么就會進(jìn)入 pre-commit “預(yù)提交”階段,同時(shí)JobManager的Coordinator?會將?Barrier 注入數(shù)據(jù)流中 ;

2.當(dāng)所有的 barrier 在算子中成功進(jìn)行一遍傳遞(就是Checkpoint完成),并完成快照后,則“預(yù)提交”階段完成;

3.等所有的算子完成“預(yù)提交”,就會發(fā)起一個(gè)commit“提交”動作,但是任何一個(gè)“預(yù)提交”失敗都會導(dǎo)致 Flink 回滾到最近的 checkpoint;

?

???????兩階段提交-詳細(xì)流程

  • 需求

接下來將介紹兩階段提交協(xié)議,以及它如何在一個(gè)讀寫Kafka的Flink程序中實(shí)現(xiàn)端到端的Exactly-Once語義。Kafka經(jīng)常與Flink一起使用,且Kafka在最近的0.11版本中添加了對事務(wù)的支持。這意味著現(xiàn)在通過Flink讀寫Kafaka,并提供端到端的Exactly-Once語義有了必要的支持

?

在上圖中,我們有:

– 從Kafka讀取的數(shù)據(jù)源(Flink內(nèi)置的KafkaConsumer

– 窗口聚合

– 將數(shù)據(jù)寫回Kafka的數(shù)據(jù)輸出端(Flink內(nèi)置的KafkaProducer

要使數(shù)據(jù)輸出端提供Exactly-Once保證,它必須將所有數(shù)據(jù)通過一個(gè)事務(wù)提交給Kafka。提交捆綁了兩個(gè)checkpoint之間的所有要寫入的數(shù)據(jù)。這可確保在發(fā)生故障時(shí)能回滾寫入的數(shù)據(jù)。

但是在分布式系統(tǒng)中,通常會有多個(gè)并發(fā)運(yùn)行的寫入任務(wù)的,簡單的提交或回滾是不夠的,因?yàn)樗薪M件必須在提交或回滾時(shí)“一致”才能確保一致的結(jié)果。

Flink使用兩階段提交協(xié)議及預(yù)提交階段來解決這個(gè)問題。

?

  • 預(yù)提交-內(nèi)部狀態(tài)

在checkpoint開始的時(shí)候,即兩階段提交協(xié)議的“預(yù)提交”階段。當(dāng)checkpoint開始時(shí),Flink的JobManager會將checkpoint barrier(將數(shù)據(jù)流中的記錄分為進(jìn)入當(dāng)前checkpoint與進(jìn)入下一個(gè)checkpoint)注入數(shù)據(jù)流。

brarrier在operator之間傳遞。對于每一個(gè)operator,它觸發(fā)operator的狀態(tài)快照寫入到state backend。

?

數(shù)據(jù)源保存了消費(fèi)Kafka的偏移量(offset),之后將checkpoint barrier傳遞給下一個(gè)operator。

這種方式僅適用于operator具有『內(nèi)部』狀態(tài)。所謂內(nèi)部狀態(tài),是指Flink state backend保存和管理的 -例如,第二個(gè)operator中window聚合算出來的sum值。當(dāng)一個(gè)進(jìn)程有它的內(nèi)部狀態(tài)的時(shí)候,除了在checkpoint之前需要將數(shù)據(jù)變更寫入到state backend,不需要在預(yù)提交階段執(zhí)行任何其他操作。Flink負(fù)責(zé)在checkpoint成功的情況下正確提交這些寫入,或者在出現(xiàn)故障時(shí)中止這些寫入。

?

  • 預(yù)提交-外部狀態(tài)

但是,當(dāng)進(jìn)程具有『外部』狀態(tài)時(shí),需要作些額外的處理。外部狀態(tài)通常以寫入外部系統(tǒng)(如Kafka)的形式出現(xiàn)。在這種情況下,為了提供Exactly-Once保證,外部系統(tǒng)必須支持事務(wù),這樣才能和兩階段提交協(xié)議集成。

在該示例中的數(shù)據(jù)需要寫入Kafka,因此數(shù)據(jù)輸出端(Data Sink)有外部狀態(tài)。在這種情況下,在預(yù)提交階段,除了將其狀態(tài)寫入state backend之外,數(shù)據(jù)輸出端還必須預(yù)先提交其外部事務(wù)。

?

當(dāng)checkpoint barrier在所有operator都傳遞了一遍,并且觸發(fā)的checkpoint回調(diào)成功完成時(shí),預(yù)提交階段就結(jié)束了。所有觸發(fā)的狀態(tài)快照都被視為該checkpoint的一部分。checkpoint是整個(gè)應(yīng)用程序狀態(tài)的快照,包括預(yù)先提交的外部狀態(tài)。如果發(fā)生故障,我們可以回滾到上次成功完成快照的時(shí)間點(diǎn)。

  • 提交階段

下一步是通知所有operator,checkpoint已經(jīng)成功了。這是兩階段提交協(xié)議的提交階段,JobManager為應(yīng)用程序中的每個(gè)operator發(fā)出checkpoint已完成的回調(diào)。

數(shù)據(jù)源和widnow operator沒有外部狀態(tài),因此在提交階段,這些operator不必執(zhí)行任何操作。但是,數(shù)據(jù)輸出端(Data Sink)擁有外部狀態(tài),此時(shí)應(yīng)該提交外部事務(wù)。

?

  • 總結(jié)

我們對上述知識點(diǎn)總結(jié)下:

1.一旦所有operator完成預(yù)提交,就提交一個(gè)commit。

2.如果只要有一個(gè)預(yù)提交失敗,則所有其他提交都將中止,我們將回滾到上一個(gè)成功完成的checkpoint。

3.在預(yù)提交成功之后,提交的commit需要保證最終成功 – operator和外部系統(tǒng)都需要保障這點(diǎn)。如果commit失敗(例如,由于間歇性網(wǎng)絡(luò)問題),整個(gè)Flink應(yīng)用程序?qū)⑹?#xff0c;應(yīng)用程序?qū)⒏鶕?jù)用戶的重啟策略重新啟動,還會嘗試再提交。這個(gè)過程至關(guān)重要,因?yàn)槿绻鹀ommit最終沒有成功,將會導(dǎo)致數(shù)據(jù)丟失。

4.完整的實(shí)現(xiàn)兩階段提交協(xié)議可能有點(diǎn)復(fù)雜,這就是為什么Flink將它的通用邏輯提取到抽象類TwoPhaseCommitSinkFunction中的原因。

?

代碼示例

Flink+Kafka實(shí)現(xiàn)End-to-End Exactly-Once

https://ververica.cn/developers/flink-kafka-end-to-end-exactly-once-analysis/

package cn.lanson.extend;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* Kafka --> Flink-->Kafka ?的End-To-End-Exactly-once* 直接使用* FlinkKafkaConsumer ?+ ?Flink的Checkpoint ?+ ?FlinkKafkaProducer*/
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//===========Checkpoint參數(shù)設(shè)置====//===========類型1:必須參數(shù)=============//設(shè)置Checkpoint的時(shí)間間隔為1000ms做一次Checkpoint/其實(shí)就是每隔1000ms發(fā)一次Barrier!env.enableCheckpointing(1000);//設(shè)置State狀態(tài)存儲介質(zhì)if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend("file:///D:/ckp"));} else {env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));}//===========類型2:建議參數(shù)===========//設(shè)置兩個(gè)Checkpoint 之間最少等待時(shí)間,如設(shè)置Checkpoint之間最少是要等?500ms(為了避免每隔1000ms做一次Checkpoint的時(shí)候,前一次太慢和后一次重疊到一起去了)//如:高速公路上,每隔1s關(guān)口放行一輛車,但是規(guī)定了兩車之前的最小車距為500menv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認(rèn)是0//設(shè)置如果在做Checkpoint過程中出現(xiàn)錯(cuò)誤,是否讓整體任務(wù)失敗:true是??false不是//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認(rèn)是trueenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認(rèn)值為0,表示不容忍任何檢查點(diǎn)失敗//設(shè)置是否清理檢查點(diǎn),表示?Cancel 時(shí)是否需要保留當(dāng)前的?Checkpoint,默認(rèn)?Checkpoint會在作業(yè)被Cancel時(shí)被刪除//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當(dāng)作業(yè)被取消時(shí),刪除外部的checkpoint(默認(rèn)值)//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當(dāng)作業(yè)被取消時(shí),保留外部的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//===========類型3:直接使用默認(rèn)的即可===============//設(shè)置checkpoint的執(zhí)行模式為EXACTLY_ONCE(默認(rèn))env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設(shè)置checkpoint的超時(shí)時(shí)間,如果?Checkpoint在?60s內(nèi)尚未完成說明該次Checkpoint失敗,則丟棄。env.getCheckpointConfig().setCheckpointTimeout(60000);//默認(rèn)10分鐘//設(shè)置同一時(shí)間有多少個(gè)checkpoint可以同時(shí)執(zhí)行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認(rèn)為1//=============重啟策略===========env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2.SourceProperties props_source = new Properties();props_source.setProperty("bootstrap.servers", "node1:9092");props_source.setProperty("group.id", "flink");props_source.setProperty("auto.offset.reset", "latest");props_source.setProperty("flink.partition-discovery.interval-millis", "5000");//會開啟一個(gè)后臺線程每隔5s檢測一下Kafka的分區(qū)情況//props_source.setProperty("enable.auto.commit", "true");//沒有Checkpoint的時(shí)候使用自動提交偏移量到默認(rèn)主題:__consumer_offsets中//props_source.setProperty("auto.commit.interval.ms", "2000");//kafkaSource就是KafkaConsumerFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props_source);kafkaSource.setStartFromLatest();//kafkaSource.setStartFromGroupOffsets();//設(shè)置從記錄的offset開始消費(fèi),如果沒有記錄從auto.offset.reset配置開始消費(fèi)//kafkaSource.setStartFromEarliest();//設(shè)置直接從Earliest消費(fèi),和auto.offset.reset配置無關(guān)kafkaSource.setCommitOffsetsOnCheckpoints(true);//執(zhí)行Checkpoint的時(shí)候提交offset到Checkpoint(Flink用),并且提交一份到默認(rèn)主題:__consumer_offsets(外部其他系統(tǒng)想用的話也可以獲取到)DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//3.Transformation//3.1切割出每個(gè)單詞并直接記為1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//value就是每一行String[] words = value.split(" ");for (String word : words) {Random random = new Random();int i = random.nextInt(5);if (i > 3) {System.out.println("出bug了...");throw new RuntimeException("出bug了...");}out.collect(Tuple2.of(word, 1));}}});//3.2分組//注意:批處理的分組是groupBy,流處理的分組是keyByKeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);//3.4將聚合結(jié)果轉(zhuǎn)為自定義的字符串格式SingleOutputStreamOperator<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {return value.f0 + ":::" + value.f1;}});//4.sink//result.print();Properties props_sink = new Properties();props_sink.setProperty("bootstrap.servers", "node1:9092");props_sink.setProperty("transaction.timeout.ms", 1000 * 5 + "");//設(shè)置事務(wù)超時(shí)時(shí)間,也可在kafka配置中設(shè)置/*FlinkKafkaProducer<String> kafkaSink0 = new FlinkKafkaProducer<>("flink_kafka",new SimpleStringSchema(),props_sink);*/FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka2",new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),props_sink,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);result.addSink(kafkaSink);//5.executeenv.execute();//測試://1.創(chuàng)建主題?/export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic flink_kafka2//2.開啟控制臺生產(chǎn)者?/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka//3.開啟控制臺消費(fèi)者?/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2}
}

?

???????Flink+MySQL實(shí)現(xiàn)End-to-End Exactly-Once

https://www.jianshu.com/p/5bdd9a0d7d02

  • 需求

1.checkpoint每10s進(jìn)行一次,此時(shí)用FlinkKafkaConsumer實(shí)時(shí)消費(fèi)kafka中的消息

2.消費(fèi)并處理完消息后,進(jìn)行一次預(yù)提交數(shù)據(jù)庫的操作

3.如果預(yù)提交沒有問題,10s后進(jìn)行真正的插入數(shù)據(jù)庫操作,如果插入成功,進(jìn)行一次checkpoint,flink會自動記錄消費(fèi)的offset,可以將checkpoint保存的數(shù)據(jù)放到hdfs中

4.如果預(yù)提交出錯(cuò),比如在5s的時(shí)候出錯(cuò)了,此時(shí)Flink程序就會進(jìn)入不斷的重啟中,重啟的策略可以在配置中設(shè)置,checkpoint記錄的還是上一次成功消費(fèi)的offset,因?yàn)楸敬蜗M(fèi)的數(shù)據(jù)在checkpoint期間,消費(fèi)成功,但是預(yù)提交過程中失敗了

5.注意此時(shí)數(shù)據(jù)并沒有真正的執(zhí)行插入操作,因?yàn)轭A(yù)提交(preCommit)失敗,提交(commit)過程也不會發(fā)生。等將異常數(shù)據(jù)處理完成之后,再重新啟動這個(gè)Flink程序,它會自動從上一次成功的checkpoint中繼續(xù)消費(fèi)數(shù)據(jù),以此來達(dá)到Kafka到Mysql的Exactly-Once。

?

  • 代碼1
    package cn.lanson.extend;import org.apache.flink.api.common.ExecutionConfig;
    import org.apache.flink.api.common.typeutils.base.VoidSerializer;
    import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
    import org.apache.kafka.clients.CommonClientConfigs;import java.sql.*;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Properties;public class Kafka_Flink_MySQL_EndToEnd_ExactlyOnce {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//方便測試env.enableCheckpointing(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setStateBackend(new FsStateBackend("file:///D:/ckp"));//2.SourceString topic = "flink_kafka";Properties props = new Properties();props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");props.setProperty("group.id","flink");props.setProperty("auto.offset.reset","latest");//如果有記錄偏移量從記錄的位置開始消費(fèi),如果沒有從最新的數(shù)據(jù)開始消費(fèi)props.setProperty("flink.partition-discovery.interval-millis","5000");//開一個(gè)后臺線程每隔5s檢查Kafka的分區(qū)狀態(tài)FlinkKafkaConsumer<ObjectNode> kafkaSource = new FlinkKafkaConsumer<>("topic_in", new JSONKeyValueDeserializationSchema(true), props);kafkaSource.setStartFromGroupOffsets();//從group offset記錄的位置位置開始消費(fèi),如果kafka broker 端沒有該group信息,會根據(jù)"auto.offset.reset"的設(shè)置來決定從哪開始消費(fèi)kafkaSource.setCommitOffsetsOnCheckpoints(true);//Flink執(zhí)行Checkpoint的時(shí)候提交偏移量(一份在Checkpoint中,一份在Kafka的默認(rèn)主題中__comsumer_offsets(方便外部監(jiān)控工具去看))DataStreamSource<ObjectNode> kafkaDS = env.addSource(kafkaSource);//3.transformation//4.SinkkafkaDS.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");//5.executeenv.execute();}
    }/**自定義kafka to mysql,繼承TwoPhaseCommitSinkFunction,實(shí)現(xiàn)兩階段提交。功能:保證kafak to mysql 的Exactly-OnceCREATE TABLE `t_test` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`value` varchar(255) DEFAULT NULL,`insert_time` datetime DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4*/
    class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> {public MySqlTwoPhaseCommitSink() {super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);}/*** 執(zhí)行數(shù)據(jù)入庫操作*/@Overrideprotected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {System.err.println("start invoke.......");String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.err.println("===>date:" + date + " " + objectNode);String value = objectNode.get("value").toString();String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";PreparedStatement ps = connection.prepareStatement(sql);ps.setString(1, value);ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));//執(zhí)行insert語句ps.execute();//手動制造異常if(Integer.parseInt(value) == 15) System.out.println(1/0);}/*** 獲取連接,開啟手動提交事務(wù)(getConnection方法中)*/@Overrideprotected Connection beginTransaction() throws Exception {String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";Connection connection = DBConnectUtil.getConnection(url, "root", "root");System.err.println("start beginTransaction......."+connection);return connection;}/*** 預(yù)提交,這里預(yù)提交的邏輯在invoke方法中*/@Overrideprotected void preCommit(Connection connection) throws Exception {System.err.println("start preCommit......."+connection);}/*** 如果invoke執(zhí)行正常則提交事務(wù)*/@Overrideprotected void commit(Connection connection) {System.err.println("start commit......."+connection);DBConnectUtil.commit(connection);}@Overrideprotected void recoverAndCommit(Connection connection) {System.err.println("start recoverAndCommit......."+connection);}@Overrideprotected void recoverAndAbort(Connection connection) {System.err.println("start abort recoverAndAbort......."+connection);}/*** 如果invoke執(zhí)行異常則回滾事務(wù),下一次的checkpoint操作也不會執(zhí)行*/@Overrideprotected void abort(Connection connection) {System.err.println("start abort rollback......."+connection);DBConnectUtil.rollback(connection);}
    }class DBConnectUtil {/*** 獲取連接*/public static Connection getConnection(String url, String user, String password) throws SQLException {Connection conn = null;conn = DriverManager.getConnection(url, user, password);//設(shè)置手動提交conn.setAutoCommit(false);return conn;}/*** 提交事務(wù)*/public static void commit(Connection conn) {if (conn != null) {try {conn.commit();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}/*** 事務(wù)回滾*/public static void rollback(Connection conn) {if (conn != null) {try {conn.rollback();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}/*** 關(guān)閉連接*/public static void close(Connection conn) {if (conn != null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}
    }

    ?

?

  • 代碼2
package cn.lanson.extend;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class DataProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "node1:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);try {for (int i = 1; i <= 20; i++) {DataBean data = new DataBean(String.valueOf(i));ProducerRecord record = new ProducerRecord<String, String>("flink_kafka", null, null, JSON.toJSONString(data));producer.send(record);System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(data));Thread.sleep(1000);}}catch (Exception e){System.out.println(e);}producer.flush();}
}@Data
@NoArgsConstructor
@AllArgsConstructor
class DataBean {private String value;
}

深度總結(jié)

Exactly-Once

流數(shù)據(jù)處理的容錯(cuò)語義:

At most once --最多一次, 也就是說數(shù)據(jù)最多被處理一次,有可能會丟失

At least once --至少一次, 也就是說數(shù)據(jù)至少會被處理一次,有可能會重復(fù)

Exactly-Once --精準(zhǔn)一次, 也就是說數(shù)據(jù)只會被處理一次,不會丟也不會重復(fù),注意: ==更準(zhǔn)確的理解應(yīng)該是只會被正確處理一次而不是僅一次==

End-to-End Exactly-Once

Flink不僅僅支持Exactly-Once,而且還支持End-to-End Exactly-Once

End-to-End Exactly-Once : 端到端的Exactly-Once, 也就是說, Flink不光光內(nèi)部處理數(shù)據(jù)的時(shí)候支持Exactly-Once, 在從Source消費(fèi), 到Transformation處理, 再到Sink,整個(gè)流數(shù)據(jù)處理,從一端到另一端 整個(gè)流程都支持Exactly-Once !

Flink如何支持End-to-End Exactly-Once的?

  • Source: offset+Checkpoint: 需要數(shù)據(jù)源支持offset維護(hù),如Kafka(offset) + Flink(使用Checkpoint維護(hù)offset) 就是支持

    FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase
    FlinkKafkaConsumerBase<T>  implements CheckpointedFunction
    源碼中就記錄了主題分區(qū)和offset信息
    ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates
    initializeState方法和snapshotState方法
  • Transformation:Checkpoint

    Flink官方介紹說的就是支持?jǐn)?shù)據(jù)流上的有狀態(tài)計(jì)算! Flink中的有狀態(tài)的Transformation-API都是自動維護(hù)狀態(tài)到的(到Checkpoint中),如sum/reduce/maxBy.....
  • Sink:兩階段事務(wù)提交+Checkpoint

    Flink+Kafka, Kafka是支持事務(wù)的,所以可以使用兩階段事務(wù)提交來實(shí)現(xiàn)
    FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction
    beginTransaction
    preCommit
    commit
    abort
    

兩階段事務(wù)提交協(xié)議

?

整個(gè)過程可以總結(jié)為下面階段:

1.一旦 Flink 開始做 checkpoint 操作,那么就beginTransaction開啟事務(wù)會進(jìn)入 pre-commit “預(yù)提交”階段,同時(shí)JobManager的Coordinator 會將 Barrier 注入數(shù)據(jù)流中 ;

2.當(dāng)所有的 barrier 在算子中成功進(jìn)行一遍傳遞(就是Checkpoint完成),并完成快照后,則“預(yù)提交”階段完成;

3.等所有的算子完成“預(yù)提交”,就會發(fā)起一個(gè)commit“提交”動作,但是任何一個(gè)“預(yù)提交”失敗都會導(dǎo)致 Flink 回滾到最近的 checkpoint(abort終止事務(wù));

注意: 兩階段事務(wù)提交本身的實(shí)現(xiàn)流程較為固定(主要就是4個(gè)方法的實(shí)現(xiàn)beginTransaction/preCommit/commit/abort), 但是代碼實(shí)現(xiàn)細(xì)節(jié)較為復(fù)雜,所以Flink提供了abstract class TwoPhaseCommitSinkFunction抽象類,并提供了Sink到Kafka的具體實(shí)現(xiàn):FlinkKafkaProducer extends TwoPhaseCommitSinkFunction里面已經(jīng)實(shí)現(xiàn)好了相應(yīng)的方法

注意: 小技巧:

面試時(shí)被問到. 需要說出幾個(gè)核心名詞: FlinkKafkaProducer 兩階段事務(wù)提交/TwoPhaseCommitSinkFunction/beginTransaction/preCommit/commit/abort

總結(jié)

以上是生活随笔為你收集整理的2021年大数据Flink(四十四):​​​​​​扩展阅读 End-to-End Exactly-Once的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

国产少妇在线观看 | 国产精品不卡在线观看 | 久久精品资源 | 四虎成人在线 | 精品自拍网 | 日本一区二区三区免费观看 | 在线观看精品视频 | 成 人 黄 色 视频 免费观看 | 日韩欧美99 | 美女网站视频免费黄 | 久久人人97超碰com | 国产99久久 | 免费看国产黄色 | 亚洲国产精品一区二区久久,亚洲午夜 | 成人毛片久久 | 国产精品国产三级在线专区 | 91在线一区二区 | 色资源在线 | 天天射天天射天天射 | 亚洲精品裸体 | 日韩午夜在线 | 亚洲精品国产精品乱码不99热 | 欧美日韩高清在线一区 | 精品天堂av| 最近高清中文字幕在线国语5 | 嫩小bbbb摸bbb摸bbb | 97福利| 日韩欧美高清在线 | 中文字幕av最新 | 久久久久国产精品免费网站 | 2023亚洲精品国偷拍自产在线 | 国产看片 色 | 91高清完整版在线观看 | 亚洲精品国产第一综合99久久 | 亚洲国产中文字幕在线观看 | 国产99久久久国产精品 | 高清不卡一区二区三区 | av电影中文字幕 | 国产二区电影 | 91激情小视频 | 亚洲免费高清视频 | 精品一区二区三区久久久 | 亚洲精品麻豆视频 | 国产亚洲精品综合一区91 | 最新国产精品亚洲 | 日日夜夜操av | 操老逼免费视频 | 国产美女在线免费观看 | 婷婷激情综合网 | av免费在线看网站 | 天天爽天天爽 | 免费日韩精品 | 在线视频免费观看 | a天堂在线看 | 久久久99久久 | 日韩av成人在线观看 | 综合婷婷久久 | 五月天婷婷在线观看视频 | 婷婷国产v亚洲v欧美久久 | 在线 成人 | 九九九九九九精品 | 国产丝袜一区二区三区 | 福利视频第一页 | 日韩一级片观看 | 美女视频黄是免费的 | 麻豆视频免费观看 | 欧美精品中文在线免费观看 | 日韩免费电影 | 精品在线一区二区三区 | 久久国产经典 | 夜色资源网 | 深夜免费福利视频 | 日本久久久久久科技有限公司 | 日韩欧美在线高清 | 成人av片免费观看app下载 | 六月丁香在线视频 | 国产不卡在线看 | sesese图片| 搡bbbb搡bbb视频 | 在线国产91 | 天天色天天操天天爽 | 婷婷精品国产欧美精品亚洲人人爽 | 精品视频久久 | 亚洲精品小区久久久久久 | 免费看片在线观看 | 黄在线免费看 | 99精品色 | 亚洲视频免费在线观看 | 五月天网页 | 亚州日韩中文字幕 | 狠狠色丁香九九婷婷综合五月 | 免费三级黄色片 | 午夜精品电影一区二区在线 | 狠狠色噜噜狠狠 | 免费无遮挡动漫网站 | 狠狠色丁香婷婷 | 国产免费观看久久黄 | 99久久婷婷国产精品综合 | 成人av在线直播 | 国产精品久久久久久久久久久不卡 | 国产一区二区在线播放视频 | 欧美日韩中文字幕综合视频 | 999一区二区三区 | 伊人日日干 | 亚洲视频,欧洲视频 | 日韩高清一二三区 | 99热这里只有精品在线观看 | 国产精品久久久久久五月尺 | 久久看毛片 | 夜夜干夜夜 | 天天操夜夜看 | 国产一卡二卡四卡国 | 黄色字幕网 | 少妇bbbb搡bbbb桶 | av免费看av | 在线观看日韩视频 | www欧美日韩 | 日韩视频免费在线观看 | 亚洲少妇xxxx | 日韩v在线 | 午夜成人免费电影 | 天天干,夜夜爽 | 国产91精品在线播放 | 日韩在线国产精品 | 久久这里只有精品首页 | 久久久久亚洲天堂 | 视频一区久久 | 蜜臀av夜夜澡人人爽人人桃色 | 黄色亚洲在线 | 成人国产精品 | 91亚洲精品久久久蜜桃网站 | 国产最新在线视频 | 欧美日韩另类在线 | 久久久免费看视频 | 天天操天天曰 | 精品国产一区二区三区久久久蜜月 | 成人免费观看大片 | 亚洲国产影院av久久久久 | 91久久影院 | 国产伦精品一区二区三区… | 欧美一级片在线免费观看 | 黄色资源在线观看 | 中文字幕资源在线 | 亚洲成人欧美 | 国产精品成久久久久 | 久久免费精品国产 | 国产一线天在线观看 | 91九色视频在线播放 | 日韩午夜精品福利 | 超碰.com| 久久久91精品国产一区二区精品 | 五月天亚洲综合小说网 | 国产一级片不卡 | 四虎成人精品永久免费av九九 | 国产一区播放 | 超碰在线观看97 | 色免费在线 | 久久久综合香蕉尹人综合网 | 中文字幕在 | 99精品欧美一区二区三区 | 免费看久久久 | av免费网 | www色| 激情婷婷在线 | 99国产成+人+综合+亚洲 欧美 | 99色国产 | 91亚洲影院 | 亚洲精品国产精品乱码不99热 | 久久一区二区三区日韩 | 中文在线8资源库 | 亚洲综合色视频在线观看 | 久久久久久综合 | 97人人模人人爽人人喊网 | 丁香婷婷自拍 | 久久精品网站视频 | 天天干天天操 | 久av电影 | 国产在线v | 91精品1区2区 | 亚洲一区二区黄色 | 黄色毛片网站在线观看 | 日韩视频在线播放 | 欧美人zozo| 免费午夜视频在线观看 | 综合久久网 | 99久久这里有精品 | 国产精品久久人 | 日日躁夜夜躁xxxxaaaa | 国产美女精品视频 | 精品一区二区在线观看 | 91精品第一页 | 九九九热精品免费视频观看 | 91国内在线 | 一区二区三区av在线 | 亚洲精品mv在线观看 | 中文字幕一区二区三区精华液 | 韩国av电影网 | 欧美a视频在线观看 | 在线看黄色av | 五月开心综合 | 国产精品成人在线 | 欧美色婷婷 | 婷婷深爱五月 | 欧美日韩中文字幕视频 | 国产欧美精品一区二区三区四区 | 91av99| 一本一道久久a久久精品 | 最新成人av| 国产99在线| 五月天九九| 在线不卡a | 亚洲欧美日本一区二区三区 | 精品999国产| 九九热av| 91丨九色丨91啦蝌蚪老版 | 97超碰在线人人 | 99性视频| 成人av免费看 | 黄色精品网站 | 在线免费观看羞羞视频 | 天天操天天舔天天干 | 国产精品a久久久久 | 天天操天天操天天操天天操天天操 | 最新av观看| 国产男女免费完整视频 | 丁香九月激情综合 | 国产美腿白丝袜足在线av | 国产亚洲精品美女久久 | 欧美激情视频免费看 | 精品国产一区二区三区在线观看 | 国语精品免费视频 | 成人一级在线观看 | 国产亚洲情侣一区二区无 | 狠狠色伊人亚洲综合成人 | 亚洲手机天堂 | 粉嫩aⅴ一区二区三区 | 久久久久久久久久电影 | 韩国一区二区三区视频 | 亚洲国产精品999 | 久久精品4 | 日韩av看片| 久久视影| 黄色三级在线看 | 国产视频久久久久 | 天天操天天射天天舔 | av久久久 | 天天射色综合 | 日韩美女久久 | 日本bbbb摸bbbb | 中文字幕乱码在线播放 | 国产精品igao视频网网址 | 在线观看网站黄 | 欧美激情视频三区 | 97超碰在线资源 | 久久久久久久久久亚洲精品 | 天天射天天干天天插 | 国产又粗又猛又色又黄视频 | av网址aaa | 国产成人精品久久亚洲高清不卡 | 亚洲黄色免费在线看 | 97av.com | 久久这里只有精品23 | 国产一区国产二区在线观看 | 去看片 | 又黄又爽的免费高潮视频 | 色综合天天干 | 亚洲午夜av久久乱码 | 久久精品99久久 | 欧美精品在线视频观看 | 午夜电影久久 | 久久精品免费观看 | 亚洲视频一区二区三区在线观看 | 91精品久久久久久久久久入口 | 色噜噜在线观看 | 婷婷五情天综123 | 午夜国产成人 | 国产一级二级三级视频 | 国产手机在线视频 | 久久久久99999 | 91看片成人 | 亚洲aⅴ在线观看 | 麻豆一区在线观看 | 超碰97在线资源 | 久久久久久久久久久影院 | 亚洲精品av在线 | 久艹视频免费观看 | av免费网 | 久久久久国产一区二区三区 | 97精品国自产拍在线观看 | 国产精品欧美久久久久久 | 九色在线视频 | 美女黄频在线观看 | www黄色大片 | 精品一区 在线 | 在线看毛片网站 | 99日韩精品 | 久久激情日本aⅴ | 亚洲精品视频在线播放 | 久久久久五月 | 国产精品国产亚洲精品看不卡15 | 99久久er热在这里只有精品15 | 97超碰影视 | 手机av永久免费 | 国产精品久久久久久久久久久免费 | 中文字幕第一页av | 亚洲一区精品二人人爽久久 | 亚洲精品激情 | 免费在线观看日韩 | 日韩毛片精品 | 欧美日韩三区二区 | 色婷婷在线播放 | 欧美特一级 | 欧美激情视频一区二区三区 | 激情五月婷婷网 | 日韩有码中文字幕在线 | 97超碰色| japanesexxxhd奶水| 在线观看av片| 波多野结衣视频一区二区 | 亚洲va天堂va欧美ⅴa在线 | 精品久久五月天 | 91精品在线免费观看视频 | 韩国av一区 | 国产专区免费 | 亚洲精品99| 在线国产中文 | 欧美一区中文字幕 | 免费人做人爱www的视 | 成人免费观看大片 | 日本不卡一区二区三区在线观看 | 国产精品一区电影 | 成人免费一区二区三区在线观看 | 中文字幕在线乱 | 国产精品18久久久久久不卡孕妇 | 国产高清精 | 在线a人v观看视频 | 91麻豆精品91久久久久同性 | 久青草国产在线 | 婷婷丁香激情综合 | 蜜桃av久久久亚洲精品 | 最新av电影网站 | 色综合五月天 | 黄色成人av | 亚洲黄色一级视频 | 亚洲精品久久久蜜桃直播 | 久久99欧美| 国产精品免费成人 | 碰超在线 | 亚洲老妇xxxxxx | 中文字幕视频免费观看 | 精品一区在线 | 韩日精品中文字幕 | 四虎4hu永久免费 | 国产丝袜制服在线 | 99精品国产成人一区二区 | 久久96国产精品久久99漫画 | 伊人色**天天综合婷婷 | 成 人 黄 色视频免费播放 | 欧美在线观看小视频 | 香蕉久久久久久久 | 国产一区免费 | 久久免费看毛片 | 波多野结衣在线中文字幕 | 中文字幕在线播放一区二区 | 亚洲视频免费在线看 | 黄色毛片视频免费观看中文 | 欧美激情视频一二区 | 在线视频成人 | 成年人视频在线免费播放 | 免费亚洲一区二区 | 国产美女在线免费观看 | 国产视频在线观看一区 | 日本中文字幕网 | 国产精品久久久久久久久久尿 | 久久综合九色 | 蜜臀aⅴ精品一区二区三区 久久视屏网 | 免费国产在线观看 | 午夜av网站| 免费能看的av | 国偷自产中文字幕亚洲手机在线 | 欧美不卡在线 | 亚洲精品国偷自产在线91正片 | 久久一二三四 | 亚在线播放中文视频 | 亚洲乱码久久久 | 免费大片黄在线 | 午夜免费福利片 | 天天爽夜夜爽人人爽曰av | 精品国产视频在线 | 欧美美女激情18p | 激情文学丁香 | 亚洲成a人片在线观看中文 中文字幕在线视频第一页 狠狠色丁香婷婷综合 | 91精品久久久久久久91蜜桃 | 深爱激情五月综合 | 亚洲开心色 | 波多野结衣视频一区 | 91精品看片 | 亚洲另类xxxx | 91精品对白一区国产伦 | 91精品国产麻豆 | 不卡av在线 | 国产精品视频一二三 | 在线精品一区二区 | 日韩精品五月天 | 99热亚洲精品 | 激情电影影院 | 在线观看中文字幕亚洲 | 精品人妖videos欧美人妖 | 亚洲性xxxx| 亚洲午夜精品一区 | 国产小视频在线观看 | 五月天综合色 | 人人干人人超 | 91高清免费在线观看 | 欧美精品免费一区二区 | 国产精品黄网站在线观看 | 久久国产精品电影 | 国产亚洲永久域名 | 亚洲成av人片在线观看香蕉 | 国产精品国产三级国产aⅴ无密码 | 一区视频在线 | 一本一本久久a久久精品综合小说 | 亚洲女欲精品久久久久久久18 | 久久精品一二三区白丝高潮 | 国产精品久久久久久婷婷天堂 | av一本久道久久波多野结衣 | 麻豆成人在线观看 | 中文字幕区 | 欧美有色| 国产一区二区久久久 | 亚州精品天堂中文字幕 | 在线观看成人av | 精品国产自在精品国产精野外直播 | 91大神精品视频在线观看 | 久热久草在线 | 99久久精品国产免费看不卡 | 999久久久久久久久 69av视频在线观看 | 五月综合激情婷婷 | 免费视频你懂得 | 国产成人久久精品77777综合 | 日韩免费小视频 | 久久久久久久久久久久av | 亚洲精品国偷拍自产在线观看蜜桃 | 91日韩在线播放 | 啪啪免费试看 | 国产精品 日韩 欧美 | 九九热精 | 欧美日韩高清一区二区 国产亚洲免费看 | 欧美最新大片在线看 | 午夜精品一二区 | 在线免费观看视频a | 波多野结衣电影一区二区三区 | 日韩一区二区三区免费视频 | 91精品国产欧美一区二区成人 | 久久亚洲视频 | 天天综合色天天综合 | 91精品毛片 | 国产色网| 天天看天天干 | 最新日韩中文字幕 | 国内精品久久久久久久影视麻豆 | 久久高清国产视频 | 视频91在线| 最近高清中文在线字幕在线观看 | 色婷婷www| 欧美激情第一区 | 中文字幕在线视频一区 | 国内精品久久久久久久久久久 | 又粗又长又大又爽又黄少妇毛片 | 久久精品站 | 国产精品黑丝在线观看 | 久久久久久免费网 | 国产精品美女久久久久久免费 | 亚洲涩涩一区 | 欧美xxxxx在线视频 | 国产精品欧美一区二区 | 国产一区二区三区免费视频 | 日本中文乱码卡一卡二新区 | 五月激情姐姐 | 成人av.com| 99r精品视频在线观看 | 日韩电影在线一区二区 | 欧美日韩在线免费观看视频 | 免费一级特黄录像 | 欧美精品久久久久久久久老牛影院 | 美女网站色| 精品在线一区二区三区 | 99精品久久只有精品 | 成人中文字幕在线观看 | 国产精品中文久久久久久久 | 免费黄a| 久久成人人人人精品欧 | av免费在线观看1 | 亚洲资源一区 | 亚洲欧美视频一区二区三区 | 在线韩国电影免费观影完整版 | 国产91欧美| 国产一区在线播放 | 久精品视频在线 | a黄色片 | 精品一区久久 | 午夜影视一区 | 99视频精品免费观看, | 欧美日韩免费一区 | 欧美一区二区三区不卡 | 国产精品成人免费精品自在线观看 | 亚洲精品一区二区三区新线路 | 黄色一级性片 | 国产99久久久国产精品成人免费 | 麻豆视频在线播放 | 中文亚洲欧美日韩 | 黄色毛片在线观看 | 亚洲va天堂va欧美ⅴa在线 | 制服丝袜在线91 | 成人精品视频 | 亚洲人成免费网站 | 午夜视频在线瓜伦 | 香蕉视频啪啪 | 亚洲国产精品va在线 | 久久久久久免费网 | 成人av电影免费在线观看 | 麻豆国产精品va在线观看不卡 | 欧美日韩视频在线观看免费 | 欧美analxxxx| 96精品视频 | 亚洲一区免费在线 | 成年人黄色大片在线 | 在线观看国产一区 | 国内外激情视频 | 看全黄大色黄大片 | 五月婷婷一区二区三区 | 成人网中文字幕 | 一级片视频在线 | 久久免费国产视频 | 久久免费a | 久久综合九色九九 | 色射爱| 97碰在线| 91毛片在线观看 | 一区二区三区在线不卡 | 一二三区av | 狠狠色狠狠综合久久 | 日韩理论在线 | 在线观看视频一区二区三区 | 国产色久 | 精品久久视频 | 亚洲伦理一区二区 | 久久久美女 | 欧美资源在线观看 | 亚洲免费a | 国产人在线成免费视频 | 欧美午夜a| 国产精品一区二区三区四区在线观看 | 成人在线观看免费 | www国产在线| 国产裸体bbb视频 | 国产视频亚洲精品 | 狠狠躁夜夜躁人人爽超碰91 | 国产一级做a爱片久久毛片a | 日日夜精品 | 欧美日韩中文字幕在线视频 | 国产精品av免费在线观看 | 国产精品久久久久久久久久东京 | 91在线蜜桃臀 | 黄色com| 国产一区二区三精品久久久无广告 | 99热在线这里只有精品 | 一区二区三区视频在线 | 丁香av | 亚洲人久久 | 97视频总站 | 99综合电影在线视频 | 亚洲精品美女久久 | 久久天堂影院 | 婷婷久久婷婷 | www.黄色片网站 | 日韩中文在线播放 | 在线观看亚洲精品视频 | 色婷婷综合久久久久中文字幕1 | 999久久久精品视频 日韩高清www | 日本91在线 | 欧美日韩性视频在线 | 欧美日韩精品免费观看视频 | 精品国产一区二区三区四 | 人人讲下载| 97超碰人 | 91精品久久久久久久久 | 韩国精品视频在线观看 | 亚洲第一区在线观看 | 免费看短| 亚洲va欧美va人人爽春色影视 | 免费涩涩网站 | 日韩精品一区二区三区高清免费 | 国产精品一区二区视频 | 日本精品久久久久久 | 日韩中文字幕国产 | 欧美成年黄网站色视频 | 日韩欧美在线影院 | 国产91综合一区在线观看 | 丁香久久综合 | 色噜噜狠狠狠狠色综合 | 中国一区二区视频 | 黄色一级片视频 | 美女在线国产 | 国产一区二区三区免费在线 | 麻豆视频在线观看免费 | 午夜 在线 | 免费久久片| 激情黄色一级片 | 亚州av免费| 在线视频免费观看 | 91九色在线 | 天天综合网入口 | 国内久久看 | 999久久久久 | 国产手机在线观看视频 | 久久久久激情视频 | 亚洲欧美激情精品一区二区 | www.五月婷 | 欧洲高潮三级做爰 | 精品一区二区久久久久久久网站 | 黄色小网站在线观看 | 人人爽人人av | 97精品视频在线 | 精品久久久久一区二区国产 | av资源免费在线观看 | 精品国产精品久久一区免费式 | 精品福利在线视频 | 精品视频国产一区 | 在线观看v片 | 成人av一区二区兰花在线播放 | 激情电影影院 | 在线观看av黄色 | a视频免费| 五月婷婷丁香在线观看 | 久久精品日产第一区二区三区乱码 | www.eeuss影院av撸 | 免费在线观看av网站 | 黄色的视频网站 | 欧美日韩视频观看 | 久久免费看 | 久久精品国产一区二区三区 | 日韩免费久久 | 国产色拍| 久久精品国产亚洲a | 日本特黄特色aaa大片免费 | 天天爱综合 | 亚洲成a人片77777潘金莲 | 亚洲精品456在线播放 | 91亚洲网 | 国产亚洲欧洲 | 午夜少妇 | 欧美激情精品久久久久久 | 色九九在线 | 日韩精品一区二区三区免费观看 | 欧美日韩在线免费观看视频 | 午夜精品久久久久久久99 | 超碰97人人在线 | 99精品视频免费看 | 亚洲国产精品影院 | 日韩有码中文字幕在线 | 亚洲精品美女久久久久 | 国产99久久久国产精品免费看 | 五月开心激情网 | 日韩高清 一区 | 欧美精品二 | 国产日韩精品在线观看 | 九九久久婷婷 | 国产午夜一区二区 | 色综合天天综合 | 久草在线手机观看 | 看国产黄色大片 | 在线观看精品国产 | 97色综合 | 91av久久| 在线黄av | 国产精品短视频 | 97人人精品 | 欧美在线观看禁18 | 国产成人久久 | 亚洲欧洲一级 | 欧美激情视频在线观看免费 | 亚洲国产剧情av | av黄网站 | 亚洲精品xxxx | 日日干,天天干 | 欧美日韩高清免费 | 一区二区三区精品在线 | 亚洲最新精品 | 最近中文字幕在线中文高清版 | 欧美 日韩 视频 | 欧美一区免费观看 | 久久精品国产免费观看 | 国产亚州av| 久草视频在线免费看 | 又爽又黄又无遮挡网站动态图 | 欧美色久 | 色综合婷婷 | 99精品国产福利在线观看免费 | 亚洲成a人片77777kkkk1在线观看 | 99精品视频在线 | 免费黄色在线网址 | 在线免费观看av网站 | 久久高清免费视频 | 三级黄色免费 | 色.www| 精品国产一区二区三区久久久蜜月 | 成人在线观看网址 | 五月婷婷视频 | 久久天天综合网 | 久久精品爱爱视频 | 久久特级毛片 | 伊人手机在线 | 二区三区精品 | 超碰大片| 免费视频久久久久久久 | 最近中文国产在线视频 | 97人人精品 | 亚洲一区二区高潮无套美女 | 国产免费人人看 | 成人a在线观看高清电影 | 国产99久久九九精品 | 91丨九色丨91啦蝌蚪老版 | 午夜影院一区 | 欧美做受高潮1 | 免费看搞黄视频网站 | 国产一级二级三级视频 | 99久久99久久精品国产片 | 欧美综合在线视频 | 麻豆94tv免费版 | 探花在线观看 | 精品国产一区二区久久 | 久久久久久欧美二区电影网 | 天天视频亚洲 | 在线99热| 蜜桃视频在线观看一区 | 狠狠色香婷婷久久亚洲精品 | 亚洲区二区 | 日韩一区二区久久 | 国产精品久久久久久久久久尿 | 亚洲影院色 | 久久亚洲综合国产精品99麻豆的功能介绍 | www.夜色.com | 992tv在线成人免费观看 | 国产专区欧美专区 | 91高清免费看 | 黄色在线看网站 | 久久99精品久久久久婷婷 | 999久久久精品视频 日韩高清www | 日韩在线观看视频在线 | 激情网综合| 青春草免费在线视频 | 精品国产人成亚洲区 | 国产一区二区手机在线观看 | 日韩理论片在线 | 国产免费av一区二区三区 | 蜜臀av性久久久久蜜臀aⅴ涩爱 | 国产无区一区二区三麻豆 | 夜夜高潮夜夜爽国产伦精品 | 综合婷婷久久 | 久久精品综合网 | 五月婷婷,六月丁香 | 国产日韩欧美综合在线 | av在线短片 | 中文字幕在线播放视频 | 精品国产一区二区三区久久影院 | 97人人模人人爽人人喊中文字 | 国产成人精品一区二区三区免费 | 91在线精品秘密一区二区 | 狠狠狠狠狠狠天天爱 | 91av免费在线观看 | av片子在线观看 | 午夜18视频在线观看 | 国产特级毛片 | 人交video另类hd | 又黄又爽又刺激视频 | 久久久久久久久爱 | 中文字幕资源在线观看 | av播放在线 | aaa日本高清在线播放免费观看 | 综合成人在线 | 国产精品原创在线 | av在线最新 | 日本久久中文字幕 | 韩国av一区二区三区 | 天天干天天操 | 五月天久久激情 | 国产视频资源在线观看 | 久久不见久久见免费影院 | 日日干夜夜干 | 久久综合九色九九 | 中文字幕在线一区二区三区 | 国产一区二区精品在线 | 欧美日韩精品在线免费观看 | 98超碰在线 | 一区二区三区四区不卡 | 久久久久久网站 | 午夜黄色 | 狠狠狠狠狠狠狠 | 国产精品一区二区三区在线看 | 国产亚洲精品久久久久久无几年桃 | 久久久久久国产精品亚洲78 | 日韩经典一区二区三区 | 欧美夫妻性生活电影 | 欧美日韩国产精品久久 | 国产精品123 | 日韩免费播放 | 色九九在线 | 亚洲欧美日韩在线看 | 久久久精品视频成人 | 久久精品—区二区三区 | 久草剧场| 黄av免费在线观看 | 日日操日日插 | 又黄又爽又无遮挡的视频 | 激情影音 | 激情影院在线 | 91人人澡人人爽 | 久久久免费在线观看 | 免费看成人 | 亚洲精品乱码久久久久久高潮 | 国内视频在线观看 | 成人国产一区二区 | 中文字幕在线观看不卡 | 97精品免费视频 | 最近免费中文字幕mv在线视频3 | 在线你懂的视频 | 美女网站色在线观看 | 五月在线 | 亚洲午夜小视频 | 久久久999精品视频 国产美女免费观看 | 伊人国产在线播放 | 能在线看的av | 国产中文字幕精品 | 久久精品视频在线观看 | 日本大尺码专区mv | 五月婷婷视频 | 精品在线播放视频 | 久久免费精品国产 | 亚洲精品福利在线 | 国内久久看 | 美女网站免费福利视频 | 国产专区免费 | 毛片网站在线观看 | 天天射网 | www在线免费观看 | 日韩欧美一区二区在线播放 | 日韩欧美一区二区三区视频 | 日韩精品一区二区三区在线视频 | 欧美性久久久久久 | 免费看片色 | 天天操夜夜爱 | 99精品国产一区二区三区不卡 | 在线观看亚洲免费视频 | 成人av影视 | 色多多视频在线观看 | 国产一级视频在线观看 | 蜜臀精品久久久久久蜜臀 | 99视频在线 | 国内精品久久久久国产 | 久久久久久久电影 | 国产一区二区视频在线 | 在线观看国产高清视频 | 91高清免费在线观看 | 91精品啪啪 | 国产在线综合视频 | 99视频在线精品免费观看2 | 国产中文字幕在线看 | 欧美91精品久久久久国产性生爱 | 黄色三级网站在线观看 | 国产成人精品一区二区三区在线 | bbbbb女女女女女bbbbb国产 | 丁香激情综合国产 | 99色在线视频 | 国产午夜精品一区二区三区四区 | 在线视频国产区 | 国产日韩欧美综合在线 | 国产亚洲欧洲 | 日韩欧美专区 | 在线 你懂 | 激情五月***国产精品 | 在线欧美中文字幕 | 午夜精选视频 | 免费在线观看日韩欧美 | 亚洲天堂毛片 | 国产精品日韩久久久久 | 在线观看www. | 黄色影院在线免费观看 | 在线观看一区二区精品 | 亚洲 欧美 精品 | 久久久久久久国产精品视频 | 日韩av手机在线看 | 99热在线免费观看 | 免费观看一区二区三区视频 | 色婷婷激情网 | 欧洲精品视频一区 | 成人免费视频a | 一区二区三区四区五区六区 | 亚洲精品高清视频在线观看 | 日韩高清在线看 | 九月婷婷综合网 | 国产精品69久久久久 | 午夜精品久久久久久久99 | 麻豆精品在线 | 日日干av| 久久久午夜电影 | 国产免费又爽又刺激在线观看 | 色综合www | 久久综合中文字幕 | 日b视频在线观看网址 | 婷婷视频在线播放 | 97国产精品免费 | 久久av免费 | 网站在线观看你们懂的 | 一区二区三区久久 | 在线免费日韩 | 国产精品国产亚洲精品看不卡15 | 久久成 | 91免费版在线观看 | 免费观看xxxx9999片 | 中文字幕国语官网在线视频 | 日韩在线观看你懂得 | 久久精品久久精品 | 亚洲 中文 欧美 日韩vr 在线 | 久草在线99 | 久久成人精品 | 日韩欧美综合在线视频 | 亚洲欧洲久久久 | 天天操月月操 | 日韩字幕在线 | 国产一区影院 | 色中色综合 | 国产黄色一级片在线 | 999视频网站 | 国产国语在线 | 亚洲欧洲日韩 | 性色av一区二区三区在线观看 | 国产在线播放一区二区三区 | 精品国产伦一区二区三区观看方式 | 97国产情侣爱久久免费观看 | 丁香视频全集免费观看 | 六月久久婷婷 | 亚洲成人资源网 | 国产精品久久网 | 97超碰福利久久精品 | www.xxx.性狂虐 | 久久久国产精品免费 | 伊人网综合在线观看 | 日韩av视屏| 99九九免费视频 | 人人射人人插 | 久久精品9 | 免费又黄又爽视频 | 91c网站色版视频 | 六月丁香色婷婷 | 香蕉视频最新网址 | 亚洲日韩精品欧美一区二区 | 欧美国产日韩久久 | 国产精品女 | 久久99国产精品免费网站 | 亚洲精品网址在线观看 | 国产在线观看黄 | 在线观看视频在线观看 | 91精品国产成人www | 久久综合给合久久狠狠色 | 国产视频2区 | 国产亚洲日 | 午夜精品久久久久久中宇69 | 色av男人的天堂免费在线 | 亚洲欧美日韩国产一区二区 | 免费av影视 | 波多野结衣一区二区 | 热久久免费视频精品 | 91人人人 | 国产精品免费在线播放 | 欧美孕妇视频 | 黄色av电影网 | 国内精品久久久久久久久久久 | 国产福利在线免费 | 婷婷丁香九月 | 天天爽天天搞 | 91在线资源 | 亚洲 欧美 另类人妖 | 免费久久网站 | 波多野结衣在线播放视频 | 69亚洲精品 | av中文电影 | 国产精品1区2区3区在线观看 | 日韩中文字幕免费在线播放 | 久久天天躁夜夜躁狠狠85麻豆 | 亚洲第一香蕉视频 | 色婷婷国产 | 99久久综合精品五月天 | 九九99视频 | 日韩欧美在线中文字幕 | 在线观看黄网站 | 国产精品www| 日韩欧美在线观看一区二区三区 |