运维实操——日志分析系统ELK(中)之logstash采集数据、伪装rsyslog、多行过滤、grok切片
日志分析系統ELK(中)之logstash
- 1、什么是logstash?
- 2、Logstash安裝
- 3、logstash簡單命令行測試
- 4、logstash文件測試
- (1)命令行輸入,輸出到文件
- (2)命令行輸入,輸出到elasticsearch
- (3)文件輸入,輸出到elasticsearch
- 5、logstash可以偽裝成日志服務器,直接接受遠程日志
- 6、多行過濾插件
- (1)命令行多行輸入,文件輸出
- (2)文件多行輸入,輸出到elasticsearch
- 7、grok切片過濾插件
- (1)命令行輸入,過濾,命令行輸出
- (2)apache日志輸入,切片,es輸出
接上篇,server3、server4、server5,是Elasticsearch集群。本文介紹logstash
1、什么是logstash?
Logstash是一個開源的服務器端數據處理管道,聚合器。logstash擁有200多個插件,能夠同時從多個來源采集數據,轉換數據,過濾數據,然后將數據發送到您最喜歡的 “存儲庫” 中。(大多都是 Elasticsearch。)
Logstash管道有兩個必需的元素,輸入和輸出,以及一個可選元素過濾器。如下圖
(1)輸入:采集各種樣式、大小和來源的數據
Logstash 支持各種輸入選擇 ,同時從眾多常用來源捕捉事件。
能夠以連續的流式傳輸方式,輕松地從您的日志、指標、Web 應用、數據存儲以及各種 AWS 服務采集數據。
(2)過濾器:實時解析和轉換數據
數據從源傳輸到存儲庫的過程中,Logstash 過濾器能夠解析各個事件,識別已命名的字段以構建結構,并將它們轉換成通用格式,以便更輕松、更快速地分析和實現商業價值。
- 利用 Grok 從非結構化數據中派生出結構
- 從 IP 地址破譯出地理坐標
- 將 PII 數據匿名化,完全排除敏感字段
- 簡化整體處理,不受數據源、格式或架構的影響
(3)輸出:選擇您的存儲庫,導出您的數據
盡管 Elasticsearch 是我們的首選輸出方向,能夠為我們的搜索和分析帶來無限可能,但它并非唯一選擇。Logstash 提供眾多輸出選擇,您可以將數據發送到您要指定的地方,并且能夠靈活地解鎖眾多下游用例。
2、Logstash安裝
官網https://elasticsearch.cn/download/下載Logstash軟件包,并準備java的jdk包
準備新的虛擬機serever6(172.25.77.6),分配內存1G
server6安裝jdk和logstash
3、logstash簡單命令行測試
找到logstash命令的路徑執行,標準輸入到標準輸出,即命令行輸入,命令行輸出
輸入hello word,標準輸出hello word;輸入test,標準輸出test
4、logstash文件測試
(1)命令行輸入,輸出到文件
在/etc/logstash/conf.d/目錄下.conf結尾的文件都可以讀到,
編輯test.conf文件如下
執行test.conf文件
命令行輸入heiheihei,輸出到了/tmp/testfile文件中
上面的方法無法直接看到結果,不太舒服,修改test.conf文件
執行test.conf文件
輸入linux,可以看到標準輸出,也輸出一份到/tmp/testfile文件中
(2)命令行輸入,輸出到elasticsearch
編輯es.conf 文件
[root@server6 conf.d]# cat es.conf input {stdin {} }output {stdout {} %標準輸出一份elasticsearch { %給elasticsearch輸出一份hosts => ["172.25.77.3:9200"] %目標elasticsearch主機ipindex => "logstash-%{+yyyy.MM.dd}" %索引格式為logstash-年月日} }
執行es.conf 文件
輸入linux haha,標準輸出一份
數據瀏覽->指定索引logstash-2021.08.14,可以看到elasticsearch輸出一份
(3)文件輸入,輸出到elasticsearch
現在我們想把日志文件作為輸入,首先要把權限改為644,因為logstash讀取時是logstash身份,所以必須開放讀的權力。
修改es.conf 文件
執行es.conf 文件
可以看到輸入了很多/var/log/messages
elasticsearch也可以看到很多數據
假如我們把剛才創建的索引刪除了,再次創建可以恢復嗎?
刪除索引
再次執行es.conf 文件,會發現沒有數據輸入
由于終端被占用了,再開啟一個窗口,輸入一條日志
現在再返回去看,有輸入了,一個是遠程登錄,一個是剛創建的日志,都是新的日志,那舊的日志呢?我寫的從頭開始輸入阿
數據輸入到elasticsearch了
再次刪除索引
在/usr/share/logstash/data/plugins/inputs/file/目錄下,有一個.sincedb的文件,它負責記錄數據偏移量,之前看到那個位置了,不會重復輸入,所以刪除他,就可以重新全部輸入了
這里補充一下,sincedb文件一共6個字段,分別表示inode編號、文件系統的主要設備號、文件系統的次要設備號、文件中的當前字節偏移量、最后一個活動時間戳(浮點數)、與此記錄匹配的最后一個已知路徑
再次執行es.conf 文件
/var/log/messages的數據又全部輸入了一遍
數據輸入到elasticsearch了
5、logstash可以偽裝成日志服務器,直接接受遠程日志
如果按照前面的方法收集日志信息,需要每臺服務器上都部署logstash,這樣太累了,那能不能讓logstash偽裝成日志服務器,每個節點服務器遠程發送日志給logstash呢?
編輯es.conf 文件
[root@server6 conf.d]# cat es.conf input {#file {# path => "/var/log/messages"# start_position => "beginning"#}syslog { %偽裝syslog,開放端口514port => 514} }output {stdout {}elasticsearch {hosts => ["172.25.77.3:9200"]index => "syslog-%{+yyyy.MM.dd}" %索引為syslog-年月日} }
執行es.conf 文件
新開一個窗口查看端口514已開放
遠程主機server3編輯/etc/rsyslog.conf文件
打開514端口
所有的日志發送給172.25.77.6:514一份
server3重啟rsyslog服務
server6的窗口有輸入了
現在elasticsearch就可以看到server3的日志了
同理,修改遠程主機server4的/etc/rsyslog.conf文件
server4重啟rsyslog服務
server6的窗口又有輸入了
elasticsearch可以看到server4的日志了
這時server6查看514端口有三個,第一個是自己的接受端口,第二個是server3的發送端口,第三個是server4的發送端口
6、多行過濾插件
錯誤日志一般都有很多行,如果按照前面的做法會分成很多條,分開讀,單獨看根本不知道什么意思,多行過濾可以把多行日志記錄合并為一行輸出。
(1)命令行多行輸入,文件輸出
編輯test.conf文件
[root@server6 conf.d]# cat test.conf input {stdin {codec => multiline { %多行輸入pattern => "EOF" %結束標志詞為EOFnegate => "true"what => "previous"}} }output {stdout {}file { path => "/tmp/testfile" %輸出到文件/tmp/testfilecodec => line { format => "custom format: %{message}"} %格式為custom format:{數據}} }
執行test.conf文件
測試:多行輸入,以EOF結束輸入,可以看到標準輸出是一條
/tmp/testfile文件中也是一條
(2)文件多行輸入,輸出到elasticsearch
接下來用文件輸入來測試,使用es集群的server3的日志(之前有一些報錯日志),把my-es.log發給server6的/var/log
先看一下正確的日志都是以時間開頭的,并且被中括號[ ]括起來的一行,而錯誤日志有很多行,比如下圖的at org開頭的這些,他們合起來應該是一條錯誤日志。
修改test.conf文件,先不加多行輸入模塊,看效果
執行test.conf文件
顯示輸入了很多數據
在es中查看eslog索引,搜索at org,可以看到他們分成了一條一條的單獨的數據
現在刪除該索引
并刪除相關的sincedb
修改test.conf文件
再次執行test.conf文件
看到這個是一條數據
當然也可以在es中查看,是一條數據
7、grok切片過濾插件
我們平時查看日志,比如查看apache的日志,可以發現很有規律,如下圖,先是訪問的ip,時間等等,那么能不能只看其中一組數據,比如只想要ip這一列。現在就需要logstash的切片這個功能了
我們可以根據日志的特征自定義grok的書寫,得到想要的切片
(1)命令行輸入,過濾,命令行輸出
編輯grok.conf文件
[root@server6 conf.d]# cat grok.conf input {stdin {} }filter {grok { %把輸入切片成五塊,分別對應match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }} }output {stdout {} }
執行grok.conf文件
輸入一串數據,根據設定的切片方法,一一對應
(2)apache日志輸入,切片,es輸出
server6安裝apache
開啟apache,寫入默認發布目錄
真機訪問172.25.77.6
server6查看apache的日志
在/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns目錄下,有很多軟件的日志的輸出形式
看httpd的規定,如何寫日志已經提前用變量的方法定義了,所以我們只需要按照這個規定切片就好了
把apache的日志作為grok的輸入,日志文件需要給讀的權限,日志文件的目錄需要給讀和執行的權限,讀的時候是logstash的身份
修改grok.conf文件
執行grok.conf文件
按照默認定義好的模式切片
es查看,成功切片
總結
以上是生活随笔為你收集整理的运维实操——日志分析系统ELK(中)之logstash采集数据、伪装rsyslog、多行过滤、grok切片的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux给文本繁简转换,linux -
- 下一篇: 操作系统——时间片轮转调度算法(RR)