日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

运维实操——日志分析系统ELK(中)之logstash采集数据、伪装rsyslog、多行过滤、grok切片

發布時間:2023/12/20 windows 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 运维实操——日志分析系统ELK(中)之logstash采集数据、伪装rsyslog、多行过滤、grok切片 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

日志分析系統ELK(中)之logstash

  • 1、什么是logstash?
  • 2、Logstash安裝
  • 3、logstash簡單命令行測試
  • 4、logstash文件測試
    • (1)命令行輸入,輸出到文件
    • (2)命令行輸入,輸出到elasticsearch
    • (3)文件輸入,輸出到elasticsearch
  • 5、logstash可以偽裝成日志服務器,直接接受遠程日志
  • 6、多行過濾插件
    • (1)命令行多行輸入,文件輸出
    • (2)文件多行輸入,輸出到elasticsearch
  • 7、grok切片過濾插件
    • (1)命令行輸入,過濾,命令行輸出
    • (2)apache日志輸入,切片,es輸出

接上篇,server3、server4、server5,是Elasticsearch集群。本文介紹logstash

1、什么是logstash?

Logstash是一個開源的服務器端數據處理管道,聚合器。logstash擁有200多個插件,能夠同時從多個來源采集數據,轉換數據,過濾數據,然后將數據發送到您最喜歡的 “存儲庫” 中。(大多都是 Elasticsearch。)

Logstash管道有兩個必需的元素,輸入和輸出,以及一個可選元素過濾器。如下圖

(1)輸入:采集各種樣式、大小和來源的數據
Logstash 支持各種輸入選擇 ,同時從眾多常用來源捕捉事件。
能夠以連續的流式傳輸方式,輕松地從您的日志、指標、Web 應用、數據存儲以及各種 AWS 服務采集數據。

(2)過濾器:實時解析和轉換數據
數據從源傳輸到存儲庫的過程中,Logstash 過濾器能夠解析各個事件,識別已命名的字段以構建結構,并將它們轉換成通用格式,以便更輕松、更快速地分析和實現商業價值。

  • 利用 Grok 從非結構化數據中派生出結構
  • 從 IP 地址破譯出地理坐標
  • 將 PII 數據匿名化,完全排除敏感字段
  • 簡化整體處理,不受數據源、格式或架構的影響

(3)輸出:選擇您的存儲庫,導出您的數據
盡管 Elasticsearch 是我們的首選輸出方向,能夠為我們的搜索和分析帶來無限可能,但它并非唯一選擇。Logstash 提供眾多輸出選擇,您可以將數據發送到您要指定的地方,并且能夠靈活地解鎖眾多下游用例。

2、Logstash安裝

官網https://elasticsearch.cn/download/下載Logstash軟件包,并準備java的jdk包
準備新的虛擬機serever6(172.25.77.6),分配內存1G

server6安裝jdk和logstash

3、logstash簡單命令行測試

找到logstash命令的路徑執行,標準輸入到標準輸出,即命令行輸入,命令行輸出

輸入hello word,標準輸出hello word;輸入test,標準輸出test

4、logstash文件測試

(1)命令行輸入,輸出到文件

在/etc/logstash/conf.d/目錄下.conf結尾的文件都可以讀到,

編輯test.conf文件如下

[root@server6 conf.d]# cat test.conf input {stdin {} %輸入來自命令行標準輸入 }output {file { %輸出到/tmp/testfile文件中,格式為custom format: {輸入內容}path => "/tmp/testfile"codec => line { format => "custom format: %{message}"}} }


執行test.conf文件

命令行輸入heiheihei,輸出到了/tmp/testfile文件中

上面的方法無法直接看到結果,不太舒服,修改test.conf文件

[root@server6 conf.d]# cat test.conf input {stdin {} }output {stdout{} %一份標準輸出到命令行file { %一份輸出到/tmp/testfile文件中path => "/tmp/testfile"codec => line { format => "custom format: %{message}"}} }


執行test.conf文件

