日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ELK学习6_Kafka-Logstash-Elasticsearch数据流操作

發布時間:2024/9/20 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ELK学习6_Kafka-Logstash-Elasticsearch数据流操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Logstash配置過程

Logstash中建立inputoutput的條件:

[html]?view plaincopy
  • [hadoop@Slave1?~]$?cd?/usr/local/??
  • [hadoop@Slave1?local]$?cd?logstash/??
  • [hadoop@Slave1?logstash]$?ls??
  • bin???????????CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE?????vendor??
  • CHANGELOG.md??Gemfile???????lib?????????????????????NOTICE.TXT??
  • [hadoop@Slave1?logstash]$?mkdir?-p?conf??
  • [hadoop@Slave1?logstash]$?ls??
  • bin???????????conf??????????Gemfile?????????????????lib??????NOTICE.TXT??
  • CHANGELOG.md??CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE??vendor??
  • [hadoop@Slave1?logstash]$?cd?conf??
  • [hadoop@Slave1?conf]$?ls??
  • [hadoop@Slave1?conf]$?touch?kafkaInput_esOutPut.conf??
  • [hadoop@Slave1?conf]$?ls??
  • kafkaInput_esOutPut.conf??
  • [hadoop@Slave1?conf]$???
  • [hadoop@Slave1?conf]$?vim?kafkaInput_esOutPut.conf???

  • ?

    對kafkaInput_esOutPut.conf進行編輯,本機的具體內容如下:

    [html]?view plaincopy
  • input?{????
  • kafka?{????
  • ??zk_connect?=>?"192.168.154.158:2181,192.168.154.159:2181,192.168.154.160:2181"????
  • ??group_id?=>?"test-consumer-group"????
  • ??topic_id?=>?"logStash"????
  • ??reset_beginning?=>?false?#?boolean?(optional),?default:?false????
  • ??consumer_threads?=>?5??#?number?(optional),?default:?1????
  • ??decorate_events?=>?true?#?boolean?(optional),?default:?false????
  • ??}????
  • }????
  • ????
  • filter{????
  • ????mutate{????
  • ????????????#以:號分割message內容,分割后以數據方式顯示。????
  • ????????????#比如abc:efg?=>?message[0]?=?abc?message[1]=efg????
  • ????????split?=>?["message",","]????
  • ????}????
  • ????#第一個數據的內容中ORA-xxxxx這種格式,則這條內容是ora錯誤。添加二個字段????
  • ????mutate{????
  • ????????add_field?=>???{????
  • ????????????????"source_Ip"?=>?"%{[message][0]}"????
  • ????????????????"source_Port"?=>?"%{[message][1]}"????
  • ????????????????"dest_Ip"?=>?"%{[message][2]}"????
  • ????????????????"dest_Port"?=>?"%{[message][3]}"????
  • ????????????????}????
  • ????}????
  • }????
  • ????
  • output?{????
  • ?????elasticsearch?{????
  • ?????????
  • ????????host?=>?"localhost"??????????
  • ????}????
  • }????


  • 分別ssh鏈接Slave2Slave3,將kafkaInput_esOutPut.conf拷貝到這兩臺機器上:

    創建conf目錄過程:

    [html]?view plaincopy
  • [hadoop@Slave1?conf]$?ssh?Slave2??
  • Last?login:?Wed?Oct?14?10:58:06?2015?from?slave1??
  • [hadoop@Slave2?~]$?cd?/usr/local/logstash/??
  • [hadoop@Slave2?logstash]$?mkdir?-p?conf??
  • [hadoop@Slave2?logstash]$?ls??
  • bin???????????conf??????????Gemfile?????????????????lib??????NOTICE.TXT??
  • CHANGELOG.md??CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE??vendor??
  • [hadoop@Slave2?logstash]$?exit??
  • logout??
  • Connection?to?Slave2?closed.??
  • [hadoop@Slave1?conf]$?ssh?Slave3??
  • Last?login:?Wed?Oct?14?10:59:01?2015?from?slave2??
  • [hadoop@Slave3?~]$?cd?/usr/local/logstash/??
  • [hadoop@Slave3?logstash]$?mkdir?-p?conf??
  • [hadoop@Slave3?logstash]$?ls??
  • bin???????????conf??????????Gemfile?????????????????lib??????NOTICE.TXT??
  • CHANGELOG.md??CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE??vendor??
  • [hadoop@Slave3?logstash]$?exit??
  • logout??
  • Connection?to?Slave3?closed.??

  • 傳輸文件過程:

    [html]?view plaincopy
  • [hadoop@Slave1?conf]$?scp?kafkaInput_esOutPut.conf?Slave2:/usr/local/logstash/conf/??
  • kafkaInput_esOutPut.conf??????????????????????100%?1063?????1.0KB/s???00:00??????
  • [hadoop@Slave1?conf]$?scp?kafkaInput_esOutPut.conf?Slave3:/usr/local/logstash/conf/??
  • kafkaInput_esOutPut.conf??????????????????????100%?1063?????1.0KB/s???00:00??????
  • [hadoop@Slave1?conf]$?ssh?Slave2??
  • Last?login:?Tue?Oct?27?23:46:19?2015?from?slave1??
  • [hadoop@Slave2?~]$?cd?/usr/local/logstash/conf/??
  • [hadoop@Slave2?conf]$?ls??
  • kafkaInput_esOutPut.conf??
  • [hadoop@Slave2?conf]$???

  • Kafka操作過程

    在三臺機器上啟動zookeeper

    關閉防火墻:

    [html]?view plaincopy
  • [hadoop@Slave1?bin]$?su??
  • Password:???
  • [root@Slave1?bin]#?service?iptables?stop??
  • iptables:?Setting?chains?to?policy?ACCEPT:?filter??????????[??OK??]??
  • iptables:?Flushing?firewall?rules:?????????????????????????[??OK??]??
  • iptables:?Unloading?modules:???????????????????????????????[??OK??]??
  • [root@Slave1?bin]#?exit??
  • exit??
  • [hadoop@Slave1?bin]??


  • 啟動:

    [html]?view plaincopy
  • [hadoop@Slave1?bin]$?./zkServer.sh?start??
  • JMX?enabled?by?default??
  • Using?config:?/usr/local/zookeeper/bin/../conf/zoo.cfg??
  • Starting?zookeeper?...?STARTED??


  • 在其他三臺機器上進行相同操作后,查看結果:

    [html]?view plaincopy
  • [hadoop@Slave1?bin]$?./zkServer.sh?status??
  • JMX?enabled?by?default??
  • Using?config:?/usr/local/zookeeper/bin/../conf/zoo.cfg??
  • Mode:?leader??
  • [hadoop@Slave1?bin]$???


  • 在三臺機器上啟動Kafka,以Slave1為例:

    [html]?view plaincopy
  • [hadoop@Slave1?bin]$?cd?/usr/local/kafka/??
  • [hadoop@Slave1?kafka]$?bin/kafka-server-start.sh?config/server.properties???

  • 新建名為logStashtopic

    [html]?view plaincopy
  • [hadoop@Slave1?~]$?cd?/usr/local/kafka/??
  • [hadoop@Slave1?kafka]$?cd?bin??
  • [hadoop@Slave1?bin]$?sh?kafka-topics.sh?--create?--topic?logStash?--replication-factor?1?--partitions?1?--zookeeper?Slave1:2181??
  • Created?topic?"logStash".??
  • [hadoop@Slave1?bin]$???


  • 啟動Logstash

    在三臺機器上,進行啟動:

    [html]?view plaincopy
  • [hadoop@Slave1?~]$?cd?/usr/local/logstash/??
  • [hadoop@Slave1?logstash]$?ls??
  • bin???????????conf??????????Gemfile?????????????????lib??????NOTICE.TXT??
  • CHANGELOG.md??CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE??vendor??
  • [hadoop@Slave1?logstash]$?cd?bin??
  • [hadoop@Slave1?bin]$?ls??
  • logstash??????logstash.lib.sh??plugin.bat??rspec.bat??
  • logstash.bat??plugin???????????rspec???????setup.bat??


  • 啟動過程中,顯示的內容如下,會出現一些警告:

    [html]?view plaincopy
  • [hadoop@Slave2?bin]$?./logstash?agent?-f?../conf/kafkaInput_esOutPut.conf???
  • log4j,?[2015-10-28T21:52:07.116]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-3?for?topic?logStash??
  • log4j,?[2015-10-28T21:52:07.118]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-2?for?topic?logStash??
  • log4j,?[2015-10-28T21:52:07.119]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-0?for?topic?logStash??
  • log4j,?[2015-10-28T21:52:07.119]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-4?for?topic?logStash??
  • log4j,?[2015-10-28T21:52:07.120]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-1?for?topic?logStash??
  • log4j,?[2015-10-28T21:52:33.934]??WARN:?org.elasticsearch.bootstrap:?JNA?not?found.?native?methods?will?be?disabled.??
  • log4j,?[2015-10-28T21:53:09.347]??WARN:?org.elasticsearch.discovery:?[logstash-Slave2-4244-11624]?waited?for?30s?and?no?initial?state?was?set?by?the?discovery??
  • log4j,?[2015-10-28T21:53:35.632]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-3?for?topic?logStash??
  • log4j,?[2015-10-28T21:53:35.633]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-2?for?topic?logStash??
  • log4j,?[2015-10-28T21:53:35.634]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-0?for?topic?logStash??
  • log4j,?[2015-10-28T21:53:35.634]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-4?for?topic?logStash??
  • log4j,?[2015-10-28T21:53:35.634]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-1?for?topic?logStash??
  • Failed?to?install?template:?waited?for?[30s]?{:level=>:error}??
  • Logstash?startup?completed??


  • 發送并接收數據

    啟動剛才建立的topic

    [html]?view plaincopy
  • [hadoop@Slave1?~]$?cd?/usr/local/kafka/??
  • [hadoop@Slave1?kafka]$?ls??
  • bin??config??libs??LICENSE??logs??NOTICE??
  • [hadoop@Slave1?kafka]$?bin/kafka-console-producer.sh?--broker-list?Slave1:9092?--topic?logStash??

  • 啟動ES

    [html]?view plaincopy
  • [hadoop@Slave1?~]$?cd?/usr/local/elasticsearch/??
  • [hadoop@Slave1?elasticsearch]$?bin/elasticsearch?-f??
  • getopt:?invalid?option?--?'f'??
  • [2015-10-29?00:47:27,084][INFO?][node?????????????????????]?[Clown]?version[1.7.3],?pid[5208],?build[05d4530/2015-10-15T09:14:17Z]??
  • [2015-10-29?00:47:27,131][INFO?][node?????????????????????]?[Clown]?initializing?...??
  • [2015-10-29?00:47:27,920][INFO?][plugins??????????????????]?[Clown]?loaded?[],?sites?[]??
  • [2015-10-29?00:47:28,548][INFO?][env??????????????????????]?[Clown]?using?[1]?data?paths,?mounts?[[/?(/dev/sda2)]],?net?usable_space?[9.7gb],?net?total_space?[17.4gb],?types?[ext4]??
  • [2015-10-29?00:47:43,711][INFO?][node?????????????????????]?[Clown]?initialized??
  • [2015-10-29?00:47:43,729][INFO?][node?????????????????????]?[Clown]?starting?...??
  • [2015-10-29?00:47:46,089][INFO?][transport????????????????]?[Clown]?bound_address?{inet[/0:0:0:0:0:0:0:0:9301]},?publish_address?{inet[/192.168.154.158:9301]}??
  • [2015-10-29?00:47:46,606][INFO?][discovery????????????????]?[Clown]?elasticsearch/v-jkBhkxSheape14hvMAHw??
  • [2015-10-29?00:47:50,712][INFO?][cluster.service??????????]?[Clown]?new_master?[Clown][v-jkBhkxSheape14hvMAHw][Slave1][inet[/192.168.154.158:9301]],?reason:?zen-disco-join?(elected_as_master)??
  • [2015-10-29?00:47:50,985][INFO?][http?????????????????????]?[Clown]?bound_address?{inet[/0:0:0:0:0:0:0:0:9200]},?publish_address?{inet[/192.168.154.158:9200]}??
  • [2015-10-29?00:47:50,986][INFO?][node?????????????????????]?[Clown]?started??
  • [2015-10-29?00:47:51,345][INFO?][gateway??????????????????]?[Clown]?recovered?[0]?indices?into?cluster_state??
  • [2015-10-29?00:47:51,346][INFO?][cluster.service??????????]?[Clown]?added?{[logstash-Slave1-4083-11624][loTUXdCXRVC_WzqzhD3PWg][Slave1][inet[/192.168.154.158:9300]]{data=false,?client=true},},?reason:?zen-disco-receive(join?from?node[[logstash-Slave1-4083-11624][loTUXdCXRVC_WzqzhD3PWg][Slave1][inet[/192.168.154.158:9300]]{data=false,?client=true}])??
  • [2015-10-29?00:47:54,185][INFO?][cluster.metadata?????????]?[Clown]?[logstash-2015.10.29]?creating?index,?cause?[auto(bulk?api)],?templates?[],?shards?[5]/[1],?mappings?[logs]??
  • [2015-10-29?00:47:56,201][INFO?][cluster.metadata?????????]?[Clown]?[logstash-2015.10.29]?update_mapping?[logs]?(dynamic)??
  • [2015-10-29?00:47:57,166][INFO?][cluster.metadata?????????]?[Clown]?[logstash-2015.10.29]?update_mapping?[logs]?(dynamic)??

  • 檢查ES是否啟動成功:

    [html]?view plaincopy
  • [hadoop@Slave1?~]$?curl?-X?GET?http://localhost:9200??
  • {??
  • ??"status"?:?200,??
  • ??"name"?:?"Clown",??
  • ??"cluster_name"?:?"elasticsearch",??
  • ??"version"?:?{??
  • ????"number"?:?"1.7.3",??
  • ????"build_hash"?:?"05d4530971ef0ea46d0f4fa6ee64dbc8df659682",??
  • ????"build_timestamp"?:?"2015-10-15T09:14:17Z",??
  • ????"build_snapshot"?:?false,??
  • ????"lucene_version"?:?"4.10.4"??
  • ??},??
  • ??"tagline"?:?"You?Know,?for?Search"??
  • }??
  • [hadoop@Slave1?~]$???

  • ?

    在剛才啟動的topic里發送數據:

    (數據的格式是源IP,源端口,目的IP,目的端口;為了簡便,發送1,1,1,1

    [html]?view plaincopy
  • [hadoop@Slave1?kafka]$?bin/kafka-console-producer.sh?--broker-list?Slave1:9092?--topic?logStash??
  • [2015-10-29?00:39:33,085]?WARN?Property?topic?is?not?valid?(kafka.utils.VerifiableProperties)??
  • 1,1,1,1??

  • 查看接收的數據:

    [html]?view plaincopy
  • [hadoop@Slave1?~]$?curl?-XGET?'localhost:9200/logstash-2015.10.27/_search'??
  • {"error":"IndexMissingException[[logstash-2015.10.27]?missing]","status":404}[hadoop@Slave1?~]$?curl?-XGET?'localhost:9200/logstash-2015.10.29/_search'??
  • {"took":260,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":2,"max_score":1.0,"hits":[{"_index":"logstash-2015.10.29","_type":"logs","_id":"AVCykUgg6gAQTB_SuF_V","_score":1.0,"_source":{"message":["1","1","1","1"],"tags":["_jsonparsefailure"],"@version":"1","@timestamp":"2015-10-29T07:39:50.871Z","kafka":{"msg_size":7,"topic":"logStash","consumer_group":"test-consumer-group","partition":0,"key":null},"source_Ip":"1","source_Port":"1","dest_Ip":"1","dest_Port":"1"}},{"_index":"logstash-2015.10.29","_type":"logs","_id":"AVCykUGv6gAQTB_SuF_U","_score":1.0,"_source":{"message":[],"tags":["_jsonparsefailure"],"@version":"1","@timestamp":"2015-10-29T07:39:46.345Z","kafka":{"msg_size":0,"topic":"logStash","consumer_group":"test-consumer-group","partition":0,"key":null},"source_Ip":"%{[message][0]}","source_Port":"%{[message][1]}","dest_Ip":"%{[message][2]}","dest_Port":"%{[message][3]}"}}]}}[hadoop@Slave1?~]$???

  • [html]?view plaincopy
  • [hadoop@Slave1?~]$?curl?-XGET?'localhost:9200/logstash-2015.10.29/_search?pretty'??
  • {??
  • ??"took"?:?26,??
  • ??"timed_out"?:?false,??
  • ??"_shards"?:?{??
  • ????"total"?:?5,??
  • ????"successful"?:?5,??
  • ????"failed"?:?0??
  • ??},??
  • ??"hits"?:?{??
  • ????"total"?:?2,??
  • ????"max_score"?:?1.0,??
  • ????"hits"?:?[?{??
  • ??????"_index"?:?"logstash-2015.10.29",??
  • ??????"_type"?:?"logs",??
  • ??????"_id"?:?"AVCykUgg6gAQTB_SuF_V",??
  • ??????"_score"?:?1.0,??
  • ??????"_source":{"message":["1","1","1","1"],"tags":["_jsonparsefailure"],"@version":"1","@timestamp":"2015-10-29T07:39:50.871Z","kafka":{"msg_size":7,"topic":"logStash","consumer_group":"test-consumer-group","partition":0,"key":null},"source_Ip":"1","source_Port":"1","dest_Ip":"1","dest_Port":"1"}??
  • ????},?{??
  • ??????"_index"?:?"logstash-2015.10.29",??
  • ??????"_type"?:?"logs",??
  • ??????"_id"?:?"AVCykUGv6gAQTB_SuF_U",??
  • ??????"_score"?:?1.0,??
  • ??????"_source":{"message":[],"tags":["_jsonparsefailure"],"@version":"1","@timestamp":"2015-10-29T07:39:46.345Z","kafka":{"msg_size":0,"topic":"logStash","consumer_group":"test-consumer-group","partition":0,"key":null},"source_Ip":"%{[message][0]}","source_Port":"%{[message][1]}","dest_Ip":"%{[message][2]}","dest_Port":"%{[message][3]}"}??
  • ????}?]??
  • ??}??
  • }??
  • [hadoop@Slave1?~]$??

  • 參考資料:

    http://blog.csdn.net/xuguokun1986/article/details/49452101

    對這篇博客的內容進行了擴展。


    來源:http://blog.csdn.net/wang_zhenwei/article/details/49493131

    與50位技術專家面對面20年技術見證,附贈技術全景圖

    總結

    以上是生活随笔為你收集整理的ELK学习6_Kafka-Logstash-Elasticsearch数据流操作的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。