日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

Flume知识点全面总结教程

發(fā)布時(shí)間:2023/12/1 综合教程 35 生活家
生活随笔 收集整理的這篇文章主要介紹了 Flume知识点全面总结教程 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

?目錄

1.前言

1.1什么是flume?

1.2Flume特性

2.Flume核心概念

2.1agent

2.2Event:flume內(nèi)部數(shù)據(jù)傳輸?shù)姆庋b形式

2.3Transaction:事務(wù)控制機(jī)制

2.4攔截器

3.Flume安裝部署

3.1參數(shù)配置

3.2啟動(dòng)命令

4.Flume入門案例

4.1數(shù)據(jù)流

?4.2 組件選擇

?4.3 部署配置實(shí)現(xiàn)

5.Flume常用內(nèi)置組件詳解

6.Flume常用組件詳解:Source

6.1netcat?source

6.1.1工作機(jī)制:

6.1.2配置文件:

6.2 exec?source

6.2.1工作機(jī)制:

?6.2.2參數(shù)詳解:

6.2.3配置文件:

6.3 spooldir?source

6.3.1工作機(jī)制:

6.3.2參數(shù)詳解:

6.3.3配置文件:

6.3.4啟動(dòng)測試:

6.4 avro?source

6.4.1工作機(jī)制

6.4.2參數(shù)詳解

6.4.3配置文件

6.4.4啟動(dòng)測試

6.4.5啟動(dòng)測試?yán)胊vro?source和avro sink實(shí)現(xiàn)agent級(jí)聯(lián)

6.5 kafka source

6.5.1工作機(jī)制

6.5.2參數(shù)詳解

6.5.3配置文件

6.5.4啟動(dòng)測試

6.6?taildir?source

6.6.1?工作機(jī)制

???????6.6.2?參數(shù)詳解

6.6.3配置文件

??????????????6.6.4啟動(dòng)測試

7Flume常用組件詳解:Interceptor攔截器

7.1timestamp?攔截器

7.1.1作用

7.1.2參數(shù)

7.1.3配置示例

7.1.4??????????????測試

7.2static攔截器

7.2.1作用

???????7.2.2參數(shù)

???????7.2.3配置示例

???????7.2.4測試

7.3Host 攔截器

7.3.1作用

???????7.3.2參數(shù)

7.3.3配置示例

7.3.4測試

7.4?UUID 攔截器

7.4.1作用

7.4.2參數(shù)

7.4.3配置

??????????????7.4.4測試

8?Flume常用組件詳解:channel

8.1 memory channel

8.1.1特性

??????????????8.1.2參數(shù)

??????????????8.1.3配置示例

??????????????8.1.4測試

??????????????8.1.5擴(kuò)展了解

???????????8.2 file channel

8.2.1特性

???????8.2.3參數(shù)

???????8.2.4配置示例

???????8.2.5??????????????測試

???????8.3kafka channel

?????????????????????8.3.1特性

???????????????????????????????????8.3.2參數(shù)

?????????????????????????????????????????????????8.3.3配置測試

9 Flume常用組件詳解:sink

9.1.1特性

??????????????9.1.2參數(shù)

??????????????9.1.3配置示例

9.1.4測試

9.2?kafka sink

9.2.1特性

9.2.2參數(shù)

9.3 avro?sink

9.3.1 特性

10 Flume常用組件詳解:Selector

10.1實(shí)踐一:replicating selector(復(fù)制選擇器)

????????????10.1.1目標(biāo)場景

????????????????????????????10.1???????.2Flume agent配置

10.1???????.3?Collector1 配置

10.1.4?Collector2 配置

10.1.5?測試驗(yàn)證

10.2?實(shí)踐二:multiplexing selector(多路選擇器)

10.2.1目標(biāo)場景

?10.2.2?第一級(jí)1 / 2配置

????????10.2.3?第二級(jí)配置

??????????????????????10.2.4?測試驗(yàn)證

11 Flume常用組件詳解:grouping processor

12Flume自定義擴(kuò)展組件

12.1自定義Source

12.1.1需求場景

??????????????12.1.2實(shí)現(xiàn)思路

12.1.3代碼架構(gòu)

?????12.1.4?具體實(shí)現(xiàn)

12.2?自定義攔截器

12.2.2實(shí)現(xiàn)思路

12.2.3?自定義攔截器的開發(fā)

13?綜合案例

13.1?案例場景

13.2?實(shí)現(xiàn)思路

13.4?配置文件

13.5?啟動(dòng)測試

14?面試加強(qiáng)

14.1?flume事務(wù)機(jī)制

14.2flume agent內(nèi)部機(jī)制

14.3?ganglia及flume監(jiān)控

14.4?Flume調(diào)優(yōu)


1.前言

flume是由cloudera軟件公司產(chǎn)出的可分布式日志收集系統(tǒng),后與2009年被捐贈(zèng)了apache軟件基金會(huì),為hadoop相關(guān)組件之一。尤其近幾年隨著flume的不斷被完善以及升級(jí)版本的逐一推出,特別是flume-ng;同時(shí)flume內(nèi)部的各種組件不斷豐富,用戶在開發(fā)的過程中使用的便利性得到很大的改善,現(xiàn)已成為apache top項(xiàng)目之一.

補(bǔ)充:cloudera公司的主打產(chǎn)品是CDH(hadoop的一個(gè)企業(yè)級(jí)商業(yè)發(fā)行版)

1.1什么是flume?

???Apache Flume 是一個(gè)從可以收集例如日志,事件等數(shù)據(jù)資源,并將這些數(shù)量龐大的數(shù)據(jù)從各項(xiàng)數(shù)據(jù)資源中集中起來存儲(chǔ)的工具/服務(wù)。flume具有高可用,分布式和豐富的配置工具,其結(jié)構(gòu)如下圖所示:

Flume: 是一個(gè)數(shù)據(jù)采集工具;可以從各種各樣的數(shù)據(jù)源(服務(wù)器)上采集數(shù)據(jù)傳輸(匯聚)到大數(shù)據(jù)生態(tài)的各種存儲(chǔ)系統(tǒng)中(Hdfs、hbase、hive、kafka);

開箱即用!(安裝部署、修改配置文件)

1.2Flume特性

Flume是一個(gè)分布式、可靠、和高可用的海量日志采集、匯聚和傳輸?shù)南到y(tǒng)。

Flume可以采集文件,socket數(shù)據(jù)包(網(wǎng)絡(luò)端口)、文件夾、kafka、mysql數(shù)據(jù)庫等各種形式源數(shù)據(jù),又可以將采集到的數(shù)據(jù)(下沉sink)輸出到HDFS、hbase、hive、kafka等眾多外部存儲(chǔ)系統(tǒng)中

一般的采集、傳輸需求,通過對(duì)flume的簡單配置即可實(shí)現(xiàn);不用開發(fā)一行代碼!

Flume針對(duì)特殊場景也具備良好的自定義擴(kuò)展能力,因此,flume可以適用于大部分的日常數(shù)據(jù)采集場景

2.Flume核心概念

2.1agent

Flume中最核心的角色是agent,flume采集系統(tǒng)就是由一個(gè)個(gè)agent連接起來所形成的一個(gè)或簡單或復(fù)雜的數(shù)據(jù)傳輸通道。

對(duì)于每一個(gè)Agent來說,它就是一個(gè)獨(dú)立的守護(hù)進(jìn)程(JVM),它負(fù)責(zé)從數(shù)據(jù)源接收數(shù)據(jù),并發(fā)往下一個(gè)目的地,如下圖所示:

每一個(gè)agent相當(dāng)于一個(gè)數(shù)據(jù)(被封裝成Event對(duì)象)傳遞員,內(nèi)部有三個(gè)組件:

Source:采集組件,用于跟數(shù)據(jù)源對(duì)接,以獲取數(shù)據(jù);它有各種各樣的內(nèi)置實(shí)現(xiàn);

Sink:下沉組件,用于往下一級(jí)agent傳遞數(shù)據(jù)或者向最終存儲(chǔ)系統(tǒng)傳遞數(shù)據(jù)

Channel:傳輸通道組件,用于從source將數(shù)據(jù)傳遞到sink

單個(gè)agent采集數(shù)據(jù)

?多級(jí)agent之間串聯(lián)

2.2Eventflume內(nèi)部數(shù)據(jù)傳輸?shù)姆庋b形式

數(shù)據(jù)在Flum內(nèi)部中數(shù)據(jù)以Event的封裝形式存在。

因此,Source組件在獲取到原始數(shù)據(jù)后,需要封裝成Event放入channel;

Sink組件從channel中取出Event后,需要根據(jù)配置要求,轉(zhuǎn)成其他形式的數(shù)據(jù)輸出。

Event封裝對(duì)象主要有兩部分組成: Headers和 ?Body

Header是一個(gè)集合 ?Map[String,String],用于攜帶一些KV形式的元數(shù)據(jù)(標(biāo)志、描述等)

Boby: 就是一個(gè)字節(jié)數(shù)組;裝載具體的數(shù)據(jù)內(nèi)容

2018-11-03 18:44:44,913 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 61 20 61 20 61 61 61 20 61 20 0D ???????????????a a aaa a . }

2.3Transaction事務(wù)控制機(jī)制

Flume的事務(wù)機(jī)制(類似數(shù)據(jù)庫的事務(wù)機(jī)制):

Flume使用兩個(gè)獨(dú)立的事務(wù)分別負(fù)責(zé)從Soucrce到Channel,以及從Channel到Sink的event傳遞。比如spooling directory source 為文件的每一個(gè)event?batch創(chuàng)建一個(gè)事務(wù),一旦事務(wù)中所有的事件全部傳遞到Channel且提交成功,那么Soucrce就將event batch標(biāo)記為完成。

同理,事務(wù)以類似的方式處理從Channel到Sink的傳遞過程,如果因?yàn)槟撤N原因使得事件無法記錄,那么事務(wù)將會(huì)回滾,且所有的事件都會(huì)保持到Channel中,等待重新傳遞。

事務(wù)機(jī)制涉及到如下重要參數(shù):

a1.sources.s1.batchSize?=100

a1.sinks.k1.batchSize = 200

a1.channels.c1.transactionCapacity?= 300 (應(yīng)該大于source或者sink的批次大小)

<?transactionCapacity 是說,channel中保存的事務(wù)的個(gè)數(shù)>

跟channel的數(shù)據(jù)緩存空間容量區(qū)別開來:

a1.channels.c1.capacity = 10000

那么事務(wù)是如何保證數(shù)據(jù)的端到端完整性的呢?看下面有兩個(gè)agent的情況:

數(shù)據(jù)流程:

  1. source 1產(chǎn)生Event,通過“put”、“commit”操作將Event放到Channel 1中
  2. sink 1通過“take”操作從Channel 1中取出Event,并把它發(fā)送到Source 2中
  3. source 2通過“put”、“commit”操作將Event放到Channel 2中
  4. source 2向sink 1發(fā)送成功信號(hào),sink 1“commit”步驟2中的“take”操作(其實(shí)就是刪除Channel 1中的Event)

說明:在任何時(shí)刻,Event至少在一個(gè)Channel中是完整有效的

2.4攔截器

攔截器工作在source組件之后,source產(chǎn)生的event會(huì)被傳入攔截器根據(jù)需要進(jìn)行攔截處理

而且,攔截器可以組成攔截器鏈!

攔截器在flume中有一些內(nèi)置的功能比較常用的攔截器

用戶也可以根據(jù)自己的數(shù)據(jù)處理需求,自己開發(fā)自定義攔截器!

這也是flume的一個(gè)可以用來自定義擴(kuò)展的接口!

3.Flume安裝部署

3.1參數(shù)配置

Flume的安裝非常簡單,只需要解壓即可,當(dāng)然,前提是已有hadoop環(huán)境

1.上傳安裝包到數(shù)據(jù)源所在節(jié)點(diǎn)上

