日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume 实战开发指南

發布時間:2024/8/23 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume 实战开发指南 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flume

文章目錄

  • Flume
    • Flume介紹
    • Flume核心概念
    • Flume NG的體系結構
      • Source
      • Channel
      • Sink
    • Flume的部署類型
      • 單一流程
      • 多代理流程(多個agent順序連接)
      • 流的合并(多個Agent的數據匯聚到同一個Agent )
      • 多路復用流(多級流)
      • load balance功能
    • Flume組件選型
      • Source
      • Channel
        • FileChannel和MemoryChannel區別
        • FileChannel優化
      • Sink
        • HDFS小文件處理
    • 案例:日志采集發送到KafkaFlume配置
      • 日志采集流程
      • 配置如下
      • Flume攔截器開發
      • Flume采集腳本批量啟動停止
    • 案例:消費Kafka保存到HDFS
      • 配置如下
        • FileChannel優化
        • HDFS小文件處理
      • Flume時間戳攔截器
      • 項目經驗之Flume內存優化

Flume介紹

Flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。

flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)并且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。

  • flume的可靠性

    當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復后,繼續發送),Besteffort(數據發送到接收方后,不會進行確認)。

  • flume的可恢復性
    還是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統里(性能較差)。

  • Flume核心概念

    ClientClient生產數據,運行在一個獨立的線程。
    Event一個數據單元,消息頭和消息體組成。(Events可以是日志記錄、 avro 對象等。)
    FlowEvent從源點到達目的點的遷移的抽象。
    Agent一個獨立的Flume進程,包含組件Source、 Channel、 Sink。(Agent使用JVM 運行Flume。每臺機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。)
    Source數據收集組件。(source從Client收集數據,傳遞給Channel)
    Channel中轉Event的一個臨時存儲,保存由Source組件傳遞過來的Event。(Channel連接 sources 和 sinks ,這個有點像一個隊列。)
    Sink從Channel中讀取并移除Event, 將Event傳遞到FlowPipeline中的下一個Agent(如果有的話)(Sink從Channel收集數據,運行在一個獨立線程。)

    Flume NG的體系結構

    Flume 運行的核心是 Agent。Flume以agent為最小的獨立運行單位。一個agent就是一個JVM。它是一個完整的數據收集工具,含有三個核心組件,分別是sourcechannelsink。通過這些組件, Event 可以從一個地方流向另一個地方,如下圖所示。

    Source

    Source是數據的收集端,負責將數據捕獲后進行特殊的格式化,將數據封裝到事件(event) 里,然后將事件推入Channel中。

    Flume提供了各種source的實現,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。如果內置的Source無法滿足需要, Flume還支持自定義Source。

    Channel

    Channel是連接Source和Sink的組件,大家可以將它看做一個數據的緩沖區(數據隊列),它可以將事件暫存到內存中也可以持久化到本地磁盤上, 直到Sink處理完該事件。

    Flume對于Channel,則提供了Memory Channel、JDBC Chanel、File Channel,etc。

    MemoryChannel可以實現高速的吞吐,但是無法保證數據的完整性。

    MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。

    FileChannel保證數據的完整性與一致性。在具體配置不現的FileChannel時,建議FileChannel設置的目錄和程序日志文件保存的目錄設成不同的磁盤,以便提高效率。

    Sink

    Flume Sink取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。

    Flume也提供了各種sink的實現,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。

    Flume Sink在設置存儲數據時,可以向文件系統中,數據庫中,hadoop中儲數據,在日志數據較少時,可以將數據存儲在文件系中,并且設定一定的時間間隔保存數據。在日志數據較多時,可以將相應的日志數據存儲到Hadoop中,便于日后進行相應的數據分析。

    Flume的部署類型

    單一流程

    多代理流程(多個agent順序連接)

    可以將多個Agent順序連接起來,將最初的數據源經過收集,存儲到最終的存儲系統中。這是最簡單的情況,一般情況下,應該控制這種順序連接的Agent 的數量,因為數據流經的路徑變長了,如果不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。

    流的合并(多個Agent的數據匯聚到同一個Agent )

    這種情況應用的場景比較多,比如要收集Web網站的用戶行為日志, Web網站為了可用性使用的負載集群模式,每個節點都產生用戶行為日志,可以為每 個節點都配置一個Agent來單獨收集日志數據,然后多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。

    多路復用流(多級流)

    Flume還支持多級流,什么多級流?來舉個例子,當syslog, java, nginx、 tomcat等混合在一起的日志流開始流入一個agent后,可以agent中將混雜的日志流分開,然后給每種日志建立一個自己的傳輸通道。

    load balance功能

    下圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每個Sink組件分別連接到一個獨立的Agent上 。

    Flume組件選型

    Source

  • Taildir Source相比Exec Source、Spooling Directory Source的優勢?

    TailDir Source:斷點續傳、多目錄。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置,實現斷點續傳。不會丟數據,但是有可能會導致數據重復。

    Exec Source:可以實時搜集數據,但是在Flume不運行或者Shell命令出錯的情況下,數據將會丟失。

    Spooling Directory Source:監控目錄,支持斷點續傳。

  • batchSize大小如何設置?

    答:Event 1K左右時,500-1000合適(默認為100)

  • Channel

    ? 采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel數據存儲在Kafka里面,所以數據是存儲在磁盤中。

    ? 注意在Flume1.7以前,Kafka Channel很少有人使用,因為發現parseAsFlumeEvent這個配置起不了作用。也就是無論parseAsFlumeEvent配置為true還是false,都會轉為Flume Event。這樣的話,造成的結果是,會始終都把Flume的headers中的信息混合著內容一起寫入Kafka的消息中,這顯然不是我所需要的,我只是需要把內容寫入即可。

    FileChannel和MemoryChannel區別

    MemoryChannel:

    傳輸數據速度更快,但因為數據保存在JVM的堆內存中,Agent進程掛掉會導致數據丟失,適用于對數據質量要求不高的需求。

    FileChannel:

    傳輸速度相對于Memory慢,但數據安全保障高,Agent進程掛掉也可以從失敗中恢復數據。

    選型:

    金融類公司、對錢要求非常準確的公司通常會選擇FileChannel

    傳輸的是普通日志信息(京東內部一天丟100萬-200萬條,這是非常正常的),通常選擇MemoryChannel。

    FileChannel優化

    ? 通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。

    官方說明如下:

    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

    checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據。

    FileChannel原理:

    Sink

    HDFS小文件處理

    HDFS存入大量小文件,有什么影響?

    **元數據層面:**每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在Namenode內存中。所以小文件過多,會占用Namenode服務器大量內存,影響Namenode性能和使用壽命

    **計算層面:**默認情況下MR會對每個小文件啟用一個Map任務計算,非常影響計算性能。同時也影響磁盤尋址時間。

    官方默認的這三個參數配置寫入HDFS后會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

    基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合作用,效果如下:

    ①文件在達到128M時會滾動生成新文件

    ②文件創建超3600秒時會滾動生成新文件

    案例:日志采集發送到KafkaFlume配置

    日志采集流程

    配置如下

    在/opt/module/flume/conf目錄下創建file-flume-kafka.conf文件

    > vim file-flume-kafka.conf# 為各組件命名 a1.sources = r1 a1.channels = c1# 描述source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/program/logs/app.* a1.sources.r1.positionFile = /opt/program/logs/taildir_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.jast.flume.ETLInterceptor$Builder# 描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = 192.168.60.14:9092,192.168.60.15:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false# 綁定source和channel以及sink和channel的關系 a1.sources.r1.channels = c1

    注意:com.jast.flume.ETLInterceptor是自定義的攔截器的全類名。需要根據用戶自定義的攔截器做相應修改。

    Flume攔截器開發

  • 創建Maven工程flume-interceptor

  • 創建包名:com.jast.flume.interceptor

  • 在pom.xml文件中添加如下配置

    <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.19</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

    注意:scope中provided的含義是編譯時用該jar包。打包時時不用。因為集群上已經存在flume的jar包。只是本地編譯時用一下。

  • 在com.jast.flume.ETLInterceptor包下創建ETLInterceptor類

    package com.jast.flume;import cn.hutool.json.JSONUtil; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List;/*** @author Jast* @description 自定義攔截器*/ public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();String text = new String(body, StandardCharsets.UTF_8);if(JSONUtil.isJson(text)){return event;}System.out.println("非json格式過濾掉");return null;}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}} }
  • 打包

  • 自己部署的Flume:需要先將打好的包放入到flume/lib文件夾下面

    cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar flume/lib
  • CDH版本Flume:需要先將打好的包放入到flume/lib文件夾下面

    具體的目錄/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/

    cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib
  • 啟動Flume

    nohup flume-ng agent --conf-file /opt/program/flume/file-flume-kafka-1.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > /opt/program/flume/log1.txt 2>&1 &
  • 測試

    隨便在app.log加入一條數據,攔截器檢測到,然后打印出非json格式過濾掉。

    2022-03-17 09:42:42,148 INFO taildir.ReliableTaildirEventReader: Pos 9 is larger than file size! Restarting from pos 0, file: /opt/program/logs/app.log, inode: 5810713 2022-03-17 09:42:42,148 INFO taildir.TailFile: Updated position, file: /opt/program/logs/app.log, inode: 5810713, pos: 0 非json格式過濾掉 2022-03-17 09:45:17,236 INFO taildir.TaildirSource: Closed file: /opt/program/logs/app.log, inode: 5810713, pos: 22

    輸入一條json數據,在kafka消費時正常消費,配置成功

    22/03/17 09:15:41 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-3667] Resetting offset for partition topic_log-0 to offset 0. {"en":"小張"}
  • Flume采集腳本批量啟動停止

  • 在/home/atguigu/bin目錄下創建腳本f1.sh

    > vim f1.sh#! /bin/bashcase $1 in "start"){for i in localhostdoecho " --------啟動 $i 采集flume-------"ssh $i "nohup flume-ng agent --conf-file /opt/program/flume/file-flume-kafka-1.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > /opt/program/flume/log1.txt 2>&1 &"done };; "stop"){for i in localhostdoecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka-1.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done};; esac

    說明1:nohup,該命令可以在你退出帳戶/關閉終端之后繼續運行相應的進程。nohup就是不掛起的意思,不掛斷地運行命令。

    說明2:awk 默認分隔符為空格

    說明3:$2是在“”雙引號內部會被解析為腳本的第二個參數,但是這里面想表達的含義是awk的第二個值,所以需要將他轉義,用$2表示。

    說明4:xargs 表示取出前面命令運行的結果,作為后面命令的輸入參數。ls

  • 增加腳本執行權限

    chmod u+x f1.sh
  • f1集群啟動腳本

    f1.sh start
  • f1集群停止腳本

    f1.sh stop
  • 案例:消費Kafka保存到HDFS

    配置如下

    ## 組件 a1.sources=r1 a1.channels=c1 a1.sinks=k1## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = 192.168.60.15:9092,192.168.60.14:9092 a1.sources.r1.kafka.topics=topic_log #a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/program/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/program/flume/data/behavior1/## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /jast_root/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.round = false#控制生成的小文件 # 每隔多少秒生成一個 a1.sinks.k1.hdfs.rollInterval = 10 # 128M生成一個文件 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0## 控制輸出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = GzipCodec## 拼裝 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1

    CDH 版本:Flume寫入HDFS時,Flume部署的服務器需要安裝HDFS Gateway

    自己部署版本:Flume與Hadoop集群不在一起的話,需要配置Hadoop環境變量

    FileChannel優化

    ? 通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。

    官方說明如下:

    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

    checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據。

    FileChannel原理:

    HDFS小文件處理

    HDFS存入大量小文件,有什么影響?

    **元數據層面:**每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在Namenode內存中。所以小文件過多,會占用Namenode服務器大量內存,影響Namenode性能和使用壽命

    **計算層面:**默認情況下MR會對每個小文件啟用一個Map任務計算,非常影響計算性能。同時也影響磁盤尋址時間。

    官方默認的這三個參數配置寫入HDFS后會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

    基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合作用,效果如下:

    ①文件在達到128M時會滾動生成新文件

    ②文件創建超3600秒時會滾動生成新文件

    Flume時間戳攔截器

    ? 由于Flume默認會用Linux系統時間,作為輸出到HDFS路徑的時間。如果數據是23:59分產生的。Flume消費Kafka里面的數據時,有可能已經是第二天了,那么這部門數據會被發往第二天的HDFS路徑。我們希望的是根據日志里面的實際時間,發往HDFS的路徑,所以下面攔截器作用是獲取日志中的實際時間。

    ? 解決的思路:攔截json日志,通過fastjson框架解析json,獲取實際時間ts。將獲取的ts時間寫入攔截器header頭,header的key必須是timestamp,因為Flume框架會根據這個key的值識別為時間,寫入到HDFS。

  • 在com.jast.flume.interceptor包下創建TimeStampInterceptor類

    package com.jast.flume.interceptor;import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayList<Event> events = new ArrayList<>();@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}} }
  • 重新打包

  • 自己部署的Flume:需要先將打好的包放入到flume/lib文件夾下面

    cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar flume/lib
  • CDH版本Flume:需要先將打好的包放入到flume/lib文件夾下面

    具體的目錄/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/

    cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib
  • 項目經驗之Flume內存優化

  • 問題描述:如果啟動消費Flume拋出如下異常

    ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded
  • 解決方案步驟

    在服務器的flume/conf/flume-env.sh文件中增加如下配置

    export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

  • Flume內存參數設置及優化

    JVM heap一般設置為4G或更高

    -Xmx與-Xms最好設置一致,減少內存抖動帶來的性能影響,如果設置不一致容易導致頻繁fullgc。

    -Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大允許的尺寸,按需分配。如果不設置一致,容易在初始化時,由于內存不夠,頻繁觸發fullgc。

    參考內容:
    https://www.cnblogs.com/qingyunzong/p/8994494.html
    https://flume.apache.org/

  • 總結

    以上是生活随笔為你收集整理的Flume 实战开发指南的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 久免费一级suv好看的国产 | 美女啪啪国产 | 法国少妇愉情理伦片 | 中文字幕亚洲一区二区三区五十路 | 精品日本一区二区三区 | 超碰福利在线观看 | 婷婷激情小说网 | 亚洲天堂激情 | 岛国免费视频 | 丝袜一区二区三区四区 | 国产精品一区二区入口九绯色 | 国 产 黄 色 大 片 | 人人干人人干 | 91超碰在线播放 | 久久精品国产亚洲7777 | 双性高h1v1| 日韩欧美一区二区三区免费观看 | 高潮一区二区三区乱码 | 肉丝肉足丝袜一区二区三区 | 中国av一区 | 黄色无遮挡网站 | 又黄又爽的免费视频 | 国产中文字幕免费 | 国产香蕉在线 | 国产又粗又猛又爽又黄 | 2021天天干 | 给我看高清的视频在线观看 | 五月婷婷综合激情 | 激情第一页| 国产精品人成 | 国产精品igao视频 | 国产精品自拍合集 | 国产午夜精品理论片在线 | 九色激情网 | 96精品视频 | 12av毛片| 精品少妇爆乳无码av无码专区 | 无码精品一区二区免费 | 让男按摩师摸好爽 | 欧美一区,二区 | 午夜欧美福利 | 男女无遮挡猛进猛出 | 黄色一毛片 | 色欲av伊人久久大香线蕉影院 | 美女屁股眼视频免费 | 日本视频黄色 | 91人妻一区二区三区 | 三级视频网站在线观看 | 亚洲国产精品999 | 久久视频在线免费观看 | 人人叉人人 | 国产精品一区二区免费在线观看 | 成人av社区 | 欧美日韩亚洲二区 | 精品一区二区三区四 | 欧美日韩国产精品一区二区三区 | 国产精品短视频 | 日韩男女视频 | 老局长的粗大高h | 我们的2018在线观看免费高清 | 91国在线 | av成人在线免费观看 | 99久久久国产精品无码性 | 国产精品高潮呻吟视频 | 无码人妻久久一区二区三区不卡 | 国内免费精品视频 | 免费在线网站 | 手机av网址 | 噜噜噜久久久 | 黄色av片三级三级三级免费看 | 我们俩电影网mp4动漫官网 | 中文字幕永久在线观看 | 午夜鲁鲁| 一女被多男玩喷潮视频 | 国产毛片儿 | 国产伦精品一区二区三区视频孕妇 | 久久久久人妻精品色欧美 | kendra lust free xxx| 男欢女爱久石 | 日韩在线视频观看 | 国产草逼视频 | 免费观看污视频 | 日韩欧美高清dvd碟片 | 四虎成人精品永久免费av九九 | 国产丝袜一区二区三区 | 手机av免费| 亚洲男人精品 | 在线观看久草 | 国产精品国语对白 | 日韩精品免费观看 | 欧美www视频 | 五月六月丁香 | 国产一区二区在线观看视频 | 久青草免费视频 | bt天堂av| 男人天堂tv | 成 年人 黄 色 片 | 国产精品免费视频观看 | 成人精品999 |