日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka官方文档翻译——实现

發(fā)布時間:2023/12/19 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka官方文档翻译——实现 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?

IMPLEMENTATION

1. API Design

Producer APIs

Producer API封裝了底層兩個Producer:

  • kafka.producer.SyncProducer
  • kafka.producer.async.AsyncProducer
class Producer {/* Sends the data, partitioned by key to the topic using either the *//* synchronous or the asynchronous producer */public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);/* Sends a list of data, partitioned by key to the topic using either *//* the synchronous or the asynchronous producer */public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);/* Closes the producer and cleans up */public void close();}

這么做的目的是通過一個簡答的API暴露把所有Producer的功能暴露給Client。Kafka的Producer可以:

  • 排隊/緩存多個發(fā)送請求并且異步的批量分發(fā)出去:

kafka.producer.Producer提供批量化多個發(fā)送請求(producer.type=async),之后進行序列化并發(fā)送的分區(qū)的能力。批大小可以通過一些配置參數(shù)進行設(shè)置。將事件加入到queue中,他們會被緩沖在queue中,直到滿足queue.time或batch.size達到配置的值。后臺線程(kafka.producer.async.ProducerSendThread)從queue中獲取數(shù)據(jù)并使用kafka.producer.EventHandler對數(shù)據(jù)進行序列化并發(fā)送到合適的分區(qū)。可以通過event.handler參數(shù)以插件的形式添加自定義的event handler程序。在producer queue pipeline處理的各個階段可以注入回調(diào),用于自定義的日志/跟蹤代碼或者監(jiān)控邏輯。這可以通過實現(xiàn)kafka.producer.async.CallbackHandler接口并設(shè)置callback.handler參數(shù)來實現(xiàn)。

  • 使用用戶指定的Encoder來序列化數(shù)據(jù):
interface Encoder<T> {public Message toMessage(T data);}

默認使用kafka.serializer.DefaultEncoder。

  • 通過用戶可選的Partitioner來實現(xiàn)亂負載均衡:

Partition的路由由kafka.producer.Partitioner決定。

interface Partitioner<T> {int partition(T key, int numPartitions);}

分區(qū)選擇API使用key和分區(qū)總數(shù)來選擇最終的partition(返回選擇的partition id)。id用于從排序的partition列表中選擇最終的一個分區(qū)去發(fā)送數(shù)據(jù)。默認的分區(qū)策略是hash(key)%numPartitions。如果key是null,會隨機選擇一個分區(qū)。可以通過partitioner.class參數(shù)來配置特定的分區(qū)選擇策略。

Consumer APIs

我們有兩個級別的Consumer API。低級別的“簡單的”API和單個Broker之間保持鏈接并且和發(fā)送到服務(wù)端的網(wǎng)絡(luò)請求有緊密的對應關(guān)系。這個API是無狀態(tài)的,每個請求都包含offset信息,允許用戶維護這個元數(shù)據(jù)。

高級別的API在Consumer端隱藏了Broker的細節(jié),并且允許從集群消費數(shù)據(jù)而不關(guān)心底層的拓撲結(jié)構(gòu)。同樣維持了“哪些數(shù)據(jù)已經(jīng)被消費過”的狀態(tài)。高級別的API還提供了通過表達式訂閱的Topic的功能(例如通過白名單或者黑名單的方式訂閱)。

Low-level API

class SimpleConsumer {/* Send fetch request to a broker and get back a set of messages. */public ByteBufferMessageSet fetch(FetchRequest request);/* Send a list of fetch requests to a broker and get back a response set. */public MultiFetchResponse multifetch(List<FetchRequest> fetches);/*** Get a list of valid offsets (up to maxSize) before the given time.* The result is a list of offsets, in descending order.* @param time: time in millisecs,* if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available.* if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.*/public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);}

低級別的API用于實現(xiàn)高級別的API,也被直接使用在一些在狀態(tài)上有特殊需求的“離線”Consumer。

High-level API