然后解壓 ?tar -zxvf apache-flume-1.8.0-bin.tar.gz

2、根據(jù)數(shù)據(jù)采集的需求配置采集方案,描述在配置文件中(文件名可任意自定義)

3、指定采集方案配置文件,在相應(yīng)的節(jié)點(diǎn)上啟動(dòng)flume agent

3.2啟動(dòng)命令

bin/flume-ng agent?-c?./conf ………….

commands:

??help ?????????????????????顯示本幫助信息

??agent ????????????????????啟動(dòng)一個(gè)agent進(jìn)程

??avro-client ????????????????啟動(dòng)一個(gè)用于測試avro?source的客戶端(能夠發(fā)送avro序列化流)

??version ???????????????????顯示當(dāng)前flume的版本信息

global options: ??全局通用選項(xiàng)

??--conf,-c <conf> ?????????指定flume的系統(tǒng)配置文件所在目錄

??--classpath,-C <cp> ???????添加額外的jar路徑

??--dryrun,-d ??????????????不去真實(shí)啟動(dòng)flume?agent,而是打印當(dāng)前命令

??--plugins-path <dirs> ??????指定插件(jar)所在路徑

??-Dproperty=value ?????????傳入java環(huán)境參數(shù)

??-Xproperty=value ?????????傳入所需的JVM配置參數(shù)

agent options:

??--name,-n <name> ?????????agent的別名(在用戶采集方案配置文件中)

??--conf-file,-f <file> ?????????指定用戶采集方案配置文件的路徑

??--zkConnString,-z <str> ?????指定zookeeper的連接地址

??--zkBasePath,-p <path> ?????指定用戶配置文件所在的zookeeper?path,比如:/flume/config

??--no-reload-conf ???????????關(guān)閉配置文件動(dòng)態(tài)加載

??--help,-h ??????????????????display help text

avro-client options:

??--rpcProps,-P <file> ??RPC client properties file with server connection params

??--host,-H <host> ????avro序列化數(shù)據(jù)所要發(fā)往的目標(biāo)主機(jī)(avro?source所在機(jī)器)

??--port,-p <port> ?????avro序列化數(shù)據(jù)所要發(fā)往的目標(biāo)主機(jī)的端口號(hào)

??--dirname <dir> ?????需要被序列化發(fā)走的數(shù)據(jù)所在目錄(提前準(zhǔn)備好測試數(shù)據(jù)放在一個(gè)文件中)

??--filename,-F <file> ??需要被序列化發(fā)走的數(shù)據(jù)所在文件(default: std input)

??--headerFile,-R <file> ?存儲(chǔ)header?key-value的文件

??--help,-h ????????????幫助信息

??Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first

in the classpath.

開啟內(nèi)置監(jiān)控功能

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

4.Flume入門案例

先用一個(gè)最簡單的例子來測試一下程序環(huán)境是否正常

4.1數(shù)據(jù)流

?4.2
組件選擇

  • Source組件 NetCat:

  • Channel組件:

Memory?Channel

capacity: 緩存的容量 ,可緩存的event的數(shù)量

transactionCapacity: 事務(wù)容量。支持出錯(cuò)情況下的event回滾事件數(shù)量。

  • Sink組件: logger Sink

?4.3
部署配置實(shí)現(xiàn)

  • 創(chuàng)建部署配置文件

在flume的安裝目錄下,新建一個(gè)文件夾,myconf

#?cd?myconf

#?vi??netcat-logger.conf

# 定義這個(gè)agent中各組件的名字

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 描述和配置source組件:r1

a1.sources.r1.type =?netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 9999

# source 和 channel關(guān)聯(lián)

a1.sources.r1.channels = c1??

# 描述和配置sink組件:k1

a1.sinks.k1.type = logger

#?sink也要關(guān)聯(lián)channel

a1.sinks.k1.channel = c1

# 描述和配置channel組件,此處使用是內(nèi)存緩存的方式

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

flume-ng?命令的模式:

  • ?啟動(dòng)一個(gè)采集器:

[root@hdp-01 apache-flume-1.6.0-bin]# bin/flume-ng agent -n a1 -c conf -f myconf/netcat-logger.conf ???-Dflume.root.logger=INFO,console

agent ???運(yùn)行一個(gè)采集器

-n a1 ?指定我們這個(gè)agent的名字

-c conf ??指定flume自身的配置文件所在目錄

-f conf/netcat-logger.conf ?指定自定義的采集方案

在工作環(huán)境中的命令為:

nohup bin/flume-ng agent -n a1 -c conf -f myconf/netcat-logger.conf 1>/dev/null 2>&1 &

  • 測試

往agent的source所監(jiān)聽的端口上發(fā)送數(shù)據(jù),讓agent有數(shù)據(jù)可采。

通過telnet命令向端口發(fā)送消息:

[root@hdp-01 ~]#?telnet?hdp-01 9999

如果沒有telnet命令,用yum安裝一個(gè)即可: ?

yum?-y install?telnet

就可以通過日志查看:

注意: 注釋不能寫在配置的后面,只能單獨(dú)一行寫。

5.Flume常用內(nèi)置組件詳解

Flume支持眾多的source和sink類型,詳細(xì)手冊(cè)可參考官方文檔

Flume 1.9.0 User Guide — Apache Flume

6.Flume常用組件詳解Source

6.1netcat?source

6.1.1工作機(jī)制:

啟動(dòng)一個(gè)socket服務(wù),監(jiān)聽一個(gè)端口;

將端口上收到的數(shù)據(jù),轉(zhuǎn)成event寫入channel;

6.1.2配置文件:

a1.sources = s1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 44444

a1.sources.s1.channels = c1

6.2
exec?source

6.2.1工作機(jī)制:

啟動(dòng)一個(gè)用戶所指定的linux?shell命令;

采集這個(gè)linux shell命令的標(biāo)準(zhǔn)輸出,作為收集到的數(shù)據(jù),轉(zhuǎn)為event寫入channel;

?6.2.2參數(shù)詳解:

channels

本source要發(fā)往的channel

type

本source的類別名稱:exec

command

本source所要運(yùn)行的linux命令,比如: tail?-F /path/file

shell

指定運(yùn)行上述命令所用shell

restartThrottle

10000

命令die了以后,重啟的時(shí)間間隔

restart

false

命令die了以后,是否要重啟

logStdErr

false

是否收集命令的錯(cuò)誤輸出stderr

batchSize

20

提交的event批次大小

batchTimeout

3000

發(fā)往下游沒完成前,等待的時(shí)間

selector.type

replicating

指定channel選擇器:replicating or multiplexing

selector.*

選擇器的具體參數(shù)

interceptors

指定攔截器

interceptors.*

?指定的攔截器的具體參數(shù)

6.2.3配置文件:

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

啟動(dòng)測試:

1.準(zhǔn)備一個(gè)日志文件

2.寫一個(gè)腳本模擬往日志文件中持續(xù)寫入數(shù)據(jù)

for i in {1..10000}; do echo ${i}--------------------------- >> access.log ; sleep 0.5; done

3.創(chuàng)建一個(gè)flume自定義配置文件

4.啟動(dòng)flume采集

注意:通過人為破壞測試,發(fā)現(xiàn)這個(gè)exec?source,不會(huì)記錄宕機(jī)前所采集數(shù)據(jù)的偏移量位置,重啟后可能會(huì)造成數(shù)據(jù)丟失!

6.3
spooldir?source

6.3.1工作機(jī)制:

監(jiān)視一個(gè)指定的文件夾,如果文件夾下有沒采集過的新文件,則將這些新文件中的數(shù)據(jù)采集,并轉(zhuǎn)成event寫入channel;

注意:spooling目錄中的文件必須是不可變的,而且是不能重名的!否則,source會(huì)loudly?fail!

6.3.2參數(shù)詳解:

Property Name

Default

Description

channels

type

The component type name, needs to be?spooldir.

spoolDir

The directory from which to read files from.

fileSuffix

.COMPLETED

采集完成的文件,添加什么后綴名

deletePolicy

never

是否刪除采完的文件:?never?or?immediate

fileHeader

false

是否將所采集文件的絕對(duì)路徑添加到header中

fileHeaderKey

file

上述header的key名稱

basenameHeader

false

是否將文件名添加到header

basenameHeaderKey

basename

上述header的key名稱

includePattern

^.*$

指定需要采集的文件名的正則表達(dá)式

ignorePattern

^$

指定要排除的文件名的正則表達(dá)式

如果一個(gè)文件名即符合includePattern又匹配ignorePattern,則該文件不采

trackerDir

.flumespool

記錄元數(shù)據(jù)的目錄所在路徑,可以用絕對(duì)路徑也可以用相對(duì)路徑(相對(duì)于采集目錄)

trackingPolicy

rename

采集進(jìn)度跟蹤策略,有兩種:?“rename”和?“tracker_dir”. 本參數(shù)只在deletePolicy=never時(shí)才生效

?“rename”- 采完的文件根據(jù)filesuffix重命名

?“tracker_dir” - 采完的文件會(huì)在trackerDir目錄中生成一個(gè)同名的空文件

consumeOrder

oldest

采集順序:?oldest,?youngest?and?random.

oldest和youngest情況下,可能會(huì)帶來一定效率的損失;(需要對(duì)文件夾中所有文件進(jìn)行一次掃描以尋找最old或最young的)

pollDelay

500

Delay (in milliseconds) used when polling for new files.

recursiveDirectorySearch

false

Whether to monitor sub directories for new files to read.

maxBackoff

4000

The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.

batchSize

100

一次傳輸?shù)絚hannel的event條數(shù)(一批)

inputCharset

UTF-8

Character set used by deserializers that treat the input file as text.

decodeErrorPolicy

FAIL

What to do when we see a non-decodable character in the input file.?FAIL: Throw an exception and fail to parse the file.?REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD.?IGNORE: Drop the unparseable character sequence.

deserializer

LINE

Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implementEventDeserializer.Builder.

deserializer.*

Varies per event deserializer.

bufferMaxLines

(Obselete) This option is now ignored.

bufferMaxLineLength

5000

(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.

selector.type

replicating

replicating or multiplexing

selector.*

Depends on the selector.type value

interceptors

Space-separated list of interceptors

interceptors.*

6.3.3配置文件

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = spooldir

a1.sources.s1.spoolDir = /root/weblog

a1.sources.s1.batchSize = 200

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

6.3.4啟動(dòng)測試

bin/flume-ng agent -n a1 -c conf -f myconf/spooldir-mem-logger.conf -Dflume.root.logger=DEBUG,console

注意:spooldir?source?與exec?source不同,spooldir?source本身是可靠的!會(huì)記錄崩潰之前的采集位置!

6.4 avro?source

Avro?source 是通過監(jiān)聽一個(gè)網(wǎng)絡(luò)端口來接受數(shù)據(jù),而且接受的數(shù)據(jù)必須是使用avro序列化框架序列化后的數(shù)據(jù);

Avro是一種序列化框架,跨語言的;

擴(kuò)展:什么是序列化,什么是序列化框架?

序列化: 是將一個(gè)有復(fù)雜結(jié)構(gòu)的數(shù)據(jù)塊(對(duì)象)變成扁平的(線性的)二進(jìn)制序列

序列化框架: 一套現(xiàn)成的軟件,可以按照既定策略,將對(duì)象轉(zhuǎn)成二進(jìn)制序列

比如: jdk就有: ObjectOutputStream

???????hadoop就有: Writable

???????跨平臺(tái)的序列化框架: avro

6.4.1工作機(jī)制

啟動(dòng)一個(gè)網(wǎng)絡(luò)服務(wù),監(jiān)聽一個(gè)端口,收集端口上收到的avro序列化數(shù)據(jù)流!

該source中擁有avro的反序列化器,能夠?qū)⑹盏降亩M(jìn)制流進(jìn)行正確反序列化,并裝入一個(gè)event寫入channel!??????

6.4.2參數(shù)詳解

