logstash filter 过滤器详解
一、grok 正則捕獲插件
grok是一個十分強大的logstash filter插件,他可以通過正則解析任意文本,將非結構化日志數據解析成結構化和方便查詢的結構。是目前logstash 中解析非結構化日志數據最好的方式。
grok語法規則:
%{匹配模式類型:自定義標簽名}
grok內置字段模式類型:
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+ EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME} INT (?:[+-]?(?:[0-9]+)) BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) NUMBER (?:%{BASE10NUM}) BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+)) BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\bPOSINT \b(?:[1-9][0-9]*)\b NONNEGINT \b(?:[0-9]+)\b WORD \b\w+\b NOTSPACE \S+ SPACE \s* DATA .*? GREEDYDATA .* QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``)) UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} # URN, allowing use of RFC 2141 section 2.3 reserved characters URN urn:[0-9A-Za-z][0-9A-Za-z-]{0,31}:(?:%[0-9a-fA-F]{2}|[0-9A-Za-z()+,.:=@;$_!*'/?#-])+# Networking MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}) CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2}) IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? IPV4 (?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9]) IP (?:%{IPV6}|%{IPV4}) HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b) IPORHOST (?:%{IP}|%{HOSTNAME}) HOSTPORT %{IPORHOST}:%{POSINT}# paths PATH (?:%{UNIXPATH}|%{WINPATH}) UNIXPATH (/([\w_%!$@:.,+~-]+|\\.)*)+ TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+)) WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+ URIPROTO [A-Za-z]([A-Za-z0-9+\-.]+)+ URIHOST %{IPORHOST}(?::%{POSINT:port})? # uripath comes loosely from RFC1738, but mostly from what Firefox # doesn't turn into %XX URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+ #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]* URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?# Months: January, Feb, 3, 03, 12, December MONTH \b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|?)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b MONTHNUM (?:0?[1-9]|1[0-2]) MONTHNUM2 (?:0[1-9]|1[0-2]) MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])# Days: Monday, Tue, Thu, etc... DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)# Years? YEAR (?>\d\d){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) # '60' is a leap second in most time standards and thus is valid. SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? DATE %{DATE_US}|%{DATE_EU} DATESTAMP %{DATE}[- ]%{TIME} TZ (?:[APMCE][SD]T|UTC) DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE} DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}# Syslog Dates: Month Day HH:MM:SS SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} PROG [\x21-\x5a\x5c\x5e-\x7e]+ SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])? SYSLOGHOST %{IPORHOST} SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}# Shortcuts QS %{QUOTEDSTRING}# Log formats SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:# Log Levels LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)內置的字段模式,可見 logstash 中的 grok-patterns
{logstash to path}/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns
也可以使用自定義正則:
修改?logstash 中的 grok-patterns 文件,在文件末尾添加自定義規則(注意名字不能和已存在的重復),然后重啟服務生效。
如:
# 自定義正則
URL (http(s)?:\/\/)?%{URIHOST:domain_name}%{URIPATH}
二、grok 實戰
2.1、過濾IP
創建 t1.conf
input {stdin {} } filter{grok{match => {"message" => "%{IPV4:ip}"}} } output {stdout {} }啟動腳本
/opt/logstash-6.8.23/bin/logstash -f /opt/logstash/config/t1.conf --path.data=/data/logstash/data/t1
啟動完成,輸入數據:
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET /HTTP/1.1" 403 5039
?2.2、過濾時間戳
創建 t1.conf
input {stdin {} } filter{grok{match => {"message" => "%{IPV4:ip}\ \[%{HTTPDATE:timestamp}\]"}} } output {stdout {} }注意:
日志中的空格、雙引號、單引號、中括號等,需要用 \ 進行轉義
啟動腳本
/opt/logstash-6.8.23/bin/logstash -f /opt/logstash/config/t1.conf --path.data=/data/logstash/data/t1
啟動完成,輸入數據:
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET /HTTP/1.1" 403 5039
?2.3、過濾報文頭信息
創建 t1.conf
input {stdin {} } filter{grok{match => {"message" => "\ %{QS:referrer}\ "}} } output {stdout {} }啟動腳本
/opt/logstash-6.8.23/bin/logstash -f /opt/logstash/config/t1.conf --path.data=/data/logstash/data/t1
啟動完成,輸入數據:
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET /HTTP/1.1" 403 5039
?三、date 插件
date過濾器用于解析字段中的日期,然后使用該日期或時間戳作為事件的logstash時間戳。
例如,syslog事件通常有這樣的時間戳:
"Apr 17 09:32:01"
你應該使用 MMM dd HH:mm:ss 的日期格式來解析這個。
日期過濾器對于事件的排序和對舊數據的回填特別重要,如果在你的事件中沒有得到正確的日期,那么以后搜索它們可能會出現順序不對。
如果沒有這個過濾器,logstash將根據第一次得到事件(在輸入時)的時間(如果時間戳還沒有在事件中設置)選擇一個時間戳,例如,對于文件輸入,時間戳被設置為每次讀取的時間。
日期格式說明
date 過濾器配置選項
| locale | string | No |
| match | array | No |
| tag_on_failure | array | No |
| target | string | No |
| timezone | string | No |
四、date 實戰
在上面我們有個例子是講解timestamp字段,表示取出日志中的時間。但是在顯示的時候除了顯示你指定的timestamp外,還有一行是@timestamp信息,這兩個時間是不一樣的,@timestamp表示系統當前時間。兩個時間并不是一回事,在ELK的日志處理系統中,@timestamp字段會被elasticsearch用到,用來標注日志的生產時間,如此一來,日志生成時間就會發生混亂,要解決這個問題,需要用到另一個插件,即date插件,這個時間插件用來轉換日志記錄中的時間字符串,變成Logstash::Timestamp對象,然后轉存到@timestamp字段里面。
創建 t1.conf
input {stdin {} } filter{grok{match => {"message" => "\ \[%{HTTPDATE:timestamp}\]"}}date{match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]} } output {stdout {} }啟動腳本
/opt/logstash-6.8.23/bin/logstash -f /opt/logstash/config/t1.conf --path.data=/data/logstash/data/t1
啟動完成,輸入數據:
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET /HTTP/1.1" 403 5039
?五、數據修改 mutate 插件
mutate插件是logstash另一個非常重要的插件,它提供了豐富的基礎類型數據處理能力,包括重命名、刪除、替換、修改日志事件中的字段。我們這里舉幾個常用的mutate插件:字段類型轉換功能covert、正則表達式替換字段功能gsub、分隔符分隔字符串為數值功能split、重命名字段功能rename、刪除字段功能remove_field 等
- add_field 增加字段
- remove_field 刪除字段
- rename_field 重命名字段
- replace 修改字段的值(可以調用其他字段)
- update 修改字段的值(不可以調用其他字段)
- convert 字段類型轉換
- copy 復制一個字段
- lowercase 值轉小寫
- uppercase 值轉大寫
- split 字段分割
- strip 去掉末尾空格
- gsub 正則替換,只對字符串類型有效
六、geoip 地址查詢
geoip是常見的免費的IP地址歸類查詢庫,geoip可以根據IP地址提供對應的地域信息,包括國別,省市,經緯度等等,此插件對于可視化地圖和區域統計非常有用。
七、geoip 實戰
創建 t1.conf
input {stdin {} } filter{grok {match => ["message","%{IP:ip}"]remove_field => ["message"]}geoip {source => ["ip"]} } output {stdout {} }啟動腳本
/opt/logstash-6.8.23/bin/logstash -f /opt/logstash/config/t1.conf --path.data=/data/logstash/data/t1
啟動完成,輸入數據:
112.156.245.15 [07/Feb/2018:16:24:19 +0800] "GET /HTTP/1.1" 403 5039
上面的結果輸出了很多信息,我們對結果進行選擇性的輸出
創建 t1.conf
input {stdin {} } filter{grok {match => ["message","%{IP:ip}"]remove_field => ["message"]}geoip {source => ["ip"]} } output {stdout {} }啟動腳本
/opt/logstash-6.8.23/bin/logstash -f /opt/logstash/config/t1.conf --path.data=/data/logstash/data/t1
啟動完成,輸入數據:
112.156.245.15 [07/Feb/2018:16:24:19 +0800] "GET /HTTP/1.1" 403 5039
?八、filter 插件綜合應用
創建 t1.conf
input {stdin {} } filter{grok {match => ["message","%{IPORHOST:client_ip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:status}\ %{NUMBER:bytes}\ \"-\"\ \"%{DATA:browser_info}\ %{GREEDYDATA:extra_info}\"\ \"-\""]}geoip {source => ["client_ip"]target => ["geoip"]fields => ["city_name","region_name","country_name","ip"]}date {match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]}mutate {remove_field => ["message","timestamp"]} } output {stdout {} }啟動腳本
/opt/logstash-6.8.23/bin/logstash -f /opt/logstash/config/t1.conf --path.data=/data/logstash/data/t1
啟動完成,輸入數據:
112.156.245.15 [20/Feb/2018:12:12:14 +0800] "GET / HTTP/1.1" 200 190 "-" "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Mobile Safari/537.36" "-"
?注意:
在匹配信息的時候,GREEDYDATA 與 DATA 匹配的機制是不一樣的,GREEDYDATA 是貪婪模式,而 DATA 則是能少匹配一點就少匹配一點。
總結
以上是生活随笔為你收集整理的logstash filter 过滤器详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: echarts实现中国地图(Vue)
- 下一篇: 数据结构(一)求矩阵中的鞍点