日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka使用_Kafka介绍与使用

發布時間:2025/3/15 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka使用_Kafka介绍与使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近在研究kafka,覺得需要輸出點東西才能更好的吸收,遂總結與大家分享,話不多說。

一、先上思維導圖:

二、再上kafka整體架構圖:

2.1、Producer:消息生產者,就是向kafka broker發消息的客戶端。

2.2、Consumer :消息消費者,向kafka broker取消息的客戶端

2.3、Topic :每條發布到kafka集群的消息都有一個類別,這個類別被稱為主題Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存于何處)。

2.4、Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復制(不是真的復制,是概念上的)到所有的CG,但每個partition只會把消息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。

2.5、Broker :一臺kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。

2.6、Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序。

2.7、Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka。

三、部分小點請看導圖

四、Kafka集群部署 (提前備好ZK集群環境)

4.1、下載安裝包

http://kafka.apache.org/downloads

或者在linux中使用wget命令下載安裝包

wget http://mirrors.hust.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz

4.2、解壓安裝包

tar -zxvf /root/mysoftpackage/kafka_2.13-2.5.0.tgz -C /root/apps/

4.3、創建軟鏈接

如后續配置環境變量后,升級版本啥的不用再重新配置環境變量。

cd /root/apps/

ln -s kafka_2.13-2.5.0 kafka

4.4、修改配置文件

cp /root/apps/kafka/config/server.properties?

/root/apps/kafka/config/server.properties.bak

vi /root/apps/kafka/config/server.properties

修改以下配置:

# Broker的全局唯一編號,集群內不重復即可

broker.id=0

#kafka運行日志存放的路徑

log.dirs=/root/kafkadata/logs

#kafka依賴的ZK集群

zookeeper.connect=hdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181

vi /root/apps/kafka/config/producer.properties

修改以下配置:

bootstrap.servers=hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092

vi /root/apps/kafka/config/consumer.properties

修改以下配置:

bootstrap.servers=hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092

4.5、分發安裝包

scp -r /root/apps/kafka_2.13-2.5.0 hdp-node-02:/root/apps

scp -r /root/apps/kafka_2.13-2.5.0 hdp-node-03:/root/apps

然后分別在各機器上創建軟連

cd /root/apps/

ln -s kafka_2.13-2.5.0 kafka

4.6、再次修改配置文件(重要)

依次修改各服務器上配置文件的的broker.id,分別是0,1,2不得重復。

4.7、環境變量配置

vi /etc/profile

export KAFKA_HOME=/root/apps/kafka

export PATH=$PATH:$KAFKA_HOME/bin

刷新下系統環境變量

source /etc/profile

4.8、守護進程啟動集群

依次在各節點上啟動kafka

kafka-server-start.sh -daemon /root/apps/kafka/config/server.properties

4.9、編寫腳本批量啟動集群kafka服務(kafkaBatchStart.sh)

#!/bin/bash

for i in 1 2 3

do

ssh hdp-node-0$i "source /etc/profile;/root/apps/kafka/bin/kafka-server-start.sh -daemon /root/apps/kafka/config/server.properties"

done

五、基本管理操作Shell命令

5.1、查看當前服務器中的所有topic

kafka-topics.sh --list --zookeeper?hdp-node-01:2181

5.2、創建topic

replication-facto:副本數、partition:分區數

kafka-topics.sh --create --zookeeper hdp-node-01:2181 --replication-factor 3 --partitions 3 --topic goodsMq

5.3、刪除topic

kafka-topics.sh --delete --zookeeper hdp-node-01:2181 --topic goodsMq

5.4、通過shell命令發送消息

kafka-console-producer.sh --broker-list hdp-node-01:9092 --topic goodsMq

kafka-console-producer.sh --bootstrap-server hdp-node-01:9092,hdp-node-02:9092 --topic goodsMq

5.5、通過shell消費消息

--from-beginning:指定偏移量從頭開始消費

kafka-console-consumer.sh --bootstrap-server hdp-node-01:9092,hdp-node-02:9092 --topic goodsMq --from-beginning

5.6、查看某個Topic的詳情

kafka-topics.sh --topic goodsMq --describe --zookeeper hdp-node-01:2181,hdp-node-02:2181

六、Java簡單代碼示例

6.1、引入pom依賴

<dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId><version>2.5.0version>dependency