Property Name

Default

Description

channels

type

本source的別名:?avro

bind

要綁定的地址

port

要綁定的端口號(hào)

threads

服務(wù)的最大線程數(shù)

selector.type

selector.*

interceptors

Space-separated list of interceptors

interceptors.*

compression-type

none

壓縮類型:跟發(fā)過來的數(shù)據(jù)是否壓縮要匹配:none | deflate

ssl

false

Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see?SSL/TLS support?section).

keystore

This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).

keystore-password

The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).

keystore-type

JKS

The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).

exclude-protocols

SSLv3

Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.

include-protocols

Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.

exclude-cipher-suites

Space-separated list of cipher suites to exclude.

include-cipher-suites

Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.

ipFilter

false

Set this to true to enable ipFiltering for netty

ipFilterRules

Define N netty ipFilter pattern rules with this config.

6.4.3配置文件

a1.sources = r1

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

6.4.4啟動(dòng)測試

啟動(dòng)agent:

bin/flume-ng agent -c ./conf -f ./myconf/avro-mem-logger.conf -n a1 -Dflume.root.logger=DEBUG,consol

用一個(gè)客戶端去給啟動(dòng)好的source發(fā)送avro序列化數(shù)據(jù):

bin/flume-ng avro-client --host c703 --port 4141

6.4.5啟動(dòng)測試利用avro?source和avro sink實(shí)現(xiàn)agent級(jí)聯(lián)

6.4.5.1需求說明

6.4.5.2配置文件

  • 上游配置文件

vi ?exec-m-avro.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /tmp/logs/access.log

a1.sources.r1.batchSize = 100

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.trasactionCapacity = 100

a1.sinks.k1.channel = c1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = h3

a1.sinks.k1.port = 4455

  • 下游配置文件

vi ?avro-m-log.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4455

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.trasactionCapacity = 100

a1.sinks.k1.channel = c1

a1.sinks.k1.type = logger

6.4.5.3啟動(dòng)測試

  • 先啟動(dòng)下游:

bin/flume-ng agent -n a1 -c conf/ -f avro-m-log.conf -Dflume.root.logger=INFO,console

  • 再啟動(dòng)上游:

bin/flume-ng agent -n a1 -c conf/ -f exec-m-avro.conf

  • 然后寫一個(gè)腳本在h1上模擬生成數(shù)據(jù)

while true

do

echo "hello " ?>> /tmp/logs/access.log

sleep 0.1

done

6.5 kafka source

6.5.1???????工作機(jī)制

Kafka source的工作機(jī)制:就是用kafka?consumer連接kafka,讀取數(shù)據(jù),然后轉(zhuǎn)換成event,寫入channel

6.5.2??????????????參數(shù)詳解

Property Name

Default

Description

channels

?數(shù)據(jù)發(fā)往的channel

type

本source的名稱:

org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers

Kafka?broker服務(wù)器列表,逗號(hào)分隔

kafka.consumer.group.id

flume

Kafka消費(fèi)者組id

kafka.topics

Kafka消息主題列表,逗號(hào)隔開

kafka.topics.regex

用正則表達(dá)式來指定一批topic;本參數(shù)的優(yōu)先級(jí)高于kafka.topics

batchSize

1000

寫入channel的event?批,最大消息條數(shù)

batchDurationMillis

1000

批次寫入channel的最大時(shí)長

backoffSleepIncrement

1000

Kafka?Topic?顯示為空時(shí)觸發(fā)的初始和增量等待時(shí)間。

maxBackoffSleep

5000

Kafka?Topic?顯示為空時(shí)觸發(fā)的最長等待時(shí)間

useFlumeEventFormat

false

默認(rèn)情況下,event 將從Kafka Topic 直接作為字節(jié)直接進(jìn)入event 主體。設(shè)置為true以讀取event 作為Flume Avro二進(jìn)制格式。與Kafka Sink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時(shí),這將保留在生成端發(fā)送的任何Flume標(biāo)頭。

setTopicHeader

true

是否要往header中加入一個(gè)kv:topic信息

topicHeader

topic

應(yīng)上面開關(guān)的需求,加入kv:topic?=>topic名稱

kafka.consumer.security.protocol

PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.

more consumer security props

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to?Kafka security?for additional properties that need to be set on consumer.

Other Kafka Consumer Properties

本source,允許直接配置任意的kafka消費(fèi)者參數(shù),格式如下:

For example:?kafka.consumer.auto.offset.reset

(就是在消費(fèi)者參數(shù)前加統(tǒng)一前綴:?kafka.consumer.)

??????????????6.5.3配置文件

a1.sources = s1

a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.s1.channels = c1

a1.sources.s1.batchSize = 100

a1.sources.s1.batchDurationMillis = 2000

a1.sources.s1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092

a1.sources.s1.kafka.topics = TAOGE

a1.sources.s1.kafka.consumer.group.id = g1

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

??????????????6.5.4啟動(dòng)測試

1. 首先,操作kafka,準(zhǔn)備好topic

#?查看當(dāng)前kafka集群中的topic:

bin/kafka-topics.sh ?--list --zookeeper c701:2181

#?創(chuàng)建一個(gè)新的topic

bin/kafka-topics.sh ?--create --topic TAOGE --partitions 3 --replication-factor 2 --zookeeper c701:2181

#?查看topic的詳細(xì)信息

bin/kafka-topics.sh --describe --topic TAOGE --zookeeper c701:2181

#?控制臺(tái)生產(chǎn)者,向topic中寫入數(shù)據(jù)

bin/kafka-console-producer.sh --broker-list c701:9092,c702:9092,c703:9092 --topic TAOGE

2. 啟動(dòng)flume?agent來采集kafka中的數(shù)據(jù)

bin/flume-ng agent -n a1 -c conf/ -f myconf/kfk-mem-logger.conf ?-Dflume.root.logger=INFO,console

注意:

Source往channel中寫入數(shù)據(jù)的批次大小 ?<= ?channel的事務(wù)控制容量大小

6.6?taildir?source

???????6.6.1?工作機(jī)制

監(jiān)視指定目錄下的一批文件,只要某個(gè)文件中有新寫入的行,則會(huì)被tail到

它會(huì)記錄每一個(gè)文件所tail到的位置,記錄到一個(gè)指定的positionfile保存目錄中,格式為json(如果需要的時(shí)候,可以人為修改,就可以讓source從任意指定的位置開始讀取數(shù)據(jù))

所以,這個(gè)source真的像官網(wǎng)所吹的,是可靠的reliable!

它對(duì)采集完成的文件,不會(huì)做任何修改(比如重命名,刪除…..)

taildir source會(huì)把讀到的數(shù)據(jù)成功寫入channel后,再更新記錄偏移量

這種機(jī)制,能保證數(shù)據(jù)不會(huì)漏采(丟失),但是有可能會(huì)產(chǎn)生數(shù)據(jù)重復(fù)!

?????????????????????6.6.2?參數(shù)詳解

Property Name

Default

Description

channels

所要寫往的channel

type

本source的別名:?TAILDIR.

filegroups

空格分割的組名,每一組代表著一批文件

g1 g2

filegroups.<filegroupName>

每個(gè)文件組的絕路路徑,文件名可用正則表達(dá)式

positionFile

~/.flume/taildir_position.json

記錄偏移量位置的文件所在路徑

headers.<filegroupName>.<headerKey>

Header value which is the set with header key. Multiple headers can be specified for one file group.

byteOffsetHeader

false

Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.

skipToEnd

false

Whether to skip the position to EOF in the case of files not written on the position file.

idleTimeout

120000

關(guān)閉非活動(dòng)文件的時(shí)延。如果被關(guān)閉的這個(gè)文件又在某個(gè)時(shí)間有了新增行,會(huì)被此source檢測到,并重新打開

writePosInterval

3000

3s 記錄一次偏移量到positionfile

batchSize

100

提交event到channel的批次最大條數(shù)

maxBatchCount

Long.MAX_VALUE

控制在一個(gè)文件上連續(xù)讀取的最大批次個(gè)數(shù)(如果某個(gè)文件正在被高速寫入,那就應(yīng)該讓這個(gè)參數(shù)調(diào)為最大值,以讓source可以集中精力專采這個(gè)文件)

backoffSleepIncrement

1000

The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.

maxBackoffSleep

5000

The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.

cachePatternMatching

true

Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity.

fileHeader

false

Whether to add a header storing the absolute path filename.

fileHeaderKey

file

Header key to use when appending absolute path filename to event header.

??????????????6.6.3配置文件

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/flumedata/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.ri.maxBatchCount = 1000

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

????????????????????????????6.6.4啟動(dòng)測試

bin/flume-ng agent -n a1 -c conf/ -f myconf/taildir-mem-logger.conf -Dflume.root.logger=DEBUG,console

經(jīng)過人為破壞測試,發(fā)現(xiàn), this source還是真正挺reliable的!

不會(huì)丟失數(shù)據(jù),但在極端情況下可能會(huì)產(chǎn)生重復(fù)數(shù)據(jù)!

7Flume常用組件詳解Interceptor攔截器

攔截器是什么?

就是工作在source之后,它可以從source獲得event,做一個(gè)邏輯處理,然后再返回處理之后的event

這樣一來,就可以讓用戶不需要改動(dòng)source代碼的情況下,就可以插入一些數(shù)據(jù)處理邏輯;

Flume supports chaining of interceptors.

閱讀源碼,獲取的知識(shí):

攔截器的調(diào)用順序:

SourceRunner

ExecSource

ChannelProcessor

SourceRunner -》 source 的start( )方法 --》讀到一批數(shù)據(jù),調(diào)channelProcessor.processEventBatch(events) --> 調(diào)攔截器進(jìn)行攔截處理 ?--> 調(diào)選擇器selector獲取要發(fā)送的channle --> 提交數(shù)據(jù)

7.1timestamp?攔截器

???????7.1.1作用

向event中,寫入一個(gè)kv到header里

k名稱可配置;v就是當(dāng)前的時(shí)間戳(毫秒)

???????7.1.2參數(shù)

Property Name

Default

Description

type

本攔截器的名稱:timestamp

headerName

timestamp

要插入header的key名

preserveExisting

false

如果header中已存在同名key,是否要覆蓋

??????????????7.1.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

??????????????7.1.4??????????????測試

2019-06-09 10:24:21,884 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{timestamp=1560047061012} body: 31 30 32 34 34 2E 2E 2E 2E 2E 2E 2E 2E 2E 2E 2E 10244........... }

7.2static攔截器

???????7.2.1作用

讓用戶往event中添加一個(gè)自定義的header??key-value,當(dāng)然,這個(gè)key-value是在配置文件中配死的;

?????????????????????7.2.2參數(shù)

Property Name

Default

Description

type

別名:?static

preserveExisting

true

是否覆蓋同名kv

key

key

你要插入的key名

value

value

你要插入的value

?????????????????????7.2.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1 i2 i3

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = hero

a1.sources.r1.interceptors.i3.value = TAOGE

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

?????????????????????7.2.4測試

7.3Host 攔截器

???????7.3.1作用

往event的header中插入主機(jī)名(ip)信息

?????????????????????7.3.2參數(shù)

Property Name

Default

Description

type

本攔截器的別名:?host

preserveExisting

false

是否覆蓋已存在的hader key-value

useIP

true

插入ip還是主機(jī)名

hostHeader

host

要插入header的key名

7.3.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1?i2

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

7.3.4測試

??????????????7.4?UUID 攔截器

7.4.1作用

生成uuid放入event的header中

??????????????7.4.2參數(shù)

Property Name

Default

Description

type

全名:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

headerName

id

Key名稱

preserveExisting

true

是否覆蓋同名key

prefix

""

Uuid前的前綴

??????????????7.4.3配置

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1 i2 i3 i4

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.sources.s1.interceptors.i3.type = static

a1.sources.s1.interceptors.i3.key = hero

