Flume架构及应用
Flume架構及應用
零、 目錄
一、 flume 架構介紹
二、 flume 應用 – 日志采集
從整體上描述代理agent中sources、sinks、channels所涉及到的組件
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1詳細描述agent中每一個source、sink與channel的具體實現:即在描述source的時候,需要指定source到底是什么類型的,即這個source是接受文件的、還是接受http的、還是接受thrift的;對于sink也是同理,需要指定結果是輸出到HDFS中,還是Hbase中啊等等;對于channel需要指定是內存啊,還是數據庫啊,還是文件啊等等。
# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100通過channel將source與sink連接起來
# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1啟動agent cd 到flume bin 目錄
./flume-ng agent -c ../conf/ -f ../conf/你的配置文件名稱 -n 你的agent名稱 -Dflume.root.logger=INFO,console參數說明: -n 指定agent名稱(與配置文件中代理的名字相同)-c 指定flume中配置文件的目錄-f 指定配置文件-Dflume.root.logger=DEBUG,console 設置日志等級案例1: NetCat Source:監聽一個指定的網絡端口,即只要應用程序向這個端口里面寫數據,這個source組件就可以獲取到信息。 其中 Sink為logger Channel為memory
flume官網中NetCat Source描述:
Property Name Default Description channels – type – The component type name, needs to be netcat bind – 日志需要發送到的主機名或者Ip地址,該主機運行著netcat類型的source在監聽 port – 日志需要發送到的端口號,該端口號要有netcat類型的source在監聽配置文件:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = 192.168.80.80a1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1啟動flume agent a1
flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console使用telnet發送數據
telnet 192.168.80.80 44444 big data world!(windows中運行的)在控制臺上查看flume收集到的日志數據:
案例2:NetCat Source:監聽一個指定的網絡端口,即只要應用程序向這個端口里面寫數據,這個source組件就可以獲取到信息。 其中 Sink為hdfs Channel為file (相比于案例1的兩個變化)
flume官網中HDFS Sink的描述:
編寫配置文件:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1啟動flume agent a1 服務端:
flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console使用telnet發送數據
telnet 192.168.80.80 44444 big data world!(windows中運行的)在HDFS中查看flume收集到的日志數據:
案例3:Spooling Directory Source:監聽一個指定的目錄,即只要應用程序向這個指定的目錄中添加新的文件,source組件就可以獲取到該信息,并解析該文件的內容,然后寫入到channle。寫入完成后,標記該文件已完成或者刪除該文件。其中 Sink為logger, Channel為memory
flume官網中Spooling Directory Source描述:
Property Name Default Descriptionchannels – type – The component type name, needs to be spooldir.spoolDir – Spooling Directory Source監聽的目錄fileSuffix .COMPLETED 文件內容寫入到channel之后,標記該文件deletePolicy never 文件內容寫入到channel之后的刪除策略: never or immediatefileHeader false Whether to add a header storing the absolute path filename.ignorePattern ^$ Regular expression specifying which files to ignore (skip)interceptors – 指定傳輸中event的head(頭信息),常用timestampSpooling Directory Source的兩個注意事項:
①If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing. 即:拷貝到spool目錄下的文件不可以再打開編輯②If a file name is reused at a later time, Flume will print an error to its log file and stop processing.即:不能將具有相同文件名字的文件拷貝到這個目錄下編寫配置文件:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/datainput a1.sources.r1.fileHeader = true a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp# Describe the sink a1.sinks.k1.type = logger# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1啟動flume agent a1 服務端:
flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console使用cp命令向Spooling Directory 中發送數據
cp datafile /usr/local/datainput (注:datafile中的內容為:big data world!)在控制臺上查看flume收集到的日志數據: 從控制臺顯示的結果可以看出event的頭信息中包含了時間戳信息。 同時我們查看一下Spooling Directory中的datafile信息—-文件內容寫入到channel之后,該文件被標記了:
[root@hadoop80 datainput]# ls datafile.COMPLETED案例4:Spooling Directory Source:監聽一個指定的目錄,即只要應用程序向這個指定的目錄中添加新的文件,source組件就可以獲取到該信息,并解析該文件的內容,然后寫入到channle。寫入完成后,標記該文件已完成或者刪除該文件。 其中 Sink:hdfs Channel:file (相比于案例3的兩個變化)
編寫配置文件:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /usr/local/datainputa1.sources.r1.fileHeader = truea1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutputa1.sinks.k1.hdfs.writeFormat = Texta1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%Sa1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in filea1.channels.c1.type = filea1.channels.c1.checkpointDir = /usr/flume/checkpointa1.channels.c1.dataDirs = /usr/flume/data# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1啟動flume agent a1 服務端
flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console使用cp命令向Spooling Directory 中發送數據
cp datafile /usr/local/datainput (注:datafile中的內容為:big data world!)在控制臺上查看flume收集到的日志數據:
在HDFS中查看flume收集到的日志數據:
案例5:Exec Source:監聽一個指定的命令,獲取一條命令的結果作為它的數據源常用的是tail -F file指令,即只要應用程序向日志(文件)里面寫數據,source組件就可以獲取到日志(文件)中最新的內容 。 其中 Sink:hdfs Channel:file .這個案列為了方便顯示Exec Source的運行效果,結合Hive中的external table進行來說明。
編寫配置文件:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /usr/local/log.file# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput1.sinks.k1.hdfs.writeFormat = Texta1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%Sa1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in filea1.channels.c1.type = filea1.channels.c1.checkpointDir = /usr/flume/checkpointa1.channels.c1.dataDirs = /usr/flume/data# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1在hive中建立外部表—–hdfs://hadoop80:9000/dataoutput的目錄,方便查看日志捕獲內容
hive> create external table t1(infor string)> row format delimited> fields terminated by '\t'> location '/dataoutput/';OKTime taken: 0.284 seconds啟動flume agent a1 服務端:
flume-ng agent -n a1 -c ../conf -f ../conf/exec.conf -Dflume.root.logger=DEBUG,console使用echo命令向/usr/local/datainput 中發送數據
echo big data > log.file在HDFS和Hive分別中查看flume收集到的日志數據:
hive> select * from t1;OKbig dataTime taken: 0.086 seconds使用echo命令向/usr/local/datainput 中在追加一條數據
echo big data world! >> log.file在HDFS和Hive再次分別中查看flume收集到的日志數據:
hive> select * from t1;OKbig databig data world!Time taken: 0.511 secondsExec source:Exec source和Spooling Directory Source是兩種常用的日志采集的方式,其中Exec source可以實現對日志的實時采集,Spooling Directory Source在對日志的實時采集上稍有欠缺,盡管Exec source可以實現對日志的實時采集,但是當Flume不運行或者指令執行出錯時,Exec source將無法收集到日志數據,日志會出現丟失,從而無法保證收集日志的完整性。
案例6:Avro Source:監聽一個指定的Avro 端口,通過Avro 端口可以獲取到Avro client發送過來的文件 。即只要應用程序通過Avro 端口發送文件,source組件就可以獲取到該文件中的內容。 其中 Sink:hdfs Channel:file (注:Avro和Thrift都是一些序列化的網絡端口–通過這些網絡端口可以接受或者發送信息,Avro可以發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制)
Avro Source運行原理如下圖:
flume官網中Avro Source的描述:
Property Name Default Descriptionchannels – type – The component type name, needs to be avrobind – 日志需要發送到的主機名或者ip,該主機運行著ARVO類型的sourceport – 日志需要發送到的端口號,該端口要有ARVO類型的source在監聽編寫配置文件
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.bind = 192.168.80.80a1.sources.r1.port = 4141# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutputa1.sinks.k1.hdfs.writeFormat = Texta1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%Sa1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in filea1.channels.c1.type = filea1.channels.c1.checkpointDir = /usr/flume/checkpointa1.channels.c1.dataDirs = /usr/flume/data# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1啟動flume agent a1 服務端
flume-ng agent -n a1 -c ../conf -f ../conf/avro.conf -Dflume.root.logger=DEBUG,console使用avro-client發送文件
flume-ng avro-client -c ../conf -H 192.168.80.80 -p 4141 -F /usr/local/log.file注:log.file文件中的內容為:[root@hadoop80 local]# more log.filebig databig data world!在HDFS中查看flume收集到的日志數據:
三、 總結
總結
以上是生活随笔為你收集整理的Flume架构及应用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty实战 IM即时通讯系统(八)服
- 下一篇: Netty实战 IM即时通讯系统(九)实