輸入linux,可以看到標準輸出,也輸出一份到/tmp/testfile文件中

(2)命令行輸入,輸出到elasticsearch

編輯es.conf 文件

[root@server6 conf.d]# cat es.conf input {stdin {} }output {stdout {} %標準輸出一份elasticsearch { %給elasticsearch輸出一份hosts => ["172.25.77.3:9200"] %目標elasticsearch主機ipindex => "logstash-%{+yyyy.MM.dd}" %索引格式為logstash-年月日} }


執行es.conf 文件

輸入linux haha,標準輸出一份

數據瀏覽->指定索引logstash-2021.08.14,可以看到elasticsearch輸出一份

(3)文件輸入,輸出到elasticsearch

現在我們想把日志文件作為輸入,首先要把權限改為644,因為logstash讀取時是logstash身份,所以必須開放讀的權力。

修改es.conf 文件

[root@server6 conf.d]# cat es.conf input {file { %從文件/var/log/messages輸入,從頭開始輸入path => "/var/log/messages"start_position => "beginning"} }output {stdout {} %標準輸出elasticsearch { %輸出elasticsearchhosts => ["172.25.77.3:9200"] index => "logstash-%{+yyyy.MM.dd}" } }


執行es.conf 文件

可以看到輸入了很多/var/log/messages

elasticsearch也可以看到很多數據

假如我們把剛才創建的索引刪除了,再次創建可以恢復嗎?

刪除索引

再次執行es.conf 文件,會發現沒有數據輸入

由于終端被占用了,再開啟一個窗口,輸入一條日志

現在再返回去看,有輸入了,一個是遠程登錄,一個是剛創建的日志,都是新的日志,那舊的日志呢?我寫的從頭開始輸入阿

數據輸入到elasticsearch了

再次刪除索引

在/usr/share/logstash/data/plugins/inputs/file/目錄下,有一個.sincedb的文件,它負責記錄數據偏移量,之前看到那個位置了,不會重復輸入,所以刪除他,就可以重新全部輸入了

這里補充一下,sincedb文件一共6個字段,分別表示inode編號、文件系統的主要設備號、文件系統的次要設備號、文件中的當前字節偏移量、最后一個活動時間戳(浮點數)、與此記錄匹配的最后一個已知路徑

再次執行es.conf 文件

/var/log/messages的數據又全部輸入了一遍

數據輸入到elasticsearch了

5、logstash可以偽裝成日志服務器,直接接受遠程日志

如果按照前面的方法收集日志信息,需要每臺服務器上都部署logstash,這樣太累了,那能不能讓logstash偽裝成日志服務器,每個節點服務器遠程發送日志給logstash呢?

編輯es.conf 文件

[root@server6 conf.d]# cat es.conf input {#file {# path => "/var/log/messages"# start_position => "beginning"#}syslog { %偽裝syslog,開放端口514port => 514} }output {stdout {}elasticsearch {hosts => ["172.25.77.3:9200"]index => "syslog-%{+yyyy.MM.dd}" %索引為syslog-年月日} }


執行es.conf 文件

新開一個窗口查看端口514已開放

遠程主機server3編輯/etc/rsyslog.conf文件
打開514端口

所有的日志發送給172.25.77.6:514一份

server3重啟rsyslog服務

server6的窗口有輸入了

現在elasticsearch就可以看到server3的日志了

同理,修改遠程主機server4的/etc/rsyslog.conf文件


server4重啟rsyslog服務

server6的窗口又有輸入了

elasticsearch可以看到server4的日志了

這時server6查看514端口有三個,第一個是自己的接受端口,第二個是server3的發送端口,第三個是server4的發送端口

6、多行過濾插件

錯誤日志一般都有很多行,如果按照前面的做法會分成很多條,分開讀,單獨看根本不知道什么意思,多行過濾可以把多行日志記錄合并為一行輸出。

(1)命令行多行輸入,文件輸出

編輯test.conf文件