a1.sources.s1.interceptors.i3.value = TAOGE

a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

a1.sources.s1.interceptors.i4.headName = duanzong

a1.sources.s1.interceptors.i4.prefix = ?666_

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

????????????????????????????7.4.4測試

8?Flume常用組件詳解channel

channel是agent中用來緩存event的repository(池,倉庫)

source往channel中添加event

sink從channel中取并移除event

channel跟事務(wù)控制有極大關(guān)系;

channel?有容量大小、可靠性級(jí)別、事務(wù)容量等特性;

8.1 memory channel

??????????????8.1.1特性

事件被存儲(chǔ)在實(shí)現(xiàn)配置好容量的內(nèi)存(隊(duì)列)中。

速度快,但可靠性較低,有可能會(huì)丟失數(shù)據(jù)

????????????????????????????8.1.2參數(shù)

Property Name

Default

Description

type

別名:?memory

capacity

100

能存儲(chǔ)的最大事件event數(shù)

transactionCapacity

100

最大事務(wù)控制容量

keep-alive

3

添加或移除event的超時(shí)時(shí)間

byteCapacityBufferPercentage

20

除了body以外的字節(jié)所能占用的容量百分比

byteCapacity

see description

channel中最大的總byte數(shù)(只計(jì)算body)

????????????????????????????8.1.3配置示例

a1.channels?=?c1

a1.channels.c1.type?=?memory

a1.channels.c1.capacity?=?10000

a1.channels.c1.transactionCapacity?=?10000

a1.channels.c1.byteCapacityBufferPercentage?=?20

a1.channels.c1.byteCapacity?=?800000

????????????????????????????8.1.4測試

????????????????????????????8.1.5擴(kuò)展了解

Memory channel源碼閱讀

// lock to guard queue, mainly needed to keep it locked down during resizes// it should never be held through a blocking operation

private?Object queueLock = new?Object();

//queue為Memory Channel中存放Event的地方,這里用了LinkedBlockingDeque來實(shí)現(xiàn)

@GuardedBy(value = "queueLock")p

rivate?LinkedBlockingDeque<Event> queue;

//下面的兩個(gè)信號(hào)量用來做同步操作,queueRemaining表示queue中的剩余空間,queueStored表示queue中的使用空間

// invariant that tracks the amount of space remaining in the queue(with all uncommitted takeLists deducted)

// we maintain the remaining permits = queue.remaining - takeList.size()// this allows local threads waiting for space in the queue to commit without denying access to the

// shared lock to threads that would make more space on the queue

private?Semaphore queueRemaining;

// used to make "reservations" to grab data from the queue.

// by using this we can block for a while to get data without locking all other threads out

// like we would if we tried to use a blocking call on queue

private?Semaphore queueStored;

//下面幾個(gè)變量為配置文件中Memory Channel的配置項(xiàng)

// 一個(gè)事務(wù)中Event的最大數(shù)目

private?volatile?Integer transCapacity;

// 向queue中添加、移除Event的等待時(shí)間

private?volatile?int?keepAlive;

// queue中,所有Event所能占用的最大空間

private?volatile?int?byteCapacity;

private?volatile?int?lastByteCapacity;

// queue中,所有Event的header所能占用的最大空間占byteCapacity的比例

private?volatile?int?byteCapacityBufferPercentage;

// 用于標(biāo)示byteCapacity中剩余空間的信號(hào)量

private?Semaphore bytesRemaining;

// 用于記錄Memory Channel的一些指標(biāo),后面可以通過配置監(jiān)控來觀察Flume的運(yùn)行情況

private?ChannelCounter channelCounter;

然后重點(diǎn)說下MemoryChannel里面的MemoryTransaction,它是Transaction類的子類,從其文檔來看,一個(gè)Transaction的使用模式都是類似的:

?Channel ch = ...Transaction tx = ch.getTransaction();try {tx.begin();...// ch.put(event) or ch.take()...tx.commit();} catch (ChannelException ex) {tx.rollback();...} finally {tx.close();}

可以看到一個(gè)Transaction主要有、put、take、commit、rollback這四個(gè)方法,我們?cè)趯?shí)現(xiàn)其子類時(shí),主要也是實(shí)現(xiàn)著四個(gè)方法。

Flume官方為了方便開發(fā)者實(shí)現(xiàn)自己的Transaction,定義了BasicTransactionSemantics,這時(shí)開發(fā)者只需要繼承這個(gè)輔助類,并且實(shí)現(xiàn)其相應(yīng)的、doPut、doTake、doCommit、doRollback方法即可,MemoryChannel就是繼承了這個(gè)輔助類。