6.2、消息生產者

public static void main(String[] args) {???//指定當前kafka?producer生產的數據的目的地???String?topicName?=?"orderMq"; // 讀取配置文件 Properties props = new Properties(); //指定kafka服務器地址 如果是集群可以指定多個 但是就算只指定一個他也會去集群環境下尋找其他的節點地址 props.setProperty("bootstrap.servers","hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092"); //key序列化器 props.setProperty("key.serializer", StringSerializer.class.getName()); //value序列化器 props.setProperty("value.serializer",StringSerializer.class.getName()); //通過配置文件,創建生產者 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //生產數據 for (int messageNo = 1; messageNo < 100; messageNo++) { //調用producer的send方法發送數據 ProducerRecord record = new ProducerRecord<String, String>(topicName, messageNo + "", "appid-" + UUID.randomUUID() + "-測試"); //發送記錄 producer.send(record); } producer.close(); System.out.println("done!!!");}

6.3、消息消費者

public static void main(String[] args) throws Exception{ Properties properties = new Properties(); properties.setProperty("bootstrap.servers","hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092"); properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer",StringDeserializer.class.getName()); properties.setProperty("group.id","test-consumer-group"); KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singletonList("orderMq")); while (true){ ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : poll) { System.out.println(record.key() + "=" + record.value()); } }}

七、思考

7.1、Kafka為什么效率高吞吐量大

1)、硬盤的索引功能,二分查找法。

分區:找到相應的leader分區負責讀寫操作;

分段:根據文件segment的命名可以確認要查找的offset或timestamp在哪個文件中;

稀疏索引:快速確定要找的offset在哪個內存地址的附近。

2)、通過Partition實現并行處理

3)、I/O優化:

3.1)、磁盤的順序寫入(600MB/S)

3.2)、充分利用操作系統文件讀取緩存(PageCache)

? ? ? ?Kafka 的數據并不是實時的寫入硬盤,它充分利用了現代操作系統分頁存儲來利用內存提高 I/O 效率。再通過mmap(Memory Mapped Files)內存映射文件零拷貝的方式,它的工作原理是直接利用操作系統的 Page 來實現文件到物理內存的直接映射,完成映射之后你對物理內存的操作會被同步到硬盤上(操作系統在適當的時候)。通過 mmap,進程像讀寫硬盤一樣讀寫內存(當然是虛擬機內存),也不必關心內存的大小,有虛擬內存為我們兜底。

? ? ? ?使用這種方式可以獲取很大的 I/O 提升,省去了用戶空間到內核空間復制的開銷。但也有一個很明顯的缺陷——不可靠,寫到 mmap 中的數據并沒有被真正的寫到硬盤,操作系統會在程序主動調用 Flush 的時候才把數據真正的寫到硬盤。

? ? ? ?Kafka 提供了一個參數 producer.type 來控制是不是主動 Flush:

如果Kafka 寫入到 mmap 之后就立即 Flush,然后再返回 Producer 叫同步 (Sync)。如果 Kafka 寫入 mmap 之后立即返回 Producer 不調用 Flush 叫異步 (Async)。

3.3.)、基于 Sendfile 實現零拷貝(Zero Copy)方式讀取磁盤數據

傳統模式下,當需要對一個文件進行傳輸的時候,其具體流程細節如下:

a、調用 Read 函數,文件數據被 Copy 到內核緩沖區。

b、Read 函數返回,文件數據從內核緩沖區 Copy 到用戶緩沖區

c、Write 函數調用,將文件數據從用戶緩沖區 Copy 到內核與 Socket 相關的緩沖區。

d、數據從 Socket 緩沖區 Copy 到相關協議引擎。

以上細節是傳統 Read/Write 方式進行網絡文件傳輸的方式,我們可以看到,在這個過程當中,文件數據實際上是經過了四次 Copy 操作:

硬盤—>內核 buf—>用戶 buf—>Socket 相關緩沖區—>協議引擎


Sendfile 的引入以減少數據復制,同時減少上下文切換

3.4)、批量壓縮減少網絡IO損耗

? ? ? 在很多情況下,系統的瓶頸不是 CPU 或磁盤,而是網絡 IO,對于需要在廣域網上的數據中心之間發送消息的數據流水線尤其如此。進行數據壓縮會消耗少量的 CPU 資源,不過對于 Kafka 而言,網絡 IO 更應該考慮:因為每個消息都壓縮,但是壓縮率相對很低,所以 Kafka 使用了批量壓縮,即將多個消息一起壓縮而不是單個消息壓縮。

