Flume 实战开发指南
Flume
文章目錄
- Flume
- Flume介紹
- Flume核心概念
- Flume NG的體系結構
- Source
- Channel
- Sink
- Flume的部署類型
- 單一流程
- 多代理流程(多個agent順序連接)
- 流的合并(多個Agent的數據匯聚到同一個Agent )
- 多路復用流(多級流)
- load balance功能
- Flume組件選型
- Source
- Channel
- FileChannel和MemoryChannel區別
- FileChannel優化
- Sink
- HDFS小文件處理
- 案例:日志采集發送到KafkaFlume配置
- 日志采集流程
- 配置如下
- Flume攔截器開發
- Flume采集腳本批量啟動停止
- 案例:消費Kafka保存到HDFS
- 配置如下
- FileChannel優化
- HDFS小文件處理
- Flume時間戳攔截器
- 項目經驗之Flume內存優化
Flume介紹
Flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支持在日志系統中定制各類數據發送方,用于收集數據;同時,Flume提供對數據進行簡單處理,并寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。
flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日志數據(字節數組形式)并且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。
flume的可靠性
當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復后,繼續發送),Besteffort(數據發送到接收方后,不會進行確認)。
flume的可恢復性
還是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統里(性能較差)。
Flume核心概念
| Event | 一個數據單元,消息頭和消息體組成。(Events可以是日志記錄、 avro 對象等。) |
| Flow | Event從源點到達目的點的遷移的抽象。 |
| Agent | 一個獨立的Flume進程,包含組件Source、 Channel、 Sink。(Agent使用JVM 運行Flume。每臺機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。) |
| Source | 數據收集組件。(source從Client收集數據,傳遞給Channel) |
| Channel | 中轉Event的一個臨時存儲,保存由Source組件傳遞過來的Event。(Channel連接 sources 和 sinks ,這個有點像一個隊列。) |
| Sink | 從Channel中讀取并移除Event, 將Event傳遞到FlowPipeline中的下一個Agent(如果有的話)(Sink從Channel收集數據,運行在一個獨立線程。) |
Flume NG的體系結構
Flume 運行的核心是 Agent。Flume以agent為最小的獨立運行單位。一個agent就是一個JVM。它是一個完整的數據收集工具,含有三個核心組件,分別是source、 channel、 sink。通過這些組件, Event 可以從一個地方流向另一個地方,如下圖所示。
Source
Source是數據的收集端,負責將數據捕獲后進行特殊的格式化,將數據封裝到事件(event) 里,然后將事件推入Channel中。
Flume提供了各種source的實現,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。如果內置的Source無法滿足需要, Flume還支持自定義Source。
Channel
Channel是連接Source和Sink的組件,大家可以將它看做一個數據的緩沖區(數據隊列),它可以將事件暫存到內存中也可以持久化到本地磁盤上, 直到Sink處理完該事件。
Flume對于Channel,則提供了Memory Channel、JDBC Chanel、File Channel,etc。
MemoryChannel可以實現高速的吞吐,但是無法保證數據的完整性。
MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。
FileChannel保證數據的完整性與一致性。在具體配置不現的FileChannel時,建議FileChannel設置的目錄和程序日志文件保存的目錄設成不同的磁盤,以便提高效率。
Sink
Flume Sink取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。
Flume也提供了各種sink的實現,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。
Flume Sink在設置存儲數據時,可以向文件系統中,數據庫中,hadoop中儲數據,在日志數據較少時,可以將數據存儲在文件系中,并且設定一定的時間間隔保存數據。在日志數據較多時,可以將相應的日志數據存儲到Hadoop中,便于日后進行相應的數據分析。
Flume的部署類型
單一流程
多代理流程(多個agent順序連接)
可以將多個Agent順序連接起來,將最初的數據源經過收集,存儲到最終的存儲系統中。這是最簡單的情況,一般情況下,應該控制這種順序連接的Agent 的數量,因為數據流經的路徑變長了,如果不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。
流的合并(多個Agent的數據匯聚到同一個Agent )
這種情況應用的場景比較多,比如要收集Web網站的用戶行為日志, Web網站為了可用性使用的負載集群模式,每個節點都產生用戶行為日志,可以為每 個節點都配置一個Agent來單獨收集日志數據,然后多個Agent將數據最終匯聚到一個用來存儲數據存儲系統,如HDFS上。
多路復用流(多級流)
Flume還支持多級流,什么多級流?來舉個例子,當syslog, java, nginx、 tomcat等混合在一起的日志流開始流入一個agent后,可以agent中將混雜的日志流分開,然后給每種日志建立一個自己的傳輸通道。
load balance功能
下圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink組件上,而每個Sink組件分別連接到一個獨立的Agent上 。
Flume組件選型
Source
Taildir Source相比Exec Source、Spooling Directory Source的優勢?
TailDir Source:斷點續傳、多目錄。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置,實現斷點續傳。不會丟數據,但是有可能會導致數據重復。
Exec Source:可以實時搜集數據,但是在Flume不運行或者Shell命令出錯的情況下,數據將會丟失。
Spooling Directory Source:監控目錄,支持斷點續傳。
batchSize大小如何設置?
答:Event 1K左右時,500-1000合適(默認為100)
Channel
? 采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel數據存儲在Kafka里面,所以數據是存儲在磁盤中。
? 注意在Flume1.7以前,Kafka Channel很少有人使用,因為發現parseAsFlumeEvent這個配置起不了作用。也就是無論parseAsFlumeEvent配置為true還是false,都會轉為Flume Event。這樣的話,造成的結果是,會始終都把Flume的headers中的信息混合著內容一起寫入Kafka的消息中,這顯然不是我所需要的,我只是需要把內容寫入即可。
FileChannel和MemoryChannel區別
MemoryChannel:
傳輸數據速度更快,但因為數據保存在JVM的堆內存中,Agent進程掛掉會導致數據丟失,適用于對數據質量要求不高的需求。
FileChannel:
傳輸速度相對于Memory慢,但數據安全保障高,Agent進程掛掉也可以從失敗中恢復數據。
選型:
金融類公司、對錢要求非常準確的公司通常會選擇FileChannel
傳輸的是普通日志信息(京東內部一天丟100萬-200萬條,這是非常正常的),通常選擇MemoryChannel。
FileChannel優化
? 通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。
官方說明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformancecheckpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據。
FileChannel原理:
Sink
HDFS小文件處理
HDFS存入大量小文件,有什么影響?
**元數據層面:**每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在Namenode內存中。所以小文件過多,會占用Namenode服務器大量內存,影響Namenode性能和使用壽命
**計算層面:**默認情況下MR會對每個小文件啟用一個Map任務計算,非常影響計算性能。同時也影響磁盤尋址時間。
官方默認的這三個參數配置寫入HDFS后會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合作用,效果如下:
①文件在達到128M時會滾動生成新文件
②文件創建超3600秒時會滾動生成新文件
案例:日志采集發送到KafkaFlume配置
日志采集流程
配置如下
在/opt/module/flume/conf目錄下創建file-flume-kafka.conf文件
> vim file-flume-kafka.conf# 為各組件命名 a1.sources = r1 a1.channels = c1# 描述source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/program/logs/app.* a1.sources.r1.positionFile = /opt/program/logs/taildir_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.jast.flume.ETLInterceptor$Builder# 描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = 192.168.60.14:9092,192.168.60.15:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false# 綁定source和channel以及sink和channel的關系 a1.sources.r1.channels = c1注意:com.jast.flume.ETLInterceptor是自定義的攔截器的全類名。需要根據用戶自定義的攔截器做相應修改。
Flume攔截器開發
創建Maven工程flume-interceptor
創建包名:com.jast.flume.interceptor
在pom.xml文件中添加如下配置
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.19</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>注意:scope中provided的含義是編譯時用該jar包。打包時時不用。因為集群上已經存在flume的jar包。只是本地編譯時用一下。
在com.jast.flume.ETLInterceptor包下創建ETLInterceptor類
package com.jast.flume;import cn.hutool.json.JSONUtil; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List;/*** @author Jast* @description 自定義攔截器*/ public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();String text = new String(body, StandardCharsets.UTF_8);if(JSONUtil.isJson(text)){return event;}System.out.println("非json格式過濾掉");return null;}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()){Event next = iterator.next();if(intercept(next)==null){iterator.remove();}}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}} }打包
自己部署的Flume:需要先將打好的包放入到flume/lib文件夾下面
cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar flume/libCDH版本Flume:需要先將打好的包放入到flume/lib文件夾下面
具體的目錄/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/
cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib啟動Flume
nohup flume-ng agent --conf-file /opt/program/flume/file-flume-kafka-1.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > /opt/program/flume/log1.txt 2>&1 &測試
隨便在app.log加入一條數據,攔截器檢測到,然后打印出非json格式過濾掉。
2022-03-17 09:42:42,148 INFO taildir.ReliableTaildirEventReader: Pos 9 is larger than file size! Restarting from pos 0, file: /opt/program/logs/app.log, inode: 5810713 2022-03-17 09:42:42,148 INFO taildir.TailFile: Updated position, file: /opt/program/logs/app.log, inode: 5810713, pos: 0 非json格式過濾掉 2022-03-17 09:45:17,236 INFO taildir.TaildirSource: Closed file: /opt/program/logs/app.log, inode: 5810713, pos: 22輸入一條json數據,在kafka消費時正常消費,配置成功
22/03/17 09:15:41 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-3667] Resetting offset for partition topic_log-0 to offset 0. {"en":"小張"}Flume采集腳本批量啟動停止
在/home/atguigu/bin目錄下創建腳本f1.sh
> vim f1.sh#! /bin/bashcase $1 in "start"){for i in localhostdoecho " --------啟動 $i 采集flume-------"ssh $i "nohup flume-ng agent --conf-file /opt/program/flume/file-flume-kafka-1.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > /opt/program/flume/log1.txt 2>&1 &"done };; "stop"){for i in localhostdoecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka-1.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done};; esac說明1:nohup,該命令可以在你退出帳戶/關閉終端之后繼續運行相應的進程。nohup就是不掛起的意思,不掛斷地運行命令。
說明2:awk 默認分隔符為空格
說明3:$2是在“”雙引號內部會被解析為腳本的第二個參數,但是這里面想表達的含義是awk的第二個值,所以需要將他轉義,用$2表示。
說明4:xargs 表示取出前面命令運行的結果,作為后面命令的輸入參數。ls
增加腳本執行權限
chmod u+x f1.shf1集群啟動腳本
f1.sh startf1集群停止腳本
f1.sh stop案例:消費Kafka保存到HDFS
配置如下
## 組件 a1.sources=r1 a1.channels=c1 a1.sinks=k1## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = 192.168.60.15:9092,192.168.60.14:9092 a1.sources.r1.kafka.topics=topic_log #a1.sources.r1.interceptors = i1 #a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/program/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/program/flume/data/behavior1/## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /jast_root/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.round = false#控制生成的小文件 # 每隔多少秒生成一個 a1.sinks.k1.hdfs.rollInterval = 10 # 128M生成一個文件 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0## 控制輸出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = GzipCodec## 拼裝 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1CDH 版本:Flume寫入HDFS時,Flume部署的服務器需要安裝HDFS Gateway
自己部署版本:Flume與Hadoop集群不在一起的話,需要配置Hadoop環境變量
FileChannel優化
? 通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。
官方說明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformancecheckpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據。
FileChannel原理:
HDFS小文件處理
HDFS存入大量小文件,有什么影響?
**元數據層面:**每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在Namenode內存中。所以小文件過多,會占用Namenode服務器大量內存,影響Namenode性能和使用壽命
**計算層面:**默認情況下MR會對每個小文件啟用一個Map任務計算,非常影響計算性能。同時也影響磁盤尋址時間。
官方默認的這三個參數配置寫入HDFS后會產生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0幾個參數綜合作用,效果如下:
①文件在達到128M時會滾動生成新文件
②文件創建超3600秒時會滾動生成新文件
Flume時間戳攔截器
? 由于Flume默認會用Linux系統時間,作為輸出到HDFS路徑的時間。如果數據是23:59分產生的。Flume消費Kafka里面的數據時,有可能已經是第二天了,那么這部門數據會被發往第二天的HDFS路徑。我們希望的是根據日志里面的實際時間,發往HDFS的路徑,所以下面攔截器作用是獲取日志中的實際時間。
? 解決的思路:攔截json日志,通過fastjson框架解析json,獲取實際時間ts。將獲取的ts時間寫入攔截器header頭,header的key必須是timestamp,因為Flume框架會根據這個key的值識別為時間,寫入到HDFS。
在com.jast.flume.interceptor包下創建TimeStampInterceptor類
package com.jast.flume.interceptor;import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayList<Event> events = new ArrayList<>();@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}} }重新打包
自己部署的Flume:需要先將打好的包放入到flume/lib文件夾下面
cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar flume/libCDH版本Flume:需要先將打好的包放入到flume/lib文件夾下面
具體的目錄/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/
cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib項目經驗之Flume內存優化
問題描述:如果啟動消費Flume拋出如下異常
ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded解決方案步驟
在服務器的flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
Flume內存參數設置及優化
JVM heap一般設置為4G或更高
-Xmx與-Xms最好設置一致,減少內存抖動帶來的性能影響,如果設置不一致容易導致頻繁fullgc。
-Xms表示JVM Heap(堆內存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆內存)最大允許的尺寸,按需分配。如果不設置一致,容易在初始化時,由于內存不夠,頻繁觸發fullgc。
參考內容:
https://www.cnblogs.com/qingyunzong/p/8994494.html
https://flume.apache.org/
總結
以上是生活随笔為你收集整理的Flume 实战开发指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Windows驱动开发如何入门
- 下一篇: “__popcnt64 is undef