private class MemoryTransaction extends BasicTransactionSemantics {//和MemoryChannel一樣,內(nèi)部使用LinkedBlockingDeque來保存沒有commit的Eventprivate LinkedBlockingDeque<Event> takeList;private LinkedBlockingDeque<Event> putList;private final ChannelCounter channelCounter;//下面兩個(gè)變量用來表示put的Event的大小、take的Event的大小private int putByteCounter = 0;private int takeByteCounter = 0;public MemoryTransaction(int transCapacity, ChannelCounter counter) {//用transCapacity來初始化put、take的隊(duì)列putList = new LinkedBlockingDeque<Event>(transCapacity);takeList = new LinkedBlockingDeque<Event>(transCapacity);channelCounter = counter;}@Overrideprotected void doPut(Event event) throws InterruptedException {//doPut操作,先判斷putList中是否還有剩余空間,有則把Event插入到該隊(duì)列中,同時(shí)更新putByteCounter//沒有剩余空間的話,直接報(bào)ChannelExceptionchannelCounter.incrementEventPutAttemptCount();int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);if (!putList.offer(event)) {throw new ChannelException("Put queue for MemoryTransaction of capacity " +putList.size() + " full, consider committing more frequently, " +"increasing capacity or increasing thread count");}putByteCounter += eventByteSize;}@Overrideprotected Event doTake() throws InterruptedException {//doTake操作,首先判斷takeList中是否還有剩余空間channelCounter.incrementEventTakeAttemptCount();if(takeList.remainingCapacity() == 0) {throw new ChannelException("Take list for MemoryTransaction, capacity " +takeList.size() + " full, consider committing more frequently, " +"increasing capacity, or increasing thread count");}//然后判斷,該MemoryChannel中的queue中是否還有空間,這里通過信號(hào)量來判斷if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {return null;}Event event;//從MemoryChannel中的queue中取出一個(gè)eventsynchronized(queueLock) {event = queue.poll();}Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +"signalling existence of entry");//放到takeList中,然后更新takeByteCounter變量takeList.put(event);int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);takeByteCounter += eventByteSize;return event;}@Overrideprotected void doCommit() throws InterruptedException {//該對(duì)應(yīng)一個(gè)事務(wù)的提交//首先判斷putList與takeList的相對(duì)大小int remainingChange = takeList.size() - putList.size();//如果takeList小,說明向該MemoryChannel放的數(shù)據(jù)比取的數(shù)據(jù)要多,所以需要判斷該MemoryChannel是否有空間來放if(remainingChange < 0) {// 1. 首先通過信號(hào)量來判斷是否還有剩余空間if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,TimeUnit.SECONDS)) {throw new ChannelException("Cannot commit transaction. Byte capacity " +"allocated to store event body " + byteCapacity * byteCapacitySlotSize +"reached. Please increase heap space/byte capacity allocated to " +"the channel as the sinks may not be keeping up with the sources");}// 2. 然后判斷,在給定的keepAlive時(shí)間內(nèi),能否獲取到充足的queue空間if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException("Space for commit to queue couldn't be acquired." +" Sinks are likely not keeping up with sources, or the buffer size is too tight");}}int puts = putList.size();int takes = takeList.size();//如果上面的兩個(gè)判斷都過了,那么把putList中的Event放到該MemoryChannel中的queue中。synchronized(queueLock) {if(puts > 0 ) {while(!putList.isEmpty()) {if(!queue.offer(putList.removeFirst())) {throw new RuntimeException("Queue add failed, this shouldn't be able to happen");}}}//清空本次事務(wù)中用到的putList與takeList,釋放資源putList.clear();takeList.clear();}//更新控制queue大小的信號(hào)量bytesRemaining,因?yàn)榘裻akeList清空了,所以直接把takeByteCounter加到bytesRemaining中。bytesRemaining.release(takeByteCounter);takeByteCounter = 0;putByteCounter = 0;//因?yàn)榘裵utList中的Event放到了MemoryChannel中的queue,所以把puts加到queueStored中去。queueStored.release(puts);//如果takeList比putList大,說明該MemoryChannel中queue的數(shù)量應(yīng)該是減少了,所以把(takeList-putList)的差值加到信號(hào)量queueRemainingif(remainingChange > 0) {queueRemaining.release(remainingChange);}if (puts > 0) {channelCounter.addToEventPutSuccessCount(puts);}if (takes > 0) {channelCounter.addToEventTakeSuccessCount(takes);}channelCounter.setChannelSize(queue.size());}@Overrideprotected void doRollback() {//當(dāng)一個(gè)事務(wù)失敗時(shí),會(huì)進(jìn)行回滾,即調(diào)用本方法//首先把takeList中的Event放回到MemoryChannel中的queue中。int takes = takeList.size();synchronized(queueLock) {Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +"queue to rollback takes. This should never happen, please report");while(!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}//然后清空putListputList.clear();}//因?yàn)榍蹇樟藀utList,所以需要把putList所占用的空間大小添加到bytesRemaining中bytesRemaining.release(putByteCounter);putByteCounter = 0;takeByteCounter = 0;//因?yàn)榘裻akeList中的Event回退到queue中去了,所以需要把takeList的大小添加到queueStored中queueStored.release(takes);channelCounter.setChannelSize(queue.size());}}

MemoryChannel的邏輯相對(duì)簡單,主要是通過MemoryTransaction中的putList、takeList與MemoryChannel中的queue打交道,這里的queue相當(dāng)于持久化層,只不過放到了內(nèi)存中,如果是FileChannel的話,會(huì)把這個(gè)queue放到本地文件中。下面表示了Event在一個(gè)使用了MemoryChannel的agent中數(shù)據(jù)流向:

source ---> putList ---> queue ---> takeList ---> sink

還需要注意的一點(diǎn)是,這里的事務(wù)可以嵌套使用,如下圖:

?當(dāng)有兩個(gè)agent級(jí)連時(shí),sink的事務(wù)中包含了一個(gè)source的事務(wù),這也應(yīng)證了前面所說的:

在任何時(shí)刻,Event至少在一個(gè)Channel中是完整有效的

?????????????????????????8.2 file channel

???????8.2.1特性

event被緩存在本地磁盤文件中

可靠性高,不會(huì)丟失

但在極端情況下可能會(huì)重復(fù)數(shù)據(jù)

?????????????????????8.2.3參數(shù)

Property Name Default

Description

?

type

別名:?file.

checkpointDir

~/.flume/file-channel/checkpoint

Checkpoint信息保存目錄

useDualCheckpoints

false

Checkpoint是否雙重checkpoint機(jī)制

backupCheckpointDir

備份checkpoint的保存目錄

dataDirs

~/.flume/file-channel/data

Event數(shù)據(jù)緩存目錄

transactionCapacity

10000

事務(wù)管理容量

checkpointInterval

30000

記錄checkpoint信息的時(shí)間間隔

maxFileSize

2146435071

控制一個(gè)數(shù)據(jù)文件的大小規(guī)格

minimumRequiredSpace

524288000

所需的最低磁盤空間,低于則停止接收新數(shù)據(jù)

capacity

1000000

最大event緩存數(shù)

keep-alive

3

等待添加數(shù)據(jù)的最大時(shí)間

?????????????????????8.2.4配置示例

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/taildir_chkp/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.ri.maxBatchCount = 1000

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/flume_chkp

a1.channels.c1.dataDirs = /root/flume_data

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

?????????????????????8.2.5??????????????測試

在使用taildir?source??和 ?file?channel的情況下,經(jīng)過反復(fù)各種人為破壞,發(fā)現(xiàn),沒有數(shù)據(jù)丟失的現(xiàn)象發(fā)生;

但是,如果時(shí)間點(diǎn)掐的比較好(sink 取了一批數(shù)據(jù)寫出,但還沒來得及向channel提交事務(wù)),會(huì)產(chǎn)生數(shù)據(jù)重復(fù)的現(xiàn)象!

?????????????????????8.3kafka channel

???????????????????????????????????8.3.1特性

agent利用kafka作為channel數(shù)據(jù)緩存

kafka channel要跟 kafka?source、 kafka?sink區(qū)別開來

kafka?channel在應(yīng)用時(shí),可以沒有source?|??或者可以沒有sink

如果是需要把kafka作為最終采集存儲(chǔ),那么就只要 ?source?+?kafka?channel

如果是把kafka作為數(shù)據(jù)源,要將kafka中的數(shù)據(jù)寫往hdfs,那么就只要 kafka?channel?+?hdfs?sink

?????????????????????????????????????????????????8.3.2參數(shù)

Property Name

Default

Description

type

名字:?org.apache.flume.channel.kafka.KafkaChannel

kafka.bootstrap.servers

Kafka服務(wù)器地址

kafka.topic

flume-channel

所使用的topic

kafka.consumer.group.id

flume

消費(fèi)者組id

parseAsFlumeEvent

true

跟上、下游匹配,是否需要將數(shù)據(jù)解析為Flume的Event格式

pollTimeout

500

從kafka取數(shù)據(jù)的超時(shí)時(shí)間

defaultPartitionId

默認(rèn)指派的partitionid

partitionIdHeader

將一個(gè)event指派到某個(gè)分區(qū)時(shí)所使用的header 的key

kafka.consumer.auto.offset.reset

latest

初始化讀取偏移量的策略

???????????????????????????????????????????????????????????????8.3.3配置測試

配置示例1:用exec?soure讀文件,往kafka?channel中寫

不帶sink!自己用消費(fèi)者去kafka集群中讀取采集到的數(shù)據(jù)!

a1.sources = r1

# 配置兩個(gè)channel,為了便于觀察

# c1 是kafkachannel ,c2是一個(gè)內(nèi)存channel

a1.channels = c1 c2

a1.sinks = k1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1 c2

# kafka-channel具體配置,該channel沒有sink

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

a1.channels.c1.parseAsFlumeEvent = false

# 內(nèi)存channel 配置,并對(duì)接一個(gè)logger sink來觀察

a1.channels.c2.type = memory

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c2

運(yùn)行測試:

1.準(zhǔn)備好a.log文件

2.啟動(dòng)好kafka

3.啟動(dòng)agent

4.往a.log寫入數(shù)據(jù)

5.用kafka的控制臺(tái)消費(fèi)者消費(fèi)主題,看是否拿到數(shù)據(jù)

配置示例2:用logger?sink,從kafka?channel中取數(shù)據(jù)

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

a1.channels.c1.parseAsFlumeEvent = false

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

運(yùn)行測試:

2.啟動(dòng)好kafka

3.啟動(dòng)flume agent

4.用kafka的控制臺(tái)生產(chǎn)者向topic中寫入數(shù)據(jù)

5.在flume?agent的控制臺(tái)上觀察是否取到數(shù)據(jù)

9 Flume常用組件詳解sink

sink是從channel中獲取、移除數(shù)據(jù),并輸出到下游(可能是下一級(jí)agent,也可能是最終目標(biāo)存儲(chǔ)系統(tǒng))

9.1hdfs sink

???????9.1.1特性

數(shù)據(jù)被最終發(fā)往hdfs

可以生成text文件或 sequence?文件,而且支持壓縮;

支持生成文件的周期性roll機(jī)制:基于文件size,或者時(shí)間間隔,或者event數(shù)量;

目標(biāo)路徑,可以使用動(dòng)態(tài)通配符替換,比如用%D代表當(dāng)前日期;

當(dāng)然,它也能從event的header中,取到一些標(biāo)記來作為通配符替換;

header:{type=acb}

/weblog/%{type}/%D/ ?就會(huì)被替換成: /weblog/abc/19-06-09/

????????????????????????????9.1.2參數(shù)

Name

Default

Description

channel

從哪個(gè)channel取數(shù)據(jù)

type

別名:?hdfs

hdfs.path

目標(biāo)hdfs存儲(chǔ)路徑(URI)

hdfs.filePrefix

FlumeData

指定生成的文件名前綴

hdfs.fileSuffix

后綴

hdfs.inUsePrefix

正在寫入的文件的前綴標(biāo)識(shí)

hdfs.inUseSuffix

.tmp

正在寫入的文件的后綴標(biāo)識(shí)

hdfs.rollInterval

30

切換文件的條件:間隔時(shí)間;為0則不生效

hdfs.rollSize

134217728

切換文件的條件:文件大小;為0則不生效

hdfs.rollCount

10

切換文件的條件:event條數(shù);為0則不生效

hdfs.idleTimeout

0

不活躍文件的關(guān)閉超時(shí)時(shí)長;0則不自動(dòng)關(guān)閉

hdfs.batchSize

100

從channel中取一批數(shù)據(jù)的最大大小;

hdfs.codeC

壓縮編碼: gzip, bzip2, lzo, lzop, snappy

hdfs.fileType

SequenceFile

目標(biāo)文件格式: SequenceFile,?DataStream?or?CompressedStream?

注意:DataStream?不能支持壓縮

CompressedStream?必須設(shè)置壓縮編碼

SequenceFile 可壓縮可不壓縮

hdfs.maxOpenFiles

5000

允許同時(shí)最多打開的文件數(shù);如果超出,則會(huì)關(guān)閉最早打開的

hdfs.minBlockReplicas

目標(biāo)文件的block副本數(shù)

hdfs.writeFormat

Writable

指定sequence?file中的對(duì)象類型;支持Text和Writable

同時(shí)請(qǐng)使用Text,否則后續(xù)數(shù)據(jù)處理平臺(tái)可能無法解析

hdfs.threadsPoolSize

10

操作HDFS時(shí)的線程池大小

hdfs.rollTimerPoolSize

1

檢查文件是否需要被roll的線程數(shù)

hdfs.kerberosPrincipal

Kerberos user principal for accessing secure HDFS

hdfs.kerberosKeytab

Kerberos keytab for accessing secure HDFS

hdfs.proxyUser

hdfs.round

false

目錄通配符切換是是否需要切掉尾數(shù)

hdfs.roundValue

10

時(shí)間尾數(shù)切掉多少

hdfs.roundUnit

minute

時(shí)間尾數(shù)切掉大小的單位-?second,?minute?or?hour.

hdfs.timeZone

Local Time

時(shí)間通配符所使用的時(shí)區(qū)

hdfs.useLocalTimeStamp

false

所用的時(shí)間是否要從agent?sink本地獲取

hdfs.closeTries

0

重命名已完成文件的重試次數(shù);0則一直嘗試重命名

hdfs.retryInterval

180

關(guān)閉一個(gè)文件的重試時(shí)間間隔

serializer

TEXT

將channel中的event?body解析成什么格式:Text|?avro_event ; 也可以使用自定義的序列化器

serializer.*

小提示:什么叫做URI

????????????????????????????9.1.3配置示例

## 定義

a1.sources = r1

a1.sinks = k1

a1.channels = c1

## source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

## channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100000000

## sink

a1.sinks.k1.channel = c1

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://h1:8020/doitedu/%Y-%m-%d/%H-%M

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.filePrefix = doit_

a1.sinks.k1.hdfs.fileSuffix = .log.gz

a1.sinks.k1.hdfs.rollInterval = 0

a1.sinks.k1.hdfs.rollSize = 102400

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

a1.sinks.k1.hdfs.writeFormat = Text

??????????????9.1.4測試

1. 啟動(dòng)hdfs

2. 清除以前的taildirsource產(chǎn)生的偏移量記錄文件、filechannel緩存的數(shù)據(jù)目錄和checkpoint目錄

3. 啟動(dòng)agent

4. 用for循環(huán)腳本往日志文件中不斷寫入新的數(shù)據(jù)

5. 到hdfs中觀察結(jié)果

???????9.2?kafka sink

有了kafka?channel后, ?kafka?sink的必要性就降低了。因?yàn)槲覀兛梢杂胟afka作為channel來接收source產(chǎn)生的數(shù)據(jù)!

9.2.1特性

9.2.2參數(shù)

Property Name

Default

Description

type

名稱:?org.apache.flume.sink.kafka.KafkaSink

kafka.bootstrap.servers

Kafka服務(wù)器列表

kafka.topic

default-flume-topic

Kafka的topic

flumeBatchSize

100

從channel中取event的批次大小

kafka.producer.acks

1

Kafka生產(chǎn)者消息推送應(yīng)答級(jí)別:

1?: Leader接收到即回應(yīng)

0 :不等回應(yīng)?

-1:副本同步完成,再回應(yīng)

useFlumeEventFormat

false

是否使用avro序列化

defaultPartitionId

指定所有event默認(rèn)發(fā)往的分區(qū)id;不指定則按kafka生產(chǎn)者的分區(qū)器,均勻分發(fā)

partitionIdHeader

通過在header中指定分區(qū)id,來約束這個(gè)event發(fā)往的分區(qū)

allowTopicOverride

true

允許在event的header中指定要寫入的topic

topicHeader

topic

如果上一條開關(guān)開啟,則在header中放入的key的名稱

Other Kafka Producer Properties

可以用kafka.producer.xxx來配置任何kafka?producer的參數(shù)

??????????????9.2.3配置示例

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/taildir_chkp/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.r1.maxBatchCount = 1000

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/flume_chkp

a1.channels.c1.dataDirs = /root/flume_data

a1.sinks = k1

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = mytopic

a1.sinks.k1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.compression.type = snappy

??????????????9.2.4測試

1. 清掉 taildir的偏移量記錄文件;清掉filechannel的數(shù)據(jù)緩存和checkpoint記錄;

2. 準(zhǔn)備一個(gè)日志文件,并不斷寫入數(shù)據(jù)

3. 啟動(dòng)flume的agent

4. 用kafka的客戶端去觀察數(shù)據(jù)結(jié)果

9.3 avro?sink

9.3.1 特性

avro?sink用來向avro?source發(fā)送avro序列化數(shù)據(jù),這樣就可以實(shí)現(xiàn)agent之間的級(jí)聯(lián)

??????????????9.3.2參數(shù)

Property Name

Default

Description

channel

type

The component type name, needs to be?avro.

hostname

目標(biāo)avro source的主機(jī)

port

目標(biāo)avro source的綁定端口

batch-size

100

number of event to batch together for send.

connect-timeout

20000

連接超時(shí)時(shí)間

request-timeout

20000

請(qǐng)求超時(shí)時(shí)間

reset-connection-interval

none

Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

compression-type

none

This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource

compression-level

6

The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression

ssl

false

Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.

trust-all-certs

false

If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “l(fā)isten in” on the encrypted connection.

truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.

truststore-password

The password for the truststore. If not specified, then the global keystore password will be used (if defined).

truststore-type

JKS

The type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).

exclude-protocols

SSLv3

Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.

maxIoWorkers

2 * the number of available processors in the machine

The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

????????????????????????????9.3.3配置示例

級(jí)聯(lián)配置,需要至少兩個(gè)flume?agent來演示

在C703上,配置avro?sink?發(fā)送者

## c703 ##

a1.sources = s1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.channels = c1

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = c701

a1.sinks.k1.port = 4545

在C701上,配置avro?source?接收者

## c701 ##

a1.sources = s1

a1.sources.s1.type = avro

a1.sources.s1.hostname = 0.0.0.0

a1.sources.s1.port = 4545

a1.sources.s1.channel = c1

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

