日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

10分钟带你逆袭kafka之路

發布時間:2024/8/23 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 10分钟带你逆袭kafka之路 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

作者:故事凌

1. kafka概述

##1.1 kafka簡介

Apache Kafka 是一個快速、可擴展的、高吞吐的、可容錯的分布式“發布-訂閱”消息系統, 使用 Scala 與 Java 語言編寫,能夠將消息從一個端點傳遞到另一個端點,較之傳統的消息中 間件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、內置分區、支持消息副本和高容 錯的特性,非常適合大規模消息處理應用程序。

Kafka 官網: http://kafka.apache.org/

Kafka主要設計目標如下:

  • 以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間的訪問性能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條消息的傳輸。
  • 支持Kafka Server間的消息分區,及分布式消費,同時保證每個partition內的消息順序傳輸。
  • 同時支持離線數據處理和實時數據處理。
  • 支持在線水平擴展。

Kafka通常用于兩大類應用程序:

  • 建立實時流數據管道,以可靠地在系統或應用程序之間獲取數據
  • 構建實時流應用程序,以轉換或響應數據流

要了解Kafka如何執行這些操作,讓我們從頭開始深入研究Kafka的功能。

首先幾個概念:

  • Kafka在一個或多個可以跨越多個數據中心的服務器上作為集群運行。
  • Kafka集群將記錄流存儲在稱為主題的類別中。
  • 每個記錄由一個鍵,一個值和一個時間戳組成

1.2 kafka架構體系

1.3 kafka的應用場景

kafka的應用場景非常多, 下面我們就來舉幾個我們最常見的場景

1.3.1 用戶的活動跟蹤

用戶在網站的不同活動消息發布到不同的主題中心,然后可以對這些消息進行實時監測、實時處理。當然,也可以加載到Hadoop或離線處理數據倉庫,對用戶進行畫像。像淘寶、天貓、京東這些大型電商平臺,用戶的所有活動都要進行追蹤的。

1.3.2 日志收集

1.3.3 限流削峰

1.3.4 高吞吐率實現

