日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

2、Flume1.7.0入门:安装、部署、及flume的案例

發布時間:2025/4/16 编程问答 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2、Flume1.7.0入门:安装、部署、及flume的案例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、什么是Flume?

flume 作為 cloudera 開發的實時日志收集系統,受到了業界的認可與廣泛應用。

flume的特點:

flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。

flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)并且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。

flume的可靠性?

當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復后,繼續發送),Besteffort(數據發送到接收方后,不會進行確認)。

flume的可恢復性:

還是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統里(性能較差)?

flume的一些核心概念:

  • Agent:使用JVM 運行Flume。每臺機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。
  • Client:生產數據,運行在一個獨立的線程。
  • Source:從Client專門用來收集數據,傳遞給Channel,可以處理各種類型、各種格式的日志數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。
  • Sink:從Channel收集數據,運行在一個獨立線程,sink組件是用于把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。?
  • Channel:連接 sources 和 sinks ,這個有點像一個隊列,source組件把數據收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對采集到的數據進行簡單的緩存,可以存放在memory、jdbc、file等等。
  • Events:可以是日志記錄、 avro 對象等。

Agent的概念

Flumeagent為最小的獨立運行單位。一個agent就是一個JVM,agent本身是一個Java進程,運行在日志收集節點—所謂日志收集節點就是服務器節點。?

agentSourceSinkChannel三大組件構成,類似生產者、倉庫、消費者的架構.如下圖:

Event的概念?

flume的核心是把數據從數據源(source)收集過來,在將收集到的數據送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先緩存數據(channel),待數據真正到達目的地(sink)后,flume在刪除自己緩存的數據。?

在整個數據的傳輸的過程中,流動的是event,即事務保證是在event級別進行的。那么什么是event呢?—–event將傳輸的數據進行封裝,是flume傳輸數據的基本單位,如果是文本文件,通常是一行記錄,event也是事務的基本單位。event從source,流向channel,再到sink,本身為一個字節數組,并可攜帶headers(頭信息)信息。event代表著一個數據的最小完整單元,從外部數據源來,向外部的目的地去。?

為了方便大家理解,給出一張event的數據流向圖:?

一個完整的event包括:event headers、event body、event信息(即文本文件中的單行記錄),如下所以:?

2017-03-29 14:00:58,227 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 64 ? ? ? ? ? ? ? ? ? hello word }

其中event信息就是flume收集到的日記記錄。?

flume的運行機制?

flume的核心就是一個agent,這個agent對外有兩個進行交互的地方,一個是接受數據的輸入——source,一個是數據的輸出sink,sink負責將數據發送到外部指定的目的地。source接收到數據之后,將數據發送給channel,chanel作為一個數據緩沖區會臨時存放這些數據,隨后sink會將channel中的數據發送到指定的地方—-例如HDFS等,注意:只有在sink將channel中的數據成功發送出去之后,channel才會將臨時數據進行刪除,這種機制保證了數據傳輸的可靠性與安全性。?

flume的廣義用法?

flume可以支持多級flume的agent,即flume可以前后相繼,例如sink可以將數據寫到下一個agent的source中,這樣的話就可以連成串了,可以整體處理了。flume還支持扇入(fan-in)、扇出(fan-out)。所謂扇入就是source可以接受多個輸入,所謂扇出就是sink可以將數據輸出多個目的地destination中。?

值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。組合方式基于用戶設置的配置文件,非常靈活。比如:Channel可以把事件暫存在內存里,也可以持久化到本地硬盤上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。Flume支持用戶建立多級流,也就是說,多個agent可以協同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes。如下圖所示:

二、安裝Flume

1、下載Flume

http://apache.mirrors.hoobly.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz?

2、安裝Flume

1)將下載的flume包,解壓到/opt目錄中.

2)修改 flume-env.sh 配置文件,主要是JAVA_HOME變量設置

mbp:apache-flume-1.7.0-bin$ cp conf/flume-env.sh.template? conf/flume-env.sh

mbp:apache-flume-1.7.0-bin$ vi conf/flume-env.sh

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements.? See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership.? The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License.? You may obtain a copy of the License at

#

# ? ? http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced

# during Flume startup.

# Enviroment variables can be set here.

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX

# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

# Let Flume write raw event data and configuration information to its log files for debugging

# purposes. Enabling these flags is not recommended in production,

# as it may result in logging sensitive user information or encryption secrets.

# export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "?

# Note that the Flume conf directory is always included in the classpath.

#FLUME_CLASSPATH=""

3)驗證是否安裝成功

mbp:apache-flume-1.7.0-bin$ bin/flume-ng version

Flume 1.7.0

Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git

Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707

Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016

From source with checksum 0d21b3ffdc55a07e1d08875872c00523

mbp:apache-flume-1.7.0-bin$?

出現上面的信息,表示安裝成功了.

三、flume的案例

對于flume的原理其實很容易理解,我們更應該掌握flume的具體使用方法,flume提供了大量內置的Source、Channel和Sink類型。而且不同類型的Source、Channel和Sink可以自由組合—–組合方式基于用戶設置的配置文件,非常靈活。比如:Channel可以把事件暫存在內存里,也可以持久化到本地硬盤上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。下面我將用具體的案例詳述flume的具體用法。?

其實flume的用法很簡單—-書寫一個配置文件,在配置文件當中描述source、channel與sink的具體實現,而后運行一個agent實例,在運行agent實例的過程中會讀取配置文件的內容,這樣flume就會采集到數據。?

配置文件的編寫原則:

0)、案例1:Netcat?

NetCat Source:監聽一個指定的網絡端口,即只要應用程序向這個端口里面寫數據,這個source組件就可以獲取到信息。

1>從整體上描述代理agent中sources、sinks、channels所涉及到的組件??

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

2>詳細描述agent中每一個source、sink與channel的具體實現:即在描述source的時候,需要指定source到底是什么類型的,即這個source是接受文件的、還是接受http的、還是接受thrift的;對于sink也是同理,需要指定結果是輸出到HDFS中,還是Hbase中啊等等;對于channel需要指定是內存啊,還是數據庫啊,還是文件啊等等。

# Describe configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

3>通過channel將source與sink連接起來

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啟動agent的shell操作:

flume-ng agent -n a1 -c conf -f conf/example.file -Dflume.root.logger=DEBUG,console

參數說明:

-n 指定agent名稱(與配置文件中代理的名字相同)?

-c 指定flume中配置文件的目錄?

-f 指定配置文件?

-Dflume.root.logger=DEBUG,console 設置日志等級

1)、案例1:Avro

Avro可以發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制。

a)創建agent配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/avro.conf?

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe configure the source

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)啟動flume agent a1

mbp:apache-flume-1.7.0-bin$ bin/flume-ng agent -n a1 -c conf -f conf/avro.conf -Dflume.root.logger=INFO,console

c)創建指定文件

mbp:apache-flume-1.7.0-bin$ echo "hello word" > log.00

d)使用avro-client發送文件

mbp:apache-flume-1.7.0-bin$ bin/flume-ng avro-client -c conf -H mbp -p 4141 -F log.00

e)在mbp的控制臺,可以看到以下信息,注意最后一行:

2017-03-29 13:52:19,139 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting

.

.

.

2017-03-29 14:00:58,227 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 64 ? ? ? ? ? ? ? ? ? hello word }

2)、案例2:Spool

監聽一個指定的目錄,即只要應用程序向這個指定的目錄中添加新的文件,source組件就可以獲取到該信息,并解析該文件的內容,然后寫入到channle。寫入完成后,標記該文件已完成或者刪除該文件。

Spool監測配置的目錄下新增的文件,并將文件中的數據讀取出來。需要注意兩點:

1) 拷貝到spool目錄下的文件不可以再打開編輯。

2) spool目錄下不可包含相應的子目錄

a)創建agent配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/spool.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.channels = c1

a1.sources.r1.spoolDir = /opt/apache-flume-1.7.0-bin/logs

a1.sources.r1.fileHeader = true

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)啟動flume agent a1

mbp:apache-flume-1.7.0-bin$ bin/flume-ng agent -n a1 -c conf -f conf/spool.conf -Dflume.root.logger=INFO,console

c)追加文件到/opt/apache-flume-1.7.0-bin/logs目錄

mbp:apache-flume-1.7.0-bin$ echo "spool test1" > logs/spool_text.log

d)在mbp的控制臺,可以看到以下相關信息:

2017-03-29 14:31:04,921 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/opt/apache-flume-1.7.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31? ? ? ? ? ? ? ? spool test1 }

3)、案例3:Exec