????????????????????????????9.3.4啟動(dòng)測試

先在C701上啟動(dòng)接受者avro?source(服務(wù))

bin/flume-ng agent -n a1 -c conf/ -f myconf/avro-mem-logger.conf -Dflume.root.logger=INFO,console

再在C703上啟動(dòng)發(fā)送者avro?sink(客戶端)

bin/flume-ng agent -n a1 -c conf/ -f myconf/tail-mem-avro.conf -Dflume.root.logger=INFO,console

10 Flume常用組件詳解:Selector

一個(gè)source可以對(duì)接多個(gè)channel

那么,source的數(shù)據(jù)如何在多個(gè)channel之間傳遞,就由selector來控制

配置應(yīng)該掛載到source組件上

???????10.1實(shí)踐一:replicating selector復(fù)制選擇器

replicating selector就是默認(rèn)的選擇器

官網(wǎng)配置參考

??????????????????????????10.1.1目標(biāo)場景

selector將event復(fù)制,分發(fā)給所有下游節(jié)點(diǎn)

??????????????????????????????????????????10.1???????.2Flume agent配置

# Name the components on this agent ?a1.sources = r1 ?a1.sinks = k1 k2 ?a1.channels = c1 c2 ?# http source, with replicating selectora1.sources.r1.type = httpa1.sources.r1.port = 6666a1.sources.r1.bind = mastera1.sources.r1.selector.type = replicating ?# Describe the sink ?a1.sinks.k1.type = avro ?a1.sinks.k1.hostname = slave1 ?# bind to remote host,RPCa1.sinks.k1.port = 6666a1.sinks.k2.type = avro# bind to remote host,PRCa1.sinks.k2.hostname = slave2a1.sinks.k2.port = 6666# 2 channels in selector testa1.channels.c1.type = memory ?a1.channels.c1.capacity = 1000 ?a1.channels.c1.transactionCapacity = 100 ?a1.channels.c2.type = memory ?a1.channels.c2.capacity = 1000 ?a1.channels.c2.transactionCapacity = 100 ?# bind source ,sink to channelsa1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1 ?a1.sinks.k2.channel = c2

??????????????10.1???????.3?Collector1 配置

# 01 specify agent,source,sink,channela1.sources = r1a1.sinks = k1a1.channels = c1# 02 avro source,connect to local port 6666a1.sources.r1.type = avroa1.sources.r1.bind = slave1a1.sources.r1.port = 6666# 03 logger sinka1.sinks.k1.type = logger# 04 channel,memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 05 bind source,sink to channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

???????10.1.4?Collector2 配置

# 01 specify agent,source,sink,channela1.sources = r1a1.sinks = k1a1.channels = c1# 02 avro source,connect to local port 6666a1.sources.r1.type = avroa1.sources.r1.bind = slave2a1.sources.r1.port = 6666# 03 logger sinka1.sinks.k1.type = logger# 04 channel,memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 05 bind source,sink to channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

???????10.1.5?測試驗(yàn)證

???????10.2?實(shí)踐二:multiplexing selector多路選擇器

multiplexing selector可以根據(jù)event中的一個(gè)指定key的value來決定這條消息會(huì)寫入哪個(gè)channel,具體在選擇時(shí),需要配置一個(gè)映射關(guān)系,比如

a1.sources.r1.selector.mapping.CZ=c1 ?; 就意味著header中的value為CZ的話,這條消息就會(huì)被寫入c1這個(gè)channel

multiplexing selector官方配置參考

???????10.2.1目標(biāo)場景

????????10.2.2?第一級(jí)1 / 2配置

a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = execa1.sources.r1.command = tail -F /root/logs/a.loga1.sources.r1.channels = c1a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = flag# 第一臺(tái)value=1,另一臺(tái)value=2a1.sources.r1.interceptors.i1.value = 1a1.channels.c1.type = memorya1.sinks.k1.type = avroa1.sinks.k1.hostname = h3a1.sinks.k1.port = 44444a1.sinks.k1.channel = c1

??????????????????????10.2.3?第二級(jí)配置

a1.sources = r1a1.channels = c1 c2a1.sinks = k1 k2# source配置a1.sources.r1.channels = c1 c2a1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 44444# source的選擇器配置a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = flaga1.sources.r1.selector.default = c2a1.sources.r1.selector.mapping.1 = c1a1.sources.r1.selector.mapping.2 = c2# channle 配置a1.channels.c1 = memorya1.channels.c2 = memory# 兩個(gè)sink分別對(duì)接兩個(gè)channel的配置a1.sinks.k1.type = loggera1.sinks.k1.channel = c1a1.sinks.k2.type = loggera1.sinks.k2.channel = c2

????????????????????????????????????10.2.4?測試驗(yàn)證

11 Flume常用組件詳解:grouping processor

一個(gè)agent中,多個(gè)sink可以被組裝到一個(gè)組,而數(shù)據(jù)在組內(nèi)多個(gè)sink之間發(fā)送,有兩種模式:

模式1: Failover Sink Processor??失敗切換

一組中只有優(yōu)先級(jí)高的那個(gè)sink在工作,另一個(gè)是等待中

如果高優(yōu)先級(jí)的sink發(fā)送數(shù)據(jù)失敗,則專用低優(yōu)先級(jí)的sink去工作!并且,在配置時(shí)間penalty之后,還會(huì)嘗試用高優(yōu)先級(jí)的去發(fā)送數(shù)據(jù)!

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

## 對(duì)兩個(gè)sink分配不同的優(yōu)先級(jí)

a1.sinkgroups.g1.processor.priority.k1 = 200

a1.sinkgroups.g1.processor.priority.k2 = 100

## 主sink失敗后,停用懲罰時(shí)間

a1.sinkgroups.g1.processor.maxpenalty = 5000

模式2: Load balancing Sink Processor ?負(fù)載均衡

允許channel中的數(shù)據(jù)在一組sink中的多個(gè)sink之間進(jìn)行輪轉(zhuǎn),策略有:

round-robin(輪著發(fā))

random(隨機(jī)挑)

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = load_balance

a1.sinkgroups.g1.processor.backoff = true

a1.sinkgroups.g1.processor.selector = random

12Flume自定義擴(kuò)展組件

12.1自定義Source

??????????????12.1.1需求場景

什么情況下需要自定義source:

一般是某種數(shù)據(jù)源,用flume內(nèi)置的source組件無法解析,比如XML文檔

而本教程中的例子:實(shí)現(xiàn)文本日志的采集,并能記住偏移量!

????????????????????????????12.1.2實(shí)現(xiàn)思路

首先,找到自定義source所要實(shí)現(xiàn)或繼承的父類/接口

然后,重寫方法(插入自己的需求邏輯)

然后,將代碼打成jar包,傳入flume的lib目錄

最后,寫配置文件調(diào)用自定義的source

??????????????12.1.3代碼架構(gòu)

source是由 SourceRunner—》EventDrivenSourceRunner來調(diào)用

sourceRunner 拿到?source實(shí)例對(duì)象

然后調(diào)?source的start方法

source讀數(shù)據(jù),然后將數(shù)據(jù)轉(zhuǎn)成event

然后將event傳給channelprocessor.processEvent(event)

processEvent中第一個(gè)動(dòng)作就是調(diào)用攔截器攔截這個(gè)event,然后再往channel中寫入

?????12.1.4?具體實(shí)現(xiàn)

  • 線程池實(shí)現(xiàn)版:

package cn.doitedu.flume.custom;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.SystemClock;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.ExecSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
?* 能夠記錄讀取位置偏移量的自定義source
?*/
public class HoldOffesetSource extends AbstractSource implements EventDrivenSource, Configurable {
????private static final Logger logger = LoggerFactory.getLogger(HoldOffesetSource.class);

????private String positionfilepath;
????private String logfile;
????private int batchsize;

????private ExecutorService exec;

????/**
?????* 框架調(diào)用本方法,開始采集數(shù)據(jù)
?????* 自定義代碼去讀取數(shù)據(jù),轉(zhuǎn)為event
?????* 用getChannelProcessor()方法(定義在父類中)去獲取框架的channelprocessor(channel處理器)
?????* 調(diào)用這個(gè)channelprocessor將event提交給channel
?????*/
????@Override
????public synchronized void start() {

????????super.start();
????????// 用于向channel提交數(shù)據(jù)的一個(gè)處理器
????????ChannelProcessor channelProcessor = getChannelProcessor();

????????// 獲取歷史偏移量
????????long offset = 0;
????????try {
????????????File positionfile = new File(this.positionfilepath);
????????????String s = FileUtils.readFileToString(positionfile);
????????????offset = Long.parseLong(s);
????????} catch (IOException e) {
????????????e.printStackTrace();
????????}

????????// 構(gòu)造一個(gè)線程池
????????exec = Executors.newSingleThreadExecutor();
????????// 向線程池提交數(shù)據(jù)采集任務(wù)
????????exec.execute(new HoldOffsetRunnable(offset, logfile, channelProcessor, batchsize, positionfilepath));

????}

????/**
?????* 停止前要調(diào)用的方法
?????* 可以在這里做一些資源關(guān)閉清理工作
?????*/
????@Override
????public synchronized void stop() {
????????super.stop();

???????try{
???????????exec.shutdown();
???????}catch (Exception e){
???????????exec.shutdown();
???????}
????}

????/**
?????* 獲取配置文件中的參數(shù),來配置本source實(shí)例
?????* <p>
?????* 要哪些參數(shù):
?????* 偏移量記錄文件所在路徑
?????* 要采集的文件所在路徑
?????*
?????* @param context
?????*/
????public void configure(Context context) {

????????// 這是我們source用來記錄偏移量的文件路徑
????????this.positionfilepath = context.getString("positionfile", "./");

????????// 這是我們source要采集的日志文件的路徑
????????this.logfile = context.getString("logfile");

????????// 這是用戶配置的采集事務(wù)批次最大值
????????this.batchsize = context.getInteger("batchsize", 100);

????????// 如果日志文件路徑?jīng)]有指定,則拋異常
????????if (StringUtils.isBlank(logfile)) throw new RuntimeException("請(qǐng)配置需要采集的文件路徑");

????}

????/**
?????* 采集文件的具體工作線程任務(wù)類
?????*/
????private static class HoldOffsetRunnable implements Runnable {

????????long offset;
????????String logfilepath;
????????String positionfilepath;
????????ChannelProcessor channelProcessor; ?// channel提交器 (里面會(huì)調(diào)攔截器,會(huì)開啟寫入channel的事務(wù))
????????int batchsize; // 批次大小
????????List<Event> events = new ArrayList<Event>(); ?// 用來保存一批事件
????????SystemClock systemClock = new SystemClock();

????????public HoldOffsetRunnable(long offset, String logfilepath, ChannelProcessor channelProcessor, int batchsize, String positionfilepath) {
????????????this.offset = offset;
????????????this.logfilepath = logfilepath;
????????????this.channelProcessor = channelProcessor;
????????????this.batchsize = batchsize;
????????????this.positionfilepath = positionfilepath;
????????}

????????public void run() {

????????????try {
????????????????// 先定位到指定的offset
????????????????RandomAccessFile raf = new RandomAccessFile(logfilepath, "r");
????????????????raf.seek(offset);

????????????????// 循環(huán)讀數(shù)據(jù)
????????????????String line = null;

????????????????// 記錄上一批提交的時(shí)間
????????????????long lastBatchTime = System.currentTimeMillis();
????????????????while (true) {
????????????????????line = raf.readLine();
????????????????????if(line == null ){
????????????????????????Thread.sleep(2000);
????????????????????????continue;
????????????????????}

????????????????????// 將數(shù)據(jù)轉(zhuǎn)成event
????????????????????Event event = EventBuilder.withBody(line.getBytes());
????????????????????// 裝入list batch
????????????????????synchronized (HoldOffesetSource.class) {
????????????????????????events.add(event);
????????????????????}

????????????????????// 判斷批次大小是否滿 或者 時(shí)間到了沒有
????????????????????if (events.size() >= batchsize || timeout(lastBatchTime)) {
????????????????????????// 滿足,則提交
????????????????????????channelProcessor.processEventBatch(events);

????????????????????????// 記錄提交時(shí)間
????????????????????????lastBatchTime = systemClock.currentTimeMillis();

????????????????????????// 記錄偏移量
????????????????????????long offset = raf.getFilePointer();
????????????????????????FileUtils.writeStringToFile(new File(positionfilepath), offset + "");

????????????????????????// 清空本批event
????????????????????????events.clear();

????????????????????}

????????????????????// 不滿足,繼續(xù)讀
????????????????}
????????????} catch (FileNotFoundException e) {
????????????????logger.error("要采集的文件不存在");
????????????} catch (IOException e) {
????????????????logger.error("我也不知道怎么搞的,不好意思,我罷工了");
????????????} catch (InterruptedException e) {
????????????????logger.error("線程休眠出問題了");
????????????}
????????}

????????// 判斷是否批次間隔超時(shí)
????????private boolean timeout(long lastBatchTime) {
????????????return systemClock.currentTimeMillis() - lastBatchTime > 2000;
????????}
????}
}

  • 單線程實(shí)現(xiàn)版