[root@server6 conf.d]# cat test.conf input {stdin {codec => multiline { %多行輸入pattern => "EOF" %結束標志詞為EOFnegate => "true"what => "previous"}} }output {stdout {}file { path => "/tmp/testfile" %輸出到文件/tmp/testfilecodec => line { format => "custom format: %{message}"} %格式為custom format:{數據}} }


執行test.conf文件

測試:多行輸入,以EOF結束輸入,可以看到標準輸出是一條

/tmp/testfile文件中也是一條

(2)文件多行輸入,輸出到elasticsearch

接下來用文件輸入來測試,使用es集群的server3的日志(之前有一些報錯日志),把my-es.log發給server6的/var/log

先看一下正確的日志都是以時間開頭的,并且被中括號[ ]括起來的一行,而錯誤日志有很多行,比如下圖的at org開頭的這些,他們合起來應該是一條錯誤日志。

修改test.conf文件,先不加多行輸入模塊,看效果

[root@server6 conf.d]# cat test.conf input {file { path => "/var/log/my-es.log" %文件/var/log/my-es.log作為輸入start_position => "beginning" %從頭開始輸入# codec => multiline {# pattern => "EOF"# negate => "true"# what => "previous"# }} }output {stdout {}elasticsearch { %輸出到eshosts => ["172.25.77.3:9200"]index => "eslog-%{+yyyy.MM.dd}"} }


執行test.conf文件

顯示輸入了很多數據

在es中查看eslog索引,搜索at org,可以看到他們分成了一條一條的單獨的數據

現在刪除該索引

并刪除相關的sincedb

修改test.conf文件

[root@server6 conf.d]# cat test.conf input {file { path => "/var/log/my-es.log" start_position => "beginning" codec => multiline {pattern => "^\[" %[開頭的為結束詞negate => "true"what => "previous"}} }output {stdout {}elasticsearch { %輸出到eshosts => ["172.25.77.3:9200"]index => "eslog-%{+yyyy.MM.dd}"} }


再次執行test.conf文件

看到這個是一條數據

當然也可以在es中查看,是一條數據

7、grok切片過濾插件

我們平時查看日志,比如查看apache的日志,可以發現很有規律,如下圖,先是訪問的ip,時間等等,那么能不能只看其中一組數據,比如只想要ip這一列。現在就需要logstash的切片這個功能了

我們可以根據日志的特征自定義grok的書寫,得到想要的切片

(1)命令行輸入,過濾,命令行輸出

編輯grok.conf文件

[root@server6 conf.d]# cat grok.conf input {stdin {} }filter {grok { %把輸入切片成五塊,分別對應match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }} }output {stdout {} }


執行grok.conf文件

輸入一串數據,根據設定的切片方法,一一對應

(2)apache日志輸入,切片,es輸出

server6安裝apache

開啟apache,寫入默認發布目錄

真機訪問172.25.77.6

server6查看apache的日志

在/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns目錄下,有很多軟件的日志的輸出形式

看httpd的規定,如何寫日志已經提前用變量的方法定義了,所以我們只需要按照這個規定切片就好了

把apache的日志作為grok的輸入,日志文件需要給讀的權限,日志文件的目錄需要給讀和執行的權限,讀的時候是logstash的身份

修改grok.conf文件

[root@server6 conf.d]# cat grok.conf input {file {path => "/var/log/httpd/access_log" %/var/log/httpd/access_log文件作為輸入start_position => "beginning" %從頭開始}}filter {grok {match => { "message" => "%{HTTPD_COMBINEDLOG}" } %按照默認的HTTPD_COMBINEDLOG方式切片} }output {stdout {}elasticsearch {hosts => ["172.25.77.3:9200"]index => "apachelog-%{+yyyy.MM.dd}" %索引名字叫apachelog}}


執行grok.conf文件

按照默認定義好的模式切片

es查看,成功切片

總結

以上是生活随笔為你收集整理的运维实操——日志分析系统ELK(中)之logstash采集数据、伪装rsyslog、多行过滤、grok切片的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。