/* create a connection to the cluster */ConsumerConnector connector = Consumer.create(consumerConfig);interface ConsumerConnector {/*** This method is used to get a list of KafkaStreams, which are iterators over* MessageAndMetadata objects from which you can obtain messages and their* associated metadata (currently only topic).* Input: a map of <topic, #streams>* Output: a map of <topic, list of message streams>*/public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);/*** You can also obtain a list of KafkaStreams, that iterate over messages* from topics that match a TopicFilter. (A TopicFilter encapsulates a* whitelist or a blacklist which is a standard Java regex.)*/public List<KafkaStream> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);/* Commit the offsets of all messages consumed so far. */public commitOffsets()/* Shut down the connector */public shutdown()}

這個API圍繞迭代器,通過KafkaStream類實現(xiàn)。一個KafkaStream表示了一個或多個分區(qū)(可以分布在不同的Broker上)組成的消息流。每個Stream被單個線程處理,客戶端可以在創(chuàng)建流時提供需要的個數(shù)。這樣,一個流背后可以是多個分區(qū),但是一個分區(qū)只會屬于一個流。

createMessageStreams調(diào)用會吧Consumer注冊到Topic,促使Consumer/Broker的重新分配。API鼓勵在單次調(diào)用中創(chuàng)建多個Stream以減少充分配的次數(shù)。createMessageStreamsByFilter方法的調(diào)用(另外的)用于注冊watcher去發(fā)現(xiàn)匹配過濾規(guī)則的topic。createMessageStreamsByFilter返回的迭代器可以迭代來此多個Topic的消息(如果多個Topic都符合過濾規(guī)則)。

2. Network Layer

網(wǎng)絡(luò)層是一個直接的NIO Server,不會詳細的描述。sendfile是通過給MessageSet增加writeTo方法實現(xiàn)的。這允許使用文件存儲的消息用更高效的transferTo實現(xiàn)代替讀取數(shù)據(jù)到進程內(nèi)的處理。線程模型是簡單的單acceptor多processor,每個線程處理固定數(shù)量的連接。這種設(shè)計已經(jīng)被很好的測試并且是易于實現(xiàn)的。協(xié)議非常簡單,以便在將來實現(xiàn)其他語言的客戶端。

3. Messages

消息包含一個固定大小的頭,一個變長且“不透明”的byte數(shù)組表示的key和一個變長且“不透明”的byte數(shù)組表示的內(nèi)容。header包含以下內(nèi)容:

  • A CRC32 checksum to detect corruption or truncation.
  • A format version.
  • An attributes identifier
  • A timestamp

保持key和value的不透明是正確的決定:現(xiàn)在序列化方面取得了巨大的進展,選擇特定的一個序列化方式都不太適合所有的場景。不用說使用Kafka的特定程序可能需要特定的序列化方式。MessageSet接口是批量讀寫消息到NIO Channel的迭代器。

4. Message Format

/*** 1. 4 byte CRC32 of the message* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version* bit 0 ~ 2 : Compression codec.* 0 : no compression* 1 : gzip* 2 : snappy* 3 : lz4* bit 3 : Timestamp type* 0 : create time* 1 : log append time* bit 4 ~ 7 : reserved* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0* 5. 4 byte key length, containing length K* 6. K byte key* 7. 4 byte payload length, containing length V* 8. V byte payload*/

5. Log

包含兩個partition,名稱為“my_topic”的Topic的日志包含兩個目錄(名稱為my_topic_0和my_topic_1),其中包含該Topic的消息的數(shù)據(jù)文件。日志文件的格式是log entry的序列;每個log entry都是4字節(jié)的消息長度N加上后面N個字節(jié)的消息數(shù)據(jù)。每條消息都有一個64位的offset標識這條消息在這個Topic的Partition中的偏移量。消息在磁盤中的存儲格式如下所示。每個日志文件都以它存儲的第一條消息的offset命名。所以第一個文件會命名為00000000000.kafka,隨后每個文件的文件名將是前一個文件的文件名加上S的正數(shù),S是配置中指定的單個文件的大小。

消息的確切的二進制格式都有版本,它保持一個標準的接口,讓消息集可以根據(jù)需要在Producer、Broker、Consumer之間傳輸而不需要重新拷貝或者轉(zhuǎn)換,其格式如下:

On-disk format of a messageoffset : 8 bytes message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)crc : 4 bytesmagic value : 1 byteattributes : 1 bytetimestamp : 8 bytes (Only exists when magic value is greater than zero)key length : 4 byteskey : K bytesvalue length : 4 bytesvalue : V bytes

使用消息的Offset作為消息ID是不常見的。我們初始的想法是在Producer生成一個GUID作為Message ID,并在Broker上維持ID和Offset之間的映射關(guān)系。但是因為Consumer需要為每個Server維持一個ID,那么GUID的全局唯一性就變得沒什么意義了。此外,維持一個隨機的ID和Offset的映射關(guān)系將給索引的構(gòu)建帶來巨大的負擔,本質(zhì)上需要一個完整的持久化的隨機存取的數(shù)據(jù)結(jié)構(gòu)。因此,為了簡化查找結(jié)構(gòu),我們決定使用每個分區(qū)的原子計數(shù)器,它可以和分區(qū)ID加上ServerID來唯一標識一條消息。一旦使用了計數(shù)器,直接使用Offset進行跳轉(zhuǎn)是順其自然的,兩者都是分區(qū)內(nèi)單調(diào)遞增的整數(shù)。由于偏移量從消費者API中隱藏起來,因此這個決定是最終的實現(xiàn)細節(jié),所以我們采用更有效的方法。

Writes

日志允許連續(xù)追加到最后一個文件。當文件達到配置的大小時(如1GB)將滾動到一個新文件。日志采用兩個配置:M,配置達到多少條消息后進行刷盤;S,配置多長時間之后進行刷盤。這個持久化策略保證最多只丟失M條消息或者S秒之內(nèi)的消息。

Reads

讀取通過提供64位的offset和S-byte的chunk大小來實現(xiàn)。這將返回包含在S-byte的buffer的消息跌代替。S比任意單條消息都大,但是如果在異常的超大消息的情況下,讀取操作可以通過多次重試,每次都將buffer大小翻倍,直到消息被讀取成功。最大消息大小和buffer大小可以配置,用于拒絕超過特定大小的消息,以限制客戶端讀取消息時需要拓展的buffer大小。buffer可能以不完整的消息作為結(jié)尾,這可以通過消息大小來輕松的檢測到。

實際的讀取操作首先需要定位offset所在的文件,再將offset轉(zhuǎn)化為文件內(nèi)相對的偏移量,然后從文件的這個偏移量開始讀取數(shù)據(jù)。搜索操作通過內(nèi)存中映射的文件的簡單的二分查找完成。

日志提供了獲取最近寫入消息的能力以允許客戶端從“當前時間”開始訂閱。這在客戶端無法在指定天數(shù)內(nèi)消費掉消息的場景中非常有用。在這種情況下,如果客戶端嘗試消費一個不存在的offset將拋出OutOfRangeException異常并且可以根據(jù)場景重置或者失敗。

這面是發(fā)送到Consumer的數(shù)據(jù)的格式:

MessageSetSend (fetch result)total length : 4 byteserror code : 2 bytesmessage 1 : x bytes...message n : x bytes MultiMessageSetSend (multiFetch result)total length : 4 byteserror code : 2 bytesmessageSetSend 1...messageSetSend n

Deletes

數(shù)據(jù)刪除一次刪除一個日志段。日志管理器允許通過插件的形式實現(xiàn)刪除策略來選擇那些文件是合適刪除的。當前的刪除策略是日志文件的修改時間已經(jīng)過去N天,保留最近N GB數(shù)據(jù)的策略也是有用的。為了避免刪除時鎖定讀取操作,我們采用copy-on-write的方式來實現(xiàn),以保證一致性的視圖。

Guarantees

日志提供了配置參數(shù)M,用于控制寫入多少條消息之后進行一次刷盤。在日志恢復過程中遍歷最新的日志段的所有消息并驗證每一條消息是有效的。如果消息的大小和偏移量之和小于文件長度并且消息的CRC32和存儲的CRC相同,那么消息是有效的。在異常事件被檢測到時,日志會被截取到最后一條有效的消息的offset。