監聽一個指定的命令,獲取一條命令的結果作為它的數據源?

常用的是tail -F file指令,即只要應用程序向日志(文件)里面寫數據,source組件就可以獲取到日志(文件)中最新的內容 。

EXEC執行一個給定的命令獲得輸出的源,如果要使用tail命令,必選使得file足夠大才能看到輸出內容

a)創建agent配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/exec_tail.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.channels = c1

a1.sources.r1.command = tail -F /opt/apache-flume-1.7.0-bin/log_exec_tail

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)啟動flume agent a1

mbp:apache-flume-1.7.0-bin$ bin/flume-ng agent -n a1 -c conf -f conf/exec_tail.conf -Dflume.root.logger=INFO,console

c)生成足夠多的內容在文件里

mbp:apache-flume-1.7.0-bin$ for i in {1..100};do echo "exec tail$i" >> /opt/apache-flume-1.7.0-bin/logs/log_exec_tail done

e)在mbp的控制臺,可以看到以下信息:

2017-03-29 15:26:25,990 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting

.

.

.

2017-03-29 15:26:44,336 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 34? ? ? ? ? ? ? ? exec tail94 }

2017-03-29 15:26:44,336 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 35? ? ? ? ? ? ? ? exec tail95 }

2017-03-29 15:26:44,336 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 36? ? ? ? ? ? ? ? exec tail96 }

2017-03-29 15:26:44,336 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 37? ? ? ? ? ? ? ? exec tail97 }

2017-03-29 15:26:44,336 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 38? ? ? ? ? ? ? ? exec tail98 }

2017-03-29 15:26:44,337 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 39? ? ? ? ? ? ? ? exec tail99 }

2017-03-29 15:26:44,337 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 30 30 ? ? ? ? ? ? exec tail100 }

4)、案例4:Syslogtcp

Syslogtcp監聽TCP的端口做為數據源?

a)創建agent配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/syslog_tcp.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)啟動flume agent a1

mbp:apache-flume-1.7.0-bin$ bin/flume-ng agent -n a1 -c conf -f conf/syslog_tcp.conf -Dflume.root.logger=INFO,console

c)測試產生syslog

mbp:apache-flume-1.7.0-bin$ echo "hello idoall.org syslog" | nc localhost 5140

d)在mbp的控制臺,可以看到以下信息:

2017-03-29 15:33:43,305 (New I/O worker #1) [WARN - org.apache.flume.source.SyslogUtils.buildEvent(SyslogUtils.java:317)] Event created from Invalid Syslog data.

2017-03-29 15:33:46,303 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }

5)、案例5:JSONHandler

a)創建agent配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/post_json.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 8888

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)啟動flume agent a1

mbp:apache-flume-1.7.0-bin$ bin/flume-ng agent -n a1 -c conf -f conf/post_json.conf -Dflume.root.logger=INFO,console

c)生成JSON 格式的POST request

curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888

d)在mbp的控制臺,可以看到以下信息:

2017-03-29 15:37:30,565 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] jetty-6.1.26

2017-03-29 15:37:30,713 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@0.0.0.0:8888

2017-03-29 15:37:30,713 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.

2017-03-29 15:37:30,713 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started

2017-03-29 15:38:00,451 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{a=a1, b=b1} body: 69 64 6F 61 6C 6C 2E 6F 72 67 5F 62 6F 64 79? ? idoall.org_body }?

總結Exec source:Exec source和Spooling Directory Source是兩種常用的日志采集的方式,其中Exec source可以實現對日志的實時采集,Spooling Directory Source在對日志的實時采集上稍有欠缺,盡管Exec source可以實現對日志的實時采集,但是當Flume不運行或者指令執行出錯時,Exec source將無法收集到日志數據,日志會出現丟失,從而無法保證收集日志的完整性。

6)、案例6:Avro Source

監聽一個指定的Avro 端口,通過Avro 端口可以獲取到Avro client發送過來的文件 。即只要應用程序通過Avro 端口發送文件,source組件就可以獲取到該文件中的內容。 其中 Sink:hdfs Channel:file?

(注:Avro和Thrift都是一些序列化的網絡端口–通過這些網絡端口可以接受或者發送信息,Avro可以發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制)?

Avro Source運行原理如下圖:

flume配置文件的書寫是相當靈活的—-不同類型的Source、Channel和Sink可以自由組合!

最后對上面用的幾個flume source進行適當總結:?

① NetCat Source:監聽一個指定的網絡端口,即只要應用程序向這個端口里面寫數據,這個source組件?

就可以獲取到信息。?

②Spooling Directory Source:監聽一個指定的目錄,即只要應用程序向這個指定的目錄中添加新的文?

件,source組件就可以獲取到該信息,并解析該文件的內容,然后寫入到channle。寫入完成后,標記?

該文件已完成或者刪除該文件。?

③Exec Source:監聽一個指定的命令,獲取一條命令的結果作為它的數據源?

常用的是tail -F file指令,即只要應用程序向日志(文件)里面寫數據,source組件就可以獲取到日志(文件)中最新的內容 。?

④Avro Source:監聽一個指定的Avro 端口,通過Avro 端口可以獲取到Avro client發送過來的文件 。即只要應用程序通過Avro 端口發送文件,source組件就可以獲取到該文件中的內容。

7)、案例7:Hadoop sink

a)創建agent配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/hdfs_sink.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://localhost:8020/user/flume/syslogtcp

a1.sinks.k1.hdfs.filePrefix = Syslog

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)啟動flume agent a1

mbp:apache-flume-1.7.0-bin$ bin/flume-ng agent -n a1 -c conf -f conf/hdfs_sink.conf -Dflume.root.logger=INFO,console

c)測試產生syslog

mbp:apache-flume-1.7.0-bin$ echo "hello idoall flume -> hadoop testing one" | nc localhost 5140

d)在mbp的控制臺,可以看到以下信息:

2017-03-29 19:10:14,820 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false

2017-03-29 19:10:14,834 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)] Creating hdfs://localhost:8020/user/flume/syslogtcp/Syslog.1490785814821.tmp

2017-03-29 19:10:44,861 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)] Closing hdfs://localhost:8020/user/flume/syslogtcp/Syslog.1490785814821.tmp

2017-03-29 19:10:44,880 (hdfs-k1-call-runner-9) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming hdfs://localhost:8020/user/flume/syslogtcp/Syslog.1490785814821.tmp to hdfs://localhost:8020/user/flume/syslogtcp/Syslog.1490785814821

2017-03-29 19:10:44,884 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

e)在mbp上再打開一個窗口,去hadoop上檢查文件是否生成

mbp:hadoop-2.7.3$ bin/hadoop fs -ls /user/flume/syslogtcp

Found 1 items

-rw-r--r-- ? 3 liudebin supergroup? ? ? ? 175 2017-03-29 19:10 /user/flume/syslogtcp/Syslog.1490785779051

mbp:hadoop-2.7.3$ bin/hadoop fs -cat /user/flume/syslogtcp/Syslog.1490785779051

SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?L??????? ??[???(hello idoall flume -> hadoop testing one?????L??????? ??

?8)、案例8:File Roll Sink

a)創建agent配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/file_roll.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5555

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type = file_roll

a1.sinks.k1.sink.directory = /opt/apache-flume-1.7.0-bin/logs

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

b)啟動flume agent a1

mbp:apache-flume-1.7.0-bin$ bin/flume-ng agent -n a1 -c conf -f conf/file_roll.conf -Dflume.root.logger=INFO,console

c)測試產生log

mbp:apache-flume-1.7.0-bin$ echo "hello idoall.org syslog" | nc localhost 5555

mbp:apache-flume-1.7.0-bin$ echo "hello idoall.org syslog 2" | nc localhost 5555

d)查看/home/hadoop/flume-1.5.0-bin/logs下是否生成文件,默認每30秒生成一個新文件

mbp:apache-flume-1.7.0-bin$ ls -l logs/

total 24

-rw-r--r--? 1 liudebin? wheel? ? 50? 3 30 13:06 1490850370723-1

-rw-r--r--? 1 liudebin? wheel ? ? 0? 3 30 13:06 1490850370723-2

-rw-r--r--? 1 liudebin? wheel? 6429? 3 29 14:06 flume.log.COMPLETED

mbp:apache-flume-1.7.0-bin$ cat logs/1490850370723-1 logs/1490850370723-2

hello idoall.org syslog

hello idoall.org syslog 2

9)、案例9:Replicating Channel Selector

