【产品环境】使用ELK搭建日志系统
為什么80%的碼農都做不了架構師?>>> ??
隨著業務不斷完善與發展,日志的重要性穩步上升。我們需要從日志中排查錯誤,以及分析用戶行為,為業務發展提供參考意見。因此,需要一套專門的日志系統幫助我們收集、分析、處理日志。
以前我曾經寫過一個logstash的blog: http://my.oschina.net/abcfy2/blog/372138 ,版本比較低,但是logstash的配置沒變過。此篇blog將比上述blog更詳盡一些,擴展到產品環境搭建完整的日志系統,但是logstash本身的配置不多做介紹,因為舊Blog已經介紹的比較詳細了。
本篇Blog主要介紹我們目前使用的日志系統的總體架構和部分配置。Kibana的使用暫時不在本篇Blog的覆蓋范圍之內,以后也許會單獨寫一篇kibana的使用,讀者也可以參考饒琛琳的《ELK stack權威指南》一書的相關章節。
本篇Blog的內容也并非自己獨自完成,關于log4j 1.2部分的配置和使用是開發同事共同探究實現的。
最后要說的一點是,日志系統的實現并不只是運維的工作,開發也需要配合,規范日志格式,規范項目埋點,便于排查問題。最后歸結與一點,要有執行力,要有人推動,不能隨隨便便的打日志,更不允許產品環境有亂七八糟的println這種調試方式的日志輸出。
關于ELK
ELK Stack指代三個獨立的組件: Elasticsearch,Logstash,Kibana。這三個獨立的組件組合使用,可以形成一套完整的日志解決方案。目前這三個產品先后歸于Elastic.co公司旗下,該公司圍繞Elasticsearch這個核心產品逐步打造一整套生態環境,使得ELK Stack這套架構日益成熟,而且周邊也逐步開始完善。
其中,Logstash的作用是處理日志,將日志解析為JSON格式進行傳遞。Elasticsearch的作用是數據庫,將最終解析的結果存庫,用于日后查詢與分析使用。Kibana是Elasticsearch的dashboard,用于圖形化展示elasticsearch數據庫的查詢結果。這三個組件搭配使用,將十分靈活,有以下幾個優點(以下內容節選自饒琛琳的《ELK stack權威指南》一書,感謝作者的努力):
- 處理方式靈活。Elasticsearch是實時全文索引,不需要像Storm那樣預先編程才能使用。
- 配置簡易上手。Elasticsearch全部采用JSON接口,Logstash是Ruby DSL設計,都是目前業界最通用的配置語法設計。
- 檢索性能高效。雖然每次查詢都是實時計算,但是優秀的設計和實現基本可以達到百億級數據查詢的秒級響應。
- 集群線性擴展。不管是Elasticsearch集群還是Logstash集群都是可以線性擴展的。
- 前端操作炫麗。Kibana界面上,只需點擊鼠標,就可以完成搜索、聚合功能,生成炫麗的儀表盤。
Kibana的可視化效果:
Logstash的處理流程
其中,ELK的靈活性得益于Logstash的插件式設計,而且插件之間都是松耦合(通過JSON事件交互,接口統一)。數據流在Logstash會經過三個階段: Input -> Filter -> Output,而且Filter可以無限制串聯,形成流式處理,甚至可以干脆沒有。這三個階段既可以在單節點上完成,也可以直接Output到其他節點上,分布處理與卸載壓力。整個Logstash的基本流程圖如下:
Logstash的數據處理過程描述如下:
整個Logstash的ruby DSL配置語法看起來像這樣:
input {插件名1 {# 插件相關配置屬性}插件名2 {# 插件相關配置屬性}... SNIP ...插件名3 {# 插件相關配置屬性} }filter {插件名1 {# 插件相關配置屬性}插件名2 {# 插件相關配置屬性}... SNIP ...插件名3 {# 插件相關配置屬性} }output {插件名1 {# 插件相關配置屬性}插件名2 {# 插件相關配置屬性}... SNIP ...插件名3 {# 插件相關配置屬性} }舉個例子,比如存儲于日志文件中的某http access log日志:
55.3.244.1 GET /index.html 15824 0.043經過了Logstash的inputs-file插件,輸入成為Logstash的一個事件,在Logstash會變成這樣(以rubydebug格式顯示):
{"message" => "55.3.244.1 GET /index.html 15824 0.043","@version" => "1","@timestamp" => "2016-03-01T03:37:33.081Z","host" => "fengyu-Vostro-3900" }日志本身內容會存放在message這個field中,除此之外還會加上一些元數據,如host,@timestamp等。
加上filters-grok這個Filter進行正則解析處理,解析message這個field(詳細配置參考filters-grok的文檔),最終將該事件解析成如下的事件:
{"message" => "55.3.244.1 GET /index.html 15824 0.043","@version" => "1","@timestamp" => "2016-03-01T03:51:03.914Z","host" => "fengyu-Vostro-3900","client" => "55.3.244.1","method" => "GET","request" => "/index.html","bytes" => "15824","duration" => "0.043" }最后,通過outputs-elasticsearch這個output插件,將解析過的日志推送至Elasticsearch數據庫中存儲。
通過elasticsearch中的各種查詢方式,即可按照自己的需求展示這些數據了。
Logstash的這種設計,可以很容易進行線性擴展,比如不做filter處理,直接output到其他logstash實例的input端,將處理分散在不同的節點上。最極端的情況,甚至可以擴展成這個架構,兼顧HA(High Availability)與HP(High Performance):
三個logstash實例互為冗余,將解析的結果推送至消息隊列,由另一個logstash實例將日志從消息隊列取出,推送至elasticsearch集群中。
架構設計
整個日志數據流的模型圖:
每臺服務器上部署有我們自己開發的應用程序,以及這些應用程序的第三方依賴服務項(如數據庫,web服務器等)。
因此日志源主要有兩種: 自己開發的應用程序的日志,依賴的第三方軟件的日志。
我們自己開發的程序,直接將日志以JSON格式寫入消息隊列中。第三方服務大部分無法直接將日志寫入消息隊列中,而是輸出為日志文件,這種日志源通過logstash的filters-grok插件,解析日志文件后推送至消息隊列中。
需要收集的第三方依賴的日志,以及收集哪些日志,詳見文檔末的附錄。
消息隊列使用kafka + zookeeper的方式實現。日志專用的消息隊列部署在日志服務器中。
注: 如果日志量比較小的話,可以沒必要這么復雜,比如省略掉kafka這個消息隊列,日志服務器也無需部署logstash,直接在應用服務器上用logstash將解析過的log推送至日志服務器上的es數據庫中。
安全問題:
所有服務盡可能只對內網ip暴露(通過防火墻實現),減少對外暴露的服務,并且以低權限賬戶運行。跨節點的服務(如mongodb復制集,kafka+zookeeper,postgresql集群等)連接一律采用SSL雙向認證的方案,提高安全性。
詳細配置參考文檔末附錄的內容。
解決方案
根據上述描述,我們需要搭建一臺日志服務器,安裝ELK與日志專用的消息隊列。
應用程序產生的日志直接推送至日志服務器的消息隊列中,經過logstash的處理最終推送至elasticsearch中,在kibana上進行展示。
可以在logstash的Filter上定義報警規則,當日志有嚴重的錯誤時Output郵件報警。
部署方案
服務器上應用程序列表如圖所示:
多臺產品服務器上,每臺服務器分別部署有應用程序和logstash,其他第三方服務按照需要組成集群(如postgresql集群,mongodb復制集等)。日志服務器上部署完整的ELK Stack和Kafka+Zookeeper。
- 日志信息由應用程序生成時,直接寫入日志服務器的kafka隊列中。相關規范與配置參考文檔末的附錄內容。
- 由第三方依賴程序產生的log,通常以文件形式存儲在產品服務器上。通過產品服務器的logstash解析日志文件后,推送至日志服務器的kafka。需要收集的日志列表參考文檔末的附錄。
- 日志統一輸出到logs這個TOPIC中。
- 日志服務器的logstash負責從日志服務器的kafka隊列中取出日志信息,推送至elasticsearch儲存,同時做報警規則,遇到需要報警的日志通過郵件方式報警。kibana作為elasticsearch的dashboard使用,對es數據庫存儲的內容進行可視化展示。
擴展問題
此架構在擴展上將即為便利,共有三個可擴展的點:
這些擴展方案均可在無需原程序改動的條件下進行擴展。
部署步驟
單節點部署
推薦使用elastic.co的倉庫(RHEL/CentOS和Ubuntu/Debian倉庫為官方維護):
- Elasticsearch地址: https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-repositories.html
- Logstash地址: https://www.elastic.co/guide/en/logstash/current/package-repositories.html
- Kibana地址: https://www.elastic.co/guide/en/logstash/current/package-repositories.html
推薦使用清華大學鏡像倉庫,我的issue已經被tuna接受,國內安裝速度會快許多(官方倉庫在S3上,所以你懂的)。tuna鏡像地址: http://mirrors.tuna.tsinghua.edu.cn/ELK/
按照官方文檔的步驟,安裝之后根據需要定制配置,啟動服務,啟用開機自啟動即可。
包管理器安裝的logstash,啟動配置存放于/etc/logstash/conf.d/,這個目錄一開始是空的,自己將logstash的配置文件以.conf結尾扔到這個目錄后,即可使用service logstash start啟動服務,日志存放于/var/log/logstash/。logstash的配置文件可以拆分成多個.conf文件,以規范配置,比如input.conf,filter.conf,output.conf。
特別注意: logstash可以在一個目錄下存放多個.conf文件,logstash內部會將多個.conf文件合并為一個大的配置文件,合并的順序為文件名順序。所以特別注意你的filter配置,如果多個配置文件都有filter配置,特別注意filter的加載次序!否則會搞亂你的配置。如果你的filter只針對某個應用的日志使用,那么推薦你使用if [type] == "appname" { filter配置 }這種方式限制住你的filter的作用范圍。
批量部署
我用的是salt,產品環境Ubuntu Server 14.04 LTS,當然你也可以使用其他類似的工具,如puppet,chef,ansible等等。
salt的logstash這個state的目錄結構如下:
$ tree /srv/salt/logstash/ /srv/salt/logstash/ ├── config │?? ├── logagent │?? │?? ├── 00_log4j.conf │?? │?? ├── 01_vertx.conf │?? │?? ├── 02_mongod.conf │?? │?? ├── 03_postgresql.conf │?? │?? ├── 04_nginx.conf │?? │?? └── 99_output.conf │?? └── logserver │?? └── logserver.conf └── init.sls $ cat /srv/salt/logstash/init.sls logstash_repo:pkgrepo.managed:- name: deb http://mirrors.tuna.tsinghua.edu.cn/ELK/apt/logstash/2.3/ stable main- file: /etc/apt/sources.list.d/logstash.list- key_url: https://packages.elastic.co/GPG-KEY-elasticsearch- clean_file: Truelogstash:pkg.latest:- require:- pkgrepo: logstash_repologstash_grains:grains.list_present:- name: roles- value: logstashlogstash-config:file.recurse:- name: /etc/logstash/conf.d{% if 'logserver' in grains.get('roles', '') %}- source: salt://logstash/config/logserver/{% else %}- source: salt://logstash/config/logagent/{% endif %}- clean: True- makedirs: True- template: jinja{% if 'postgresql' in grains.get('roles', '') %} logstash-user:group.present:- name: adm- addusers: - "logstash" {% endif %}logstash-service:service.running:- name: logstash- enable: True- watch:- pkg: logstash- file: logstash-config最終推送到/etc/logstash/conf.d/目錄下的文件為00_log4j.conf,01_vertx.conf,02_mongod.conf,03_postgresql.conf,04_nginx.conf,99_output.conf,這樣命名是為了按照自己預期的文件順序疊加input,filter,output配置,而不會造成混亂。有關00_log4j.conf的配置內容參考博客開頭提供的舊的blog,這里基本沒大改過。
測試用例
為了演示這套架構的流程與效果,所以將這套架構最小化,將產品服務器的應用與日志服務器的應用全部部署在一個節點上測試。
日志文件數據源以Nginx的access log為例,使用logstash將nginx access log中的內容推送至kafka隊列中,另一個logstash實例從kafka將nginx的log取出存入elasticsearch中。
自己開發的應用程序直接按照上述日志規范打印日志進入kafka,由logstash從kafka中取出應用程序的日志,推送至elasticsearch中。
日志文件用例
修改Nginx的配置文件,使之打印出JSON格式的access log,配置方法見附錄內容。 access log內容如下:
{"@timestamp":"2016-03-03T13:11:03+08:00","host":"sinoiot-172-16-250-3","clientip":"172.16.1.34","size":191,"responsetime":0.000,"http_host":"172.16.250.3","url":"/mirror/","xff":"-","referer":"-","agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status":200} {"@timestamp":"2016-03-03T13:11:03+08:00","host":"sinoiot-172-16-250-3","clientip":"172.16.1.34","size":0,"responsetime":0.000,"http_host":"172.16.250.3","url":"/favicon.ico","xff":"-","referer":"http://172.16.250.3/mirror/","agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status":204} ...模擬產品環境的Logstash的配置文件如下所示:
input {file {path => "/var/log/nginx/access.log"codec => jsontype => "nginx"tags => "access"} }output {# stdout這個output插件僅作為調試階段使用,用于將處理過的結果打印在終端# 真實產品環境不需要這個outputstdout {codec => "rubydebug"}kafka {topic_id => "logs"bootstrap_servers => "172.16.250.10:9092" # 真實產品環境需要修改對應的kafka集群列表} }啟動logstash,將會看到終端上顯示解析過的事件:
{"@timestamp" => "2016-03-03T05:11:03.000Z","host" => "sinoiot-172-16-250-3","clientip" => "172.16.1.34","size" => 191,"responsetime" => 0.0,"http_host" => "172.16.250.3","url" => "/mirror/","xff" => "-","referer" => "-","agent" => "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status" => 200,"@version" => "1","path" => "/var/log/nginx/access.log","type" => "nginx","tags" => [[0] "access"] } {"@timestamp" => "2016-03-03T05:11:03.000Z","host" => "sinoiot-172-16-250-3","clientip" => "172.16.1.34","size" => 0,"responsetime" => 0.0,"http_host" => "172.16.250.3","url" => "/favicon.ico","xff" => "-","referer" => "http://172.16.250.3/mirror/","agent" => "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status" => 204,"@version" => "1","path" => "/var/log/nginx/access.log","type" => "nginx","tags" => [[0] "access"] }從kafka隊列中的logs這個topic獲取日志信息,將看到下列內容:
$ bin/kafka-console-consumer.sh --zookeeper 172.16.250.10:2181 --topic logs --from-beginning {"@timestamp":"2016-03-03T05:12:21.000Z","host":"sinoiot-172-16-250-3","clientip":"218.75.124.3","size":162,"responsetime":0.000,"http_host":"218.75.124.3","url":"/mirror/packages.elastic.co/elasticsearch/2.x/debian/dists/stable/main/i18n/Translation-en","xff":"-","referer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)","status":404,"@version":"1","path":"/var/log/nginx/access.log","type":"nginx","tags":["access"]} {"@timestamp":"2016-03-03T05:12:21.000Z","host":"sinoiot-172-16-250-3","clientip":"218.75.124.3","size":162,"responsetime":0.000,"http_host":"218.75.124.3","url":"/mirror/packages.elastic.co/kibana/4.4/debian/dists/stable/main/i18n/Translation-en_US","xff":"-","referer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)","status":404,"@version":"1","path":"/var/log/nginx/access.log","type":"nginx","tags":["access"]}證明logstash已經將解析過的事件推送至kafka隊列中。
由于消息隊列中存儲的日志都是解析過的,所以日志服務器上的配置就簡單多了,只需要通過logstash將kafka中的日志推送至elasticsearch存儲即可。
日志服務器的logstash配置就簡單的多(真實產品環境下需要配置email filter插件,用于郵件報警)。
模擬日志服務器的logstash配置:
input {kafka {topic_id => "logs"zk_connect => "172.16.250.10:2181" # 真實產品環境替換為對應的zookeeper集群列表} }output {elasticsearch {codec => json}# 產品環境調試完畢,不需要stdout這個output pluginstdout {codec => "rubydebug"}# 產品環境需要郵件報警的話,加入email output# if 報警條件 {# email {# # email output插件的配置# }#} }最后,在kibana中將看到如下的效果:
應用程序日志
自己開發的應用程序直接按照JSON格式推送至Kafka消息隊列中,因此不需要通過logstash output kafka這種方式。log4j 1.2版本需要手工格式化成JSON,log4j 2.x版本提供了JSON appender,不過目前來看log4j 1.x版本依舊占據主流。輸出到kafka的配置參考附錄。
由于推送的topic_id是一樣的,因此日志服務器中的logstash配置也無需修改。
從kafka隊列中取出log,看看格式:
$ bin/kafka-console-consumer.sh --zookeeper 172.16.250.10:2181 --topic logs --from-beginning {"@timestamp":"2016-03-03T17:03:32.772+08:00","host":"172.16.1.4","type":"rtds","loglevel":"INFO","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"a":1,"b":2}} {"@timestamp":"2016-03-03T17:03:32.773+08:00","host":"172.16.1.4","type":"rtds","loglevel":"DEBUG","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"c":1,"d":2}} {"@timestamp":"2016-03-03T17:03:32.813+08:00","host":"172.16.1.4","type":"rtds","loglevel":"ERROR","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"errormsg":" java.math.BigDecimal.divide(Unknown Source)\n org.codehaus.groovy.runtime.typehandling.BigDecimalMath.divideImpl(BigDecimalMath.java:68)\n org.codehaus.groovy.runtime.typehandling.IntegerMath.divideImpl(IntegerMath.java:49)\n org.codehaus.groovy.runtime.dgmimpl.NumberNumberDiv$NumberNumber.invoke(NumberNumberDiv.java:323)\n org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.call(PojoMetaMethodSite.java:56)\n org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:125)\n hawkeyes.rtds.MainVerticle.test(MainVerticle.groovy:69)\n hawkeyes.rtds.MainVerticle.deployInStandaloneMode(MainVerticle.groovy:63)\n sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)\n sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\n java.lang.reflect.Method.invoke(Unknown Source)\n org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:93)\n groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1210)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1077)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1019)\n groovy.lang.Closure.call(Closure.java:426)\n groovy.lang.Closure.call(Closure.java:420)\n java_util_concurrent_Callable$call.call(Unknown Source)\n org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)\n hawkeyes.rtds.MainVerticle.start(MainVerticle.groovy:30)\n io.vertx.lang.groovy.GroovyVerticle.start(GroovyVerticle.groovy:64)\n io.vertx.lang.groovy.GroovyVerticle$1.start(GroovyVerticle.groovy:93)\n io.vertx.core.impl.DeploymentManager.lambda$doDeploy$159(DeploymentManager.java:429)\n io.vertx.core.impl.ContextImpl.lambda$wrapTask$16(ContextImpl.java:335)\n io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)\n io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)\n io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)\n java.lang.Thread.run(Unknown Source)\n"}} {"@timestamp":"2016-03-03T17:03:32.814+08:00","host":"172.16.1.4","type":"rtds","loglevel":"INFO","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"a":1,"b":2}} {"@timestamp":"2016-03-03T17:03:32.814+08:00","host":"172.16.1.4","type":"rtds","loglevel":"DEBUG","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"c":1,"d":2}}最終在kibana中展示的效果如圖:
總結
ELK這套架構的設計由于其外部組件的松耦合性,幾乎可以滿足各種規模日志收集,組合消息隊列,更是帶來了彈性伸縮的可能性。
這套架構的引入,將對今后日志的收集管理提供便利,通過日志提供的數據,也便于業務跟蹤。而且此架構在未來也容易擴展。
此架構涉及到的組件也相對較多,需要有一定的維護量。數據分析時不但需要有規范化的數據結構,也需要熟悉elasticsearch的聚合表達式,需要一些專業知識與學習成本。
附錄
附錄一 應用程序log輸出到Kafka的方法
修改log4j.properties文件,配置kafka appender即可將log內容輸入到kafka消息隊列中。
log4j.logger.hawkeyes.rtds=INFO, Kafka log4j.appender.Kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender log4j.appender.Kafka.layout=org.apache.log4j.EnhancedPatternLayout log4j.appender.Kafka.layout.ConversionPattern=%m log4j.appender.Kafka.brokerList=127.0.0.1:9092 log4j.appender.Kafka.topic=logs log4j.appender.Kafka.requiredNumAcks=1LOG4J主要由三大組件組成:
- Logger: 決定什么日志信息應該被輸出、什么日志信息應該被忽略;
- Appender: 指定日志信息應該輸出到什么地方, 這些地方可以是控制臺、文件、網絡設備;
- Layout: 指定日志信息的輸出格式;
按照原來配置log4j.rootLogger=DEBUG, Kafka這使程序中所有日志都會向Kafka中寫入。但KafkaLog4jAppender在初始化時,本身會打印log,它在獲取logger對象時又會繼續創建KafkaLog4jAppender,新的KafkaLog4jAppender又會打log, 這就成了死循環,因此定義了一個輸出范圍log4j.category.hawkeyes.rtds=INFO, Kafka,所有hawkeyes.rtds包下的類才會向kafka消息隊列中輸出,這不會影響KafkaLog4jAppender中log輸出。
附錄二 應用程序動態調整日志級別的實現方法
為滿足不重啟程序就能修改日志級別的需求,可以使用log4j的動態改變log輸出級別的功能。
動態修改loglevel原理
改變Logger中level屬性即可。
參考代碼:
def rtdsLogger = Logger.getLogger("hawkeyes.rtds") rtdsLogger.setLevel(Level.toLevel("info"))然后將這種方法進行封裝,對外提供一個可以操作的api即可(如REST api)。
附錄三 部分相關服務的配置參考
Nginx輸出JSON格式的log配置方法
編輯/etc/nginx/nginx.conf配置文件,加入以下內容:
log_format json '{"@timestamp":"$time_iso8601",''"host":"$hostname",''"clientip":"$remote_addr",''"size":$body_bytes_sent,''"responsetime":$request_time,''"http_host":"$host",''"url":"$uri",''"xff":"$http_x_forwarded_for",''"referer":"$http_referer",''"agent":"$http_user_agent",''"status":$status}';access_log /var/log/nginx/access.log json;刪掉原來默認的配置行access_log /var/log/nginx/access.log。重啟nginx,之后nginx的access log文件/var/log/nginx/access.log將以json_lines的格式打印日志。
以上配置參考了饒琛琳的《ELK stack權威指南》的相關章節
Zookeeper相關配置參考
Zookeeper集群配置范例:
需要改動的文件有兩個。在zookeeper的配置目錄中
- myid: 這個文件的內容修改為一個正整數,要求每個節點的數值不同
- zoo.cfg: 修改server.${id}=${ip}:2888:3888。這個id和myid中的數字一一對應,后面的ip是節點的ip(注意不要使用環回ip,必須是能被其他節點訪問到的ip,也可以是域名)。參考范例:
Zookeeper啟用SSL雙向認證: //TODO
Kafka相關配置參考
Kafka集群配置范例:
修改config目錄下的主配置文件server.properties。關鍵的幾個配置參數如下:
broker.id=1 advertised.host.name=172.16.250.10 zookeeper.connect=172.16.250.10:2181,172.16.250.13:2181,172.16.250.14:2181- broker.id: 同zookeeper集群配置,每個節點的id均為不重復的正整數。
- advertised.host.name: 同zookeeper的集群配置,設置為能被其他節點訪問到的ip或域名(該選項默認為系統主機名,不用hosts或dns基本無法被其他節點訪問到)。
- zookeeper.connect: 為zookeeper集群列表,格式為ip:port。多個節點使用,分割。
Kafka啟用SSL雙向認證: //TODO
Logstash配置參考
產品服務器的logstash將日志從文件取出,格式化后推送至日志服務器的Kafka中:
input {file {path => "/path/to/log/file" # 日志文件路徑type => "app" # 應用名,如nginx,postgresql等... SNIP ... # 這里根據不同的文件格式可能需要做不同處理} }filter {# filter這里主要是grok正則,nginx配置JSON日志格式后不需要grok解析grok {... SNIP ...} }output {kafka {topic_id => "logs"bootstrap_servers => "kafka" # 真實產品環境需要修改對應的kafka集群列表... SINP ...} }日志服務器logstash從kafka消息隊列中取出對應的日志消息,推送至elasticsearch存儲。 日志報警規則在日志服務器指定,便于修改報警規則。
input {kafka {zk_connect => "zookeeper_cluster:2181"topic_id => "logs"... SNIP ...} }filter {# 這里詳細指定郵件報警規則if "email_alert" in [tags] {email {... SNIP ...}} }output {elasticsearch {... SNIP ...} }參考文獻
- ELKstack 中文指南: https://www.gitbook.com/book/chenryn/kibana-guide-cn/details
- Logstash官方文檔: https://www.elastic.co/guide/en/logstash/2.2/index.html
- Kafka官方文檔集群配置: http://kafka.apache.org/documentation.html#quickstart_multibroker
- Zookeeper官方文檔集群配置: https://zookeeper.apache.org/doc/r3.3.2/zookeeperAdmin.html#sc_zkMulitServerSetup
轉載于:https://my.oschina.net/abcfy2/blog/703158
總結
以上是生活随笔為你收集整理的【产品环境】使用ELK搭建日志系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux下查看线程数的几种方法
- 下一篇: 操作系统结构