兩種錯誤需要處理:因為Crash導致的written塊丟失和無意義的block被添加到文件中。這樣做的原因是,一般的操作系統(tǒng)不保證file inode和實際數(shù)據(jù)塊之間的寫入順序,所以除了丟失丟失written data,文件還會獲得無意義的數(shù)據(jù),在inode更新大小但是在block寫入數(shù)據(jù)之前。CRC檢測這個錯誤并防止損壞日志(unwritten的消息肯定會丟失)。

6. Distribution

Consumer Offset Tracking

high-level的Consume保持它自己消費過的每個分區(qū)的最大的offset并且周期性的提交,所以可以在重啟的時候恢復offset信息。Kafka提供在offset manager中保存所有offset的選項。任何Consumer實例都需要發(fā)送offset到offset manager。high-level的Consumer自動化的處理offset。如果使用simple consumer,需要自己手動管理offset。在Java simple consumer中現(xiàn)在還不支持,Java simple consumer只能從ZooKeeper提交和獲取offset。如果使用Scala simple consumer,你會找到offset manager并且可以明確指定向offset manager提交和獲取offset。Consumer通過向Broker發(fā)送GroupCoordinatorRequest請求并獲取包含offset manager的GroupCoordinatorResponse來獲取offset。之后Consumer可以向offset manager提交和獲取offset。如果offset manager移動,Consumer需要重新發(fā)現(xiàn)。如果你期望手動管理offset,可以查看這些解釋如果提交OffsetCommitRequest和OffsetFetchRequest的代碼。

當offset manager收到OffsetCommitRequest,將其添加到一個特殊的、壓縮的,成為_consumeroffsets的Kafka topic中。offset manager返回一個成功的offset commit的響應給Consumer,當所有的備份都收到offset之后。如果在配置的timeout時間內(nèi)所有副本沒有完成備份,commit offset認為是失敗的,將在之后重試(high-consumer將自動執(zhí)行)。broker周期性的壓縮offset信息,因為它只需要保存每個Partition最近的offset信息即可。為了更快的響應獲取offset的請求,offset manager也會在內(nèi)存緩存offset數(shù)據(jù)。

當offset manager收到fetch offset的請求,它從cache中返回最近commit的offset。如果offset manager是剛啟動或者剛成為一些group的offset manager(通過成為一下offset topic的leader partition),它需要加載offset信息。這種情況下,fetch offset request或返回OffsetsLoadInProgress異常,Consumer需要之后重試(high-level consumer會自動處理)。

Migrating offsets from ZooKeeper to Kafka