Flume支持Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是復制和復用。在復制的情況下,流的事件被發送到所有的配置通道。在復用的情況下,事件被發送到可用的渠道中的一個子集。Fan out流需要指定源和Fan out通道的規則。

這次我們需要用到mbp1,mbp2兩臺機器

a)在mbp1創建replicating_Channel_Selector.conf配置文件

mbp1:apache-flume-1.7.0-bin$ vi conf/replicating_Channel_Selector.conf

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.type = replicating

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = mbp1

a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = mbp2

a1.sinks.k2.port = 5555

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

b)在mbp1創建replicating_Channel_Selector_avro.conf配置文件

mbp1:apache-flume-1.7.0-bin$ vi conf/replicating_Channel_Selector_avro.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5555

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

c)在mbp1上將2個配置文件復制到mbp2上一份

mbp1:apache-flume-1.7.0-bin$ scp -r conf/replicating_Channel_Selector.conf vagrant@mbp2:/opt/apache-flume-1.7.0/conf/

mbp1:apache-flume-1.7.0-bin$ scp -r conf/replicating_Channel_Selector_avro.conf vagrant@mbp2:/opt/apache-flume-1.7.0/conf/

d)打開4個窗口,在mbp1和mbp2上同時啟動兩個flume agent

mbp1:apache-flume-1.7.0-bin$ bin/flume-ng agent -c . -f conf/replicating_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console

mbp1:apache-flume-1.7.0-bin$ bin/flume-ng agent -c . -f conf/replicating_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console

e)然后在mbp1或mbp2的任意一臺機器上,測試產生syslog

mbp1:apache-flume-1.7.0-bin$ echo "hello idoall.org syslog" | nc localhost 5140

f)在mbp1和mbp2的sink窗口,分別可以看到以下信息,這說明信息得到了同步:

17/04/05 14:08:18 INFO ipc.NettyServer: Connection to /192.168.1.51:46844 disconnected.

17/04/05 14:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f, /192.168.1.50:35873 => /192.168.1.50:5555] OPEN

17/04/05 14:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f, /192.168.1.50:35873 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555

17/04/05 14:08:52 INFO ipc.NettyServer: [id: 0x90f8fe1f, /192.168.1.50:35873 => /192.168.1.50:5555] CONNECTED: /192.168.1.50:35873

17/04/05 14:08:59 INFO ipc.NettyServer: [id: 0xd6318635, /192.168.1.51:46858 => /192.168.1.50:5555] OPEN

17/04/05 14:08:59 INFO ipc.NettyServer: [id: 0xd6318635, /192.168.1.51:46858 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555

17/04/05 14:08:59 INFO ipc.NettyServer: [id: 0xd6318635, /192.168.1.51:46858 => /192.168.1.50:5555] CONNECTED: /192.168.1.51:46858

17/04/05 14:09:20 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }

10)、案例10:Multiplexing Channel Selector

a)在mbp1創建Multiplexing_Channel_Selector配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/Multiplexing_Channel_Selector.conf

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
#映射允許每個值通道可以重疊。默認值可以包含任意數量的通道。
a1.sources.r1.selector.mapping.baidu = c1
a1.sources.r1.selector.mapping.ali = c2
a1.sources.r1.selector.default = c1
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = mbp1
a1.sinks.k1.port = 5555
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = mbp2
a1.sinks.k2.port = 5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

b)在mbp1創建Multiplexing_Channel_Selector_avro配置文件

mbp:apache-flume-1.7.0-bin$ vi conf/Multiplexing_Channel_Selector_avro.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

c)將2個配置文件復制到mbp2上一份

mbp1:apache-flume-1.7.0-bin$scp -r conf/Multiplexing_Channel_Selector.conf??root@mbp2:/opt/apache-flume-1.7.0-bin/conf/Multiplexing_Channel_Selector.conf

mbp1:apache-flume-1.7.0-bin$scp -r conf/Multiplexing_Channel_Selector_avro.conf?root@mbp2:/opt/apache-flume-1.7.0-bin/conf/Multiplexing_Channel_Selector_avro.conf

d)打開4個窗口,在mbp1和mbp2上同時啟動兩個flume agent

mbp1:apache-flume-1.7.0-bin$bin/flume-ng agent -c . -f conf/Multiplexing_Channel_Selector_avro.conf -n a1 -Dflume.root.logger=INFO,console

mbp1:apache-flume-1.7.0-bin$bin/flume-ng agent -c . -f conf/Multiplexing_Channel_Selector.conf -n a1 -Dflume.root.logger=INFO,console

e)然后在mbp1或mbp2的任意一臺機器上,測試產生syslog

mbp1:apache-flume-1.7.0-bin$curl -X POST -d '[{ "headers" :{"type" : "baidu"},"body" : "idoall_TEST1"}]'?http://localhost:5140?&& curl -X POST -d '[{ "headers" :{"type" : "ali"},"body" : "idoall_TEST2"}]'?http://localhost:5140?&& curl -X POST -d '[{ "headers" :{"type" : "qq"},"body" : "idoall_TEST3"}]'?http://localhost:5140

f)在mbp1的sink窗口,可以看到以下信息:

17/04/05 14:32:21 INFO node.Application: Starting Sink k1
17/04/05 14:32:21 INFO node.Application: Starting Source r1
17/04/05 14:32:21 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
17/04/05 14:32:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
17/04/05 14:32:21 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
17/04/05 14:32:21 INFO source.AvroSource: Avro source r1 started.
17/04/05 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] OPEN
17/04/05 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555
17/04/05 14:32:36 INFO ipc.NettyServer: [id: 0xcf00eea6, /192.168.1.50:35916 => /192.168.1.50:5555] CONNECTED: /192.168.1.50:35916
17/04/05 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] OPEN
17/04/05 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555
17/04/05 14:32:44 INFO ipc.NettyServer: [id: 0x432f5468, /192.168.1.51:46945 => /192.168.1.50:5555] CONNECTED: /192.168.1.51:46945
17/04/05 14:34:11 INFO sink.LoggerSink: Event: { headers:{type=baidu} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 31 idoall_TEST1 }
17/04/05 14:34:57 INFO sink.LoggerSink: Event: { headers:{type=qq} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 33

g)在mbp2的sink窗口,可以看到以下信息:

17/04/05 14:32:27 INFO node.Application: Starting Sink k1
17/04/05 14:32:27 INFO node.Application: Starting Source r1
17/04/05 14:32:27 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
17/04/05 14:32:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
17/04/05 14:32:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
17/04/05 14:32:27 INFO source.AvroSource: Avro source r1 started.
17/04/05 14:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec, /192.168.1.50:38104 => /192.168.1.51:5555] OPEN
17/04/05 14:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec, /192.168.1.50:38104 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555
17/04/05 14:32:36 INFO ipc.NettyServer: [id: 0x7c2f0aec, /192.168.1.50:38104 => /192.168.1.51:5555] CONNECTED: /192.168.1.50:38104
17/04/05 14:32:44 INFO ipc.NettyServer: [id: 0x3d36f553, /192.168.1.51:48599 => /192.168.1.51:5555] OPEN
17/04/05 14:32:44 INFO ipc.NettyServer: [id: 0x3d36f553, /192.168.1.51:48599 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555
17/04/05 14:32:44 INFO ipc.NettyServer: [id: 0x3d36f553, /192.168.1.51:48599 => /192.168.1.51:5555] CONNECTED: /192.168.1.51:48599
17/04/05 14:34:33 INFO sink.LoggerSink: Event: { headers:{type=ali} body: 69 64 6F 61 6C 6C 5F 54 45 53 54 32 idoall_TEST2 }

可以看到,根據header中不同的條件分布到不同的channel上

11)、案例11:Flume Sink Processors

failover的機器是一直發送給其中一個sink,當這個sink不可用的時候,自動發送到下一個sink。

