ELK+Kafka集群日志分析系统
因為是自己本地寫好的word文檔復制進來的。格式有些出入還望體諒。如有錯誤請回復。謝謝!
一、?系統介紹 2
二、?版本說明 3
三、?服務部署 3
1)?JDK部署 3
2)?Elasticsearch集群部署及優化 3
3)?Elasticsearch健康插件安裝 13
4)?Shield之elasticsearch安全插件 15
5)Zookeeper集群搭建 15
6)Kafka集群搭建 17
7)測試Kafka和Zookeeper集群連通性 19
8)?Logstash部署 20
9)?Kibana部署 20
四、?系統使用示例 22
1)?Logstash?作為kafka生產者示例 22
2)?Logstash?index?消費kafka示例 23
?
一、系統介紹
隨著實時分析技術的發展及成本的降低,用戶已經不僅僅滿足于離線分析。下面來介紹一下架構
?
?
?
這是一個再常見不過的架構了:
(1)Kafka:接收用戶日志的消息隊列
(2)Logstash:做日志解析,統一成json輸出給Elasticsearch
(3)Elasticsearch:實時日志分析服務的核心技術,一個schemaless,實時的數據存儲服務,通過index組織數據,兼具強大的搜索和統計功能。
(4)Kibana:基于Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇ELK?stack的重要原因。
(5)Zookeeper:?狀態管理,監控進程等服務
?
二、版本說明
本次系統為:Centos6.5?64位
Java版本為:1.8.0_74
Elasticsearch為:2.4.0
Logstash?:2.4.0
Kibana:4.6.1
Shield:2.0+
Kafka:2.10-0.10.0.1
Zookeeper:3.4.9
?
相應的版本最好下載對應的插件。
?
?
三、服務部署
1)?JDK部署
下載JDK包到/data目錄下解壓,并將變量導入/etc/profile末尾。
export?JAVA_HOME=/usr/local/jdk
export?PATH=$JAVA_HOME/bin:$PATH
export?CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
?
2)?Elasticsearch集群部署及優化
下載并安裝es?rpm包:
Rpm?-ivh?https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.4.0/elasticsearch-2.4.0.rpm
啟動方式:
/etc/init.d/elasticsearch?start|stop
Elasticsearch?bin文件內存優化:
由于是rpm?方式安裝將/usr/share/Elasticsearch/bin/Elasticsearch?加入如下參數:
ES_HEAP_SIZE=16g?????#ES_HEAP_SIZE表示JVM參數的-Xms?and?-Xmx設置
MAX_OPEN_FILES=65535
Elasticsearch?配置文件優化以及說明:
我們需要配置如下:
cluster.name:?es-ajb-cluster
node.name:?es-node-1
node.master:?true
node.data:?true
index.number_of_shards:?8
index.number_of_replicas:?1
path.data:?/data/es
path.logs:?/data/eslogs
bootstrap.mlockall:?true
network.host:?0.0.0.0
http.port:?9200
discovery.zen.ping.unicast.hosts:?["172.16.38.133",?"172.16.38.134","172.16.38.135"]
以下為更詳細yml配置文件參數解釋:
#####################?Elasticsearch?Configuration?Example?################
#?我只是挑些重要的配置選項進行注釋,其實自帶的已經有非常細致的英文注釋了.有理解偏差的地方請以英文原版解釋為準.
###################################?Cluster#############################
#?代表一個集群,集群中有多個節點,其中有一個為主節點,這個主節點是可以通過選舉產生的,主從節點是對于集群內部來說的.
#?es的一個概念就是去中心化,字面上理解就是無中心節點,這是對于集群外部來說的,因為從外部來看es集群,在邏輯上是個整體,你與任何一個節點的通信和與整個es集群通信是等價的。
#?cluster.name可以確定你的集群名稱,當你的elasticsearch集群在同一個網段中elasticsearch會自動的找到具有相同cluster.name的elasticsearch服務.
#?所以當同一個網段具有多個elasticsearch集群時cluster.name就成為同一個集群的標識.
#cluster.name:?elasticsearch
####################################?Node?##############################
#?節點名稱同理,可自動生成也可手動配置.
#node.name:?"Franz?Kafka"
#?允許一個節點是否可以成為一個master節點,es是默認集群中的第一臺機器為master,如果這臺機器停止就會重新選舉master.
#node.master:?true
#?允許該節點存儲數據(默認開啟)
#node.data:?true
#?配置文件中給出了三種配置高性能集群拓撲結構的模式,如下:
#?1.?如果你想讓節點從不選舉為主節點,只用來存儲數據,可作為負載器
#?node.master:?false
#?node.data:?true
#
#?2.?如果想讓節點成為主節點,且不存儲任何數據,并保有空閑資源,可作為協調器
#?node.master:?true
#?node.data:?false
#
#?3.?如果想讓節點既不稱為主節點,又不成為數據節點,那么可將他作為搜索器,從節點中獲取數據,生成搜索結果等
#?node.master:?false
#?node.data:?false
#?監控集群狀態有一下插件和API可以使用:
#?Use?the?Cluster?Health?API?[http://localhost:9200/_cluster/health],?the
#?Node?Info?API?[http://localhost:9200/_nodes]?or?GUI?tools
#?such?as?<http://www.elasticsearch.org/overview/marvel/>,
#?<http://github.com/karmi/elasticsearch-paramedic>,
#?<http://github.com/lukas-vlcek/bigdesk>?and
#?<http://mobz.github.com/elasticsearch-head>?to?inspect?the?cluster?state.
#?A?node?can?have?generic?attributes?associated?with?it,?which?can?later?be?used
#?for?customized?shard?allocation?filtering,?or?allocation?awareness.?An?attribute
#?is?a?simple?key?value?pair,?similar?to?node.key:?value,?here?is?an?example:
#
#node.rack:?rack314
#?By?default,?multiple?nodes?are?allowed?to?start?from?the?same?installation?location
#?to?disable?it,?set?the?following:
#node.max_local_storage_nodes:?1
?
####################################?Index?#############################
#?設置索引的分片數,默認為5
#index.number_of_shards:?5
#?設置索引的副本數,默認為1:
#index.number_of_replicas:?1
#?配置文件中提到的最佳實踐是,如果服務器夠多,可以將分片提高,盡量將數據平均分布到大集群中去
#?同時,如果增加副本數量可以有效的提高搜索性能
#?需要注意的是,"number_of_shards"?是索引創建后一次生成的,后續不可更改設置
#?"number_of_replicas"?是可以通過API去實時修改設置的
####################################?Paths?####################################
#?配置文件存儲位置
#path.conf:?/path/to/conf
#?數據存儲位置(單個目錄設置)
#path.data:?/path/to/data
#?多個數據存儲位置,有利于性能提升
#path.data:?/path/to/data1,/path/to/data2
#?臨時文件的路徑
#path.work:?/path/to/work
#?日志文件的路徑
#path.logs:?/path/to/logs
#?插件安裝路徑
#path.plugins:?/path/to/plugins
####################################?Plugin?############################
#?設置插件作為啟動條件,如果一下插件沒有安裝,則該節點服務不會啟動
#plugin.mandatory:?mapper-attachments,lang-groovy
###################################?Memory?##############################
#?當JVM開始寫入交換空間時(swapping)ElasticSearch性能會低下,你應該保證它不會寫入交換空間
#?設置這個屬性為true來鎖定內存,同時也要允許elasticsearch的進程可以鎖住內存,linux下可以通過?`ulimit?-l?unlimited`?命令
#bootstrap.mlockall:?true
#?確保?ES_MIN_MEM?和?ES_MAX_MEM?環境變量設置為相同的值,以及機器有足夠的內存分配給Elasticsearch
#?注意:內存也不是越大越好,一般64位機器,最大分配內存別才超過32G
##############################?Network?And?HTTP?#######################
#?設置綁定的ip地址,可以是ipv4或ipv6的,默認為0.0.0.0
#network.bind_host:?192.168.0.1
#?設置其它節點和該節點交互的ip地址,如果不設置它會自動設置,值必須是個真實的ip地址
#network.publish_host:?192.168.0.1
#?同時設置bind_host和publish_host上面兩個參數
#network.host:?192.168.0.1
#?設置節點間交互的tcp端口,默認是9300
#transport.tcp.port:?9300
?
#?設置是否壓縮tcp傳輸時的數據,默認為false,不壓縮
#transport.tcp.compress:?true
#?設置對外服務的http端口,默認為9200
#http.port:?9200
#?設置請求內容的最大容量,默認100mb
#http.max_content_length:?100mb
#?使用http協議對外提供服務,默認為true,開啟
#http.enabled:?false
###################################?Gateway?#############################
#?gateway的類型,默認為local即為本地文件系統,可以設置為本地文件系統
#gateway.type:?local
#?下面的配置控制怎樣以及何時啟動一整個集群重啟的初始化恢復過程
#?(當使用shard?gateway時,是為了盡可能的重用local?data(本地數據))
#?一個集群中的N個節點啟動后,才允許進行恢復處理
#gateway.recover_after_nodes:?1
#?設置初始化恢復過程的超時時間,超時時間從上一個配置中配置的N個節點啟動后算起
#gateway.recover_after_time:?5m
#?設置這個集群中期望有多少個節點.一旦這N個節點啟動(并且recover_after_nodes也符合),
#?立即開始恢復過程(不等待recover_after_time超時)
#gateway.expected_nodes:?2
#############################?Recovery?Throttling?#######################
#?下面這些配置允許在初始化恢復,副本分配,再平衡,或者添加和刪除節點時控制節點間的分片分配
#?設置一個節點的并行恢復數
#?1.初始化數據恢復時,并發恢復線程的個數,默認為4
#cluster.routing.allocation.node_initial_primaries_recoveries:?4
#
#?2.添加刪除節點或負載均衡時并發恢復線程的個數,默認為2
#cluster.routing.allocation.node_concurrent_recoveries:?2
#?設置恢復時的吞吐量(例如:100mb,默認為0無限制.如果機器還有其他業務在跑的話還是限制一下的好)
#indices.recovery.max_bytes_per_sec:?20mb
?
#?設置來限制從其它分片恢復數據時最大同時打開并發流的個數,默認為5
#indices.recovery.concurrent_streams:?5
#?注意:?合理的設置以上參數能有效的提高集群節點的數據恢復以及初始化速度
##################################?Discovery?##########################
#?設置這個參數來保證集群中的節點可以知道其它N個有master資格的節點.默認為1,對于大的集群來說,可以設置大一點的值(2-4)
#discovery.zen.minimum_master_nodes:?1
#?探查的超時時間,默認3秒,提高一點以應對網絡不好的時候,防止腦裂
#discovery.zen.ping.timeout:?3s
#?For?more?information,?see
#?<http://elasticsearch.org/guide/en/elasticsearch/reference/current/modules-discovery-zen.html>
#?設置是否打開多播發現節點.默認是true.
#?當多播不可用或者集群跨網段的時候集群通信還是用單播吧
#discovery.zen.ping.multicast.enabled:?false
#?這是一個集群中的主節點的初始列表,當節點(主節點或者數據節點)啟動時使用這個列表進行探測
#discovery.zen.ping.unicast.hosts:?["host1",?"host2:port"]
#?Slow?Log部分與GC?log部分略,不過可以通過相關日志優化搜索查詢速度
##############?Memory(重點需要調優的部分)?################
#?Cache部分:
#?es有很多種方式來緩存其內部與索引有關的數據.其中包括filter?cache
#?filter?cache部分:
#?filter?cache是用來緩存filters的結果的.默認的cache?type是node?type.node?type的機制是所有的索引內部的分片共享filter?cache.node?type采用的方式是LRU方式.即:當緩存達到了某個臨界值之后,es會將最近沒有使用的數據清除出filter?cache.使讓新的數據進入es.
#?這個臨界值的設置方法如下:indices.cache.filter.size?值類型:eg.:512mb?20%。默認的值是10%。
#?out?of?memory錯誤避免過于頻繁的查詢時集群假死
#?1.設置es的緩存類型為Soft?Reference,它的主要特點是據有較強的引用功能.只有當內存不夠的時候,才進行回收這類內存,因此在內存足夠的時候,它們通常不被回收.另外,這些引用對象還能保證在Java拋出OutOfMemory異常之前,被設置為null.它可以用于實現一些常用圖片的緩存,實現Cache的功能,保證最大限度的使用內存而不引起OutOfMemory.在es的配置文件加上index.cache.field.type:?soft即可.
#?2.設置es最大緩存數據條數和緩存失效時間,通過設置index.cache.field.max_size:?50000來把緩存field的最大值設置為50000,設置index.cache.field.expire:?10m把過期時間設置成10分鐘.
#index.cache.field.max_size:?50000
#index.cache.field.expire:?10m
#index.cache.field.type:?soft
#?field?data部分&&circuit?breaker部分:
#?用于field?data?緩存的內存數量,主要用于當使用排序,faceting操作時,elasticsearch會將一些熱點數據加載到內存中來提供給客戶端訪問,但是這種緩存是比較珍貴的,所以對它進行合理的設置.
#?可以使用值:eg:50mb?或者?30%(節點?node?heap內存量),默認是:unbounded
#indices.fielddata.cache.size:?unbounded
#?field的超時時間.默認是-1,可以設置的值類型:?5m
#indices.fielddata.cache.expire:?-1
#?circuit?breaker部分:
#?斷路器是elasticsearch為了防止內存溢出的一種操作,每一種circuit?breaker都可以指定一個內存界限觸發此操作,這種circuit?breaker的設定有一個最高級別的設定:indices.breaker.total.limit?默認值是JVM?heap的70%.當內存達到這個數量的時候會觸發內存回收
#?另外還有兩組子設置:
#indices.breaker.fielddata.limit:當系統發現fielddata的數量達到一定數量時會觸發內存回收.默認值是JVM?heap的70%
#indices.breaker.fielddata.overhead:在系統要加載fielddata時會進行預先估計,當系統發現要加載進內存的值超過limit?*?overhead時會進行進行內存回收.默認是1.03
#indices.breaker.request.limit:這種斷路器是elasticsearch為了防止OOM(內存溢出),在每次請求數據時設定了一個固定的內存數量.默認值是40%
#indices.breaker.request.overhead:同上,也是elasticsearch在發送請求時設定的一個預估系數,用來防止內存溢出.默認值是1
#?Translog部分:
#?每一個分片(shard)都有一個transaction?log或者是與它有關的預寫日志,(write?log),在es進行索引(index)或者刪除(delete)操作時會將沒有提交的數據記錄在translog之中,當進行flush?操作的時候會將tranlog中的數據發送給Lucene進行相關的操作.一次flush操作的發生基于如下的幾個配置
#index.translog.flush_threshold_ops:當發生多少次操作時進行一次flush.默認是?unlimited
#index.translog.flush_threshold_size:當translog的大小達到此值時會進行一次flush操作.默認是512mb
#index.translog.flush_threshold_period:在指定的時間間隔內如果沒有進行flush操作,會進行一次強制flush操作.默認是30m
#index.translog.interval:多少時間間隔內會檢查一次translog,來進行一次flush操作.es會隨機的在這個值到這個值的2倍大小之間進行一次操作,默認是5s
#index.gateway.local.sync:多少時間進行一次的寫磁盤操作,默認是5s
#?以上的translog配置都可以通過API進行動態的設置
?
集群部署
集群名稱不修改,節點名稱修改、將elasticsearch.yml?復制到其他節點,并替換其配置文件。并關閉自動發現,防止其他外來節點連入。
?
3)?Elasticsearch健康插件安裝
elasticsearch-head是一個elasticsearch的集群管理工具,它是完全由html5編寫的獨立網頁程序,你可以通過插件把它集成到es。
?
?插件安裝方法1:
1.elasticsearch/bin/plugin?-install?mobz/elasticsearch-head
2.運行es
3.打開http://localhost:9200/_plugin/head/
插件安裝方法2:
1.https://github.com/mobz/elasticsearch-head下載zip?解壓
2.建立elasticsearch\plugins\head\_site文件
3.將解壓后的elasticsearch-head-master文件夾下的文件copy到_site
4.運行es
5.打開http://localhost:9200/_plugin/head/
?
在地址欄輸入es服務器的ip地址和端口點connect就可以連接到集群。下面是連接后的視圖。這是主界面,在這里可以看到es集群的基本信息(如:節點情況,索引情況)。
?
通過head插件可以對索引執行api?操作。
?
4)?Shield之elasticsearch安全插件
官方安裝步驟以及驗證
?
Shield?2.0+
Compatible?with?the?latest?versions?of?Elasticsearch?and?Kibana
Step?1:?Install?Shield?into?Elasticsearch?
bin/plugin?install?license
bin/plugin?install?shield
Step?2:?Start?Elasticsearch? bin/elasticsearch
Step?3:?Add?an?admin?user? bin/shield/esusers?useradd?es_admin?-r?admin
Step?4:?Test?your?authenticated?user? curl?-u?es_admin?-XGET?'http://localhost:9200/'
Step?5:?Install?Shield?Into?Kibana?for?full?session?support?(Shield?2.2+)
Step?6:?Dive?into?the?Getting?Started?Guide
目前測試尚未通過,等待官方發布新版本
?
?
?
?
?
?
?
?
?
5)Zookeeper集群搭建
Kafka依賴Zookeeper管理自身集群(Broker、Offset、Producer、Consumer等),所以先要安裝?Zookeeper。自然,為了達到高可用的目的,Zookeeper自身也不能是單點,接下來就介紹如何搭建一個最小的Zookeeper集群(3個?zk節點)
????此處選用Zookeeper的版本是3.4.9,此為Kafka0.10中推薦的Zookeeper版本。
????首先解壓
????tar?-xzvf?zookeeper-3.4.9.tar.gz??
????進入zookeeper的conf目錄,將zoo_sample.cfg復制一份,命名為zoo.cfg,此即為Zookeeper的配置文件
????cp?zoo_sample.cfg?zoo.cfg??
????編輯zoo.cfg
#?The?number?of?milliseconds?of?each?tick?
tickTime=2000?
#?The?number?of?ticks?that?the?initial?
#?synchronization?phase?can?take?
initLimit=10?
#?The?number?of?ticks?that?can?pass?between?
#?sending?a?request?and?getting?an?acknowledgement?
syncLimit=5?
#?the?directory?where?the?snapshot?is?stored.?
dataDir=/data/zk/zk0/data?
dataLogDir=/data/zk/zk0/logs?
#?the?port?at?which?the?clients?will?connect?
clientPort=2181?
server.176?=?172.16.38.176:2888:3888
server.177?=?172.16.38.177:2888:3888
server.1771?=?172.16.38.177:2999:3999
autopurge.purgeInterval=1????
dataDir和dataLogDir的路徑需要在啟動前創建好
clientPort為zookeeper的服務端口
server.0/1/2為zk集群中三個node的信息,定義格式為hostname:port1:port2,其中port1是node間通信使用的端口,port2是node選舉使用的端口,需確保三臺主機的這兩個端口都是互通的
在另外兩臺主機上執行同樣的操作,安裝并配置zookeeper
????分別在三臺主機的dataDir路徑下創建一個文件名為myid的文件,文件內容為該zk節點的編號。例如在第一臺主機上建立的myid文件內容是0,第二臺是1。
????接下來,啟動三臺主機上的zookeeper服務:
????bin/zkServer.sh?start??
????3個節點都啟動完成后,可依次執行如下命令查看集群狀態:
????bin/zkServer.sh?status??
????命令輸出如下:
????Mode:?leader?或?Mode:?follower
????3個節點中,應有1個leader和兩個follower
????驗證zookeeper集群高可用性:
假設目前3個zk節點中,server0為leader,server1和server2為follower
我們停掉server0上的zookeeper服務:
????bin/zkServer.sh?stop??
????再到server1和server2上查看集群狀態,會發現此時server1(也有可能是server2)為leader,另一個為follower。
????再次啟動server0的zookeeper服務,運行zkServer.sh?status檢查,發現新啟動的server0也為follower
????至此,zookeeper集群的安裝和高可用性驗證完成。
????附:Zookeeper默認會將控制臺信息輸出到啟動路徑下的zookeeper.out中,顯然在生產環境中我們不能允許Zookeeper這樣做,通過如下方法,可以讓Zookeeper輸出按尺寸切分的日志文件:
修改conf/log4j.properties文件,將zookeeper.root.logger=INFO,?CONSOLE改為
zookeeper.root.logger=INFO,?ROLLINGFILE修改bin/zkEnv.sh文件,將
????ZOO_LOG4J_PROP="INFO,CONSOLE"改為ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
然后重啟zookeeper,就ok了
?
?
?
6)Kafka集群搭建
Kafka?是一個高吞吐量的分布式發布訂閱日志服務,具有高可用、高性能、分布式、高擴展、持久性等特性。目前已經在各大公司中廣泛使用。和之前采用?Redis?做輕量級消息隊列不同,Kafka?利用磁盤作隊列,所以也就無所謂消息緩沖時的磁盤問題。此外,如果公司內部已有?Kafka?服務在運行,logstash?也可以快速接入,免去重復建設的麻煩。
kafka?基本概念
?
????Topic?主題,聲明一個主題,producer指定該主題發布消息,訂閱該主題的consumer對該主題進行消費
????Partition?每個主題可以分為多個分區,每個分區對應磁盤上一個目錄,分區可以分布在不同broker上,producer在發布消息時,可以通過指定partition?key映射到對應分區,然后向該分區發布消息,在無partition?key情況下,隨機選取分區,一段時間內觸發一次(比如10分鐘),這樣就保證了同一個producer向同一partition發布的消息是順序的。?消費者消費時,可以指定partition進行消費,也可以使用high-level-consumer?api,自動進行負載均衡,并將partition分給consumer,一個partition只能被一個consumer進行消費
????Consumer?消費者,可以多實例部署,可以批量拉取,有兩類API可供選擇:一個simpleConsumer,暴露所有的操作給用戶,可以提交offset、fetch?offset、指定partition?fetch?message;另外一個high-level-consumer(ZookeeperConsumerConnector),幫助用戶做基于partition自動分配的負載均衡,定期提交offset,建立消費隊列等。simpleConsumer相當于手動擋,high-level-consumer相當于自動擋。
????simpleConsumer:無需像high-level-consumer那樣向zk注冊brokerid、owner,甚至不需要提交offset到zk,可以將offset提交到任意地方比如(mysql,本地文件等)。
????high-level-consumer:一個進程中可以啟多個消費線程,一個消費線程即是一個consumer,假設A進程里有2個線程(consumerid分別為1,2),B進程有2個線程(consumerid分別為1,2),topic1的partition有5個,那么partition分配是這樣的:
????partition1?--->?A進程consumerid1
????partition2?--->?A進程consumerid1
????partition3?--->?A進程consumerid2
????partition4?--->?B進程consumer1
????partition5?--->?B進程consumer2
????Group?High-level-consumer可以聲明group,每個group可以有多個consumer,每group各自管理各自的消費offset,各個不同group之間互不關聯影響。
由于目前版本消費的offset、owner、group都是consumer自己通過zk管理,所以group對于broker和producer并不關心,一些監控工具需要通過group來監控,simpleComsumer無需聲明group。
?
部署安裝
?
此例中,我們會安裝配置一個有兩個Broker組成的Kafka集群,并在其上創建一個兩個分區的Topic
本例中使用Kafka最新版本0.10.0
首先解壓官網下載kafkatar包
編輯config/server.properties文件,下面列出關鍵的參數
#此Broker的ID,集群中每個Broker的ID不可相同
broker.id=0
#監聽器,端口號與port一致即可
listeners=PLAINTEXT://:9092
#Broker監聽的端口
port=19092
#Broker的Hostname,填主機IP即可
host.name=172.16.38.176
#向Producer和Consumer建議連接的Hostname和port(此處有坑,具體見后)
advertised.host.name=172.16.38.176
advertised.port=9092
#進行IO的線程數,應大于主機磁盤數
num.io.threads=8
#消息文件存儲的路徑
log.dirs=/data/kafka-logs
#消息文件清理周期,即清理x小時前的消息記錄
log.retention.hours=168
#每個Topic默認的分區數,一般在創建Topic時都會指定分區數,所以這個配成1就行了
num.partitions=1
#Zookeeper連接串,此處填寫上一節中安裝的三個zk節點的ip和端口即可
zookeeper.connect=172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182
將172.16.38.177?也按照上圖配置文件中,只是修改brokerid即可。
?
?
?
7)測試Kafka和Zookeeper集群連通性
(1)建立一個主題
[root@kafka1?~]#?/usr/local/kafka/bin/kafka-topics.sh?--create?--zookeeper?localhost:2181?--replication-factor?2?--partitions?1?--topic?summer
#注意:factor大小不能超過broker數
?
(2)查看有哪些主題已經創建
[root@kafka1?~]#?/usr/local/kafka/bin/kafka-topics.sh?--list?--zookeeper?localhost:2181???#列出集群中所有的topic
summer??#已經創建成功
?
(3)查看summer這個主題的詳情
[root@kafka1?~]#?/usr/local/kafka/bin/kafka-topics.sh?--describe?--zookeeper?localhost:2181?--topic?summer
Topic:summer PartitionCount:16 ReplicationFactor:2 Configs:
Topic:?summer Partition:?0 Leader:?0 Replicas:?0,1 Isr:?0,1
Topic:?summer Partition:?1 Leader:?1 Replicas:?1,0 Isr:?0,1
Topic:?summer Partition:?2 Leader:?0 Replicas:?0,1 Isr:?0,1
Topic:?summer Partition:?3 Leader:?1 Replicas:?1,0 Isr:?0,1
Topic:?summer Partition:?4 Leader:?0 Replicas:?0,1 Isr:?0,1
Topic:?summer Partition:?5 Leader:?1 Replicas:?1,0 Isr:?0,1
Topic:?summer Partition:?6 Leader:?0 Replicas:?0,1 Isr:?0,1
Topic:?summer Partition:?7 Leader:?1 Replicas:?1,0 Isr:?0,1
Topic:?summer Partition:?8 Leader:?0 Replicas:?0,1 Isr:?0,1
Topic:?summer Partition:?9 Leader:?1 Replicas:?1,0 Isr:?0,1
Topic:?summer Partition:?10 Leader:?0 Replicas:?0,1 Isr:?0,1
Topic:?summer Partition:?11 Leader:?1 Replicas:?1,0 Isr:?0,1
Topic:?summer Partition:?12 Leader:?0 Replicas:?0,1 Isr:?0,1
Topic:?summer Partition:?13 Leader:?1 Replicas:?1,0 Isr:?0,1
Topic:?summer Partition:?14 Leader:?0 Replicas:?0,1 Isr:?0,1
Topic:?summer Partition:?15 Leader:?1 Replicas:?1,0 Isr:?0,1
#主題名稱:summer
#Partition:16個,從0開始
#leader?:id為0和1的broker
#Replicas?副本存在于broker?id為0,1的上面
#Isr:活躍狀態的broker
?
(4)發送消息,這里使用的是生產者角色
[root@kafka1?data]#?/bin/bash?/usr/local/kafka/bin/kafka-console-producer.sh?--broker-list?172.16.38.176:9092,172.16.38.177:9092?--topic?test1
?
(5)接收消息,這里使用的是消費者角色
[root@kafka2?data]#?/usr/local/kafka/bin/kafka-console-consumer.sh?--zookeeper??172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182?--topic?test1
?
(6)驗證的效果,生產者和消費者
?
?
(7)刪除一個消息主題
[root@kafka2data]#/usr/local/kafka/bin/kafka-topics.sh--zookeeper?172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182?--delete?--topic?test1
Topic?test1?is?marked?for?deletion.
Note:?This?will?have?no?impact?if?delete.topic.enable?is?not?set?to?true.
?
這樣kafka和zookeeper集群配置完畢。
?
8)Logstash部署
Logstash?requires?Java?7?or?later.?Use?the?official?Oracle?distribution?or?an?open-source?distribution?such?as?OpenJDK.
Download?and?install?the?public?signing?key:
?
rpm?--import?https://packages.elastic.co/GPG-KEY-elasticsearch
?
Add?the?following?in?your?/etc/yum.repos.d/?directory?in?a?file?with?a?.repo?suffix,?for?example?logstash.repo
?
[logstash-2.4]
name=Logstash?repository?for?2.4.x?packages
baseurl=https://packages.elastic.co/logstash/2.4/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
?
And?your?repository?is?ready?for?use.?You?can?install?it?with:
?
yum?install?logstash
?
?
?
?
?
9)Kibana部署
?????1、Download?and?install?the?public?signing?key:
?
????rpm?--import?https://packages.elastic.co/GPG-KEY-elasticsearch
?
????2、Create?a?file?named?kibana.repo?in?the?/etc/yum.repos.d/?directory?with?the?following?contents:
?
????[kibana-4.6]
????name=Kibana?repository?for?4.6.x?packages
????baseurl=https://packages.elastic.co/kibana/4.6/centos
????gpgcheck=1
????gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
????enabled=1
?
????3、Install?Kibana?by?running?the?following?command:
?
????yum?install?kibana
?
????4、Configure?Kibana?to?automatically?start?during?bootup.?If?your?distribution?is?using?the?System?V?version?of?init?(check?with?ps?-p?1),?run?the?following?command:
?
????chkconfig?--add?kibana
?
???5、If?your?distribution?is?using?systemd,?run?the?following?commands?instead:
?
????sudo?/bin/systemctl?daemon-reload
sudo?/bin/systemctl?enable?kibana.service
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
四、系統使用示例
按照logstash-->kafka-->zookeeper-->logstash-->elasticsearch-->kibana?數據順序
1)?Logstash?作為kafka生產者示例
?
?
2)?Logstash?index?消費kafka示例
?
來源:http://www.cnblogs.com/shiyiwen/p/6150213.html
總結
以上是生活随笔為你收集整理的ELK+Kafka集群日志分析系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 国家开发银行怎么还款
- 下一篇: ELK+kafka日志系统搭建-实战