Kafka較早的版本將offset信息存儲在ZooKeeper中。可以通過以下步驟將這些數(shù)據(jù)遷移到Kafka中:

  • 在Consumer配置中設(shè)置offsets.storage=kafka,dual.commit.enabled=true
  • 驗證Consumer正常
  • 設(shè)置dual.commit.enabled=false
  • 驗證Consumer正常
  • 回滾(從Kafka到ZooKeeper)也可以通過以上的步驟執(zhí)行,只需要將offsets.storage設(shè)置為zookeeper。

    ZooKeeper Directories

    以下說明ZooKeeper用于統(tǒng)籌Consumer和Broker的結(jié)構(gòu)和算法。

    Notation

    路徑中標志位[xyz]表示xyz是不固定的,如/topics/[topic]表示/topics下每個topic都有一個對應的目錄。[0...5]表示0,1,2,3,4,5的序列。->符號用于指示一個節(jié)點的值,如/hello -> world表示/hello存儲的值是“world”。

    Broker Node Registry

    /brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)

    這是一個所有存在的Broker的節(jié)點列表,每個都提供一個唯一標識用于Consumer識別(必須作為配置的一部分)。在啟動時,Broker通過在/brokers/ids中創(chuàng)建一個znode來注冊自己。使用邏輯上的Broker ID的目的是可以在不影響Consumer的前提下將Broker移動到另外的物理機上。如果嘗試注冊一個已經(jīng)存在的ID的Broker會失敗。

    一旦Broker通過臨時節(jié)點注冊到ZK,注冊信息是動態(tài)的并且在Broker宕機或者關(guān)閉后會丟失(那么通知消費者Broker不再可用)。

    Broker Topic Registry

    /brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)

    每個Broker將自己注冊到它包含的Topic下面,并保存Topic的partition數(shù)量。

    Consumers and Consumer Groups

    Consumer同樣將自己注冊到ZK中,為了協(xié)調(diào)其他的Consumer并且做消費數(shù)據(jù)的負載均衡。Consumer還可以通過offsets.storage=zookeeper將offset信息也存儲在ZK中。但是這個存儲一直在未來的版本中將被廢棄。因此建議將offset信息遷移到Kafka中。

    多個Consumer可以組成一個集群共同消費一個Topic。一個Group中的每個Consumer實例共享一個group_id。

    一個Group中的Consumer盡可能公平的分配partition,每個partition只能被一個group中的一個Consumer消費。

    Consumer Id Registry

    除了一個group內(nèi)的所有Consumer實例共享一個groupid,每個Consumer實例還擁有一個唯一的consumerid(hostname:uuid)用于區(qū)分不同的實例。Consumer的id注冊在以下的目錄中。

    /consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)

    每個Consumer將自己注冊到Group目錄下并創(chuàng)建一個包含id的znode。節(jié)點的值包含<topic, #stream="">的Map。id用于標識group中哪些Consumer是活躍的。節(jié)點是臨時節(jié)點,所以在Consumer進程關(guān)閉之后節(jié)點會丟失。

    Consumer Offsets

    Consumer記錄每個分區(qū)消費過的最大的offset信息。如果配置了offsets.storage=zookeeper,這個數(shù)據(jù)會被記錄到ZK中。

    /consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)

    Partition Owner registry

    每個Partition都被一個group內(nèi)的一個Consumer消費。這個Consumer必須在消費這個Partition之前建立對這個Partition的所有權(quán)。為了建立所有權(quán),Consumer需要將id寫入到Partition下面。

    /consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)

    Cluster Id

    Cluster id是唯一且不可變的,用于表示Cluster。Cluster id最長可以擁有22個字符,由[a-zA-Z0-9_-]+組成。從概念上講,它在Cluster第一次啟動的時候生成。

    實現(xiàn)層面上,它在Broker(0.10.1或更新的版本)第一次成功啟動后產(chǎn)生。Broker在啟動時嘗試從/cluster/id節(jié)點獲取cluster id。如果不存在,Broker穿件一個新的cluster id寫入到這個節(jié)點中。

    Broker node registration

    Broker節(jié)點基本上是相互獨立的,所以他們只是發(fā)布他們自己擁有的信息。當一個Broker加入時,它將自己注冊到broker node,并寫入自己的信息(host name和port)。Broker還將自己的Topic和Partition注冊到對應的目錄中。新Topic會被動態(tài)的注冊,當他們在Broker上創(chuàng)建的時候。

    Consumer registration algorithm

    當啟動一個Consumer時,它按照如下步驟操作:

  • 在group下注冊自己的consumer id
  • 注冊監(jiān)聽器用于監(jiān)聽新Consumer的加入和Consumer的關(guān)閉,Consumer變更都會觸發(fā)Partition的分配(負載均衡)。
  • 注冊監(jiān)聽器用于監(jiān)聽Broker的加入和關(guān)閉,Broker變更都會觸發(fā)Partition的分配(負載均衡)。
  • 如果Consumer通過filter創(chuàng)建了一個消息流,同樣會注冊一個監(jiān)聽器用于監(jiān)聽新topic的加入。
  • 強制自己在消費group內(nèi)重新平衡。
  • Consumer rebalancing algorithm

    Consumer的負載均衡算法允許一個group內(nèi)的所有Consumer對哪個Consumer消費哪些Partition達成一個共識。Consumer的負載均衡被Broker和同一個Group中的其他Consumer的添加和移除觸發(fā)。對于給定的topic和group,partitions被平均的分配個consumers。這個設(shè)計是為了簡化實現(xiàn)。如果我們允許一個分區(qū)同時被多個Consumer消費,那么在這個分區(qū)上會有沖突,需要一些鎖去保證。如果consumer的數(shù)量超過了分區(qū)數(shù),部分consumer會拿不到任何數(shù)據(jù)。在充分配算法中,我們分配分區(qū)時盡量使consumer需要和最少的Broker通信。

    每個Consumer按照如下步驟進行重分配:

    1. 對于Ci(Ci表示Consumer Instance)訂閱的Topic T2. PT表示Topic T的所有分區(qū)T3. CG表示group內(nèi)所有的Consumer4. 對PT進行排序 (那么相同Broker上的分區(qū)會被集中到一起)5. 對GC排序6. i表示Ci在CG中的位置,N = size(PT)/size(CG)7. 分配 i*N to (i+1)*N - 1 給Consumer Ci8. 從分區(qū)所有者注冊表中刪除Ci擁有的這些條目9. 將分區(qū)添加到所有者的分區(qū)注冊表中(我們可能需要重試直到分區(qū)原來的擁有者釋放分區(qū)所有權(quán))

    當分區(qū)重分配在一個Consumer觸發(fā)時,重分配應該在同一時間在相同group內(nèi)的其他Consumer上也觸發(fā)。

    ?

    --------------------------------------------------------------------------------

    這是我的個人公眾號,計劃寫一個系列的文章《MQ從入門到...》,會從最基礎(chǔ)的MQ介紹到最終設(shè)計和實現(xiàn)一個MQ,歡迎長期關(guān)注和交流。

    ?

    總結(jié)

    以上是生活随笔為你收集整理的Kafka官方文档翻译——实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 亚洲欧美一区二区三区久久 | 国产一区二区女内射 | 男人天堂亚洲 | 天天av天天 | 久久看片 | 网站黄在线观看 | 色呦呦入口 | 亚洲精品视频网 | 日韩视频免费观看高清完整版在线观看 | 日日操日日爽 | 无码国产精品一区二区色情男同 | 五月天开心激情 | 男女操网站| 国产精品视频一 | 日韩女同一区二区三区 | 中文在线最新版天堂 | 成人免费在线视频观看 | 亚洲色图 一区二区 | 手机在线看片日韩 | 欧美性生交xxxxx久久久缅北 | 欧美在线视频网站 | 欧美一级在线播放 | 国产1区2区3区 | 疯狂伦交 | 亚洲是色| 黄色成人在线网站 | 解开人妻的裙子猛烈进入 | 欧洲亚洲国产精品 | 国产精品成人av久久 | 国产成人无码av | 亚洲三级网 | 日韩1区2区3区 | 中文字幕av一区二区三区人妻少妇 | 欧洲一区二区 | 欧美一区二区三区四 | 欧美青草视频 | 黄色一级大片在线免费看国产 | 日本精品人妻无码免费大全 | 亚洲精品鲁一鲁一区二区三区 | 久久精品国产亚洲av久一一区 | 国产无遮挡呻吟娇喘视频 | julia一区 | 欧美日韩国产亚洲一区 | 成年人黄色网址 | 欧州一区二区三区 | www.日韩在线观看 | 国产精品suv一区二区三区 | 国产激情啪啪 | 视频一区在线播放 | 久久人人爽人人爽人人片av高清 | 成人羞羞在线观看网站 | 亚洲性生活 | 国产人妻精品久久久久野外 | 亚洲国产精品麻豆 | 宅男深夜视频 | 国产精品无遮挡 | 亚洲精久久 | 在线看日韩av | 伊人久久精品 | 亚洲国产精品女人久久久 | 徐锦江一级淫片免费看 | 91美女免费看 | 亚洲爱爱网站 | 欧美黄色性 | 毛片看| 国产精品99一区二区三区 | 性做久久久久久久免费看 | 成年免费视频黄网站在线观看 | 亚洲淫 | 91视频h| 少妇激情网 | 免费黄色欧美 | 99色网| 中文字幕在线看 | 老局长的粗大高h | 米奇影音| 在线一区二区不卡 | av啊啊| 99av视频| 欧美亚洲另类在线 | 91久久精品www人人做人人爽 | 国产一区二区在线不卡 | 特级黄色网 | 插少妇视频 | 国产一区二区在线电影 | 1000部av| 久久综合欧美 | 91网站大全 | 欧美精品一区二区成人 | 伊人网综合在线 | 国产精品成人免费精品自在线观看 | 一本到在线 | 成人小视频在线播放 | 97青草 | 成人激情社区 | 久久精品高清视频 | 国产成人综合欧美精品久久 | 日韩精品福利在线 | 九九色综合网 |