? ? ? ?Kafka 允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式,直到被消費者解壓縮。

? ? ? ?kafka在壓縮數據時使用的壓縮算法,可選參數有:none、gzip、snappy。none即不壓縮,gzip,和snappy壓縮算法之間取舍的話gzip壓縮率比較高,系統cpu占用比較大,但是帶來的好處是,網絡帶寬占用少,snappy壓縮比沒有gzip高,cpu占用率不是很高,性能也還行,如果網絡帶寬比較緊張的話。可以選擇gzip,一般推薦snappy。

7.2、數據生產時的分發策略是什么

Producer客戶端負責消息的分發。

? ? ? kafka集群中的任何一個broker都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表、partitions、leader列表"等信息;

? ? ?當producer獲取到metadata信息之后, producer將會和Topic下所有partition leader保持socket連接;

? ? ?消息由producer直接通過socket發送到broker,中間不會經過任何"路由層",事實上,消息被路由到哪個partition上由producer客戶端決定;

? ? ?比如可以采用"random"、"key-hash""輪詢"等,如果一個topic中有多個partitions,那么在producer端實現"消息均衡分發"是必要的。

? ? ?在producer端的配置文件中,開發者可以指定partition路由的方式。

7.3、如何保證數據不丟失完全生產

Producer消息發送的應答機制。

設置發送數據是否需要服務端的反饋,有三個值0,1,all

0: producer不會等待broker發送ack?

1: 當leader接收到消息之后發送ack?

all: 當所有的follower都同步消息成功后發送ack

request.required.acks=0

7.4、Partition如何分布在不同的Broker上

//第i個partition

int i = 0;

//broker列表

list{ broker01, broker02, broker03}

for(int i=0;i<5;i++){

brIndex = I % list.size;

? ? ? ?//第i個partition分布在hostName上

hostName = list.get(brIndex)

}

7.5、Broker如何保存數據其文件存儲機制是什么

1)、Kafka文件存儲基本結構

? ? ? ?在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。

? ? ? ?每個partion(目錄)相當于一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。

? ? ? ?默認保留7天的數據。

? ? ? 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。(什么時候創建,什么時候刪除)


2)、Kafka Partition Segment

? ? ?Segment file組成:由2大部分組成,分別為index file和data file,這兩個文件一一對應,成對出現,后綴".index"和“.log”分別表示為segment索引文件、數據文件。

? ? ? ?Segment文件命名規則:partion全局的第一個segment從0開始,后續每個segment文件名為上一個segment文件最后一條消息的offset值。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。

? ? ? 索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。

? ? ? ?其中以索引文件中元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移地址為497。

3)、Kafka 查找message

讀取offset=368776的message,需要通過下面2個步驟查找。

第一步:查找segment file

00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0。

00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1。

00000000000000737337.index的起始偏移量為737338=737337 + 1

其他后續文件依次類推。

以起始偏移量命名并排序這些文件,只要根據offset **二分查找**文件列表,就可以快速定位到具體文件。當offset=368776時定位到00000000000000368769.index和對應log文件。

第二步:通過segment file查找message

當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和00000000000000368769.log的物理偏移地址

然后再通過00000000000000368769.log順序查找直到offset=368776為止。

7.6、消費者如何標記消費狀態

通過偏移量來標識。

擴展偏移量與偏移量提交:

? ? ? 偏移量是一個自增長的ID,用來標識當前分區的哪些消息被消費過了,這個ID會保存在kafka的broker當中,而且消費者本地也會存儲一份。

? ? ? ?因為每次消費每一條消息都要更新一下偏移量的話,難免會影響整個broker的吞吐量,所以一般這個偏移量在每次發生改動時,先由消費者本地改動,默認情況下,消費者每5秒鐘會提交一次改動的偏移量,這樣做雖然說吞吐量上來了,但是可能會出現重復消費的問題:?

? ? ? ?因為可能在下一次提交偏移量之前,消費者本地消費了一些消息,然后發生了分區再均衡(分區再均衡在下面有講) 那么就會出現一個問題。