Kafka與其他MQ相比,最大的特點就是高吞吐率。為了增加存儲能力,Kafka將所有的消息都寫入到了低速大容量的硬盤。按理說,這將導致性能損失,但實際上,Kafka仍然可以保持超高的吞吐率,并且其性能并未受到影響。其主要采用如下方式實現了高吞吐率。

  • 順序讀寫:Kafka將消息寫入到了分區partition中,而分區中的消息又是順序讀寫的。順序讀寫要快于隨機讀寫。
  • 零拷貝:生產者、消費者對于Kafka中的消息是采用零拷貝實現的。
  • 批量發送:Kafka允許批量發送模式。
  • 消息壓縮:Kafka允許對消息集合進行壓縮。
  • 1.4 kafka的優點

    1. 解耦:

    在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基于數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

    2. 冗余:(副本)

    有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

    3. 擴展性

    因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。

    4. 靈活性&峰值處理能力

    在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

    5. 可恢復性

    系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。

    6. 順序保證

    在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。

    7. 緩沖

    在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩沖有助于控制和優化數據流經過系統的速度。

    8. 異步通信

    很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

    1.5 kafka于其他MQ對比

    1. RabbitMQ

    RabbitMQ是使用Erlang編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更適合于企業級的開發。同時實現了Broker構架,這意味著消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。

    2. Redis

    Redis是一個基于Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它本身支持MQ功能,所以完全可以當做一個輕量級的隊列服務來使用。對于RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試數據分為128Bytes、512Bytes、1K和10K四個不同大小的數據。實驗表明:入隊時,當數據比較小時Redis的性能要高于RabbitMQ,而如果數據大小超過了10K,Redis則慢的無法忍受;出隊時,無論數據大小,Redis都表現出非常好的性能,而RabbitMQ的出隊性能則遠低于Redis。

    3. ZeroMQ

    ZeroMQ號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。ZeroMQ能夠實現RabbitMQ不擅長的高級/復雜的隊列,但是開發人員需要自己組合多種技術框架,技術上的復雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中間件的模式,你不需要安裝和運行一個消息服務器或中間件,因為你的應用程序將扮演這個服務器角色。你只需要簡單的引用ZeroMQ程序庫,可以使用NuGet安裝,然后你就可以愉快的在應用程序之間發送消息了。但是ZeroMQ僅提供非持久性的隊列,也就是說如果宕機,數據將會丟失。其中,Twitter的Storm 0.9.0以前的版本中默認使用ZeroMQ作為數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty作為傳輸模塊)。

    4. ActiveMQ

    ActiveMQ是Apache下的一個子項目。 類似于ZeroMQ,它能夠以代理人和點對點的技術實現隊列。同時類似于RabbitMQ,它少量代碼就可以高效地實現高級應用場景。

    5. Kafka/Jafka

    Kafka是Apache下的一個子項目,是一個高性能跨語言分布式發布/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既可以達到10W/s的吞吐速率;完全的分布式系統,Broker、Producer、Consumer都原生自動支持分布式,自動實現負載均衡;支持Hadoop數據并行加載,對于像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的并行加載機制統一了在線和離線的消息處理。Apache Kafka相對于ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。

    1.6 kafka的幾種重要角色

    1.6.1 kafka作為存儲系統

    任何允許發布與使用無關的消息發布的消息隊列都有效地充當了運行中消息的存儲系統。Kafka的不同之處在于它是一個非常好的存儲系統。

    寫入Kafka的數據將寫入磁盤并進行復制以實現容錯功能。Kafka允許生產者等待確認,以便直到完全復制并確保即使寫入服務器失敗的情況下寫入也不會完成。

    Kafka的磁盤結構可以很好地擴展使用-無論服務器上有50 KB還是50 TB的持久數據,Kafka都將執行相同的操作。

    由于認真對待存儲并允許客戶端控制其讀取位置,因此您可以將Kafka視為一種專用于高性能,低延遲提交日志存儲,復制和傳播的專用分布式文件系統。

    1.6.2 kafka作為消息傳遞系統

    Kafka的流概念與傳統的企業消息傳遞系統相比如何?

    傳統上,消息傳遞具有兩種模型:排隊和發布-訂閱。在隊列中,一組使用者可以從服務器中讀取內容,并且每條記錄都將轉到其中一個。在發布-訂閱記錄中廣播給所有消費者。這兩個模型中的每一個都有優點和缺點。排隊的優勢在于,它允許您將數據處理劃分到多個使用者實例上,從而擴展處理量。不幸的是,隊列不是多用戶的—一次進程讀取了丟失的數據。發布-訂閱允許您將數據廣播到多個進程,但是由于每條消息都傳遞給每個訂閱者,因此無法擴展處理。

    Kfka的消費者群體概念概括了這兩個概念。與隊列一樣,使用者組允許您將處理劃分為一組進程(使用者組的成員)。與發布訂閱一樣,Kafka允許您將消息廣播到多個消費者組。

    Kafka模型的優點在于,每個主題都具有這些屬性-可以擴展處理范圍,并且是多訂閱者-無需選擇其中一個。

    與傳統的消息傳遞系統相比,Kafka還具有更強的訂購保證。

    傳統隊列將記錄按順序保留在服務器上,如果多個使用者從隊列中消費,則服務器將按記錄的存儲順序分發記錄。但是,盡管服務器按順序分發記錄,但是這些記錄是異步傳遞給使用者的,因此它們可能在不同的使用者上亂序到達。這實際上意味著在并行使用的情況下會丟失記錄的順序。消息傳遞系統通常通過“專有使用者”的概念來解決此問題,該概念僅允許一個進程從隊列中使用,但是,這當然意味著在處理中沒有并行性。

    Kafka做得更好。通過在主題內具有并行性(即分區)的概念,Kafka能夠在用戶進程池中提供排序保證和負載均衡。這是通過將主題中的分區分配給消費者組中的消費者來實現的,以便每個分區都由組中的一個消費者完全消費。通過這樣做,我們確保使用者是該分區的唯一讀取器,并按順序使用數據。由于存在許多分區,因此仍然可以平衡許多使用者實例上的負載。但是請注意,使用者組中的使用者實例不能超過分區。

    1.6.3 kafka用作流處理

    僅讀取,寫入和存儲數據流是不夠的,目的是實現對流的實時處理。

    在Kafka中,流處理器是指從輸入主題中獲取連續數據流,對該輸入進行一些處理并生成連續數據流以輸出主題的任何東西。

    例如,零售應用程序可以接受銷售和裝運的輸入流,并輸出根據此數據計算出的重新訂購和價格調整流。

    可以直接使用生產者和消費者API進行簡單處理。但是,對于更復雜的轉換,Kafka提供了完全集成的Streams API。這允許構建執行非重要處理的應用程序,這些應用程序計算流的聚合或將流連接在一起。

    該功能有助于解決此類應用程序所面臨的難題:處理無序數據,在代碼更改時重新處理輸入,執行狀態計算等。

    流API建立在Kafka提供的核心原語之上:它使用生產者和使用者API進行輸入,使用Kafka進行狀態存儲,并使用相同的組機制來實現流處理器實例之間的容錯。

    2. kafka中的關鍵術語解釋

    2.1 Topic

    主題。在 Kafka 中,使用一個類別屬性來劃分消息的所屬類,劃分消息的這個類稱為 topic。 topic 相當于消息的分類標簽,是一個邏輯概念

    物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存于何處

    2.2 Partition

    分區。topic 中的消息被分割為一個或多個 partition,其是一個物理概念,對應到系統上 就是一個或若干個目錄。partition 內部的消息是有序的,但 partition 間的消息是無序的。

    2.3 Segment

    段。將 partition 進一步細分為了若干的 segment,每個 segment 文件的大小相等。

    2.4 Broker

    Kafka 集群包含一個或多個服務器,每個服務器節點稱為一個 broker。

    broker存儲topic的數據。如果某topic有N個partition,集群有N個broker,那么每個broker存儲該topic的一個partition。

    如果某topic有N個partition,集群有(N+M)個broker,那么其中有N個broker存儲該topic的一個partition,剩下的M個broker不存儲該topic的partition數據。

    如果某topic有N個partition,集群中broker數目少于N個,那么一個broker存儲該topic的一個或多個partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致Kafka集群數據不均衡。

    2.5 Producer

    生產者, 即消息的發布者. 生產者將數據發布到他們選擇的主題。生產者負責選擇將哪個記錄分配給主題中的哪個分區。即: 生產者生產的一條消息,會被寫入到某一個 partition。

    2.6 Consumer

    消費者。可以從 broker 中讀取消息。

    一個消費者可以消費多個 topic 的消息

    一個消費者可以消費同一個 topic 中的多個 partition 中的消息

    一個 partiton 允許多個 consumer 同時消費

    2.7 Consumer Group

    consumer group 是 kafka 提供的可擴展且具有容錯性的消費者機制。組內可以有多個消 費者,它們共享一個公共的 ID,即 group ID。組內的所有消費者協調在一起來消費訂閱主題 的所有分區。

    Kafka 保證同一個 consumer group 中只有一個 consumer 會消費某條消息,實際上,Kafka 保證的是穩定狀態下每一個 consumer 實例只會消費某一個或多個特定的 partition,而某個 partition 的數據只會被某一個特定的 consumer 實例所消費。

    下面我們用官網的一張圖, 來標識consumer數量和partition數量的對應關系

    由兩臺服務器組成的Kafka群集,其中包含四個帶有兩個使用者組的分區(P0-P3)。消費者組A有兩個消費者實例,組B有四個。

    其實對于這個消費組, 以前一直搞不明白, 我自己的總結是:

    topic中的partitoin到group是發布訂閱的通信方式,即一條topic的partition的消息會被所有的group消費,屬于一對多模式;group到consumer是點對點通信方式,屬于一對一模式。

    舉個例子: 不使用group的話,啟動10個consumer消費一個topic,這10個consumer都能得到topic的所有數據,相當于這個topic中的任一條消息被消費10次。

    使用group的話,連接時帶上groupid,topic的消息會分發到10個consumer上,每條消息只被消費1次

    2.8 Replizcas of partition

    分區副本。副本是一個分區的備份,是為了防止消息丟失而創建的分區的備份。

    2.9 Partition Leader

    每個 partition 有多個副本,其中有且僅有一個作為 Leader,Leader 是當前負責消息讀寫 的 partition。即所有讀寫操作只能發生于 Leader 分區上。

    2.10 Partition Follower

    所有Follower都需要從Leader同步消息,Follower與Leader始終保持消息同步。Leader 與 Follower 的關系是主備關系,而非主從關系。

    2.11 ISR

    • ISR,In-Sync Replicas,是指副本同步列表。 ISR列表是由Leader負責維護。
    • AR,Assigned Replicas,指某個 partition 的所有副本, 即已分配的副本列表。
    • OSR,Outof-Sync Replicas, 即非同步的副本列表。
    • AR = ISR + OSR

    2. 12 offset

    偏移量。每條消息都有一個當前Partition下唯一的64字節的offset,它是相當于當前分區第一條消息的偏移量。

    2.13 Broker Controller

    Kafka集群的多個broker中,有一個會被選舉controller,負責管理整個集群中partition和replicas的狀態。

    只有 Broker Controller 會向 zookeeper 中注冊 Watcher,其他 broker 及分區無需注冊。即 zookeeper 僅需監聽 Broker Controller 的狀態變化即可。

    2.14 HW與LEO

    • HW,HighWatermark,高水位,表示 Consumer 可以消費到的最高 partition 偏移量。HW 保證了 Kafka 集群中消息的一致性。確切地說,是保證了 partition 的 Follower 與 Leader 間數 據的一致性。
    • LEO,Log End Offset,日志最后消息的偏移量。消息是被寫入到 Kafka 的日志文件中的, 這是當前最后一個寫入的消息在 Partition 中的偏移量。
    • 對于 leader 新寫入的消息,consumer 是不能立刻消費的。leader 會等待該消息被所有 ISR 中的 partition follower 同步后才會更新 HW,此時消息才能被 consumer 消費。

    我相信你看完上面的概念還是懵逼的, 好吧, 下面我們就用圖來形象話的表示兩者的關系吧。

    2.15 zookeeper

    Zookeeper 負責維護和協調 broker,負責 Broker Controller 的選舉。

    在 kafka0.9 之前版本,offset 是由 zk 負責管理的。

    總結:zk 負責 Controller 的選舉,Controller 負責 leader 的選舉。

    2.16 Coordinator

    Coordinator一般指的是運行在每個broker上的group Coordinator進程,用于管理Consumer Group中的各個成員,主要用于offset位移管理和Rebalance。一個Coordinator可以同時管理多個消費者組。

    2. 17 Rebalance

    當消費者組中的數量發生變化,或者topic中的partition數量發生了變化時,partition的所有權會在消費者間轉移,即partition會重新分配,這個過程稱為再均衡Rebalance。

    再均衡能夠給消費者組及broker帶來高性能、高可用性和伸縮,但在再均衡期間消費者是無法讀取消息的,即整個broker集群有小一段時間是不可用的。因此要避免不必要的再均衡。

    2.18 offset commit

    Consumer從broker中取一批消息寫入buffer進行消費,在規定的時間內消費完消息后,會自動將其消費消息的offset提交給broker,以記錄下哪些消息是消費過的。當然,若在時限內沒有消費完畢,其是不會提交offset的。

    3. kafka的工作原理和過程

    3.1 消息寫入算法

    消息發送者將消息發送給broker, 并形成最終的可供消費者消費的log, 是已給比較復雜的過程:

    • producer先從zookeeper中找到該partition的leader
    • producer將消息發送給該leader
    • leader將消息接入本地的log, 并通知ISR的followers
    • ISR中的followers從leader中pull消息, 寫入本地log后向leader發送ack
    • leader收到所有ISR中的followers的ack后, 增加HW并向producer發送ack, 表示消息寫入成功

    3.2 消息路由策略

    在通過 API 方式發布消息時,生產者是以 Record 為消息進行發布的。Record 中包含 key 與 value,value 才是我們真正的消息本身,而 key 用于路由消息所要存放的 Partition。消息 要寫入到哪個 Partition 并不是隨機的,而是有路由策略的。

    • 若指定了 partition,則直接寫入到指定的 partition;
    • 若未指定 partition 但指定了 key,則通過對 key 的 hash 值與 partition 數量取模,該取模
    • 結果就是要選出的 partition 索引;
    • 若 partition 和 key 都未指定,則使用輪詢算法選出一個 partition。

    3.3 HW截斷機制

    如果 partition leader 接收到了新的消息, ISR 中其它 Follower 正在同步過程中,還未同 步完畢時 leader 宕機。此時就需要選舉出新的 leader。若沒有 HW 截斷機制,將會導致 partition 中 leader 與 follower 數據的不一致。

    當原 Leader 宕機后又恢復時,將其 LEO 回退到其宕機時的 HW,然后再與新的 Leader進行數據同步,這樣就可以保證老 Leader 與新 Leader 中數據一致了,這種機制稱為 HW 截斷機制。

    3.4 消息發送的可靠性

    生產者向 kafka 發送消息時,可以選擇需要的可靠性級別。通過 request.required.acks參數的值進行設置。

    1、0值

    異步發送。生產者向 kafka 發送消息而不需要 kafka 反饋成功 ack。該方式效率最高,但可靠性最低。其可能會存在消息丟失的情況。

    • 在傳輸過程中會出現消息丟失。
    • 在broker內部會出現消息丟失。
    • 會出現寫入到kafka中的消息的順序與生產順序不一致的情況。

    2、1值

    同步發送。生產者發送消息給 kafka,broker 的 partition leader 在收到消息后馬上發送 成功 ack(無需等等 ISR 中的 Follower 同步),生產者收到后知道消息發送成功,然后會再發送消息。如果一直未收到 kafka 的 ack,則生產者會認為消息發送失敗,會重發消息。

    該方式對于 Producer 來說,若沒有收到 ACK,一定可以確認消息發送失敗了,然后可以 重發;但是,即使收到了 ACK,也不能保證消息一定就發送成功了。故,這種情況,也可能 會發生消息丟失的情況。

    3、-1值

    同步發送。生產者發送消息給 kafka,kafka 收到消息后要等到 ISR 列表中的所有副本都 同步消息完成后,才向生產者發送成功 ack。如果一直未收到 kafka 的 ack,則認為消息發送 失敗,會自動重發消息。該方式會出現消息重復接收的情況。

    3.5 消費者消費過程解析

    生產者將消息發送到topitc中, 消費者即可對其進行消費, 其消費過程如下:

  • consumer向broker提交連接請求,其所連接上的broker都會向其發送broker controller的通信URL,即配置文件中的listeners地址;
  • 當consumer指定了要消費的topic后,會向broker controller發送消費請求;
  • broker controller會為consumer分配一個或幾個partition leader,并將該partition的當前offset發送給consumer;
  • consumer會按照broker controller分配的partition對其中的消息進行消費;
  • 當consumer消費完該條消息后,consumer會向broker發送一個消息已經被消費反饋,即該消息的offset;
  • 在broker接收到consumer的offset后,會更新相應的__consumer_offset中;
  • 以上過程會一直重復,知道消費者停止請求消費;

    Consumer可以重置offset,從而可以靈活消費存儲在broker上的消息。

    3.6 Partition Leader選舉范圍

    當leader宕機后,broker controller會從ISR中挑選一個follower成為新的leader。如果ISR中沒有其他副本怎么辦?可以通過unclean.leader.election.enable的值來設置leader選舉范圍。

    1、false

    必須等到ISR列表中所有的副本都活過來才進行新的選舉。該策略可靠性有保證,但可用性低。

    2、true

    在ISR列表中沒有副本的情況下,可以選擇任意一個沒有宕機的主機作為新的leader,該策略可用性高,但可靠性沒有保證。

    3.7 重復消費問題的解決方案

    1、同一個consumer重復消費

    當Consumer由于消費能力低而引發了消費超時,則可能會形成重復消費。

    在某數據剛好消費完畢,但是正準備提交offset時候,消費時間超時,則broker認為這條消息未消費成功。這時就會產生重復消費問題。

    **其解決方案:**延長offset提交時間。

    2、不同的consumer重復消費

    當Consumer消費了消息,但還沒有提交offset時宕機,則這些已經被消費過的消息會被重復消費。

    **其解決方案:**將自動提交改為手動提交。

    3.8 從架構設計上解決kafka重復消費的問題

    其實在開發的時候, 我們在設計程序的時候, 比如考慮到網絡故障等一些異常的情況, 我們都會設置消息的重試次數,

    可能還有其他可能出現消息重復, 那我們應該如何解決呢?

    下面提供三個方案:

    3.8.1 方案一: 保存并查詢

    給每個消息都設置一個獨一無二的uuid, 所有的消息, 我們都要存一個uuid, 我們在消費消息的時候, 首先去持久化系統中查詢一下, 看這個看是否以前消費過, 如沒有消費過, 在進行消費, 如果已經消費過, 丟棄就好了, 下圖, 表明了這種方案:

    3.8.2 方案二: 利用冪等

    冪等(Idempotence)在數學上是這樣定義的,如果一個函數 f(x) 滿足:f(f(x)) = f(x),則函數 f(x) 滿足冪等性。

    這個概念被拓展到計算機領域,被用來描述一個操作、方法或者服務。一個冪等操作的特點是,其任意多次執行所產生的影響均與一次執行的影響相同。一個冪等的方法,使用同樣的參數,對它進行多次調用和一次調用,對系統產生的影響是一樣的。所以,對于冪等的方法,不用擔心重復執行會對系統造成任何改變。

    我們舉個例子來說明一下。在不考慮并發的情況下,“將 X 老師的賬戶余額設置為 100 萬元”,執行一次后對系統的影響是,X 老師的賬戶余額變成了 100 萬元。只要提供的參數 100萬元不變,那即使再執行多少次,X 老師的賬戶余額始終都是 100萬元,不會變化,這個操作就是一個冪等的操作。

    再舉一個例子,“將 X 老師的余額加 100 萬元”,這個操作它就不是冪等的,每執行一次,賬戶余額就會增加 100 萬元,執行多次和執行一次對系統的影響(也就是賬戶的余額)是不一樣的。

    所以,通過這兩個例子,我們可以想到如果系統消費消息的業務邏輯具備冪等性,那就不用擔心消息重復的問題了,因為同一條消息,消費一次和消費多次對系統的影響是完全一樣的。也就可以認為,消費多次等于消費一次。

    那么,如何實現冪等操作呢?最好的方式就是,從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作。但是,不是所有的業務都能設計成天然冪等的,這里就需要一些方法和技巧來實現冪等。

    下面我們介紹一種常用的方法:利用數據庫的唯一約束實現冪等。

    例如,我們剛剛提到的那個不具備冪等特性的轉賬的例子:將 X 老師的賬戶余額加 100 萬元。在這個例子中,我們可以通過改造業務邏輯,讓它具備冪等性。

    首先,我們可以限定,對于每個轉賬單每個賬戶只可以執行一次變更操作,在分布式系統中,這個限制實現的方法非常多,最簡單的是我們在數據庫中建一張轉賬流水表,這個表有三個字段:轉賬單 ID、賬戶 ID 和變更金額,然后給轉賬單 ID 和賬戶 ID 這兩個字段聯合起來創建一個唯一約束,這樣對于相同的轉賬單 ID 和賬戶 ID,表里至多只能存在一條記錄。

    這樣,我們消費消息的邏輯可以變為:“在轉賬流水表中增加一條轉賬記錄,然后再根據轉賬記錄,異步操作更新用戶余額即可。”在轉賬流水表增加一條轉賬記錄這個操作中,由于我們在這個表中預先定義了“賬戶 ID 轉賬單 ID”的唯一約束,對于同一個轉賬單同一個賬戶只能插入一條記錄,后續重復的插入操作都會失敗,這樣就實現了一個冪等的操作。

    3.8.3 方案三: 設置前提條件

    為更新的數據設置前置條件另外一種實現冪等的思路是,給數據變更設置一個前置條件,如果滿足條件就更新數據,否則拒絕更新數據,在更新數據的時候,同時變更前置條件中需要判斷的數據。

    這樣,重復執行這個操作時,由于第一次更新數據的時候已經變更了前置條件中需要判斷的數據,不滿足前置條件,則不會重復執行更新數據操作。

    比如,剛剛我們說過,“將 X 老師的賬戶的余額增加 100 萬元”這個操作并不滿足冪等性,我們可以把這個操作加上一個前置條件,變為:“如果X老師的賬戶當前的余額為 500萬元,將余額加 100萬元”,這個操作就具備了冪等性。

    對應到消息隊列中的使用時,可以在發消息時在消息體中帶上當前的余額,在消費的時候進行判斷數據庫中,當前余額是否與消息中的余額相等,只有相等才執行變更操作。

    但是,如果我們要更新的數據不是數值,或者我們要做一個比較復雜的更新操作怎么辦?用什么作為前置判斷條件呢?更加通用的方法是,給你的數據增加一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致,如果不一致就拒絕更新數據,更新數據的同時將版本號 +1,一樣可以實現冪等。

    4 . kafka集群搭建

    我們在工作中, 為了保證環境的高可用, 防止單點, kafka都是以集群的方式出現的, 下面就帶領大家一起搭建一套kafka集群環境

    我們在官網下載kafka, 下載地址為: http://kafka.apache.org/downloads, 下載我們需要的版本, 推薦使用穩定的版本

    4.1 搭建集群

    1、下載并解壓

    cd /usr/local/src wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz mkdir /data/servers tar xzvf kafka_2.11-2.4.0.tgz -C /data/servers/ cd /data/servers/kafka_2.11-2.4.0

    2、修改配置文件

    kafka的配置文件$KAFKA_HOME/config/server.properties, 主要修改一下下面幾項:

    # 確保每個機器上的id不一樣broker.id=0# 配置服務端的監控地址listeners=PLAINTEXT://192.168.51.128:9092# kafka 日志目錄log.dirs=/data/servers/kafka_2.11-2.4.0/logs# kafka設置的partitons的個數num.partitions=1# zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群zookeeper.connect=192.168.51.128:2181

    因為我自己是本機做實驗, 所有使用的是一個主機的不同端口,在線上,,就是不同的機器,大家參考即可。

    我們這里使用kafka的zookeeper,只啟動一個節點,?但是正真的生產過程中,是需要zookeeper集群,自己搭建就好,后期我們也會出zookeeper的教程,大家請關注就好了。

    3、拷貝3份配置文件

    # 創建對應的日志目錄 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094# 拷貝三份配置文件 cp server.properties server_9092.properties cp server.properties server_9093.properties cp server.properties server_9094.properties

    修改不同端口對應的文件:

    # 9092的id為0, 9093的id為1, 9094的id為2broker.id=0# 配置服務端的監控地址, 分別在不通的配置文件中寫入不同的端口listeners=PLAINTEXT://192.168.51.128:9092# kafka 日志目錄, 目錄也是對應不同的端口log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092# kafka設置的partitons的個數num.partitions=1# zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群zookeeper.connect=192.168.51.128:2181

    4、修改zookeeper的配置文件

    dataDir=/data/servers/zookeeper server.1=192.168.51.128:2888:3888

    然后創建zookeeper的myid文件

    echo "1"> /data/servers/zookeeper/myid

    5、啟動zookeeper

    使用kafka內置的zookeeper

    cd /data/servers/kafka_2.11-2.4.0/bin zookeeper-server-start.sh -daemon ../config/zookeeper.properties netstat -anp |grep 2181

    6、啟動kafka

    ./kafka-server-start.sh -daemon ../config/server_9092.properties ./kafka-server-start.sh -daemon ../config/server_9093.properties ./kafka-server-start.sh -daemon ../config/server_9094.properties

    4.2 kafka的操作

    1. topic

    我們先來看一下創建topic常用的參數吧

    –create 創建topic

    –delete 刪除topic

    –alter 修改topic的名字或者partition個數

    –list 查看topic

    –describe 查看topic的詳細信息

    –topic <String: topic> 指定topic的名字

    –zookeeper <String: hosts> 指定zookeeper的連接地址,

    ? 參數提示并不贊成這樣使用

    ? DEPRECATED, The connection string for
    ? the zookeeper connection in the form
    ? host:port. Multiple hosts can be
    ? given to allow fail-over.

    –bootstrap-server <String: server to connect to>: 指定kafka的連接地址, 推薦使用這個,

    ? 參數的提示信息顯示

    ? REQUIRED: The Kafka server to connect

    to. In case of providing this, a direct Zookeeper connection won't be required.

    –replication-factor <Integer: replication factor> : 對于每個partiton的備份個數

    ? The replication factor for each
    ? partition in the topic being
    ? created. If not supplied, defaults
    ? to the cluster default.

    –partitions <Integer: # of partitions>: 指定該topic的分區的個數

    示例:

    cd /data/servers/kafka_2.11-2.4.0/bin # 創建topic test1 kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1 # 創建topic test2 kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test2 # 查看topic kafka-topics.sh --list --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094

    2、自動創建topic

    我們在工作中, 如果我們不想去管理topic, 可以通過kafka的配置文件來管理, 我們可以讓kafka自動創建topic, 需要在我們的kafka配置文件中加入如下配置文件

    auto.create.topics.enable=true

    如果刪除topic想達到物理刪除的目的, 也是需要配置的

    delete.topic.enable=true

    3、發送消息

    他們可以通過客戶端的命令生產消息

    先來看看kafka-console-producer.sh常用的幾個參數吧

    –topic <String: topic> 指定topic

    –timeout <Integer: timeout_ms> 超時時間

    –sync 異步發送消息

    –broker-list <String: broker-list> 官網提示: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2. 這個參數是必須的

    kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1

    4、消費消息

    我們也還是先來看看kafka-console-consumer.sh的參數吧

    –topic <String: topic> 指定topic

    –group <String: consumer group id> 指定消費者組

    –from-beginning : 指定從開始進行消費, 如果不指定, 就從當前進行消費

    –bootstrap-server : kafka的連接地址

    kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning

    4.3 kafka的日志

    kafka的日志分兩種:

    第一種日志: 是我們的kafka的啟動日志, 就是我們排查問題, 查看報錯信息的日志,

    第二種日志:就是我們的數據日志, kafka是我們的數據是以日志的形式存在存盤中的, 我們第二種所說的日志就是我們的partiton與segment

    那我們就來說說備份和分區吧

    我們創建一個分區, 一個備份, 那么test就應該在三臺機器上或者三個數據目錄只有一個test-0, (分區的下標是從0開始的)

    如果我們創建N個分區, 我們就會在三個服務器上發現, test_0-n

    如果我們創建M個備份, 我們就會在發現, test_0 到test_n 每一個都是M個

    5. kafaka API

    5.1 使用kafaka原生的api

    1.消費者自動提交:

    定義自己的生產者

    import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;/*** @ClassName MyKafkaProducer* @Description TODO* @Author lingxiangxiang* @Date 3:37 PM* @Version 1.0**/ public class MyKafkaProducer {private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer;public MyKafkaProducer() {Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 設置批量發送properties.put("batch.size", 16384);// 批量發送的等待時間50ms, 超過50ms, 不足批量大小也發送properties.put("linger.ms", 50);this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties);}public boolean sendMsg() {boolean result = true;try {// 正常發送, test2是topic, 0代表的是分區, 1代表的是key, hello world是發送的消息內容final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world");producer.send(record);// 有回調函數的調用producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}});// 自己定義一個類producer.send(record, new MyCallback(record));} catch (Exception e) {result = false;}return result;} }

    生產者測試類:

    在生產者測試類中,自己遇到一個坑, 就是最后自己沒有加sleep, 就是怎么檢查自己的代碼都沒有問題, 但是最后就是沒法發送成功消息, 最后加了一個sleep就可以了, 因為主函數main已經執行完退出, 但是消息并沒有發送完成, 需要進行等待一下.當然, 你在生產環境中可能不會遇到這樣問題, 呵呵, 代碼如下:

    import static java.lang.Thread.sleep;/*** @ClassName MyKafkaProducerTest* @Description TODO* @Author lingxiangxiang* @Date 3:46 PM* @Version 1.0**/ public class MyKafkaProducerTest {public static void main(String[] args) throws InterruptedException {MyKafkaProducer producer = new MyKafkaProducer();boolean result = producer.sendMsg();System.out.println("send msg " + result);sleep(1000);} }

    消費者類:

    import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer<Integer, String> consumer;public MyKafkaConsumer() {super("KafkaConsumerTest", false);Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");properties.put("group.id", "mygroup");properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("heartbeat.interval.ms", "10000");properties.put("auto.offset.reset", "earliest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");this.consumer = new KafkaConsumer<Integer, String>(properties);}@Overridepublic void doWork() {consumer.subscribe(Arrays.asList("test2"));ConsumerRecords<Integer, String>records = consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println("topic = " + record.topic());System.out.println("partition = " + record.partition());System.out.println("key = " + record.key());System.out.println("value = " + record.value());}} }

    消費者的測試類:

    /*** @ClassName MyConsumerTest* @Description TODO* @Author lingxiangxiang* @Date 4:23 PM* @Version 1.0**/ public class MyConsumerTest {public static void main(String[] args) {MyKafkaConsumer consumer = new MyKafkaConsumer();consumer.start();System.out.println("==================");} }

    2. 消費者同步手動提交

    前面的消費者都是以自動提交 offset 的方式對 broker 中的消息進行消費的,但自動提交 可能會出現消息重復消費的情況。所以在生產環境下,很多時候需要對 offset 進行手動提交, 以解決重復消費的問題。

    手動提交又可以劃分為同步提交、異步提交,同異步聯合提交。這些提交方式僅僅是 doWork()方法不相同,其構造器是相同的。所以下面首先在前面消費者類的基礎上進行構造 器的修改,然后再分別實現三種不同的提交方式。

    同步提交方式是,消費者向 broker 提交 offset 后等待 broker 成功響應。若沒有收到響 應,則會重新提交,直到獲取到響應。而在這個等待過程中,消費者是阻塞的。其嚴重影響了消費者的吞吐量。

    修改前面的MyKafkaConsumer.java, 主要修改下面的配置:

    import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer<Integer, String> consumer;public MyKafkaConsumer() {super("KafkaConsumerTest", false);Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");properties.put("group.id", "mygroup");// 這里要修改成手動提交properties.put("enable.auto.commit", "false");// properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("heartbeat.interval.ms", "10000");properties.put("auto.offset.reset", "earliest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");this.consumer = new KafkaConsumer<Integer, String>(properties);}@Overridepublic void doWork() {consumer.subscribe(Arrays.asList("test2"));ConsumerRecords<Integer, String>records = consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println("topic = " + record.topic());System.out.println("partition = " + record.partition());System.out.println("key = " + record.key());System.out.println("value = " + record.value());//手動同步提交consumer.commitSync();}} }

    3、消費者異步手工提交

    手動同步提交方式需要等待 broker 的成功響應,效率太低,影響消費者的吞吐量。異步提交方式是,消費者向 broker 提交 offset 后不用等待成功響應,所以其增加了消費者的吞吐量。

    import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer<Integer, String> consumer;public MyKafkaConsumer() {super("KafkaConsumerTest", false);Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");properties.put("group.id", "mygroup");// 這里要修改成手動提交properties.put("enable.auto.commit", "false");// properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("heartbeat.interval.ms", "10000");properties.put("auto.offset.reset", "earliest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");this.consumer = new KafkaConsumer<Integer, String>(properties);}@Overridepublic void doWork() {consumer.subscribe(Arrays.asList("test2"));ConsumerRecords<Integer, String>records = consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println("topic = " + record.topic());System.out.println("partition = " + record.partition());System.out.println("key = " + record.key());System.out.println("value = " + record.value());//手動同步提交// consumer.commitSync();//手動異步提交// consumer.commitAsync();// 帶回調公共的手動異步提交consumer.commitAsync((offsets, e) -> {if(e != null) {System.out.println("提交次數, offsets = " + offsets);System.out.println("exception = " + e);}});}} }

    5.2 springboot使用kafka

    現在大家的開發過程中, 很多都用的是springboot的項目, 直接啟動了, 如果還是用原生的API, 就是有點low了啊, 那kafka是如何和springboot進行聯合的呢?

    1. maven配置

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.1.1</version></dependency>

    2. 添加配置文件

    在application.properties中加入如下配置信息:

    # kafka 連接地址 spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094# 生產者 spring.kafka.producer.acks = 0 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.retries = 3 spring.kafka.producer.batch-size = 4096 spring.kafka.producer.buffer-memory = 33554432 spring.kafka.producer.compression-type = gzip# 消費者 spring.kafka.consumer.group-id = mygroup spring.kafka.consumer.auto-commit-interval = 5000 spring.kafka.consumer.heartbeat-interval = 3000 spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.auto-offset-reset = earliest spring.kafka.consumer.enable-auto-commit = true # listenner, 標識消費者監聽的個數 spring.kafka.listener.concurrency = 8 # topic的名字 kafka.topic1 = topic1

    3. 生產者

    import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate;@Service @Slf4j public class MyKafkaProducerServiceImpl implements MyKafkaProducerService {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;// 讀取配置文件@Value("${kafka.topic1}")private String topic;@Overridepublic void sendKafka() {kafkaTemplate.send(topic, "hell world");} }

    4. 消費者

    @Component @Slf4j public class MyKafkaConsumer {@KafkaListener(topics = "${kafka.topic1}")public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {log.info("----------------- record =" + record);log.info("------------------ message =" + kafkaMessage.get()); }

    總結

    以上是生活随笔為你收集整理的10分钟带你逆袭kafka之路的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    久久婷婷色 | 国内精品久久久久 | 免费久久99精品国产 | 最近高清中文字幕在线国语5 | 射九九 | 亚洲成人二区 | 欧美地下肉体性派对 | 激情综合啪 | 免费看污黄网站 | 国产成人亚洲精品自产在线 | 亚洲激情视频在线 | 久久精品一区二区三区中文字幕 | 最近能播放的中文字幕 | www.久久久 | 久久激情日本aⅴ | 色多多在线观看 | 波多野结衣视频一区 | 欧美一级乱黄 | 久久久影院官网 | 99精品免费在线 | 手机av在线不卡 | 亚洲欧洲日韩 | 成人一级电影在线观看 | 在线av资源 | 亚洲国产剧情 | 黄色精品一区二区 | 午夜视频在线观看一区二区三区 | av资源免费在线观看 | 日韩精品一区二区在线观看视频 | 亚洲欧美成人综合 | 波多野结衣电影一区 | 在线播放视频一区 | 正在播放一区二区 | 久久草av | 久久看片网站 | 欧美日韩精品在线 | 热久久国产精品 | 夜色.com | 一二区电影 | 看av免费网站 | 成人av免费在线 | 视频国产区| 欧美一区二区视频97 | 国产亚洲婷婷免费 | 97超碰国产精品女人人人爽 | 97精品久久人人爽人人爽 | 性色av一区二区三区在线观看 | 久久草在线视频国产 | 亚洲国产黄色片 | 国产精品久久久久国产精品日日 | 日韩三区在线观看 | 97视频免费看 | 日韩精品久久久久久 | 欧美激情精品久久久久久免费 | 欧美va天堂va视频va在线 | 国产一在线精品一区在线观看 | 日韩系列在线 | 伊甸园永久入口www 99热 精品在线 | 久久人人爽人人片av | 黄色aaa级片 | 免费在线观看成人 | 丰满少妇一级片 | 亚洲精品欧美视频 | 欧美一区日韩精品 | 精品久久久久亚洲 | 91亚洲夫妻 | 中文字幕在线观看免费高清电影 | 国产群p视频 | 草久久久| 日韩中文字幕免费看 | 欧美日韩中文另类 | 日韩videos高潮hd | 欧美精品v国产精品 | 久久久久久久影院 | 国产美女视频免费观看的网站 | 草久草久 | 国产91粉嫩白浆在线观看 | 日韩成人免费观看 | 亚av在线 | 在线黄网站 | 国产专区精品视频 | 一本一本久久a久久精品综合妖精 | 亚洲情感电影大片 | 日韩一区二区三 | 96精品视频 | 久久久久久久久久久久影院 | 久久亚洲欧美 | 在线观看一区 | 99久久精品免费 | 精品一二三区视频 | 天天干天天干天天操 | 国产高清在线永久 | 欧美国产精品久久久久久免费 | www欧美日韩 | 2024av| 福利一区在线 | 欧美在线1区 | 亚洲婷婷免费 | 日韩最新在线视频 | 玖玖玖影院 | 毛片视频网址 | 久久狠狠一本精品综合网 | 国产特级毛片aaaaaa高清 | 在线精品亚洲一区二区 | 亚洲精品黄色 | 日韩中文免费视频 | 国产高清视频免费 | 午夜在线日韩 | 狠狠狠狠狠狠狠狠干 | 九9热这里真品2 | 制服丝袜一区二区 | 日韩欧美在线中文字幕 | 夜夜躁日日躁狠狠久久88av | 天天碰天天操 | 国产99在线| 欧美日韩视频一区二区三区 | 最近中文国产在线视频 | 热久久精品在线 | 天天夜夜操 | 国产精品一区二区av日韩在线 | 婷婷成人亚洲综合国产xv88 | 婷五月激情| 日韩美女一级片 | 亚洲天堂网视频在线观看 | 亚洲成色777777在线观看影院 | 国产高清小视频 | 97在线播放视频 | 久久私人影院 | 欧美三级在线播放 | 一级黄色片在线免费观看 | 久久久久免费 | 超碰国产在线观看 | 久久爱992xxoo | 婷婷在线资源 | 国产精品麻豆三级一区视频 | 日韩av网页| 欧美 亚洲 另类 激情 另类 | 国产1级毛片 | 最新中文字幕在线资源 | 91精品国产92久久久久 | 国产免费小视频 | 久草在线视频首页 | 九九热免费精品视频 | www视频免费在线观看 | 久久久一本精品99久久精品 | 日本中文乱码卡一卡二新区 | 制服丝袜成人在线 | 日日夜夜天天射 | 国产免费叼嘿网站免费 | 在线观看视频一区二区三区 | 成人免费看黄 | 在线观看爱爱视频 | 欧美视频在线二区 | 五月婷婷,六月丁香 | 玖玖玖在线观看 | 亚洲少妇久久 | 婷婷伊人五月 | 天天天天天操 | 久久综合久久久久88 | 国产 在线观看 | 婷婷亚洲最大 | 在线观看视频你懂的 | 日日日操| 欧美资源在线观看 | 亚洲成人蜜桃 | 国产亚洲精品美女久久 | 69精品人人人人 | 狠狠干我 | 中文字幕日韩高清 | 国产大陆亚洲精品国产 | 欧美va天堂在线电影 | 国产又粗又猛又黄又爽 | 日韩高清不卡一区二区三区 | 日本 在线 视频 中文 有码 | 中文字幕成人一区 | 在线黄av | 亚洲精品国精品久久99热 | 99久久er热在这里只有精品15 | 国产成人a v电影 | 国产在线最新 | 美女网站视频免费都是黄 | 国产一区二区不卡视频 | 婷婷丁香激情五月 | 最近的中文字幕大全免费版 | www操操操| 欧美成人aa | 9999精品视频 | 人人澡超碰碰97碰碰碰软件 | 人人澡人 | 精品视频在线播放 | 色视频网站在线观看一=区 a视频免费在线观看 | 久久久久久蜜桃一区二区 | 久久久久亚洲精品男人的天堂 | 青草草在线 | 国产精品久久久久久久免费大片 | 久久久久久久久久久免费视频 | 欧美福利网站 | 五月天伊人网 | 日韩网站在线播放 | 在线免费观看麻豆视频 | 欧美性猛片 | 91香蕉视频 mp4| 欧美一级在线观看视频 | 一区二区三区中文字幕在线观看 | 在线观看www视频 | 精品国内自产拍在线观看视频 | 狠狠干美女 | 国产精品女教师 | 色在线亚洲 | 人人澡人人添人人爽一区二区 | 日本一区二区三区免费看 | 久久综合偷偷噜噜噜色 | 久久久久久久久久影视 | 精品视频国产 | 日本中出在线观看 | 天天操天天拍 | 在线免费观看国产 | 中文字幕在线视频国产 | 91精品国产欧美一区二区成人 | 久久成年人 | av中文字幕网 | 91精品国自产在线 | 亚洲精品国产综合99久久夜夜嗨 | 欧美日韩精品在线观看视频 | 激情亚洲综合在线 | 亚洲欧美日韩国产一区二区 | 中文字幕在线观看三区 | 国产又黄又爽又猛视频日本 | 免费久久久久久久 | 日韩国产欧美在线视频 | 欧美伦理一区二区三区 | 欧美午夜精品久久久久久浪潮 | 9999精品免费视频 | 国产一区二区三区免费视频 | 国产精品国产三级国产aⅴ9色 | 精品人人人| 999久久| 丁香高清视频在线看看 | 亚洲日本三级 | 亚洲一级理论片 | 91视频com| 精品国产一区二区久久 | 波多野结衣一区二区三区中文字幕 | 香蕉网站在线观看 | 天天操天天射天天插 | 中文在线字幕免费观看 | 免费观看国产精品视频 | 久久这里只有精品视频99 | www.久久爱.cn | 在线免费观看黄色大片 | 国产精品一区二区美女视频免费看 | 亚洲资源片 | 最近中文字幕高清字幕免费mv | 国产裸体视频网站 | 九九九九精品 | 国产成人精品日本亚洲999 | 深夜免费小视频 | 中文字幕在线观看日本 | 亚洲视频在线播放 | 欧美日韩午夜爽爽 | 狠狠色丁香婷婷综合久久片 | 不卡的一区二区三区 | 成人av片免费观看app下载 | 日韩在线免费观看视频 | 在线看小早川怜子av | 日韩欧美在线观看 | 在线观看网站黄 | 精品一区在线 | 亚洲电影免费 | 五月天欧美精品 | 99资源网 | 91喷水 | 天天插天天干天天操 | 国产成人久久av免费高清密臂 | 人人网人人爽 | 99久久99视频 | 成人久久久精品国产乱码一区二区 | 97国产精品视频 | 曰本免费av | 日韩免费观看一区二区三区 | 麻豆 91 在线| 在线观看国产www | 超碰电影在线观看 | 在线看黄色av | 日本久久高清视频 | 午夜国产一区二区三区四区 | 欧美一进一出抽搐大尺度视频 | 成人午夜剧场在线观看 | 天天射天天 | 成人黄色一级视频 | 久久综合五月天婷婷伊人 | 久久综合九色综合网站 | 97久久精品午夜一区二区 | 日本精品久久久久影院 | 中文在线a√在线 | 久久久久久久国产精品 | 深夜免费福利网站 | 亚洲一区二区三区毛片 | 亚洲日本精品 | 国产无遮挡又黄又爽在线观看 | 国产精品一区二区三区电影 | 国产精久久久久久妇女av | 欧美色婷婷| 久久麻豆精品 | 国产日韩精品视频 | 国产在线观看高清视频 | 美女黄濒 | 午夜成人免费电影 | 狠狠色丁香久久婷婷综合丁香 | 最近2019好看的中文字幕免费 | 亚洲精品一区二区三区新线路 | 中文字幕在线免费观看视频 | 久久夜色精品国产欧美一区麻豆 | 一区二区三区四区五区在线视频 | 久久97久久97精品免视看 | 日日夜夜爱 | 亚洲精品在线免费看 | 久久你懂得| 97超碰资源| 亚洲永久精品视频 | 开心激情五月婷婷 | 在线小视频 | 在线黄av | 91麻豆精品一区二区三区 | 91在线资源 | 在线免费视频a | 亚洲国产日韩一区 | 亚洲精品成人av在线 | 69久久99精品久久久久婷婷 | 国产色小视频 | 国产精品18久久久久vr手机版特色 | 91av在线电影| 免费www视频| 特及黄色片 | 18网站在线观看 | 欧美坐爱视频 | 特黄一级毛片 | 97色se| 成人黄色免费观看 | 国产精品视频永久免费播放 | 天天色天天操天天爽 | 国产精品婷婷午夜在线观看 | 亚洲区精品视频 | 午夜av在线电影 | 国产原厂视频在线观看 | 国产91精品看黄网站 | 久久久天天操 | 91视频a| 日韩国产高清在线 | 亚洲精品午夜国产va久久成人 | 五月婷婷视频 | 国产999视频在线观看 | 黄色avwww | 久久99视频| www.久久久 | 亚洲最新av | 综合色亚洲 | 欧美黑吊大战白妞欧美 | 久99热| 欧美日韩观看 | 五月婷综合 | 青春草视频在线播放 | 久久精品视频中文字幕 | 国产精品毛片久久久久久 | 一区二区三区国产欧美 | 国产在线观看a | 最近日本中文字幕 | 色婷婷久久久 | 一级黄色大片 | 中文字幕韩在线第一页 | 日韩黄色免费电影 | 国产日产av | 美女福利视频一区二区 | 国产小视频免费在线观看 | 天天操天 | 日韩一级电影网站 | 91亚瑟视频| 久久国产麻豆 | 偷拍精偷拍精品欧洲亚洲网站 | 欧美日韩国产欧美 | 久久激情五月婷婷 | 2021国产在线视频 | 久久免费的精品国产v∧ | 国产女人免费看a级丨片 | 国产福利电影网址 | 国产精品久久久久高潮 | 日韩中文字幕视频在线 | 草久中文字幕 | 久久99婷婷 | 男女免费av | 97精品国自产拍在线观看 | 天天天干天天射天天天操 | 91干干干| 97在线观看免费高清完整版在线观看 | 一区二区视频电影在线观看 | 黄色亚洲 | 天海冀一区二区三区 | 91av官网| 成人国产精品免费 | 国产精品免费观看网站 | 亚洲一区二区高潮无套美女 | 九九视频这里只有精品 | 在线免费观看的av | 精品国产视频在线 | 天天干 天天摸 天天操 | 婷婷视频| 精品国产免费一区二区三区五区 | 免费在线观看污网站 | 青青网视频 | av在线播放不卡 | 久久免费视频在线 | 91成人精品在线 | 久久久久欠精品国产毛片国产毛生 | 欧美一级片在线免费观看 | 色吊丝在线永久观看最新版本 | av免费网站 | 日韩在线视频网址 | 日韩免费| 久久久久五月天 | 久久综合干| 中文字幕久久久精品 | 韩国在线一区 | 狠狠色丁香婷婷综合欧美 | 91精品国产成人观看 | 黄网站a| 天天天天天操 | 中文字幕日本在线观看 | 国产打女人屁股调教97 | 天天色天天射天天综合网 | 99久久99视频只有精品 | av高清在线观看 | 中文字幕日韩高清 | 天天插天天干 | 激情影音 | 国产五月婷婷 | 国产成人精品一区二区在线 | 久久99偷拍视频 | 91亚洲夫妻 | 在线亚洲人成电影网站色www | 日韩一级成人av | 狠狠躁18三区二区一区ai明星 | 国产人成精品一区二区三 | 日本午夜在线观看 | 国产无遮挡又黄又爽在线观看 | 日韩精品欧美精品 | 黄色av一区二区 | 国产九九热视频 | av解说在线 | 中文av日韩 | 人人爽人人看 | 色网影音先锋 | 日韩在线中文字幕 | 国产精品久久久久一区二区 | 久草视频免费看 | 国产精品成人一区二区三区吃奶 | 国产aaa毛片 | 97色资源| av在线播放观看 | 亚洲高清av在线 | 亚洲美女在线一区 | 在线中文字幕视频 | 中文字幕中文 | 激情视频一区二区三区 | 麻豆国产网站 | 激情视频免费观看 | 亚洲美女精品 | 国产精品成久久久久三级 | 国产精品综合久久久 | 日本在线中文在线 | 亚洲永久精品国产 | 狠狠色婷婷丁香六月 | av在线收看 | 亚洲精品黄色 | 亚洲精品在线视频网站 | 中文av影院| 韩日成人av | 人人插人人爱 | 国产精品美女www爽爽爽视频 | 色偷偷97 | 五月黄色 | 九九久久婷婷 | 日韩久久精品一区 | 亚洲乱亚洲乱亚洲 | 色网站视频 | 日韩欧美电影在线 | 精品国偷自产国产一区 | 激情久久久久久久久久久久久久久久 | 久久视频一区 | 高潮久久久久久 | 日本h视频在线观看 | 国产精品久久亚洲 | 日韩精品欧美精品 | 欧美日韩中文在线观看 | 国产区网址 | 97视频资源| 欧美色噜噜 | 丁香激情综合 | 干亚洲少妇| 免费黄色一区 | 亚洲国产最新 | 啪啪小视频网站 | 婷五月天激情 | 又紧又大又爽精品一区二区 | 日韩av三区| 日韩精品在线观看视频 | 欧美激情精品久久久久久免费 | 97精品国产91久久久久久久 | 一级黄色毛片 | 91最新地址永久入口 | 激情综合五月网 | 中文字幕超清在线免费 | 永久av免费在线观看 | 99视频精品免费观看, | 国产麻豆果冻传媒在线观看 | 免费看v片| 亚洲伦理一区二区 | 亚洲国产电影在线观看 | 国内精品久久久久久久久久清纯 | 久久久久久久99 | 日日骑 | 999精品网| 日韩在线色视频 | 深夜免费福利视频 | 激情综合网色播五月 | 欧美成年人在线观看 | 国产成人精品av在线观 | 97福利在线 | 国内精品在线看 | a黄色一级片| 五月激情综合婷婷 | www.国产高清 | 久久不卡电影 | 人人藻人人澡人人爽 | 五月天综合在线 | 欧美嫩草影院 | 国产精品久久人 | 成人a在线 | 久久国产乱 | 婷婷综合 | 夜夜操天天 | 国产91在线免费视频 | 中文字幕有码在线 | 精品毛片久久久久久 | 日韩素人在线观看 | 全黄网站| 综合网天天 | 国产精品久久一区二区三区, | 亚洲综合激情网 | 久久精品日产第一区二区三区乱码 | 天堂av官网 | 在线电影a| 成年人黄色大片在线 | 91热| 久久久2o19精品 | 天天插夜夜操 | 免费h在线观看 | 中文字幕电影在线 | 国产录像在线观看 | 久久久久国 | 欧美人交a欧美精品 | 精品视频www | 午夜久久美女 | 亚洲视频在线视频 | 日韩欧美一区二区三区免费观看 | 精品福利片| av成人在线观看 | 在线观看黄色免费视频 | 国产精品ssss在线亚洲 | 91在线你懂的 | 99免费视频 | 国产真实精品久久二三区 | 久久综合之合合综合久久 | 高清不卡一区二区三区 | 国产亚洲精品久久久久久久久久 | 日韩在线免费高清视频 | 亚洲视频电影在线 | 中文字幕在线看视频国产中文版 | 久久国产区 | 五月婷在线 | av电影免费在线 | 欧美成人基地 | 欧美午夜精品久久久久久浪潮 | 国产午夜av | 一级黄色电影网站 | 能在线观看的日韩av | 中文字幕成人网 | 久青草视频 | 亚洲精品xxxx | 午夜在线看 | 91精品国产综合久久福利 | 人人干天天干 | 91免费网站在线观看 | 色插综合 | 亚洲午夜久久久综合37日本 | av.com在线| 日本黄色黄网站 | 国产一级片免费播放 | 国产区精品在线观看 | 免费人人干| 亚洲精品乱码久久久久久高潮 | 91麻豆免费视频 | 日韩在线不卡av | 精品视频在线看 | 国产一区二区三区 在线 | 婷婷深爱| 久久av网址 | 国产一区二区不卡视频 | 在线观看91视频 | 成人a级黄色片 | 久久视频免费 | 97色se | 国产日韩在线视频 | 91看片在线播放 | 玖玖在线免费视频 | 91资源在线免费观看 | 91热视频在线观看 | 精品国产一区二区三区四区在线观看 | 国产精品高清在线 | 怡红院久久 | 天天色天天骑天天射 | 国产精品你懂的在线观看 | 2018亚洲男人天堂 | 精品欧美一区二区三区久久久 | 97超碰在线视| 日韩中文字幕电影 | 久久久国产影院 | 五月天激情综合 | 久久99久久精品国产 | 亚洲婷婷在线 | 夜夜视频欧洲 | 亚洲精品美女久久 | 国产精品免费在线观看视频 | 国产精品亚洲a | 国产又黄又爽又猛视频日本 | 日本最大色倩网站www | 久久首页 | 99久久精| 日韩精品第1页 | 日日干网址 | 色在线网| 日韩av有码在线 | 亚洲第一久久久 | 激情欧美xxxx | 国产在线91在线电影 | 免费视频一级片 | 97在线看片 | 日韩在线影视 | 日韩高清精品免费观看 | 色婷婷www| 国产区高清在线 | 欧洲亚洲女同hd | 美女天天操 | 开心色婷婷 | 欧美少妇影院 | 丰满少妇在线观看网站 | 久久视频精品在线观看 | 国内精品亚洲 | 国产高清区 | 国产91在线 | 美洲 | 久久视频6 | 制服丝袜在线 | 69视频国产 | 免费精品在线视频 | 在线中文字幕观看 | 国产二区免费视频 | 欧美另类老妇 | 久久久精品视频成人 | 亚洲高清av | 国产婷婷在线观看 | av免费电影在线 | 久久 地址| 久久精品毛片 | 美女视频黄是免费的 | 亚州欧美视频 | www.久久视频 | 成人免费观看a | 国产成a人亚洲精v品在线观看 | 国产精品免费在线播放 | 国产欧美精品一区二区三区四区 | 国产精品一区二区久久精品爱微奶 | 国产成人一区二 | 人人澡澡人人 | 在线播放视频一区 | 久久人人射| 日韩av不卡在线 | 亚洲精欧美一区二区精品 | 成人欧美在线 | www夜夜操com | 欧美日韩另类在线 | 日本不卡久久 | 在线播放精品一区二区三区 | 精品在线观看免费 | 国产区网址 | 亚洲成人高清在线 | 色婷婷狠狠18 | 2019免费中文字幕 | 国产精品视频最多的网站 | 午夜美女福利直播 | 成人国产电影在线观看 | 国产最新福利 | 午夜免费福利片 | 97香蕉视频 | 在线免费观看黄色 | 四川妇女搡bbbb搡bbbb搡 | 亚洲视频axxx | 国产精品久久久久久电影 | 成人免费观看视频大全 | 亚洲电影黄色 | 亚洲午夜久久久综合37日本 | 国产精品 日韩 | 在线观看免费视频你懂的 | 日夜夜精品视频 | 日韩久久片| 在线看免费 | 久久人人爽av | 色网址99 | 久久久久久久久久免费 | 国产淫片免费看 | 日韩欧美视频在线观看免费 | 亚洲欧美一区二区三区孕妇写真 | 伊人狠狠干 | 精品在线视频一区二区三区 | 免费精品视频在线观看 | 美女视频黄在线 | 在线视频欧美亚洲 | 97视频免费在线看 | 9幺看片| 黄色片毛片 | 国产三级精品在线 | 亚洲黄色成人网 | 在线观看av网站 | 天天干天天干天天干天天干天天干天天干 | 国产精品免费视频网站 | 99这里都是精品 | 日韩欧美在线国产 | 九草在线视频 | 日韩一区在线播放 | 男女啪啪免费网站 | 亚洲成人午夜av | 亚洲春色综合另类校园电影 | 亚洲人久久 | 在线播放日韩av | 欧美在线1| 色99色 | 国产中出在线观看 | 国产特级毛片aaaaaa高清 | 夜色资源站国产www在线视频 | 天天天天爽| 欧美精品久久人人躁人人爽 | 91大神免费视频 | 久久激情视频 久久 | 亚洲欧美国内爽妇网 | 黄色小说在线免费观看 | 黄色影院在线免费观看 | 成年人电影毛片 | 日韩欧美视频一区二区 | 三级av在线播放 | 日本少妇高清做爰视频 | 美国av大片 | 国产精品小视频网站 | 国产又粗又猛又爽 | 亚州国产精品 | 国产精品久久一区二区三区, | 婷婷黄色片 | 免费观看xxxx9999片 | 四虎国产精品免费 | 国产乱视频 | 97理论电影| 亚洲91av| 精品久久亚洲 | 亚洲五月 | 久久人人97超碰国产公开结果 | 成人免费ⅴa| 中文字幕专区高清在线观看 | 国产精品一区二区三区在线看 | 91精品婷婷国产综合久久蝌蚪 | 91刺激视频| 97夜夜澡人人爽人人免费 | 亚洲狠狠丁香婷婷综合久久久 | 精品国产久 | 国产.精品.日韩.另类.中文.在线.播放 | 亚洲人成在线观看 | 中文国产在线观看 | 综合久久久 | 亚洲永久在线 | 色婷婷88av视频一二三区 | 亚洲欧美日韩精品久久奇米一区 | 国产精品第一视频 | 国产精品麻豆99久久久久久 | 久久99久久久久 | 久草男人天堂 | 免费看短| 91精品播放| 亚洲精选视频免费看 | 亚洲精品乱码久久久久久写真 | 91久久精品一区二区三区 | 日日操日日插 | 97色综合 | 精品国产伦一区二区三区观看体验 | 日韩成人精品一区二区 | 天天干国产| 九九免费在线观看视频 | av在观看| 最近能播放的中文字幕 | 日本精品一区二区三区在线观看 | 一区二区视频在线看 | 精品影院 | 国产免费观看高清完整版 | 欧美a级片免费看 | 狠狠躁日日躁夜夜躁av | 日操干| 天天综合网~永久入口 | av免费观看网站 | 国产精品成人久久 | 超碰在线亚洲 | 天天干,夜夜操 | 欧美激情va永久在线播放 | 免费成人在线观看视频 | 亚洲日本一区二区在线 | 伊人导航| 99精品免费久久久久久久久 | 在线欧美小视频 | 美女国产免费 | 在线免费观看视频一区 | 在线观看免费av网站 | 欧美黄在线 | 国产99久久久精品 | 日韩精品专区 | 国产精品网在线观看 | 亚洲国产精品成人精品 | 91麻豆精品久久久久久 | 国产精品久久久久久久午夜片 | 精品不卡av | 91传媒91久久久 | 99精品视频在线 | 97精品视频在线 | 97色在线观看免费视频 | 国产 日韩 在线 亚洲 字幕 中文 | 天天干,天天射,天天操,天天摸 | 久久只精品99品免费久23小说 | 成人天堂网 | 欧美少妇18p| 在线91精品 | 天堂在线一区 | 国产不卡视频在线 | 精品国产诱惑 | av 一区二区三区 | japanesexxxxfreehd乱熟 | 99日韩精品 | 97在线观看免费观看高清 | av一级在线| 青春草视频 | 欧美另类xxx | 亚洲片在线观看 | 欧美极品少妇xxxxⅹ欧美极品少妇xxxx亚洲精品 | 天天爽夜夜爽精品视频婷婷 | 亚洲国产人午在线一二区 | 欧美婷婷色 | 欧美一级片在线 | 国产美女精品人人做人人爽 | 国产精品美| 亚洲 中文字幕av | 狠狠躁夜夜a产精品视频 | 久久久久久久影院 | 天天操操操操操操 | 天天做天天爱天天爽综合网 | 久久国产精品久久国产精品 | 91日韩免费 | 亚洲久在线 | 国产黄在线 | 日韩三级精品 | 激情欧美日韩一区二区 | 亚洲黄色av | 成人免费观看大片 | 日韩精品久久久久久中文字幕8 | 久久99国产精品久久99 | 久久久污| 亚洲人人爱 | 久久人人爽人人人人片 | 91av视频在线观看 | av成人黄色 | 国产精品99久久久久久小说 | 91精选在线 | 日韩免费一区二区三区 | 天天干天天射天天插 | 91天堂素人约啪 | 国产区欧美 | 久久综合九色综合97_ 久久久 | 久久夜色精品国产欧美一区麻豆 | 久久久久久久久久久免费 | 亚洲精品福利在线 | a黄色片| 久久色亚洲 | 四虎影视成人永久免费观看亚洲欧美 | av免费在线看网站 | 91精品国产自产在线观看 | 97av在线视频 | 伊人影院99 | 欧美精品久久久久久久久老牛影院 | 日日干日日操 | 久久久久久久久久久综合 | 国产在线综合视频 | 天天插日日射 | 精品国产乱码久久久久久久 | 91最新网址在线观看 | 国产剧情一区二区在线观看 | 亚洲欧美精品一区二区 | 中文字幕在线视频第一页 | 国产香蕉视频 | 中文资源在线官网 | 天天色天天操综合网 | 最新久久免费视频 | 国产流白浆高潮在线观看 | 中文字幕免费观看视频 | 免费在线视频一区二区 | 天天操天天操天天 | 亚洲精品66 | 综合网婷婷 | 国产成人一区二区精品非洲 | 国产色视频一区 | 亚洲国产精品500在线观看 | 国产视频精品在线 | 黄网站色视频免费观看 | 一级欧美日韩 | 中国成人一区 | 午夜精品一区二区三区免费视频 | 丁香视频在线观看 | 亚洲激情在线播放 | 九九九九精品 | 韩国av在线播放 | 日本久久中文字幕 | 成年在线观看 | 久久久国产在线视频 | 最近日韩免费视频 | 婷婷综合伊人 | 日韩最新中文字幕 | 国产精品高清免费在线观看 | 欧美成人黄色 | 91av视频免费在线观看 | 国产精品久久久久婷婷二区次 | 国产精品一区二区三区在线看 | 在线观看岛国av | 国产精品自产拍 | www.啪啪.com| 狠狠干五月天 | 国产精品免费一区二区三区 | 色综合久久久久久久久五月 | 婷婷色影院 | 免费观看成年人视频 | 欧美日韩免费一区二区 | 一级精品视频在线观看宜春院 | 手机在线日韩视频 | 亚洲精品色婷婷 | 色噜噜狠狠狠狠色综合久不 | 国产片网站 | 亚洲区精品 | 久草在线久 | 亚洲自拍自偷 | 玖玖精品视频 | 国产精品成人一区二区三区吃奶 | 免费在线a | 免费看黄在线 | 狠狠色狠狠色综合日日92 | 国产午夜在线观看视频 | 麻豆系列在线观看 | 久久国语露脸国产精品电影 | 国产理论片在线观看 | 91香蕉视频好色先生 | 日韩在线色视频 | 97av.com| 91视频在线观看下载 | 婷婷国产视频 | 色是在线视频 | 国产精品免费人成网站 | 美女精品国产 | 91一区二区在线 | 久香蕉| 日本黄色免费电影网站 | 成人小电影在线看 | 免费黄色网址网站 | 亚洲精品视频一二三 | 免费视频久久久久久久 | 狠狠色噜噜狠狠狠 | 91麻豆精品国产91久久久久 | 视频福利在线 | 乱子伦av| 99亚洲精品在线 | 日日操天天操狠狠操 | 成人免费视频在线观看 | 日批在线看 | 亚州av网站| 欧美色图亚洲图片 | 国产香蕉97碰碰碰视频在线观看 | 色小说在线 | 开心激情五月婷婷 | 91免费在线看片 | 成人av免费在线播放 | 日韩av影片在线观看 |