12.1.5 啟動(dòng)測試

代碼打成jar包,上傳flume的lib

然后寫agent配置文件

a1.sources = s1

a1.channels = c1

a1.sinks = k1

# source 配置

a1.sources.s1.type = cn.doitedu.flume.custom.HoldOffesetSource

a1.sources.s1.position_file_path= /root/myposition

a1.sources.s1.data_file_path = /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.batchTime = 5000

a1.sources.s1.channels = c1

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

然后啟動(dòng)flume-agent即可

12.2?自定義攔截器

12.2.1需求場景

公司的點(diǎn)擊流日志數(shù)據(jù)在一個(gè)目錄中不斷生成: /var/log/click_streaming.log

日志文件會(huì)隨著文件的大小達(dá)到一定閾值(128M)而被重命名,比如:click_streaming.log.1,并且會(huì)生成一個(gè)新的click_streaming.log文件繼續(xù)寫入日志流;

日志文件中數(shù)據(jù)格式如下:

13888776677,click,/baoming,張飛,湖北省

13889976655,click,/xuexi,關(guān)羽,山西省

……..

現(xiàn)在需要用flume去日志服務(wù)器上采集數(shù)據(jù)寫入HDFS,并且要求,對(duì)數(shù)據(jù)中的手機(jī)號(hào)、姓名字段進(jìn)行加密(MD5加密,并將加密結(jié)果變成BASE64編碼);

??????????????12.2.2實(shí)現(xiàn)思路

采集的目標(biāo)目錄中,會(huì)不斷生成新文件

那么,我們的source組件可以選擇 ?taildir:

1.可以監(jiān)控到新文件 ?

2.可以記錄采集的偏移量

channel,為了保證可靠性,可以選擇 ?filechannel?:

1.會(huì)在磁盤上緩存event??

2.會(huì)在磁盤上記錄事務(wù)狀態(tài)

目標(biāo)存儲(chǔ)是HDFS,sink自然是選擇hdfs?sink;

加密需求的解決:

如果從source上解決,那只能修改 taildir組件的源碼;

如果從sink上解決,那只能修改hdfs?sink組件的源碼;

上述兩種,都需要修改源碼,不是最佳選擇!

最佳選擇:通過攔截器來實(shí)現(xiàn)對(duì)數(shù)據(jù)的加工!而flume中沒有現(xiàn)成的內(nèi)置攔截器可以實(shí)現(xiàn)字段加密,我們可以自定義自己的攔截器;

??????????????12.2.3?自定義攔截器的開發(fā)

????????????????????????????12.2.3.1基本套路

框架中,自定義擴(kuò)展接口的套路:

1. 要實(shí)現(xiàn)或者繼承框架中提供的接口或父類,實(shí)現(xiàn)、重寫其中的方法

2. 寫好的代碼要打成jar包,并放入flume的lib目錄

3. 要將自定義的類,寫入相關(guān)agent配置文件

??????????????????????????????????????????12.2.3.2攔截器設(shè)計(jì)

應(yīng)對(duì)本場景使用自定義攔截器,還要考慮幾個(gè)參數(shù)的問題:

用戶要加密的字段,可能會(huì)變化,代碼的可配置性需要匹配

1. 可以在配置文件中設(shè)計(jì)一個(gè)參數(shù),來指定要加密的字段:??

indices (要加密的字段索引)

以及索引的切割符idxSplitBy

--》 比如: ?a1.sources.interceptors.i1.indices = 0:3

a1.sources.interceptors.i1.idxSplitBy = :

2.為了能夠正確切分?jǐn)?shù)據(jù)中的字段,還需要一個(gè)參數(shù):字段的分隔符dataSplitBy ?

--》 比如: a1.sources.interceptors.i1.dataSplitBy= ,

??????????????????????????????????????????12.2.3.3flume中的攔截器接口規(guī)范

首先,引入flume的開發(fā)依賴

<dependency>
????<groupId>org.apache.flume</groupId>
????<artifactId>flume-ng-core</artifactId>
????<version>1.9.0</version>
????<scope>provided</scope>
</dependency>

自定義攔截器的工作機(jī)制,其實(shí)很簡單:

flume先調(diào)自定義攔截器中的一個(gè)內(nèi)部Builder類的config()方法進(jìn)行參數(shù)配置

flume再調(diào)Builder類的build()方法獲取自定義攔截器的實(shí)例對(duì)象(可以在構(gòu)造過程中傳遞參數(shù))

flume再反復(fù)調(diào)用攔截器對(duì)象的intercept(List<Event> events)方法來修改event

????????????????????????????????????????????????????????12.2.3.4?代碼實(shí)現(xiàn)

package cn.doitedu.flume.custom;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class EncryptInterceptor ?implements Interceptor {
????// 要加密的字段索引s
????String indices;
????// 索引之間的分隔符
????String idxSplitBy;
????// 數(shù)據(jù)體字段之間的分隔符
????String dataSplitBy;

????/**
?????* 構(gòu)造方法
?????* @param indices
?????* @param idxSplitBy
?????* @param dataSplitBy
?????*/
????public EncryptInterceptor(String indices, String idxSplitBy, String dataSplitBy) {
????????// 0,3
????????this.indices = indices;
????????this.idxSplitBy = idxSplitBy;
????????this.dataSplitBy = dataSplitBy;
????}

????// 這個(gè)方法會(huì)被框架調(diào)用一次,用來做一些初始化工作
????public void initialize() {

????}

????// 攔截方法--對(duì)一個(gè)event進(jìn)行處理
????public Event intercept(Event event) {

????????byte[] body = event.getBody();
????????String dataStr = new String(body);

????????// 數(shù)據(jù)的字段數(shù)組
????????String[] dataFieldsArr = dataStr.split(dataSplitBy);

????????// 需要加密的索引的數(shù)組
????????String[] idxArr = indices.split(idxSplitBy);

????????for (String s : idxArr) {

????????????int index = Integer.parseInt(s);
????????????// 取出要加密的字段的內(nèi)容
????????????String field = dataFieldsArr[index];
????????????// MD5加密這個(gè)字段
????????????String encryptedField = DigestUtils.md5Hex(field);
????????????// BASE64編碼
????????????byte[] bytes = Base64.decodeBase64(encryptedField);
????????????// 替換掉原來的未加密內(nèi)容
????????????dataFieldsArr[index] = new String(bytes);
????????}

????????// 將加密過的字段重新拼接成一條數(shù)據(jù),并使用原來的分隔符
????????StringBuilder sb = new StringBuilder();
????????for (String field : dataFieldsArr) {
????????????sb.append(field).append(dataSplitBy);
????????}

????????sb.deleteCharAt(sb.lastIndexOf(dataSplitBy));

????????// 返回加密后的字段所封裝的event對(duì)象
????????return EventBuilder.withBody(sb.toString().getBytes());
????}

????// 攔截方法--對(duì)一批event進(jìn)行處理
????public List<Event> intercept(List<Event> events) {

????????ArrayList<Event> lst = new ArrayList<Event>();

????????for (Event event : events) {
????????????Event eventEncrpt = intercept(event);
????????????lst.add(eventEncrpt);
????????}

????????return lst;
????}

????// agent退出前,會(huì)調(diào)一次該方法,進(jìn)行需要的清理、關(guān)閉操作
????public void close() {

????}

????/**
?????* 攔截器的構(gòu)造器
?????*/
????public staticc lass EncryptInterceptorBuilder implements Interceptor.Builder{
????????// 要加密的字段索引s
????????String indices;
????????// 索引之間的分隔符
????????String idxSplitBy;
????????// 數(shù)據(jù)體字段之間的分隔符
????????String dataSplitBy;

????????// 構(gòu)造一個(gè)攔截器實(shí)例
????????public Interceptor build() {

????????????return new EncryptInterceptor(indices,idxSplitBy,dataSplitBy);
????????}

????????// 獲取配置文件中的攔截器參數(shù)
????????public void configure(Context context) {
????????????// 要加密的字段索引s
????????????this.indices = context.getString(Constants.INDICES);
????????????// 索引之間的分隔符
????????????this.idxSplitBy = context.getString(Constants.IDX_SPLIT_BY);
????????????// 數(shù)據(jù)體字段之間的分隔符
????????????this.dataSplitBy = context.getString(Constants.DATA_SPLIT_BY);

????????}
????}

????public static class Constants {
????????public static final String INDICES = "indices";
????????public static final String IDX_SPLIT_BY = "idxSplitBy";
????????public static final String DATA_SPLIT_BY = "dataSplitBy";
????}
}

??????????????????????????????????????????????????????????????????????12.2.3.5?運(yùn)行測試

1. 先將代碼打成jar包

2. 上傳到flume安裝節(jié)點(diǎn)上,并放入flume的lib目錄

3. 寫采集方案配置文件

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.s1.interceptors.i1.indices = 0:4

a1.sources.s1.interceptors.i1.idxSplitBy = :

a1.sources.s1.interceptors.i1.dataSplitBy = ,

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

# 寫到kafka就用這一段sink配置

# a1.sinks = k1

# a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# a1.sinks.k1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

# a1.sinks.k1.channel = c1

4. 準(zhǔn)備數(shù)據(jù)日志文件,與你的處理邏輯相符,如下所示

13888776677,click,/baoming,張飛,湖北省

13889976655,click,/xuexi,關(guān)羽,山西省

…….

5.運(yùn)行

bin/flume-ng agent -n a1 -c conf/ -f agentconf/spooldir-myi-m-log.conf -Dflume.root.logger=INFO,console

13?綜合案例

???????13.1?案例場景

A、B等日志服務(wù)機(jī)器實(shí)時(shí)生產(chǎn)日志,日志分為多種類型:

log1/access.log

log2/nginx.log

log3/web.log

現(xiàn)在要求:

?把日志服務(wù)器中的各類日志采集匯總到一個(gè)中轉(zhuǎn)agent上,然后分類寫入hdfs中。

但是在hdfs中要求的目錄為:

/source/logs/access/20160101/**

/source/logs/nginx/20160101/**

/source/logs/web/20160101/**

并要求可以按指定的索引,將對(duì)應(yīng)字段內(nèi)容加密!

???????13.2?實(shí)現(xiàn)思路

1. 每臺(tái)日志服務(wù)器上部署一個(gè)flume?agent?-?-?-> level1,每個(gè)agent配置3個(gè)source對(duì)應(yīng)3類數(shù)據(jù)

2. leve1_1級(jí)的agent在采集數(shù)據(jù)時(shí),添加一個(gè)header,指定數(shù)據(jù)的類別

3. level_1級(jí)的agent要配置兩個(gè)avro sink,各自對(duì)接一個(gè)下級(jí)的agent

4. level_1還要配置sink?processoràfail?over??sink?processor,控制兩個(gè)sink中只有一個(gè)avro sink在工作,如果失敗再切換到另一個(gè)avro sink

5.level_1還要配置字段加密攔截器

6. level_2 級(jí)配置兩個(gè)flume?agent,使用avro?source接收數(shù)據(jù)

7. level_2 級(jí)的hdfs?sink,目錄配置使用動(dòng)態(tài)通配符,取到event中的類別header,以便于將不同類別數(shù)據(jù)寫入不同hdfs?目錄!

??????????????13.4?配置文件

level_1級(jí)配置文件

a1.sources = r1 r2 r3

## source 配置

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/chekp1/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/log1/access.log

a1.sources.r1.maxBatchCount = 1000

a1.sources.r1.interceptors = i1 i2 i3

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r1.interceptors.i2.indices = 0:4

a1.sources.r1.interceptors.i2.idxSplitBy = :

a1.sources.r1.interceptors.i2.dataSplitBy = ,

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = logtype

a1.sources.r1.interceptors.i3.value = access

a1.sources.r2.type = TAILDIR

a1.sources.r2.channels = c1

a1.sources.r2.positionFile = /root/chekp2/taildir_position.json

a1.sources.r2.filegroups = f1

a1.sources.r2.filegroups.f1 = /root/weblog/log2/nginx.log

a1.sources.r2.maxBatchCount = 1000

a1.sources.r2.interceptors = i1 i2 i3

a1.sources.r2.interceptors.i1.type = timestamp

a1.sources.r2.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r2.interceptors.i2.indices = 0:4

a1.sources.r2.interceptors.i2.idxSplitBy = :

a1.sources.r2.interceptors.i2.dataSplitBy = ,

a1.sources.r2.interceptors.i3.type = static

a1.sources.r2.interceptors.i3.key = logtype

a1.sources.r2.interceptors.i3.value = nginx

a1.sources.r3.type = TAILDIR

a1.sources.r3.channels = c1

a1.sources.r3.positionFile = /root/chekp3/taildir_position.json

a1.sources.r3.filegroups = f1

a1.sources.r3.filegroups.f1 = /root/weblog/log3/weblog.log

a1.sources.r3.maxBatchCount = 1000

a1.sources.r3.interceptors = i1 i2 i3

a1.sources.r3.interceptors.i1.type = timestamp

a1.sources.r3.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r3.interceptors.i2.indices = 0:4

a1.sources.r3.interceptors.i2.idxSplitBy = :

a1.sources.r3.interceptors.i2.dataSplitBy = ,

a1.sources.r3.interceptors.i3.type = static

a1.sources.r3.interceptors.i3.key = logtype

a1.sources.r3.interceptors.i3.value = weblog

## channel 配置

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/channel_chkp

a1.channels.c1.dataDirs = /root/channel_data

## sink 配置

a1.sinks = k1 k2

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = c704

a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c1

a1.sinks.k2.hostname = c705

a1.sinks.k2.port = 4545

## sink processor - fail over 失敗配置

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

## 對(duì)兩個(gè)sink分配不同的優(yōu)先級(jí)

a1.sinkgroups.g1.processor.priority.k1 = 200

a1.sinkgroups.g1.processor.priority.k2 = 100

## 主sink失敗后,停用懲罰時(shí)間

a1.sinkgroups.g1.processor.maxpenalty = 5000

level_2配置

a1.sources = s1

a1.channels = c1

a1.sinks = k1

## source 配置

a1.sources.s1.type = avro

a1.sources.s1.bind= 0.0.0.0

a1.sources.s1.port = 4545

a1.sources.s1.channels = c1

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = timestamp

## channel 配置

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/lev2_channel_chkp

a1.channels.c1.dataDirs = /root/lev2_channel_data

## sink配置

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://c701:8020/doitedu/%{logtype}/%Y-%m-%d/%H/

a1.sinks.k1.hdfs.filePrefix = doitedu-

a1.sinks.k1.hdfs.fileSuffix = .log.gz

a1.sinks.k1.hdfs.rollInterval = 0

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0

## 配置壓縮

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

## 數(shù)據(jù)格式

a1.sinks.k1.hdfs.serializer = TEXT

???????13.5?啟動(dòng)測試

1. 先把自定義攔截器代碼jar包放入level_1級(jí)(C701/C702/C703)的所有flume的lib目錄中;

2. 將各臺(tái)機(jī)器上之前的一些checkpoint、緩存等目錄清除;

2. 啟動(dòng)level_2級(jí)的兩個(gè)agent(C704、C705上);

3. 在level_1的所有機(jī)器上,創(chuàng)建日志數(shù)據(jù)目錄,并寫腳本模擬往3類日志中寫入日志:

4.在level_1的所有機(jī)器上啟動(dòng)level_1級(jí)的flume?agent

5.到hdfs上觀察結(jié)果

6.嘗試kill掉2級(jí)的c704 的 agent,看是否能夠故障切換

14?面試加強(qiáng)

???????14.1?flume事務(wù)機(jī)制

二、Delivery 保證

認(rèn)識(shí) Flume 對(duì)事件投遞的可靠性保證是非常重要的,它往往是我們是否使用 Flume 來解決問題的決定因素之一。

消息投遞的可靠保證有三種:

  1. At-least-once
  2. At-most-once
  3. Exactly-once

基本上所有工具的使用用戶都希望工具框架能保證消息 Exactly-once ,這樣就不必在設(shè)計(jì)實(shí)現(xiàn)上考慮消息的丟失或者重復(fù)的處理場景。但是事實(shí)上很少有工具和框架能做到這一點(diǎn),真正能做到這一點(diǎn)所付出的成本往往很大,或者帶來的額外影響反而讓你覺得不值得。假設(shè) Flume 真的做到了 Exactly-once ,那勢必降低了穩(wěn)定性和吞吐量,所以?Flume 選擇的策略是 At-least-once 。

當(dāng)然這里的 At-least-once 需要加上引號(hào),并不是說用上 Flume 的隨便哪個(gè)組件組成一個(gè)實(shí)例,運(yùn)行過程中就能保存消息不會(huì)丟失。事實(shí)上 At-least-once 原則只是說的是 Source 、 Channel 和 Sink 三者之間上下投遞消息的保證。而當(dāng)你選擇 MemoryChannel 時(shí),實(shí)例如果異常掛了再重啟,在 channel 中的未被 sink 所消費(fèi)的殘留數(shù)據(jù)也就丟失了,從而沒辦法保證整條鏈路的 At-least-once。

Flume 的 At-least-once 保證的實(shí)現(xiàn)基礎(chǔ)是建立了自身的 Transaction 機(jī)制。Flume 的 Transaction 有4個(gè)生命周期函數(shù),分別是 start、 commit、rollback 和 close。

當(dāng) Source 往 Channel 批量投遞事件時(shí)首先調(diào)用 start 開啟事務(wù),批量

put 完事件后通過 commit 來提交事務(wù),如果 commit 異常則 rollback ,然后 close 事務(wù),最后 Source 將剛才提交的一批消息事件向源服務(wù) ack(比如 kafka 提交新的 offset )。Sink 消費(fèi) Channel 也是相同的模式,唯一的區(qū)別就是 Sink 需要在向目標(biāo)源完成寫入之后才對(duì)事務(wù)進(jìn)行 commit。兩個(gè)組件的相同做法都是只有向下游成功投遞了消息才會(huì)向上游 ack,從而保證了數(shù)據(jù)能 At-least-once 向下投遞。

???????14.2flume agent內(nèi)部機(jī)制

組件:

1、ChannelSelector

ChannelSelector 的作用就是選出 Event 將要被發(fā)往哪個(gè) Channel。其共有兩種類型,分別是 Replicating(復(fù)制)和 Multiplexing(多路復(fù)用)。 ReplicatingSelector 會(huì)將同一個(gè) Event 發(fā)往所有的 Channel,Multiplexing 會(huì)根據(jù)相應(yīng)的原則,將不同的 Event 發(fā)往不同的 Channel。

2、SinkProcessor

(1) SinkProcessor 共 有 三 種 類 型 , 分 別 是 DefaultSinkProcessor 、

LoadBalancingSinkProcessor 和 FailoverSinkProcessor。

(2) DefaultSinkProcessor 對(duì)應(yīng)的是單個(gè)的 Sink,

LoadBalancingSinkProcessor 和 FailoverSinkProcessor 對(duì)應(yīng)的是 Sink Group。

(3) LoadBalancingSinkProcessor 可以實(shí)現(xiàn)負(fù)載均衡的功能,FailoverSinkProcessor 可以實(shí)現(xiàn)故障轉(zhuǎn)移的功能。

???????14.3?ganglia及flume監(jiān)控

開啟內(nèi)置監(jiān)控功能

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

將監(jiān)控?cái)?shù)據(jù)發(fā)往ganglia進(jìn)行展現(xiàn)

-Dflume.monitoring.type=ganglia -Dflume.monitoring.port=34890

???????14.4?Flume調(diào)優(yōu)

flume-ng agent包括source、channel、sink三個(gè)部分,這三部分都運(yùn)行在JVM上,而JVM運(yùn)行在linux操作系統(tǒng)之上。因此,對(duì)于flume的性能調(diào)優(yōu),就是對(duì)這三部分及影響因素調(diào)優(yōu)。

1、source的配置

該項(xiàng)目中采用的是 taildir source,他的讀取速度能夠跟上命令行寫入日志的速度,故并未做特殊的處理。

2、channel的配置

可選的channel配置一般有兩種,一是memory channel,二是file channel。

建議在內(nèi)存足夠的情況下,優(yōu)先選擇memory channel。

嘗試過相同配置下使用file channel和memory channel,file channel明顯速度較慢,并且會(huì)生成log的文件,應(yīng)該是用作緩存,當(dāng)source已經(jīng)接收但是還未寫入sink時(shí)的event都會(huì)存在這個(gè)文件中。這樣的好處是保證數(shù)據(jù)不會(huì)丟失,所以當(dāng)對(duì)數(shù)據(jù)的丟失情況非常敏感且對(duì)實(shí)時(shí)性沒有太大要求的時(shí)候,還是使用file memory吧。。

一開始的memory channel配置用的是默認(rèn)的,然后控制臺(tái)報(bào)出了如下警告:

The channel is full or unexpected failure. The source will try again after 1000 ms

這個(gè)是因?yàn)楫?dāng)前被采集的文件過大,可以通過增大keep-alive的值解決。深層的原因是文件采集的速度和sink的速度沒有匹配好。

所以memory channel有三個(gè)比較重要的參數(shù)需要配置:

#channel中最多緩存多少

a1.channels.c1.capacity = 5000

#channel一次最多吐給sink多少

a1.channels.c1.transactionCapacity = 2000

#event的活躍時(shí)間

a1.channels.c1.keep-alive = 10

3、sink的配置

可以通過壓縮來節(jié)省空間和網(wǎng)絡(luò)流量,但是會(huì)增加cpu的消耗。

batch:size越大性能越好,但是太大會(huì)影響時(shí)效性,一般batch size和源數(shù)據(jù)端的大小相同。

4、java內(nèi)存的配置

export JAVA_OPTS="-Xms512m -Xmx2048m -Dcom.sun.management.jmxremote"

主要涉及Xms和Xmx兩個(gè)參數(shù),可以根據(jù)實(shí)際的服務(wù)器的內(nèi)存大小進(jìn)行設(shè)計(jì)。

5、OS內(nèi)核參數(shù)的配置

如果單臺(tái)服務(wù)器啟動(dòng)的flume agent過多的話,默認(rèn)的內(nèi)核參數(shù)設(shè)置偏小,需要調(diào)整。(待補(bǔ)充,暫時(shí)還未涉及)。

總結(jié)

以上是生活随笔為你收集整理的Flume知识点全面总结教程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。