? ? ? ?假設上次提交的偏移量是 2000 在下一次提交之前,其實消費者又消費了500條數據,也就是說當前的偏移量應該是2500 但是這個2500只在消費者本地,也就是說,假設其他消費者去消費這個分區的時候拿到的偏移量是2000,那么又會從2000開始消費消息,那么2000到2500之間的消息又會被消費一遍,這就是重復消費的問題。

? ? ??kafka對于這種問題也提供了解決方案:手動提交偏移量

可以關閉默認的自動提交(enable.auto.commit= false) 然后使用kafka提供的API來進行偏移量提交,kafka提供了兩種方式提交偏移量 :同步和異步

//同步提交偏移量kafkaConsumer.commitSync();//異步提交偏移量kafkaConsumer.commitAsync();

? ? ? ?他們之間的區別在于,同步提交偏移量會等待服務器應答,并且遇到錯誤會嘗試重試,但是會一定程度上影響性能不過能確保偏移量到底提交成功與否;而異步提交的對于性能肯定是有提示的,但是弊端也就像我們剛剛所提到遇到錯誤沒辦法重試,因為可能在收到你這個結果的時候又提交過偏移量了,如果這時候重試,又會導致消息重復的問題了。

? ? ? ? 其實,我們可以采用同步+異步的方式來保證提交的正確性以及服務器的性能。因為異步提交的話,如果出現問題但不是致命問題的話,可能下一次提交就不會出現這個問題了,所以有些異常是不需要解決的(可能單純的就是網絡抽風了呢? ) 所以,我們平時可以采用異步提交的方式,等到消費者中斷了(遇到了致命問題,或是強制中斷消費者) 的時候再使用同步提交(因為這次如果失敗了就沒有下次了,所以要讓他重試) 。

具體代碼:

try { while (true) { ConsumerRecords<String, String> poll = kafkaConsumer.poll(500); for (ConsumerRecord<String, String> context : poll) { System.out.println("消息所在分區:" + context.partition() + "-消息的偏移量:" + context.offset() + "key:" + context.key() + "value:" + context.value()); } //正常情況異步提交 kafkaConsumer.commitAsync(); } } catch (Exception e) { e.printStackTrace(); } finally { try { //當程序中斷時同步提交 kafkaConsumer.commitSync(); } catch (Exception e) { e.printStackTrace(); } finally { //關閉當前消費者 具體在下面有講 kafkaConsumer.close(); } }
? ? ? 值得一提的是,在手動提交時kafka提供了你可以傳入具體的偏移量來完成提交,也就是指定偏移量提交,但是非常不建議手動指定,因為如果指定的偏移量小于分區所存儲的偏移量大小的話,那么會導致消息重復消費,如果指定的偏移量大于分區所存儲的偏移量的話,那么會導致消息丟失

7.7.消費者的分區再均衡及負載均衡策略是什么

分區再均衡也是kafka里面非常重要的一個概念。

首先操作在以下情況下會觸發分區再均衡(Rebalance)操作:

a、組成員發生變更(新consumer加入組、已有consumer主動離開組或已有consumer崩潰了);

b、訂閱主題數發生變更,如果你使用了正則表達式的方式進行訂閱,那么新建匹配正則表達式的topic就會觸發rebalance;

c、訂閱主題的分區數發生變更。

當觸發Rebalance,kafka重新分配分區所有權

? ? ?何為分區所有權?我們之前有提到過,消費者有一個消費者組的概念, 而且一個消費者組在消費一個主題時有以下規則,一個消費者可以消費多個分區,但是一個分區只能被一個消費者消費。如果我有分區 0、1、2 現在有消費者 A,B 那么 kafka可能會讓消費者A 消費 0,1 這兩個分區,那么 這時候我們就會說,消費者A 擁有分區 0、1的所有權。

? ? ? 當觸發 Rebalance 的時候kafka會重新分配這個所有權,還是基于剛剛的比方,消費者A 擁有 0 和1 的所有權,消費者B 會有2的所有權。當消費者B離開kafka的時候 這時候 kafka會重新分配一下所有權,此時整個消費者組只有一個A 那么 0、1、2 三個分區的所有權都會屬于A ,同理如果這時候有消費者C進入這個消費者組,那么這時候kafka會確保每一個消費者都能消費一個分區。

? ? ? ?當觸發Rebalance時,由于kafka正在分配所有權,會導致消費者不能消費,而且還會引發一個重復消費的問題, 當消費者還沒來得及提交偏移量時,分區所有權遭到了重新分配,那么這時候就會導致一個消息被多個消費者重復消費。

? ? ? ?那么解決方案就是在消費者訂閱時,添加一個再均衡監聽器,也就是當kafka在做Rebalance操作前后,均會調用再均衡監聽器,那么這時候 我們可以在kafka Rebalance之前提交我們消費者最后處理的消息來解決這個問題。

拓展、Close():

? ? ? 當我們不需要某個消費者繼續消費kafka當中的數據時,我們可以選擇調用Close方法來關閉它,在關閉之前 close方法會發送一個通知告訴kafka我這個消費者要退出了,那么 kafka就會準備Rebalance 而且如果是采用的自動提交偏移量,消費者自身也會在關閉自己之前提交最后所消費的偏移量 。當然即使沒有調用close方法,而是直接強制中斷了消費者的進程 kafka也會根據我們后面會說到的系統參數捕捉到消費者退出了。

7.8.如何保證消費者消費的數據有序

? ? ? 只能保證同一個分區下的數據是有序的,可以讓同一類的數據進入到同一個分區里。

? ? ? 若想保證同一個主題的數據被消費時的順序和生產時的順序一致,那么只能設置一個分區。

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的kafka使用_Kafka介绍与使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 天堂网在线最新版www中文网 | 成人av网站在线 | 欧美韩国日本一区 | 超碰97人人干 | 欧美精品一区二区性色a+v | 欧美自拍第一页 | 视频一区在线观看 | 国产欧美一区二 | 久久激情av | 欧美在线aa | 高跟91娇喘 | 岛国av毛片 | 97精品一区二区三区 | 狠狠2020| 国产精品久久久久999 | x88av视频 | 丝袜美腿av在线 | 激情五月色综合国产精品 | 日韩永久 | 在线播放一级片 | 亚洲图片一区 | 九九影院最新理论片 | 性欧美hd调教 | 国产精视频 | 性色视频在线观看 | 成人午夜精品 | 亚洲一区二区三区免费看 | 国产一区二区三区四区五区在线 | 美日韩在线视频 | 日本一区二区免费高清视频 | 久久精品8 | 亚洲精品美女网站 | 国产一级在线播放 | 九色首页 | 国产网站免费观看 | 天天天干干干 | 嫩草亚洲 | 日韩高清一级片 | 日韩欧美黄色 | 91热爆视频 | 日日操夜夜摸 | 视色影院 | 天堂网资源 | 91久久精 | 综合久久久久综合 | 亚州黄色 | 日韩黄色一级大片 | 在线涩涩 | 一级片麻豆 | 亚洲成色网 | 欧美一级不卡视频 | 17c在线 | 国产精品国产精品国产专区不卡 | 一级黄色在线观看 | 欧美日本一道 | 美女午夜激情 | 爱欲av| 99热这里只有精品7 青青草社区 | 久久久人妻无码一区二区 | 国产无套内射又大又猛又粗又爽 | 黄色三级在线 | 久久久久久久久99精品 | 成人高潮片免费视频 | 免费在线一级片 | 色婷婷精品久久二区二区密 | 久久99国产精品久久99果冻传媒 | 国产肥白大熟妇bbbb视频 | 黄色在线视频网站 | 秋霞av网| 乱子伦一区 | 成年人视频在线免费看 | 日韩一级大片 | 懂色aⅴ国产一区二区三区 亚洲欧美国产另类 | 久久精品国产99久久久 | 成人视屏在线观看 | 国内一区二区视频 | 天天看天天射 | 另类ts人妖一区二区三区 | 精品亚洲一区二区 | 色图18p| 伊人91| 大尺度在线观看 | 依人成人网 | 91热爆视频 | 欧美人与禽zozzo禽性配 | 日本亲近相奷中文字幕 | 自宅警备员在线观看 | 国产xxxxx | 日韩欧美自拍 | 亚洲字幕 | 天天操天天操天天操天天操 | 色伊人久久 | 欧亚毛片 | 人妻丰满熟妇av无码区免 | 欧美在线激情视频 | 成人性生活免费看 | 天堂…中文在线最新版在线 | 欧美日韩一二三区 | 欧美精品第一页 |