a)在m1創建Flume_Sink_Processors配置文件

  • root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf
  • a1.sources = r1
  • a1.sinks = k1 k2
  • a1.channels = c1 c2
  • #這個是配置failover的關鍵,需要有一個sink group
  • a1.sinkgroups = g1
  • a1.sinkgroups.g1.sinks = k1 k2
  • #處理的類型是failover
  • a1.sinkgroups.g1.processor.type = failover
  • #優先級,數字越大優先級越高,每個sink的優先級必須不相同
  • a1.sinkgroups.g1.processor.priority.k1 = 5
  • a1.sinkgroups.g1.processor.priority.k2 = 10
  • #設置為10秒,當然可以根據你的實際狀況更改成更快或者很慢
  • a1.sinkgroups.g1.processor.maxpenalty = 10000
  • # Describe/configure the source
  • a1.sources.r1.type = syslogtcp
  • a1.sources.r1.port = 5140
  • a1.sources.r1.channels = c1 c2
  • a1.sources.r1.selector.type = replicating
  • # Describe the sink
  • a1.sinks.k1.type = avro
  • a1.sinks.k1.channel = c1
  • a1.sinks.k1.hostname = m1
  • a1.sinks.k1.port = 5555
  • a1.sinks.k2.type = avro
  • a1.sinks.k2.channel = c2
  • a1.sinks.k2.hostname = m2
  • a1.sinks.k2.port = 5555
  • # Use a channel which buffers events in memory
  • a1.channels.c1.type = memory
  • a1.channels.c1.capacity = 1000
  • a1.channels.c1.transactionCapacity = 100
  • a1.channels.c2.type = memory
  • a1.channels.c2.capacity = 1000
  • a1.channels.c2.transactionCapacity = 100
  • b)在m1創建Flume_Sink_Processors_avro配置文件

  • root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf
  • a1.sources = r1
  • a1.sinks = k1
  • a1.channels = c1
  • # Describe/configure the source
  • a1.sources.r1.type = avro
  • a1.sources.r1.channels = c1
  • a1.sources.r1.bind = 0.0.0.0
  • a1.sources.r1.port = 5555
  • # Describe the sink
  • a1.sinks.k1.type = logger
  • # Use a channel which buffers events in memory
  • a1.channels.c1.type = memory
  • a1.channels.c1.capacity = 1000
  • a1.channels.c1.transactionCapacity = 100
  • # Bind the source and sink to the channel
  • a1.sources.r1.channels = c1
  • a1.sinks.k1.channel = c1
  • c)將2個配置文件復制到m2上一份

  • root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf??root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf
  • root@m1:/home/hadoop/flume-1.5.0-bin# scp -r /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf?root@m2:/home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf
  • d)打開4個窗口,在m1和m2上同時啟動兩個flume agent

  • root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
  • root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
  • e)然后在m1或m2的任意一臺機器上,測試產生log

  • root@m1:/home/hadoop# echo "idoall.org test1 failover" | nc localhost 5140
  • f)因為m2的優先級高,所以在m2的sink窗口,可以看到以下信息,而m1沒有:

  • 14/08/10 15:02:46 INFO ipc.NettyServer: Connection to /192.168.1.51:48692 disconnected.
  • 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0x09a14036, /192.168.1.51:48704 => /192.168.1.51:5555] OPEN
  • 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0x09a14036, /192.168.1.51:48704 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555
  • 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0x09a14036, /192.168.1.51:48704 => /192.168.1.51:5555] CONNECTED: /192.168.1.51:48704
  • 14/08/10 15:03:26 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
  • g)這時我們停止掉m2機器上的sink(ctrl+c),再次輸出測試數據:

  • root@m1:/home/hadoop# echo "idoall.org test2 failover" | nc localhost 5140
  • h)可以在m1的sink窗口,看到讀取到了剛才發送的兩條測試數據:

  • 14/08/10 15:02:46 INFO ipc.NettyServer: Connection to /192.168.1.51:47036 disconnected.
  • 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0xbcf79851, /192.168.1.51:47048 => /192.168.1.50:5555] OPEN
  • 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0xbcf79851, /192.168.1.51:47048 => /192.168.1.50:5555] BOUND: /192.168.1.50:5555
  • 14/08/10 15:03:12 INFO ipc.NettyServer: [id: 0xbcf79851, /192.168.1.51:47048 => /192.168.1.50:5555] CONNECTED: /192.168.1.51:47048
  • 14/08/10 15:07:56 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
  • 14/08/10 15:07:56 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
  • i)我們再在m2的sink窗口中,啟動sink:

  • root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
  • j)輸入兩批測試數據:

  • root@m1:/home/hadoop# echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140
  • k)在m2的sink窗口,我們可以看到以下信息,因為優先級的關系,log消息會再次落到m2上:

  • 14/08/10 15:09:47 INFO node.Application: Starting Sink k1
  • 14/08/10 15:09:47 INFO node.Application: Starting Source r1
  • 14/08/10 15:09:47 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5555 }...
  • 14/08/10 15:09:47 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
  • 14/08/10 15:09:47 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
  • 14/08/10 15:09:47 INFO source.AvroSource: Avro source r1 started.
  • 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] OPEN
  • 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555
  • 14/08/10 15:09:54 INFO ipc.NettyServer: [id: 0x96615732, /192.168.1.51:48741 => /192.168.1.51:5555] CONNECTED: /192.168.1.51:48741
  • 14/08/10 15:09:57 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
  • 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] OPEN
  • 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] BOUND: /192.168.1.51:5555
  • 14/08/10 15:10:43 INFO ipc.NettyServer: [id: 0x12621f9a, /192.168.1.50:38166 => /192.168.1.51:5555] CONNECTED: /192.168.1.50:38166
  • 14/08/10 15:10:43 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }
  • 14/08/10 15:10:43 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }
  • ?

    12)、案例12:Load balancing Sink Processor

    load balance type和failover不同的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種情況下如果被選擇的sink不可用,就會自動嘗試發送到下一個可用的sink上面。

    a)在m1創建Load_balancing_Sink_Processors配置文件

    mbp:apache-flume-1.7.0-bin$ vi conf/load_balancing_sink_processors.conf

    a1.sources = r1

    a1.sinks = k1 k2

    a1.channels = c1

    #這個是配置Load balancing的關鍵,需要有一個sink group

    a1.sinkgroups = g1

    a1.sinkgroups.g1.sinks = k1 k2

    a1.sinkgroups.g1.processor.type = load_balance

    a1.sinkgroups.g1.processor.backoff = true

    a1.sinkgroups.g1.processor.selector = round_robin

    # Describe/configure the source

    a1.sources.r1.type = syslogtcp

    a1.sources.r1.port = 5140

    a1.sources.r1.channels = c1

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.channel = c1

    a1.sinks.k1.hostname = m1

    a1.sinks.k1.port = 5555

    a1.sinks.k2.type = avro

    a1.sinks.k2.channel = c1

    a1.sinks.k2.hostname = m2

    a1.sinks.k2.port = 5555

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    b)在m1創建Load_balancing_Sink_Processors_avro配置文件

    mbp:apache-flume-1.7.0-bin$ vi conf/load_balancing_sink_processors_avro.conf

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = avro

    a1.sources.r1.channels = c1

    a1.sources.r1.bind = 0.0.0.0

    a1.sources.r1.port = 5555

    # Describe the sink

    sinks.k1.type = logger

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    c)將2個配置文件復制到mbp2上一份

    root@mbp1:/opt/apache-flume-1.7.0-bin# scp -r conf/Load_balancing_Sink_Processors.conf??root@mbp2:/opt/apache-flume-1.7.0-bin/conf/Load_balancing_Sink_Processors.conf

    root@mbp1:/opt/apache-flume-1.7.0-bin# scp -r conf/Load_balancing_Sink_Processors_avro.conf?root@mbp2:/opt/apache-flume-1.7.0-bin/conf/Load_balancing_Sink_Processors_avro.conf

    d)打開4個窗口,在mbp1和mbp2上同時啟動兩個flume agent

    root@mbp1:/opt# /opt/apache-flume-1.7.0-bin/bin/flume-ng agent -c . -f conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

    root@mbp1:/opt# /opt/apache-flume-1.7.0-bin/bin/flume-ng agent -c . -f conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console

    e)然后在mbp1或mbp2的任意一臺機器上,測試產生log,一行一行輸入,輸入太快,容易落到一臺機器上

    root@mbp1:/opt# echo "idoall.org test1" | nc localhost 5140

    root@mbp1:/opt# echo "idoall.org test2" | nc localhost 5140

    root@mbp1:/opt# echo "idoall.org test3" | nc localhost 5140

    root@mbp1:/opt# echo "idoall.org test4" | nc localhost 5140

    f)在mbp1的sink窗口,可以看到以下信息:

    17/04/05 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }

    17/04/05 15:35:33 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }

    g)在mbp2的sink窗口,可以看到以下信息:

    17/04/05 15:35:27 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
    17/08/05 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }

    說明輪詢模式起到了作用。

    13)、案例13:Hbase sink

    a)在測試之前,請先參考《ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1分布式環境部署》將hbase啟動

    b)然后將以下文件復制到flume中:

    cp hbase-1.3.0-bin/lib/protobuf-java-2.5.0.jar apache-flume-1.7.0-bin/lib/

    cp hbase-1.3.0-bin/lib/habase-client-1.3.0.jar apache-flume-1.7.0-bin/lib/

    cp hbase-1.3.0-bin/lib/hbase-common-1.3.0.jar apache-flume-1.7.0-bin/lib/

    cp hbase-1.3.0-bin/lib/hbase-protocol-1.3.0.jar apache-flume-1.7.0-bin/lib/

    cp hbase-1.3.0-bin/lib/hbase-server-1.3.0.jar apache-flume-1.7.0-bin/lib/

    cp hbase-1.3.0-bin/lib/hbase-hadoop2-compat-1.3.0.jar apache-flume-1.7.0-bin/lib/

    cp hbase-1.3.0-bin/lib/hbase-hadoop-compat-1.3.0.jar apache-flume-1.7.0-bin/lib/

    cp hbase-1.3.0-bin/lib/htrace-core-3.1.0-incubating.jar apache-flume-1.7.0-bin/lib/

    c)確保test_idoall_org表在hbase中已經存在

    d)在m1創建hbase_simple配置文件

    mbp:apache-flume-1.7.0-bin$ vi conf/hbase_simple.conf

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # Describe/configure the source

    a1.sources.r1.type = syslogtcp

    a1.sources.r1.port = 5140

    a1.sources.r1.host = localhost

    a1.sources.r1.channels = c1

    # Describe the sink

    a1.sinks.k1.type = logger

    a1.sinks.k1.type = hbase

    a1.sinks.k1.table = test_idoall_org

    a1.sinks.k1.columnFamily = name

    a1.sinks.k1.column = idoall

    a1.sinks.k1.serializer =??org.apache.flume.sink.hbase.RegexHbaseEventSerializer

    a1.sinks.k1.channel = memoryChannel

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    e)啟動flume agent

    mbp:apache-flume-1.7.0-bin$ bin/flume-ng agent -n a1 -c conf -f conf/hbase_simple.conf -Dflume.root.logger=INFO,console

    f)測試產生syslog

    mbp:apache-flume-1.7.0-bin$ echo "hello idoall.org from flume" | nc localhost 5140

    g)這時登錄到hbase中,可以發現新數據已經插入

    mbp@opt# hbase-1.3.0/bin/hbase shell
    2017-04-05 16:09:48,984 INFO [main] Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
    HBase Shell; enter 'help<RETURN>' for list of supported commands.
    Type "exit<RETURN>" to leave the HBase Shell
    Version 0.96.2-hadoop2, r1581096, Mon Mar 24 16:03:18 PDT 2017
    hbase(main):001:0> list
    TABLE
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/opt/hbase-1.3.0/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/hadoop-2.8.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    hbase2hive_idoall
    hive2hbase_idoall
    test_idoall_org
    3 row(s) in 2.6880 seconds
    => ["hbase2hive_idoall", "hive2hbase_idoall", "test_idoall_org"]
    hbase(main):002:0> scan "test_idoall_org"
    ROW COLUMN+CELL
    10086 column=name:idoall, timestamp=1406424831473, value=idoallvalue
    1 row(s) in 0.0550 seconds
    hbase(main):003:0> scan "test_idoall_org"
    ROW COLUMN+CELL
    10086 column=name:idoall, timestamp=1406424831473, value=idoallvalue
    1407658495588-XbQCOZrKK8-0 column=name:payload, timestamp=1407658498203, value=hello idoall.org from flume
    2 row(s) in 0.0200 seconds
    hbase(main):004:0> quit

    經過這么多flume的例子測試,如果你全部做完后,會發現flume的功能真的很強大,可以進行各種搭配來完成你想要的工作,俗話說師傅領進門,修行在個人,如何能夠結合你的產品業務,將flume更好的應用起來,快去動手實踐吧。

    ?

    ?

    ?

    ?

    ?

    轉載于:https://www.cnblogs.com/netbloomy/p/6666683.html

    總結

    以上是生活随笔為你收集整理的2、Flume1.7.0入门:安装、部署、及flume的案例的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    一区二区三区动漫 | 天天色综合久久 | 黄色美女免费网站 | 国产精品嫩草55av | 久久亚洲电影 | 91精品国自产在线观看 | 色网免费观看 | 国产成人精品一区二区 | www色,com| 丝袜足交在线 | 在线你懂 | www.夜夜爱 | 中文视频在线播放 | 视频直播国产精品 | 免费在线一区二区 | 蜜桃传媒一区二区 | 国产又粗又猛又黄又爽的视频 | 五月婷婷,六月丁香 | 综合视频在线 | 18久久久久久 | 在线观看涩涩 | 免费精品久久久 | 久久电影网站中文字幕 | 久久综合操 | 国产高清中文字幕 | 久久一区二区免费视频 | 99 视频 高清 | 99视频播放 | 亚洲天堂色婷婷 | 国产区 在线 | 成人在线一区二区 | 人人狠狠综合久久亚洲 | 91久久一区二区 | 天天干天天射天天爽 | 国产精品久久久久永久免费观看 | 国产精品一区二区美女视频免费看 | 在线观看免费黄色 | 日韩精品一区二区免费 | 黄色片免费在线 | 成年人在线看视频 | 亚洲更新最快 | 欧美精品在线观看一区 | 国产中文字幕免费 | 黄网站色成年免费观看 | 日韩精品久久中文字幕 | 美女视频是黄的免费观看 | av天天色 | 日韩欧美在线免费观看 | 日韩一区二区免费视频 | 成人av免费在线 | 国产裸体bbb视频 | 久草在线视频在线观看 | 在线观看亚洲 | 国产精品久久久久9999吃药 | 91视频啊啊啊| 91麻豆精品国产 | 成人国产精品 | 91在线影视 | 亚洲精品乱码久久久久久高潮 | 国产三级在线播放 | 97精品视频在线播放 | 亚洲精品在线观 | a天堂一码二码专区 | 欧美日比视频 | 亚洲国产精品久久久久婷婷884 | 久久国产精品一区二区三区 | 国产精品免费一区二区三区 | 2017狠狠干| 日韩在线第一 | 人人爱人人爽 | 四虎伊人| 手机在线视频福利 | 欧美乱大交 | 国产精品久久久久久久久久 | 天天天操操操 | 福利视频导航网址 | 成人91免费视频 | 日韩中文字幕在线不卡 | 日日干天天插 | 国产一区二区精品久久 | 九九综合在线 | 深爱激情综合 | 五月婷婷开心 | 久久成人在线 | 日韩影视大全 | 网站免费黄 | 免费看成人av | 久久y| 欧美日韩久久久 | 国产看片 色 | 一区二区三区三区在线 | 四川bbb搡bbb爽爽视频 | 中文字幕免费观看全部电影 | 国产成人黄色片 | 天天操夜夜拍 | 中文字幕在线视频免费播放 | 最新av在线播放 | 黄色的片子 | 黄色a级片在线观看 | 国产 中文 日韩 欧美 | 日韩精品在线一区 | 在线 成人 | 成人黄视频| 91视频在线播放视频 | 久久黄色免费视频 | 日韩乱理 | 夜夜操网站 | 国产成人久久精品亚洲 | 免费久久久 | 四虎8848免费高清在线观看 | 日韩欧美一区二区三区视频 | 天天操天天舔天天干 | 国产精品正在播放 | 九九九在线 | 成年人在线观看 | 久久精品二区 | 国产精品日韩久久久久 | 黄色片网站大全 | 日韩有码在线观看视频 | 婷婷亚洲五月 | 国产激情免费 | 中文成人字幕 | jizz18欧美18 | 色综合久久五月天 | 色激情五月 | 香蕉视频网址 | 精品一区二区三区电影 | 日韩 在线观看 | 五月婷婷综合在线视频 | 黄色av三级在线 | 91免费看黄 | 808电影免费观看三年 | 久久天天躁夜夜躁狠狠85麻豆 | 亚洲免费在线观看视频 | 日韩中文字幕国产精品 | 91精品伦理 | av 在线观看 | 成人在线观看av | 成人av高清在线观看 | 久久乱码卡一卡2卡三卡四 五月婷婷久 | 欧美日韩调教 | 免费a现在观看 | 国产精品va | 午夜视频在线观看网站 | av+在线播放在线播放 | 人人射人人爽 | 黄色a视频免费 | 91九色蝌蚪国产 | 久久精品99久久 | 2019精品手机国产品在线 | 婷婷国产v亚洲v欧美久久 | 亚洲欧美视频网站 | 毛片3| 久久久精品国产一区二区电影四季 | 一区二区视频在线观看免费 | 福利视频入口 | 手机看片中文字幕 | 午夜久久网站 | 超碰国产在线播放 | 亚洲最新av | 亚洲精品乱码久久久久久蜜桃欧美 | 一本一本久久a久久精品牛牛影视 | 久久久免费观看 | 18pao国产成视频永久免费 | 国产 欧美 日韩 | 免费av试看 | 激情黄色av | 久青草视频在线观看 | 久久乱码卡一卡2卡三卡四 五月婷婷久 | 久久国产精品视频观看 | 国产日韩精品在线观看 | 亚洲黄a | 99精品久久99久久久久 | 久久人人爽人人爽人人片av免费 | 国产精品伦一区二区三区视频 | 亚洲最大激情中文字幕 | 在线观看国产高清视频 | 久久8 | 国产精品久久在线观看 | 色婷婷成人网 | 西西www4444大胆在线 | 国产精品原创av片国产免费 | 久久久久成人精品免费播放动漫 | 91毛片在线观看 | 精品v亚洲v欧美v高清v | www.久久免费视频 | 亚洲欧美成人 | 欧美日韩中文字幕在线视频 | 草免费视频 | 欧美精品久久久久久久 | 亚洲精品久久激情国产片 | 国产精品久久久久久久久久白浆 | 黄色网址在线播放 | 国产精品精品国产婷婷这里av | 国内成人综合 | 成人av高清| 亚洲色视频 | 国产尤物在线 | 国产剧情一区二区在线观看 | 久草9视频 | 中文字幕av在线 | 日韩欧美高清一区二区三区 | 中文av日韩 | 午夜精品电影 | 色多多视频在线 | 国产精品黑丝在线观看 | 亚洲国产精品视频在线观看 | 99视频国产精品免费观看 | 国产亚洲精品v | 不卡的av电影 | 国产成人av一区二区三区在线观看 | 2021国产视频 | 337p日本欧洲亚洲大胆裸体艺术 | 麻豆传媒视频在线播放 | 国产在线色视频 | 少妇资源站| 天天操夜夜看 | 久久夜夜操| 免费成人在线电影 | 亚洲精品国产综合99久久夜夜嗨 | 中文字幕在线播放一区 | 激情视频久久 | 91av视频观看 | 成人丁香花 | 中文高清av | 日韩中文字| 日韩精品专区 | 91福利区一区二区三区 | 亚洲永久精品国产 | 999国产 | 欧美日韩不卡在线视频 | 国产精在线 | 最近中文字幕免费 | 91久久人澡人人添人人爽欧美 | 99人久久精品视频最新地址 | 在线中文视频 | 亚洲九九| h网站免费在线观看 | 中文字幕在| 成人a v视频 | 最新国产中文字幕 | 在线免费观看黄色av | 国产69精品久久久久久 | 97福利社| 中文资源在线官网 | 国产人成看黄久久久久久久久 | 成人av免费播放 | 婷婷精品视频 | 曰本三级在线 | 一区二区三区日韩视频在线观看 | 久久免费看 | 在线观看黄色免费视频 | 国产精品白丝jk白祙 | mm1313亚洲精品国产 | 日韩在线观看视频在线 | 伊人久久精品久久亚洲一区 | 国产在线久草 | 草久在线播放 | 中文字幕日韩高清 | 精品久久九九 | 在线观看av小说 | 中文字幕av在线不卡 | 99久久久久久久 | 色姑娘综合网 | 中文字幕国产一区 | 国产精品系列在线观看 | 99热在线国产 | 夜夜躁天天躁很躁波 | 超碰在线亚洲 | 99久免费精品视频在线观看 | 99国产精品视频免费观看一公开 | 国产一在线精品一区在线观看 | 亚洲久草在线 | 欧美在线一 | 欧美日韩国产在线精品 | 久久久久久国产精品 | 中文字幕乱偷在线 | av线上看 | 处女av在线 | 亚洲性少妇性猛交wwww乱大交 | 国产亚洲精品久久久久秋 | a级国产乱理论片在线观看 特级毛片在线观看 | 国产精品手机视频 | 久久人人爽爽人人爽人人片av | 久久一二区 | 中文字幕在线看片 | 美国av片在线观看 | 久久久久免费精品国产 | 亚洲在线视频播放 | 右手影院亚洲欧美 | 国产在线91在线电影 | 九九电影在线 | 亚洲视频分类 | 福利一区视频 | 久久视频免费观看 | 黄色免费大片 | 狠狠的日日 | 日韩视频三区 | 久热精品国产 | 国产伦精品一区二区三区照片91 | 久久大视频 | 日本三级久久久 | av超碰免费在线 | 免费成人在线电影 | 中文字幕在线观看一区二区三区 | 午夜精品久久久久久久久久久久久久 | 夜夜夜影院 | 欧美福利久久 | 99国内精品久久久久久久 | 久久久免费视频播放 | 中午字幕在线 | 狠狠干干 | 亚洲精品美女久久久久 | 欧美另类美少妇69xxxx | 人人干干人人 | 91精选在线观看 | 啪啪激情网| 日韩成人一级大片 | 久久亚洲私人国产精品 | 区一区二区三在线观看 | 黄色综合 | 久久黄色影视 | 欧美精品在线一区二区 | 欧美激情精品久久久久久 | 中文字幕在线观看免费高清完整版 | 国产一区二区网址 | 亚洲国产精品推荐 | 精品久久久国产 | 99久久99久国产黄毛片 | 99免费在线视频观看 | 久久久久久久久久久电影 | 中文字幕 影院 | 伊甸园av在线 | 亚洲精品免费在线播放 | 国产在线第三页 | 天天色综合天天 | 天天干天天草天天爽 | 欧美极度另类 | av解说在线观看 | 国产精品普通话 | 欧美婷婷色 | 国产高清精 | 97精品国产aⅴ | 亚洲国产日韩精品 | 久久久69 | 中文字幕在线观看一区二区 | 2023av在线| 亚洲国产精品一区二区尤物区 | 成人黄色在线视频 | 最近中文字幕免费视频 | 黄色三级免费片 | 亚洲精品视频免费在线观看 | 色姑娘综合网 | 日韩激情第一页 | 看av在线 | 亚洲第一av在线播放 | 亚洲天堂网视频 | 国产群p视频 | 色综合久久久久 | 91免费视频国产 | 高清在线观看av | 在线看v片| 国产精品久久久av久久久 | 久草在线视频首页 | 久久精品综合 | 亚洲三级在线 | 中文字幕丝袜一区二区 | 97偷拍在线视频 | 色婷婷av一区二 | 国产传媒中文字幕 | 欧美精品久久久久 | 人人cao | 免费黄色网址大全 | 日本爽妇网| 激情av综合 | 日日干狠狠操 | 91久久久久久久一区二区 | 色婷婷激情网 | 狠狠色丁香九九婷婷综合五月 | 久久精品国产亚洲 | 在线观看www91 | 色婷婷97| 五月婷婷操| 97视频免费 | 狠狠ri | 免费不卡中文字幕视频 | 青春草免费视频 | 欧美成人久久 | av综合在线观看 | 月下香电影| 在线观看aa | 色偷偷人人澡久久超碰69 | 日韩免费av片 | 五月婷在线观看 | 婷婷六月综合亚洲 | 久久短视频 | 黄色av影院 | 亚洲激精日韩激精欧美精品 | 国产黄色免费观看 | 97成人精品视频在线观看 | 永久免费的av电影 | 9在线观看免费高清完整版 玖玖爱免费视频 | 九七在线视频 | 四虎成人精品永久免费av | 日韩免 | 日韩在线播放欧美字幕 | 日日麻批40分钟视频免费观看 | 激情五月婷婷综合网 | 欧洲亚洲女同hd | 国产69久久久 | 天天操天天干天天综合网 | 亚洲综合在线一区二区三区 | 久久神马影院 | 在线观看一区二区视频 | 99操视频 | 国产亚洲精品av | 久久99热这里只有精品国产 | 911av视频| 久久高清毛片 | 日韩欧美电影 | 午夜色性片 | 一区在线电影 | 久久艹欧美| 69亚洲精品| 亚洲一级在线观看 | 99色国产 | 成人av在线资源 | 四虎国产| 国产夫妻av在线 | 免费日韩高清 | 精品国产视频在线观看 | 国产亚洲久久 | 久久精品99国产精品亚洲最刺激 | 精品久久久久一区二区国产 | 99精品观看 | 久久男人影院 | 日韩精品中文字幕在线不卡尤物 | 狠狠干 狠狠操 | 怡红院成人在线 | www黄色软件 | 精品国产自在精品国产精野外直播 | 国产日韩欧美在线一区 | 91污在线观看 | 欧美激情综合色 | 欧美一级片免费在线观看 | 欧美另类69 | 91传媒激情理伦片 | 午夜视频免费播放 | 国产精品永久 | 一级片视频免费观看 | 97成人精品视频在线播放 | 黄色软件视频大全免费下载 | 日韩一二三区不卡 | 欧美专区国产专区 | 亚洲精品国偷拍自产在线观看蜜桃 | 亚洲天堂免费视频 | 激情综合五月天 | 日韩手机在线 | 国产无遮挡猛进猛出免费软件 | 国产成人福利 | 欧美日韩中文字幕视频 | 人人超碰人人 | 欧美三级高清 | 九九九在线观看 | 91精品国产三级a在线观看 | 日韩网站一区 | 欧美日韩国产一区二区三区 | 五月天网页| 日韩高清观看 | 2020天天干天天操 | 亚洲 欧美 91 | 久久久免费电影 | 日韩激情中文字幕 | 国产亚洲免费观看 | 日韩免费视频播放 | 色亚洲激情 | 国产精品久久久久久久久久久免费看 | 在线看片中文字幕 | 99国产一区二区三精品乱码 | 亚洲视频播放 | 国产精品9999久久久久仙踪林 | 中文字幕一二 | 亚洲精品视频在线 | 国产一级免费观看视频 | 视频一区二区视频 | 激情伊人五月天 | 久久婷婷亚洲 | 国产精品v欧美精品 | 91最新视频| 激情欧美一区二区三区免费看 | 亚洲一本视频 | 久草线 | 九九九九精品九九九九 | 精品久久精品 | 婷婷在线视频观看 | 综合久久网站 | 丁香六月天婷婷 | 婷婷五天天在线视频 | 一区二区三区在线观看中文字幕 | 国产精品视频免费 | 国产精品欧美激情在线观看 | 中文字幕久久久精品 | 免费观看成人 | 韩国av一区二区三区在线观看 | 五月天电影免费在线观看一区 | 四虎国产精品免费观看视频优播 | 日韩中文久久 | 亚洲一区二区91 | 欧美一区二视频在线免费观看 | 婷婷av电影 | 天天天天天天天天操 | 性色大片在线观看 | www.天天成人国产电影 | 97电影网站 | 日本中出在线观看 | 狠狠狠的干 | 香蕉视频网站在线观看 | 97国产超碰 | 午夜精品导航 | 午夜精品一区二区三区免费 | 91精品推荐 | 日韩电影中文字幕 | 久久久电影网站 | 久久试看 | 精品欧美一区二区精品久久 | 久草9视频 | 亚洲国产免费网站 | 黄色毛片视频免费 | 少妇搡bbb| 91桃色在线播放 | 日日夜夜天天 | 亚洲一区天堂 | 午夜国产一区二区三区四区 | 天堂av在线中文在线 | 西西大胆免费视频 | 色婷婷免费视频 | 91av在线免费观看 | 91视频久久久 | 黄色小说视频在线 | 91视频专区| 日韩av片无码一区二区不卡电影 | 中文字幕在线国产精品 | 成人免费看视频 | 日日操日日 | 婷婷丁香色 | 激情久久伊人 | 九色福利视频 | 99国产精品| 国产第一页精品 | 波多野结衣电影一区二区三区 | 久久黄色小说视频 | 久久av高清| 蜜臀av一区二区 | 欧美色综合天天久久综合精品 | 2020天天干夜夜爽 | 国产视频观看 | 一级成人免费视频 | 久久久久久久久久久免费av | 免费中文字幕视频 | 日韩啪啪小视频 | 久久久久一区 | 成人av一级片 | 综合黄色网 | 亚洲国产日韩欧美在线 | 在线观看你懂的网站 | 久草网在线观看 | 亚洲视频在线播放 | 久色网 | 一区二区三区日韩在线观看 | 亚洲成人av电影在线 | 久久99视频 | 98涩涩国产露脸精品国产网 | 97在线看 | 丁香久久激情 | 九九免费在线视频 | 最新一区二区三区 | www91在线观看 | 国产在线黄色 | 欧美精品国产精品 | 激情综合五月天 | 91在线超碰 | 中文理论片 | 在线亚洲播放 | 色网站视频 | 精品久久网 | 在线视频免费观看 | 国产一区91 | 最新成人在线 | 99国产视频 | 国产成人精品亚洲精品 | 久久国产剧场电影 | 国产精品精品视频 | 免费网站黄 | 亚洲国产日韩一区 | 亚洲涩涩网站 | 欧美日一级片 | 免费观看视频的网站 | 日韩午夜视频在线观看 | 一区二区不卡在线观看 | 日韩av黄| 久久99久久99精品免观看粉嫩 | 欧美极品xxx| 中文字幕在线观 | 射射射综合网 | 久草精品视频 | 日本免费一二三区 | 天天色天天干天天 | 探花视频免费观看高清视频 | av在线播放中文字幕 | 91黄色免费看 | 亚洲国产中文字幕在线 | 人人澡人人爽欧一区 | 欧美国产日韩一区 | 久久久久久高潮国产精品视 | 国产高清视频在线观看 | 精品国偷自产在线 | 欧美性久久久 | 久久天天躁狠狠躁亚洲综合公司 | 精品国产人成亚洲区 | 色欧美视频 | 欧美老人xxxx18 | 亚洲综合在 | 午夜国产一区二区三区四区 | 国产精品999久久久 久产久精国产品 | 在线观看日韩国产 | 黄色国产区 | 欧美精品在线视频 | 久碰视频在线观看 | 日韩理论视频 | 久久久久99999 | 狠狠狠的干 | 欧美在线一 | 久久国产成人午夜av影院潦草 | 成人欧美日韩国产 | 成人黄色大片网站 | 最近最新最好看中文视频 | 日日摸日日碰 | 日韩精品视频免费看 | 高清中文字幕 | 天天爱天天舔 | 最新日韩精品 | 一区二区高清在线 | 久久久久国产成人精品亚洲午夜 | 99婷婷狠狠成为人免费视频 | 91精品区| 操久在线 | 国产91精品看黄网站在线观看动漫 | 中国一 片免费观看 | 免费在线观看av网站 | 在线观看视频在线 | 五月婷婷另类国产 | www在线观看国产 | 国产精品永久免费观看 | 乱男乱女www7788 | 中文字幕在线观看2018 | 菠萝菠萝蜜在线播放 | 久久久久久蜜桃一区二区 | 中国一级片在线播放 | 五月婷婷六月丁香在线观看 | 精品国产精品久久一区免费式 | 天天躁日日 | 国产小视频在线观看 | 人人爽人人 | 久久黄色小说视频 | 免费在线观看91 | 精品国产成人av在线免 | 五月天堂网 | 中文字幕欧美激情 | 国产一二区免费视频 | 丁香午夜| 午夜精品久久久久99热app | 欧洲亚洲女同hd | 日日插日日干 | 国产一区二区在线免费 | 精品视频资源站 | 日韩精品一区电影 | 精品中文字幕视频 | 黄免费在线观看 | 成人a在线观看 | 亚洲专区 国产精品 | 91香蕉视频污在线 | www.干| 日韩精品一区二区三区免费视频观看 | 国产精品一区在线观看 | 亚洲精品国精品久久99热 | 久草在线资源免费 | 激情网站网址 | 在线视频一二区 | 在线观看一 | 欧美日韩视频网站 | 国产精久久久久久久 | 国产一级电影免费观看 | 色偷偷人人澡久久超碰69 | 色www精品视频在线观看 | 欧美日本三级 | 亚洲成人黄色网址 | 亚洲三级av | 久久国产免费视频 | 中文字幕欧美激情 | 国产日韩一区在线 | 国产精品一区二区视频 | 欧美精品v国产精品v日韩精品 | 人人澡人摸人人添学生av | 欧美日韩精品免费观看 | 午夜影院先 | 精品国产一区二区久久 | 黄色网在线免费观看 | 久久精品国产一区二区三区 | 免费人成在线观看 | 久久亚洲精品国产亚洲老地址 | 99国产一区 | 国产精品日韩 | 日韩精品一区二区免费 | 超碰激情在线 | 国产精品久久久久久久久久久久午夜 | 欧美一级电影 | 亚洲精品国产精品99久久 | 日韩在线播放视频 | 国产精品久久久久久久久久直播 | 天天操狠狠操夜夜操 | 中文字幕有码在线观看 | 亚洲高清视频在线播放 | 久久久三级视频 | www.午夜| 成人av影院在线观看 | 成人av亚洲 | 91免费在线播放 | 国产精品久久久久永久免费看 | 五月在线视频 | 国产精品大片在线观看 | 天天射综合网站 | 国产在线视频一区二区三区 | av福利免费 | 久久视频在线观看中文字幕 | 国产精品精品久久久久久 | 国产精品成人一区二区三区吃奶 | 中文一区二区三区在线观看 | 免费观看黄色av | 99精品在线直播 | 日韩理论片在线观看 | 日三级在线 | 亚洲蜜桃在线 | 99精品国产99久久久久久97 | 在线观看岛国av | 成人久久亚洲 | 天天鲁天天干天天射 | 国内精品在线看 | 亚洲精品在线观看不卡 | 黄色综合 | 天天操夜夜操 | 伊香蕉大综综综合久久啪 | 97视频在线观看成人 | 久久午夜色播影院免费高清 | 正在播放国产一区二区 | 夜夜躁狠狠燥 | 精品久久国产 | 日韩中文字幕电影 | 4438全国亚洲精品观看视频 | 香蕉在线观看视频 | 成人黄色在线视频 | 中文字幕888 | 美女av在线免费 | 麻豆免费视频网站 | 美女精品国产 | 午夜精品久久久久久久久久久 | 精品国产成人av在线免 | 91视频在线自拍 | 五月婷婷丁香六月 | 又爽又黄又刺激的视频 | 激情av在线资源 | 国产伦理剧| 久久热首页 | 黄色的视频网站 | 美腿丝袜一区二区三区 | 成人在线视频网 | 69国产盗摄一区二区三区五区 | 国产成人久久久77777 | 99久久精品午夜一区二区小说 | 综合久久2023| 亚洲五月综合 | 国产精品一区二区三区在线免费观看 | 色成人亚洲| 在线观看色视频 | 麻豆视频91 | 丰满少妇在线观看资源站 | 日韩久久在线 | 国产成人一区二区三区影院在线 | 视频二区在线视频 | 三级黄色欧美 | 中文字幕在线观看完整版 | 97碰在线| 亚洲少妇久久 | 国产999精品久久久影片官网 | 成人99免费视频 | 色婷婷狠狠五月综合天色拍 | 国产黄色片免费观看 | 美女久久久 | 国产99久久九九精品免费 | 99久久久久久 | 91av久久 | 五月天久久久久 | 色综合天天视频在线观看 | 91中文字幕视频 | 中文字幕 欧美性 | 又大又硬又黄又爽视频在线观看 | 99久久电影 | 日韩三区在线观看 | 在线观看国产一区二区 | 国产丝袜制服在线 | 狠狠做六月爱婷婷综合aⅴ 日本高清免费中文字幕 | 日韩中文在线字幕 | 超碰在线97免费 | 黄色av成人在线 | 天天躁日日躁狠狠躁av麻豆 | 国产五码一区 | 免费a视频在线 | 天天插天天狠天天透 | 天天爽天天爽天天爽 | 婷婷开心久久网 | 久久黄色影院 | 日日爽日日操 | 国产一区精品在线观看 | 在线免费观看一区二区三区 | 四虎影院在线观看av | 91午夜精品 | 国产精品视频不卡 | 久久久久激情视频 | 99久久精品一区二区成人 | 在线播放日韩av | 亚洲一区二区麻豆 | 亚洲国产中文在线 | 久久网页 | 久草com| 午夜91在线| 日韩午夜精品 | 人人搞人人爽 | 国产精品久久久久永久免费观看 | 久久久蜜桃一区二区 | 亚洲高清av在线 | 亚洲亚洲精品在线观看 | 97福利在线观看 | 国产精品第二十页 | 黄av免费在线观看 | 国内精品久久久久久久影视麻豆 | 国产精品18久久久久久久久久久久 | 亚洲精品免费看 | 麻豆va一区二区三区久久浪 | av不卡免费在线观看 | 91污污| 色综久久 | 91九色在线播放 | 在线观看免费91 | av资源中文字幕 | 午夜久久久久久久久久久 | 草在线 | 国产精品h在线观看 | 日b视频国产 | 欧美一区二区三区在线观看 | 91精品一区二区三区久久久久久 | 日韩免费高清在线观看 | 天天插天天射 | 日韩专区av| 超碰在线人人97 | 亚洲欧洲精品一区二区 | av.com在线| 日韩在线理论 | 91精品国产自产在线观看永久 | 国产成人a亚洲精品v | 狠狠操电影网 | 久久三级毛片 | 成人免费观看av | 国产精品美女999 | 精品视频亚洲 | 国产黄色一级片 | 中文字幕之中文字幕 | 国产中的精品av小宝探花 | 99久高清在线观看视频99精品热在线观看视频 | 色综合久久精品 | 黄色一二级片 | 中日韩欧美精彩视频 | 中文字幕丝袜制服 | 久久久久亚洲国产精品 | 国产在线国偷精品产拍 | 国产精品人人做人人爽人人添 | 亚洲欧美日韩一区二区三区在线观看 | 国产一区久久 | 日本最新一区二区三区 | 国产麻豆视频网站 | 国产精品99久久久久 | 国产女教师精品久久av | 狠狠狠色狠狠色综合 | 四虎成人精品永久免费av | 免费成人黄色 | 在线播放一区二区三区 | 日日射av| 91久久人澡人人添人人爽欧美 | 精品国产电影一区 | 久久免费精品国产 | 国产精品久久久久一区二区三区 | 成人福利av | 精品国产1区2区3区 国产欧美精品在线观看 | 天天曰夜夜爽 | 91香蕉视频 mp4 | 欧美国产日韩在线视频 | 国内外成人在线视频 | 欧美视频在线观看免费网址 | 一区二区三区四区五区在线 | 亚洲国产精品成人综合 | 中文字幕在线观看视频网站 | 欧美在线视频精品 | 91精品国产电影 | 日韩91av | 亚洲成年人免费网站 | 亚洲一级黄色 | 女人18毛片a级毛片一区二区 | 久久99精品国产99久久 | 欧美日韩国产一区二 | 色综合久久久久综合体桃花网 | 国产亚洲小视频 | 在线成人欧美 | 一区二区三区四区五区在线 | 在线免费精品视频 | 349k.cc看片app| 9999国产| 97免费公开视频 | 国产美女免费观看 | 在线亚洲欧美视频 | 国产成人综合在线观看 | 91麻豆精品一区二区三区 | 美女视频永久黄网站免费观看国产 | 午夜日b视频 | 国产精品丝袜久久久久久久不卡 | 日本公妇色中文字幕 | 日韩成人在线免费观看 | 日韩高清毛片 | 日韩网 | 国产欧美久久久精品影院 | 操操操影院 | 九九在线国产视频 | 久久视频精品在线 | 一区二区在线不卡 | 日韩精品在线一区 | 亚洲国产精品影院 | 91超碰免费在线 | av在线官网 | 欧美精品在线观看免费 | 五月婷婷在线视频观看 | 一区二区三区四区在线免费观看 | 久产久精国产品 | 中文字幕永久免费 | 欧美国产视频在线 | 国产偷在线 | 在线成人中文字幕 | 日本中文字幕电影在线免费观看 | 精品国产一区二区三区久久 | 久久精品久久99精品久久 | 精品视频 | 成人精品一区二区三区中文字幕 | 国产一卡二卡四卡国 | 久久天天躁夜夜躁狠狠躁2022 | 狠狠狠干 | 激情在线五月天 | 中文av网 | 日韩精品视频免费专区在线播放 | 色婷婷国产精品 | 香蕉久草在线 | 国产精品一区免费在线观看 | 天天操夜夜操国产精品 | 日韩伦理一区二区三区av在线 | 成人精品视频 | 3d黄动漫免费看 | 三日本三级少妇三级99 | 国产午夜精品免费一区二区三区视频 | 婷婷综合影院 | 天天·日日日干 | 99r国产精品| av三级av| 国产网红在线观看 | 久久精品999 | 亚洲国产成人久久综合 | 69av网| 日韩在线视频免费播放 | 欧美在线一级片 | 国产精品一区二区三区免费看 | 精品国产资源 | 色婷婷国产精品 | 天天色天天操综合网 | 福利网址在线观看 | 国内精品久久久久影院优 | 91视视频在线直接观看在线看网页在线看 | 久久久精品国产免费观看一区二区 | 日韩久久久久久久久久 | 中文av日韩 | 最近免费观看的电影完整版 | 超碰在线97观看 | 久草在线电影网 | 激情网婷婷 | 中文字幕在线看片 |