Hadoop生态Flume(四)拦截器(Interceptor)介绍与使用(2)
轉載自?Flume中的攔截器(Interceptor)介紹與使用(二)
lume中的攔截器(interceptor),用戶Source讀取events發送到Sink的時候,在events header中加入一些有用的信息,或者對events的內容進行過濾,完成初步的數據清洗。這在實際業務場景中非常有用,Flume-ng 1.6中目前提供了以下攔截器:
Timestamp Interceptor;
Host Interceptor;
Static Interceptor;
UUID Interceptor;
Morphline Interceptor;
Search and Replace Interceptor;
Regex Filtering Interceptor;
Regex Extractor Interceptor;
本文接上一篇《Flume中的攔截器(Interceptor)介紹與使用(一)》,繼續對剩下幾種攔截器進行學習和介紹,并附上使用示例。
?
一、Search and Replace Interceptor
該攔截器用于將events中的正則匹配到的內容做相應的替換。
具體配置示例如下:
## source 攔截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = search_replace agent_lxw1234.sources.sources1.interceptors.i1.searchPattern = [0-9]+ agent_lxw1234.sources.sources1.interceptors.i1.replaceString = lxw1234 agent_lxw1234.sources.sources1.interceptors.i1.charset = UTF-8# sink 1 配置 ##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink agent_lxw1234.sinks.sink1.type = logger agent_lxw1234.sinks.sink1.channel = fileChannel該配置將events中的數字替換為lxw1234。
原始的events內容為:
實際的events內容為:
?
二、Regex Filtering Interceptor
該攔截器使用正則表達式過濾原始events中的內容。
配置示例如下:
## source 攔截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = regex_filter agent_lxw1234.sources.sources1.interceptors.i1.regex = ^lxw1234.* agent_lxw1234.sources.sources1.interceptors.i1.excludeEvents = false# sink 1 配置 ##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink agent_lxw1234.sinks.sink1.type = logger agent_lxw1234.sinks.sink1.channel = fileChannel該配置表示過濾掉不是以lxw1234開頭的events。
如果excludeEvents設為true,則表示過濾掉以lxw1234開頭的events。
原始events內容為:
攔截后的events內容為:
?
三、Regex Extractor Interceptor
該攔截器使用正則表達式抽取原始events中的內容,并將該內容加入events header中。
配置示例如下:
## source 攔截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = regex_extractor agent_lxw1234.sources.sources1.interceptors.i1.regex = cookieid is (.*?) and ip is (.*?) agent_lxw1234.sources.sources1.interceptors.i1.serializers = s1 s2 agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.type = default agent_lxw1234.sources.sources1.interceptors.i1.serializers.s1.name = cookieid agent_lxw1234.sources.sources1.interceptors.i1.serializers.s2.type = default agent_lxw1234.sources.sources1.interceptors.i1.serializers.s2.name = ip# sink 1 配置 ##agent_lxw1234.sinks.sink1.type = com.lxw1234.sink.MySink agent_lxw1234.sinks.sink1.type = logger agent_lxw1234.sinks.sink1.channel = fileChannel該配置從原始events中抽取出cookieid和ip,加入到events header中。
原始的events內容為:
events header中的內容為:
?
Flume的攔截器可以配合Sink完成許多業務場景需要的功能,
比如:按照時間及主機生成目標文件目錄及文件名;
配合Kafka Sink完成多分區的寫入等等。
?
?
總結
以上是生活随笔為你收集整理的Hadoop生态Flume(四)拦截器(Interceptor)介绍与使用(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 再用笔记本键盘笔记本再配键盘
- 下一篇: Hadoop生态hive(四)数据类型