javascript
kafka 可视化工具_两小时带你轻松实战SpringBoot+kafka+ELK分布式日志收集
一、背景
隨著業(yè)務(wù)復(fù)雜度的提升以及微服務(wù)的興起,傳統(tǒng)單一項(xiàng)目會(huì)被按照業(yè)務(wù)規(guī)則進(jìn)行垂直拆分,另外為了防止單點(diǎn)故障我們也會(huì)將重要的服務(wù)模塊進(jìn)行集群部署,通過負(fù)載均衡進(jìn)行服務(wù)的調(diào)用。那么隨著節(jié)點(diǎn)的增多,各個(gè)服務(wù)的日志也會(huì)散落在各個(gè)服務(wù)器上。這對(duì)于我們進(jìn)行日志分析帶來了巨大的挑戰(zhàn),總不能一臺(tái)一臺(tái)的登錄去下載日志吧。那么我們需要一種收集日志的工具將散落在各個(gè)服務(wù)器節(jié)點(diǎn)上的日志收集起來,進(jìn)行統(tǒng)一的查詢及管理統(tǒng)計(jì)。那么ELK就可以做到這一點(diǎn)。
ELK是ElasticSearch+Logstash+Kibana的簡稱,在這里我分別對(duì)如上幾個(gè)組件做個(gè)簡單的介紹:
1.1、ElasticSearch(簡稱ES)
Elasticsearch是一個(gè)高度可擴(kuò)展的開源全文搜索和分析引擎。它允許您快速、實(shí)時(shí)地存儲(chǔ)、搜索和分析大量數(shù)據(jù)。它通常用作底層引擎/技術(shù),為具有復(fù)雜搜索特性和需求的應(yīng)用程序提供動(dòng)力。我們可以借助如ElasticSearch完成諸如搜索,日志收集,反向搜索,智能分析等功能。ES設(shè)計(jì)的目標(biāo):
- 快速實(shí)時(shí)搜索
Elasticsearch是一個(gè)實(shí)時(shí)搜索平臺(tái)。這意味著,從索引文檔到可搜索文檔,存在輕微的延遲(通常為一秒)。
- 集群
集群是一個(gè)或多個(gè)節(jié)點(diǎn)(服務(wù)器)的集合,這些節(jié)點(diǎn)(服務(wù)器)一起保存整個(gè)數(shù)據(jù),并提供跨所有節(jié)點(diǎn)的聯(lián)合索引和搜索功能。集群由一個(gè)惟一的名稱來標(biāo)識(shí),默認(rèn)情況下該名稱為“elasticsearch”。這個(gè)名稱很重要,因?yàn)楣?jié)點(diǎn)只能是集群的一部分,如果節(jié)點(diǎn)被設(shè)置為通過其名稱加入集群的話。確保不要在不同的環(huán)境中重用相同的集群名稱,否則可能會(huì)導(dǎo)致節(jié)點(diǎn)加入錯(cuò)誤的集群。例如,您可以使用logging-dev、logging-test和logging-prod開發(fā)、測(cè)試和生產(chǎn)集群。
- 節(jié)點(diǎn)
節(jié)點(diǎn)是單個(gè)服務(wù)器,它是集群的一部分,它用來存儲(chǔ)數(shù)據(jù),并參與集群的索引和搜索功能。與集群一樣,節(jié)點(diǎn)的名稱默認(rèn)為在啟動(dòng)時(shí)分配給節(jié)點(diǎn)的隨機(jī)惟一標(biāo)識(shí)符(UUID)。如果不需要默認(rèn)值,可以定義任何節(jié)點(diǎn)名稱。這個(gè)名稱對(duì)于管理非常重要,因?yàn)槟胍_定網(wǎng)絡(luò)中的哪些服務(wù)器對(duì)應(yīng)于Elasticsearch集群中的哪些節(jié)點(diǎn)。
- 索引
索引是具有類似特征的文檔的集合。例如,您可以有一個(gè)客戶數(shù)據(jù)索引、另一個(gè)產(chǎn)品目錄索引和另一個(gè)訂單數(shù)據(jù)索引。索引由一個(gè)名稱標(biāo)識(shí)(必須是小寫的),該名稱用于在對(duì)其中的文檔執(zhí)行索引、搜索、更新和刪除操作時(shí)引用索引。在單個(gè)集群中,可以定義任意數(shù)量的索引。
- 文檔
文檔是可以建立索引的基本信息單元。例如,可以為單個(gè)客戶提供一個(gè)文檔,為單個(gè)產(chǎn)品提供一個(gè)文檔,為單個(gè)訂單提供另一個(gè)文檔。這個(gè)文檔用JSON (JavaScript對(duì)象符號(hào))表示。在索引中,可以存儲(chǔ)任意數(shù)量的文檔。請(qǐng)注意,盡管文檔在物理上駐留在索引中,但實(shí)際上文檔必須被索引/分配到索引中的類型中。
1.2、Logstash
Logstash是一個(gè)開源數(shù)據(jù)收集引擎,具有實(shí)時(shí)流水線功能。Logstash可以動(dòng)態(tài)地將來自不同數(shù)據(jù)源的數(shù)據(jù)統(tǒng)一起來,并將數(shù)據(jù)規(guī)范化后(通過Filter過濾)傳輸?shù)侥x擇的目標(biāo)。
在這里inputs代表數(shù)據(jù)的輸入通道,大家可以簡單理解為來源。常見的可以從kafka,FileBeat, DB等獲取日志數(shù)據(jù),這些數(shù)據(jù)經(jīng)過fliter過濾后(比如說:日志過濾,json格式解析等)通過outputs傳輸?shù)街付ǖ奈恢眠M(jìn)行存儲(chǔ)(Elasticsearch,Mogodb,Redis等)
簡單的實(shí)例:
cd logstash-6.4.1bin/logstash -e 'input { stdin { } } output { stdout {} }'1.3、Kibana
kibana是用于Elasticsearch檢索數(shù)據(jù)的開源分析和可視化平臺(tái)。我們可以使用Kibana搜索、查看或者與存儲(chǔ)在Elasticsearch索引中的數(shù)據(jù)交互。同時(shí)也可以輕松地執(zhí)行高級(jí)數(shù)據(jù)分析并在各種圖表、表和映射中可視化數(shù)據(jù)。基于瀏覽器的Kibana界面使您能夠快速創(chuàng)建和共享動(dòng)態(tài)儀表板,實(shí)時(shí)顯示對(duì)Elasticsearch查詢的更改。
1.4、處理方案
用戶通過java應(yīng)用程序的Slf4j寫入日志,SpringBoot默認(rèn)使用的是logback。我們通過實(shí)現(xiàn)自定義的Appender將日志寫入kafka,同時(shí)logstash通過input插件操作kafka訂閱其對(duì)應(yīng)的主題。當(dāng)有日志輸出后被kafka的客戶端logstash所收集,經(jīng)過相關(guān)過濾操作后將日志寫入Elasticsearch,此時(shí)用戶可以通過kibana獲取elasticsearch中的日志信息
二、SpringBoot中的配置
在SpringBoot當(dāng)中,我們可以通過logback-srping.xml來擴(kuò)展logback的配置。不過我們?cè)诖酥皯?yīng)當(dāng)先添加logback對(duì)kafka的依賴,代碼如下:
compile group: 'com.github.danielwegener', name: 'logback-kafka-appender', version: '0.2.0-RC1'添加好依賴之后我們需要在類路徑下創(chuàng)建logback-spring.xml的配置文件并做如下配置(添加kafka的Appender):
<configuration><!-- springProfile用于指定當(dāng)前激活的環(huán)境,如果spring.profile.active的值是哪個(gè),就會(huì)激活對(duì)應(yīng)節(jié)點(diǎn)下的配置 --><springProfile name="default"><!-- configuration to be enabled when the "staging" profile is active --><springProperty scope="context" name="module" source="spring.application.name"defaultValue="undefinded"/><!-- 該節(jié)點(diǎn)會(huì)讀取Environment中配置的值,在這里我們讀取application.yml中的值 --><springProperty scope="context" name="bootstrapServers" source="spring.kafka.bootstrap-servers"defaultValue="localhost:9092"/><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><!-- encoders are assigned the typech.qos.logback.classic.encoder.PatternLayoutEncoder by default --><encoder><pattern>%boldYellow(${module}) | %d | %highlight(%-5level)| %cyan(%logger{15}) - %msg %n</pattern></encoder></appender><!-- kafka的appender配置 --><appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender"><encoder><pattern>${module} | %d | %-5level| %logger{15} - %msg</pattern></encoder><topic>logger-channel</topic><keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/><deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/><!-- Optional parameter to use a fixed partition --><!-- <partition>0</partition> --><!-- Optional parameter to include log timestamps into the kafka message --><!-- <appendTimestamp>true</appendTimestamp> --><!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --><!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --><!-- bootstrap.servers is the only mandatory producerConfig --><producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig><!-- 如果kafka不可用則輸出到控制臺(tái) --><appender-ref ref="STDOUT"/></appender><!-- 指定項(xiàng)目中的logger --><logger name="org.springframework.test" level="INFO" ><appender-ref ref="kafka" /></logger><root level="info"><appender-ref ref="STDOUT" /></root></springProfile></configuration>在這里面我們主要注意以下幾點(diǎn):
- 日志輸出的格式是為模塊名 | 時(shí)間 | 日志級(jí)別 | 類的全名 | 日志內(nèi)容
- SpringProfile節(jié)點(diǎn)用于指定當(dāng)前激活的環(huán)境,如果spring.profile.active的值是哪個(gè),就會(huì)激活對(duì)應(yīng)節(jié)點(diǎn)下的配置
- springProperty可以讀取Environment中的值
三、ELK搭建過程
3.1、檢查環(huán)境
ElasticSearch需要jdk8,官方建議我們使用JDK的版本為1.8.0_131,原文如下:
Elasticsearch requires at least Java 8. Specifically as of this writing, it is recommended that you use the Oracle JDK version 1.8.0_131檢查完畢后,我們可以分別在官網(wǎng)下載對(duì)應(yīng)的組件
- ElasticSearch
- Kibana
- Logstash
- kafka
- zookeeper
3.2、啟動(dòng)zookeeper
首先進(jìn)入啟動(dòng)zookeeper的根目錄下,將conf目錄下的zoo_sample.cfg文件拷貝一份重新命名為zoo.cfg
mv zoo_sample.cfg zoo.cfg配置文件如下:
# The number of milliseconds of each ticktickTime=2000# The number of ticks that the initial # synchronization phase can takeinitLimit=10# The number of ticks that can pass between # sending a request and getting an acknowledgementsyncLimit=5# the directory where the snapshot is stored.# do not use /tmp for storage, /tmp here is just # example sakes.dataDir=../zookeeper-data# the port at which the clients will connectclientPort=2181# the maximum number of client connections.# increase this if you need to handle more clients#maxClientCnxns=60## Be sure to read the maintenance section of the # administrator guide before turning on autopurge.## http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance## The number of snapshots to retain in dataDir#autopurge.snapRetainCount=3# Purge task interval in hours# Set to "0" to disable auto purge feature#autopurge.purgeInterval=1緊接著我們進(jìn)入bin目錄啟動(dòng)zookeeper:
./zkServer.sh start3.3、啟動(dòng)kafka
在kafka根目錄下運(yùn)行如下命令啟動(dòng)kafka:
./bin/kafka-server-start.sh config/server.properties啟動(dòng)完畢后我們需要?jiǎng)?chuàng)建一個(gè)logger-channel主題:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logger-channel3.4、配置并啟動(dòng)logstash
進(jìn)入logstash跟目錄下的config目錄,我們將logstash-sample.conf的配置文件拷貝到根目錄下重新命名為core.conf,然后我們打開配置文件進(jìn)行編輯:
```ruby
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
我們分別配置logstash的input,filter和output(懂ruby的童鞋們肯定對(duì)語法結(jié)構(gòu)不陌生吧):
- 在input當(dāng)中我們指定日志來源為kafka,具體含義可以參考官網(wǎng):kafka-input-plugin
- 在filter中我們配置grok插件,該插件可以利用正則分析日志內(nèi)容,其中patterns_dir屬性用于指定自定義的分析規(guī)則,我們可以在該文件下建立文件配置驗(yàn)證的正則規(guī)則。舉例子說明:55.3.244.1 GET /index.html 15824 0.043的 日志內(nèi)容經(jīng)過如下配置解析:
解析過后會(huì)變成:
client: 55.3.244.1method: GETrequest: /index.htmlbytes: 15824duration: 0.043這些屬性都會(huì)在elasticsearch中存為對(duì)應(yīng)的屬性字段。更詳細(xì)的介紹請(qǐng)參考官網(wǎng):grok ,當(dāng)然該插件已經(jīng)幫我們定義好了好多種核心規(guī)則,我們可以在這里查看所有的規(guī)則。
- 在output當(dāng)中我們將過濾過后的日志內(nèi)容打印到控制臺(tái)并傳輸?shù)絜lasticsearch中,我們可以參考官網(wǎng)上關(guān)于該插件的屬性說明:[地址]((https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html)
- 另外我們?cè)趐atterns文件夾中創(chuàng)建好自定義的規(guī)則文件logback,內(nèi)容如下:
# yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527 LOGBACKTIME 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})
編輯好配置后我們運(yùn)行如下命令啟動(dòng)logstash:
bin/logstash -f first-pipeline.conf --config.reload.automatic該命令會(huì)實(shí)時(shí)更新配置文件而不需啟動(dòng)
3.5、啟動(dòng)ElasticSearch
啟動(dòng)ElasticSearch很簡單,我們可以運(yùn)行如下命令:
./bin/elasticsearch我們可以發(fā)送get請(qǐng)求來判斷啟動(dòng)成功:
GET http://localhost:9200我們可以得到類似于如下的結(jié)果:
{"name" : "Cp8oag6","cluster_name" : "elasticsearch","cluster_uuid" : "AT69_T_DTp-1qgIJlatQqA","version" : {"number" : "6.4.0","build_flavor" : "default","build_type" : "zip","build_hash" : "f27399d","build_date" : "2016-03-30T09:51:41.449Z","build_snapshot" : false,"lucene_version" : "7.4.0","minimum_wire_compatibility_version" : "1.2.3","minimum_index_compatibility_version" : "1.2.3"},"tagline" : "You Know, for Search"}3.5.1 配置IK分詞器(可選)
我們可以在github上下載elasticsearch的IK分詞器,地址如下:ik分詞器,然后把它解壓至your-es-root/plugins/ik的目錄下,我們可以在{conf}/analysis-ik/config/IKAnalyzer.cfg.xmlor {plugins}/elasticsearch-analysis-ik-*/config/IKAnalyzer.cfg.xml 里配置自定義分詞器:
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"><properties><comment>IK Analyzer 擴(kuò)展配置</comment><!--用戶可以在這里配置自己的擴(kuò)展字典 --><entry key="ext_dict">custom/mydict.dic;custom/single_word_low_freq.dic</entry><!--用戶可以在這里配置自己的擴(kuò)展停止詞字典--><entry key="ext_stopwords">custom/ext_stopword.dic</entry><!--用戶可以在這里配置遠(yuǎn)程擴(kuò)展字典 --><entry key="remote_ext_dict">location</entry><!--用戶可以在這里配置遠(yuǎn)程擴(kuò)展停止詞字典--><entry key="remote_ext_stopwords">http://xxx.com/xxx.dic</entry></properties>首先我們添加索引:
curl -XPUT http://localhost:9200/my_index我們可以把通過put請(qǐng)求來添加索引映射:
PUT my_index {"mappings": {"doc": { "properties": { "title": { "type": "text" }, "name": { "type": "text" }, "age": { "type": "integer" }, "created": {"type": "date", "format": "strict_date_optional_time||epoch_millis"}"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"}}}}}其中doc是映射名 my_index是索引名稱
3.5.2 logstash與ElasticSearch
logstash默認(rèn)情況下會(huì)在ES中建立logstash-*的索引,*代表了yyyy-MM-dd的時(shí)間格式,根據(jù)上述logstash配置filter的示例,其會(huì)在ES中建立module ,logmessage,class,level等索引。(具體我們可以根據(jù)grok插件進(jìn)行配置)
3.6 啟動(dòng)Kibana
在kibana的bin目錄下運(yùn)行./kibana即可啟動(dòng)。啟動(dòng)之后我們可以通過瀏覽器訪問http://localhost:5601 來訪問kibanaUI。我們可以看到如下界面:
總結(jié)
以上是生活随笔為你收集整理的kafka 可视化工具_两小时带你轻松实战SpringBoot+kafka+ELK分布式日志收集的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python中sys模块有什么用_Pyt
- 下一篇: python续行符是啥_python续行