日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flume架构及应用

發(fā)布時間:2024/4/30 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flume架构及应用 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Flume架構(gòu)及應(yīng)用

零、 目錄

  • flume 架構(gòu)介紹
  • flume概念
  • flume特點
  • flume可靠性
  • flume核心概念
  • flume架構(gòu)介紹
  • flume運行機制
  • flume廣義用法
  • flume 應(yīng)用 – 日志采集
  • flume 配置啟動過程
  • 具體案例
  • 總結(jié)
  • 一、 flume 架構(gòu)介紹

  • flume 概念
  • 在具體介紹flume 之前 , 先給大家看一下Hadoop業(yè)務(wù)的整體開發(fā)流程: 從Hadoop 的業(yè)務(wù)流程中可以看出 , 數(shù)據(jù)采集是十分重要的一步 , 也是不可或缺的一步。
  • flume 作為 cloudera 開發(fā)的實時日志收集系統(tǒng),受到了業(yè)界的認可與廣泛應(yīng)用。Flume 初始的發(fā)行版本目前被統(tǒng)稱為 Flume OG(original generation),屬于 cloudera。但隨著 FLume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設(shè)計不合理、核心配置不標(biāo)準(zhǔn)等缺點暴露出來,尤其是在 Flume OG 的最后一個發(fā)行版本 0.94.0 中,日志傳輸不穩(wěn)定的現(xiàn)象尤為嚴重,為了解決這些問題,2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了里程碑式的改動:重構(gòu)核心組件、核心配置以及代碼架構(gòu),重構(gòu)后的版本統(tǒng)稱為 Flume NG(next generation);改動的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。
  • flume 特點
  • flume是一個分布式、可靠地、高可用的海量日志采集、聚合和傳輸?shù)南到y(tǒng)。 支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于手機數(shù)據(jù)。 同時Flume 提供對數(shù)據(jù)進行簡單的處理 , 并寫道各種數(shù)據(jù)接收方(HDFS、HBase…)的能力
  • flume的核心是把數(shù)據(jù)從數(shù)據(jù)源(source)收集過來,在將收集到的數(shù)據(jù)送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先緩存數(shù)據(jù)(channel),待數(shù)據(jù)真正到達目的地(sink)后,flume在刪除自己緩存的數(shù)據(jù)。
  • flume的數(shù)據(jù)流由事件(Event)貫穿始終。事件是Flume的基本數(shù)據(jù)單位,它攜帶日志數(shù)據(jù)(字節(jié)數(shù)組形式)并且攜帶有頭信息,這些Event由Agent外部的Source生成,當(dāng)Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區(qū),它將保存事件直到Sink處理完該事件。Sink負責(zé)持久化日志或者把事件推向另一個Source。
  • flume 可靠性
  • 當(dāng)節(jié)點出現(xiàn)故障時,日志能夠被傳送到其他節(jié)點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數(shù)據(jù)agent首先將event寫到磁盤上,當(dāng)數(shù)據(jù)傳送成功后,再刪除;如果數(shù)據(jù)發(fā)送失敗,可以重新發(fā)送。),Store on failure(這也是scribe采用的策略,當(dāng)數(shù)據(jù)接收方crash時,將數(shù)據(jù)寫到本地,待恢復(fù)后,繼續(xù)發(fā)送),Besteffort(數(shù)據(jù)發(fā)送到接收方后,不會進行確認)。
  • flume 核心概念
  • Agent:使用JVM 運行Flume。每臺機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。
  • Source:從Client收集數(shù)據(jù),傳遞給Channel。
  • Sink:從Channel收集數(shù)據(jù),運行在一個獨立線程,并將數(shù)據(jù)發(fā)送到指定的地方去。
  • Channel:連接 sources 和 sinks ,這個有點像一個隊列。
  • Events:在整個數(shù)據(jù)的傳輸?shù)倪^程中,流動的是event,即事務(wù)保證是在event級別進行的。event將傳輸?shù)臄?shù)據(jù)進行封裝,是flume傳輸數(shù)據(jù)的基本單位,如果是文本文件,通常是一行記錄,event也是事務(wù)的基本單位。event從source,流向channel,再到sink,本身為一個字節(jié)數(shù)組,并可攜帶headers(頭信息)信息。event代表著一個數(shù)據(jù)的最小完整單元,從外部數(shù)據(jù)源來,向外部的目的地去。 event數(shù)據(jù)流向圖: . 一個完整的event包括:event headers、event body、event信息(即文本文件中的單行記錄),如下所示:
  • Client:生產(chǎn)數(shù)據(jù),運行在一個獨立的線程。
  • flume 架構(gòu)介紹
  • flume之所以這么神奇,是源于它自身的一個設(shè)計,這個設(shè)計就是agent,agent本身是一個java進程,運行在日志收集節(jié)點—所謂日志收集節(jié)點就是服務(wù)器節(jié)點。
  • agent里面包含3個核心的組件:source—->channel—–>sink,類似生產(chǎn)者、倉庫、消費者的架構(gòu)。
  • source:source組件是專門用來收集數(shù)據(jù)的,可以處理各種類型、各種格式的日志數(shù)據(jù),包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。 Source 類型:
  • Avro Source: 支持Avro協(xié)議(實際上是Avro RPC),內(nèi)置支持
  • Thrift Source: 支持Thrift協(xié)議,內(nèi)置支持
  • Exec Source: 基于Unix的command在標(biāo)準(zhǔn)輸出上生產(chǎn)數(shù)據(jù)
  • JMS Source: 從JMS系統(tǒng)(消息、主題)中讀取數(shù)據(jù)
  • Spooling Directory Source: 監(jiān)控指定目錄內(nèi)數(shù)據(jù)變更
  • Twitter 1% firehose Source: 通過API持續(xù)下載Twitter數(shù)據(jù),試驗性質(zhì)
  • Netcat Source: 監(jiān)控某個端口,將流經(jīng)端口的每一個文本行數(shù)據(jù)作為Event輸入
  • Sequence Generator Source: 序列生成器數(shù)據(jù)源,生產(chǎn)序列數(shù)據(jù)
  • Syslog Sources: 讀取syslog數(shù)據(jù),產(chǎn)生Event,支持UDP和TCP兩種協(xié)議
  • HTTP Source: 基于HTTP POST或GET方式的數(shù)據(jù)源,支持JSON、BLOB表示形式
  • Legacy Sources: 兼容老的Flume OG中Source(0.9.x版本)
  • channel:source組件把數(shù)據(jù)收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數(shù)據(jù)的——對采集到的數(shù)據(jù)進行簡單的緩存,可以存放在memory、jdbc、file等等。 Channel類型:
  • Memory Channel:Event數(shù)據(jù)存儲在內(nèi)存中
  • JDBC Channel:Event數(shù)據(jù)存儲在持久化存儲中,當(dāng)前Flume Channel內(nèi)置支持Derby
  • File Channel:Event數(shù)據(jù)存儲在磁盤文件中
  • Spillable Memory Channel:Event數(shù)據(jù)存儲在內(nèi)存中和磁盤上,當(dāng)內(nèi)存隊列滿了,會持久化到磁盤文件
  • Pseudo Transaction Channel:測試用途
  • Custom Channel:自定義Channel實現(xiàn)
  • sink:sink組件是用于把數(shù)據(jù)發(fā)送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定義。 Sink類型:
  • HDFS Sink:數(shù)據(jù)寫入HDFS
  • Logger Sink:數(shù)據(jù)寫入日志文件
  • Avro Sink:數(shù)據(jù)被轉(zhuǎn)換成Avro Event,然后發(fā)送到配置的RPC端口上
  • Thrift Sink:數(shù)據(jù)被轉(zhuǎn)換成Thrift Event,然后發(fā)送到配置的RPC端口上
  • IRC Sink:數(shù)據(jù)在IRC上進行回放
  • File Roll Sink:存儲數(shù)據(jù)到本地文件系統(tǒng)
  • Null Sink:丟棄到所有數(shù)據(jù)
  • HBase Sink:數(shù)據(jù)寫入HBase數(shù)據(jù)庫
  • Morphline Solr Sink:數(shù)據(jù)發(fā)送到Solr搜索服務(wù)器(集群)
  • ElasticSearch Sink:數(shù)據(jù)發(fā)送到Elastic Search搜索服務(wù)器(集群)
  • Kite Dataset Sink:寫數(shù)據(jù)到Kite Dataset,試驗性質(zhì)的
  • Custom Sink:自定義Sink實現(xiàn)
  • flume 運行機制
  • flume的核心就是一個agent,這個agent對外有兩個進行交互的地方,一個是接受數(shù)據(jù)的輸入——source,一個是數(shù)據(jù)的輸出sink,sink負責(zé)將數(shù)據(jù)發(fā)送到外部指定的目的地。source接收到數(shù)據(jù)之后,將數(shù)據(jù)發(fā)送給channel,chanel作為一個數(shù)據(jù)緩沖區(qū)會臨時存放這些數(shù)據(jù),隨后sink會將channel中的數(shù)據(jù)發(fā)送到指定的地方—-例如HDFS等,注意:只有在sink將channel中的數(shù)據(jù)成功發(fā)送出去之后,channel才會將臨時數(shù)據(jù)進行刪除,這種機制保證了數(shù)據(jù)傳輸?shù)目煽啃耘c安全性。
  • flume 廣義用法
  • flume之所以這么神奇—-其原因也在于flume可以支持多級flume的agent,即flume可以前后相繼,例如sink可以將數(shù)據(jù)寫到下一個agent的source中,這樣的話就可以連成串了,可以整體處理了。flume還支持扇入(fan-in)、扇出(fan-out)。所謂扇入就是source可以接受多個輸入,所謂扇出就是sink可以將數(shù)據(jù)輸出多個目的地destination中。
  • 二、 flume 應(yīng)用 – 日志采集

  • flume 配置啟動過程
  • 對于flume的原理其實很容易理解,我們更應(yīng)該掌握flume的具體使用方法,flume提供了大量內(nèi)置的Source、Channel和Sink類型。而且不同類型的Source、Channel和Sink可以自由組合—–組合方式基于用戶設(shè)置的配置文件,非常靈活。比如:Channel可以把事件暫存在內(nèi)存里,也可以持久化到本地硬盤上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。下面我將用具體的案例詳述flume的具體用法。
  • 其實flume的用法很簡單—-書寫一個配置文件,在配置文件當(dāng)中描述source、channel與sink的具體實現(xiàn),而后運行一個agent實例,在運行agent實例的過程中會讀取配置文件的內(nèi)容,這樣flume就會采集到數(shù)據(jù)。
  • 配置文件編寫原則:
  • 從整體上描述代理agent中sources、sinks、channels所涉及到的組件

    # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1
  • 詳細描述agent中每一個source、sink與channel的具體實現(xiàn):即在描述source的時候,需要指定source到底是什么類型的,即這個source是接受文件的、還是接受http的、還是接受thrift的;對于sink也是同理,需要指定結(jié)果是輸出到HDFS中,還是Hbase中啊等等;對于channel需要指定是內(nèi)存啊,還是數(shù)據(jù)庫啊,還是文件啊等等。

    # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  • 通過channel將source與sink連接起來

    # Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
  • 啟動agent cd 到flume bin 目錄

    ./flume-ng agent -c ../conf/ -f ../conf/你的配置文件名稱 -n 你的agent名稱 -Dflume.root.logger=INFO,console參數(shù)說明: -n 指定agent名稱(與配置文件中代理的名字相同)-c 指定flume中配置文件的目錄-f 指定配置文件-Dflume.root.logger=DEBUG,console 設(shè)置日志等級
  • 具體案例:
  • 案例1: NetCat Source:監(jiān)聽一個指定的網(wǎng)絡(luò)端口,即只要應(yīng)用程序向這個端口里面寫數(shù)據(jù),這個source組件就可以獲取到信息。 其中 Sink為logger Channel為memory

  • flume官網(wǎng)中NetCat Source描述:

    Property Name Default Description channels – type – The component type name, needs to be netcat bind – 日志需要發(fā)送到的主機名或者Ip地址,該主機運行著netcat類型的source在監(jiān)聽 port – 日志需要發(fā)送到的端口號,該端口號要有netcat類型的source在監(jiān)聽
  • 配置文件:

    # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = 192.168.80.80a1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
  • 啟動flume agent a1

    flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console
  • 使用telnet發(fā)送數(shù)據(jù)

    telnet 192.168.80.80 44444 big data world!(windows中運行的)
  • 在控制臺上查看flume收集到的日志數(shù)據(jù):

  • 案例2:NetCat Source:監(jiān)聽一個指定的網(wǎng)絡(luò)端口,即只要應(yīng)用程序向這個端口里面寫數(shù)據(jù),這個source組件就可以獲取到信息。 其中 Sink為hdfs Channel為file (相比于案例1的兩個變化)

  • flume官網(wǎng)中HDFS Sink的描述:

  • 編寫配置文件:

    # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 啟動flume agent a1 服務(wù)端:

    flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console
  • 使用telnet發(fā)送數(shù)據(jù)

    telnet 192.168.80.80 44444 big data world!(windows中運行的)
  • 在HDFS中查看flume收集到的日志數(shù)據(jù):

  • 案例3:Spooling Directory Source:監(jiān)聽一個指定的目錄,即只要應(yīng)用程序向這個指定的目錄中添加新的文件,source組件就可以獲取到該信息,并解析該文件的內(nèi)容,然后寫入到channle。寫入完成后,標(biāo)記該文件已完成或者刪除該文件。其中 Sink為logger, Channel為memory

  • flume官網(wǎng)中Spooling Directory Source描述:

    Property Name Default Descriptionchannels – type – The component type name, needs to be spooldir.spoolDir – Spooling Directory Source監(jiān)聽的目錄fileSuffix .COMPLETED 文件內(nèi)容寫入到channel之后,標(biāo)記該文件deletePolicy never 文件內(nèi)容寫入到channel之后的刪除策略: never or immediatefileHeader false Whether to add a header storing the absolute path filename.ignorePattern ^$ Regular expression specifying which files to ignore (skip)interceptors – 指定傳輸中event的head(頭信息),常用timestamp
  • Spooling Directory Source的兩個注意事項:

    ①If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing. 即:拷貝到spool目錄下的文件不可以再打開編輯②If a file name is reused at a later time, Flume will print an error to its log file and stop processing.即:不能將具有相同文件名字的文件拷貝到這個目錄下
  • 編寫配置文件:

    # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/datainput a1.sources.r1.fileHeader = true a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp# Describe the sink a1.sinks.k1.type = logger# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
  • 啟動flume agent a1 服務(wù)端:

    flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console
  • 使用cp命令向Spooling Directory 中發(fā)送數(shù)據(jù)

    cp datafile /usr/local/datainput (注:datafile中的內(nèi)容為:big data world!)
  • 在控制臺上查看flume收集到的日志數(shù)據(jù): 從控制臺顯示的結(jié)果可以看出event的頭信息中包含了時間戳信息。 同時我們查看一下Spooling Directory中的datafile信息—-文件內(nèi)容寫入到channel之后,該文件被標(biāo)記了:

    [root@hadoop80 datainput]# ls datafile.COMPLETED
  • 案例4:Spooling Directory Source:監(jiān)聽一個指定的目錄,即只要應(yīng)用程序向這個指定的目錄中添加新的文件,source組件就可以獲取到該信息,并解析該文件的內(nèi)容,然后寫入到channle。寫入完成后,標(biāo)記該文件已完成或者刪除該文件。 其中 Sink:hdfs Channel:file (相比于案例3的兩個變化)

  • 編寫配置文件:

    # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /usr/local/datainputa1.sources.r1.fileHeader = truea1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutputa1.sinks.k1.hdfs.writeFormat = Texta1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%Sa1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in filea1.channels.c1.type = filea1.channels.c1.checkpointDir = /usr/flume/checkpointa1.channels.c1.dataDirs = /usr/flume/data# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
  • 啟動flume agent a1 服務(wù)端

    flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console
  • 使用cp命令向Spooling Directory 中發(fā)送數(shù)據(jù)

    cp datafile /usr/local/datainput (注:datafile中的內(nèi)容為:big data world!)
  • 在控制臺上查看flume收集到的日志數(shù)據(jù):

  • 在HDFS中查看flume收集到的日志數(shù)據(jù):

  • 案例5:Exec Source:監(jiān)聽一個指定的命令,獲取一條命令的結(jié)果作為它的數(shù)據(jù)源常用的是tail -F file指令,即只要應(yīng)用程序向日志(文件)里面寫數(shù)據(jù),source組件就可以獲取到日志(文件)中最新的內(nèi)容 。 其中 Sink:hdfs Channel:file .這個案列為了方便顯示Exec Source的運行效果,結(jié)合Hive中的external table進行來說明。

  • 編寫配置文件:

    # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /usr/local/log.file# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput1.sinks.k1.hdfs.writeFormat = Texta1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%Sa1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in filea1.channels.c1.type = filea1.channels.c1.checkpointDir = /usr/flume/checkpointa1.channels.c1.dataDirs = /usr/flume/data# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
  • 在hive中建立外部表—–hdfs://hadoop80:9000/dataoutput的目錄,方便查看日志捕獲內(nèi)容

    hive> create external table t1(infor string)> row format delimited> fields terminated by '\t'> location '/dataoutput/';OKTime taken: 0.284 seconds
  • 啟動flume agent a1 服務(wù)端:

    flume-ng agent -n a1 -c ../conf -f ../conf/exec.conf -Dflume.root.logger=DEBUG,console
  • 使用echo命令向/usr/local/datainput 中發(fā)送數(shù)據(jù)

    echo big data > log.file
  • 在HDFS和Hive分別中查看flume收集到的日志數(shù)據(jù):

    hive> select * from t1;OKbig dataTime taken: 0.086 seconds
  • 使用echo命令向/usr/local/datainput 中在追加一條數(shù)據(jù)

    echo big data world! >> log.file
  • 在HDFS和Hive再次分別中查看flume收集到的日志數(shù)據(jù):

    hive> select * from t1;OKbig databig data world!Time taken: 0.511 seconds
  • Exec source:Exec source和Spooling Directory Source是兩種常用的日志采集的方式,其中Exec source可以實現(xiàn)對日志的實時采集,Spooling Directory Source在對日志的實時采集上稍有欠缺,盡管Exec source可以實現(xiàn)對日志的實時采集,但是當(dāng)Flume不運行或者指令執(zhí)行出錯時,Exec source將無法收集到日志數(shù)據(jù),日志會出現(xiàn)丟失,從而無法保證收集日志的完整性。

  • 案例6:Avro Source:監(jiān)聽一個指定的Avro 端口,通過Avro 端口可以獲取到Avro client發(fā)送過來的文件 。即只要應(yīng)用程序通過Avro 端口發(fā)送文件,source組件就可以獲取到該文件中的內(nèi)容。 其中 Sink:hdfs Channel:file (注:Avro和Thrift都是一些序列化的網(wǎng)絡(luò)端口–通過這些網(wǎng)絡(luò)端口可以接受或者發(fā)送信息,Avro可以發(fā)送一個給定的文件給Flume,Avro 源使用AVRO RPC機制)

  • Avro Source運行原理如下圖:

  • flume官網(wǎng)中Avro Source的描述:

    Property Name Default Descriptionchannels – type – The component type name, needs to be avrobind – 日志需要發(fā)送到的主機名或者ip,該主機運行著ARVO類型的sourceport – 日志需要發(fā)送到的端口號,該端口要有ARVO類型的source在監(jiān)聽
  • 編寫配置文件

    # Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.bind = 192.168.80.80a1.sources.r1.port = 4141# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutputa1.sinks.k1.hdfs.writeFormat = Texta1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%Sa1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in filea1.channels.c1.type = filea1.channels.c1.checkpointDir = /usr/flume/checkpointa1.channels.c1.dataDirs = /usr/flume/data# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
  • 啟動flume agent a1 服務(wù)端

    flume-ng agent -n a1 -c ../conf -f ../conf/avro.conf -Dflume.root.logger=DEBUG,console
  • 使用avro-client發(fā)送文件

    flume-ng avro-client -c ../conf -H 192.168.80.80 -p 4141 -F /usr/local/log.file注:log.file文件中的內(nèi)容為:[root@hadoop80 local]# more log.filebig databig data world!
  • 在HDFS中查看flume收集到的日志數(shù)據(jù):

  • 三、 總結(jié)

  • NetCat Source:監(jiān)聽一個指定的網(wǎng)絡(luò)端口,即只要應(yīng)用程序向這個端口里面寫數(shù)據(jù),這個source組件就可以獲取到信息。
  • Spooling Directory Source:監(jiān)聽一個指定的目錄,即只要應(yīng)用程序向這個指定的目錄中添加新的文件,source組件就可以獲取到該信息,并解析該文件的內(nèi)容,然后寫入到channle。寫入完成后,標(biāo)記該文件已完成或者刪除該文件。
  • Exec Source:監(jiān)聽一個指定的命令,獲取一條命令的結(jié)果作為它的數(shù)據(jù)源 。常用的是tail -F file指令,即只要應(yīng)用程序向日志(文件)里面寫數(shù)據(jù),source組件就可以獲取到日志(文件)中最新的內(nèi)容 。
  • Avro Source:監(jiān)聽一個指定的Avro 端口,通過Avro 端口可以獲取到Avro client發(fā)送過來的文件 。即只要應(yīng)用程序通過Avro 端口發(fā)送文件,source組件就可以獲取到該文件中的內(nèi)容。
  • 總結(jié)

    以上是生活随笔為你收集整理的Flume架构及应用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。