使用Logstash,JDBC将数据聚合并索引到Elasticsearch中
介紹
在我以前的帖子在這里和這里我展示了如何使用JDBC和Elasticsearch JDBC進(jìn)口商庫(kù)從SQL數(shù)據(jù)庫(kù)索引數(shù)據(jù)到Elasticsearch。 在這里的第一篇文章中,我提到了使用導(dǎo)入程序庫(kù)的一些缺點(diǎn),這些缺點(diǎn)我已在此處復(fù)制:
- 不支持ES版本5及更高版本
- 嵌套對(duì)象數(shù)組中可能存在重復(fù)的對(duì)象。 但是重復(fù)數(shù)據(jù)刪除可以在應(yīng)用程序?qū)舆M(jìn)行處理。
- 對(duì)最新ES版本的支持可能會(huì)延遲。
使用Logstash及其以下插件可以克服以上所有缺點(diǎn):
- JDBC Input插件 –用于使用JDBC從SQL DB讀取數(shù)據(jù)
- 聚合過(guò)濾器插件 –用于將SQL DB中的行聚合到嵌套對(duì)象中。
我將使用最新的ES版,即5.63可以從Elasticsearch網(wǎng)站下載這里 。 我們將使用此處可用的映射創(chuàng)建索引world_v2。
$ curl -XPUT --header "Content-Type: application/json" http://localhost:9200/world_v2 -d @world-index.json或使用Postman REST客戶端,如下所示:
要確認(rèn)索引已成功創(chuàng)建,請(qǐng)?jiān)跒g覽器中打開(kāi)此URL http:// localhost:9200 / world_v2,以得到類(lèi)似于以下內(nèi)容的內(nèi)容:
創(chuàng)建Logstash配置文件
我們應(yīng)該選擇等效的logstash版本,即5.6.3,可以從此處下載。 然后,我們需要使用以下命令安裝JDBC輸入插件,聚合過(guò)濾器插件和Elasticsearch輸出插件:
bin/logstash-plugin install logstash-input-jdbc bin/logstash-plugin install logstash-filter-aggregate bin/logstash-plugin install logstash-output-elasticsearch我們需要將以下內(nèi)容復(fù)制到bin目錄中,以便能夠運(yùn)行我們將在接下來(lái)定義的配置:
我們將上述內(nèi)容復(fù)制到Logstash的bin目錄或您將擁有l(wèi)ogstash配置文件的任何目錄中,這是因?yàn)槲覀冊(cè)谂渲弥惺褂眠@兩個(gè)文件的相對(duì)路徑來(lái)引用這兩個(gè)文件。 下面是Logstash配置文件:
input {jdbc {jdbc_connection_string => "jdbc:mysql://localhost:3306/world"jdbc_user => "root"jdbc_password => "mohamed"# The path to downloaded jdbc driverjdbc_driver_library => "mysql-connector-java-5.1.6.jar"jdbc_driver_class => "Java::com.mysql.jdbc.Driver"# The path to the file containing the querystatement_filepath => "world-logstash.sql"} } filter {aggregate {task_id => "%{code}"code => "map['code'] = event.get('code')map['name'] = event.get('name')map['continent'] = event.get('continent')map['region'] = event.get('region')map['surface_area'] = event.get('surface_area')map['year_of_independence'] = event.get('year_of_independence')map['population'] = event.get('population')map['life_expectancy'] = event.get('life_expectancy')map['government_form'] = event.get('government_form')map['iso_code'] = event.get('iso_code')map['capital'] = {'id' => event.get('capital_id'), 'name' => event.get('capital_name'),'district' => event.get('capital_district'),'population' => event.get('capital_population')}map['cities_list'] ||= []map['cities'] ||= []if (event.get('cities_id') != nil)if !( map['cities_list'].include? event.get('cities_id') ) map['cities_list'] << event.get('cities_id')map['cities'] << {'id' => event.get('cities_id'), 'name' => event.get('cities_name'),'district' => event.get('cities_district'),'population' => event.get('cities_population')}endendmap['languages_list'] ||= []map['languages'] ||= []if (event.get('languages_language') != nil)if !( map['languages_list'].include? event.get('languages_language') )map['languages_list'] << event.get('languages_language')map['languages'] << {'language' => event.get('languages_language'), 'official' => event.get('languages_official'),'percentage' => event.get('languages_percentage')}endendevent.cancel()"push_previous_map_as_event => truetimeout => 5}mutate { remove_field => ["cities_list", "languages_list"]} } output {elasticsearch {document_id => "%{code}"document_type => "world"index => "world_v2"codec => "json"hosts => ["127.0.0.1:9200"]} }我們將配置文件放置在logstash的bin目錄中。 我們使用以下命令運(yùn)行l(wèi)ogstash管道:
$ logstash -w 1 -f world-logstash.conf我們使用1個(gè)工作程序,因?yàn)楫?dāng)匯總發(fā)生時(shí),多個(gè)工作人員可能會(huì)破壞匯總,這是基于具有共同國(guó)家/地區(qū)代碼的事件序列。 成功完成Logstash管道后,我們將看到以下輸出:
在瀏覽器中打開(kāi)以下URL http:// localhost:9200 / world_v2 / world / IND ,以查看在Elasticsearch中索引的印度的信息,如下所示:
翻譯自: https://www.javacodegeeks.com/2017/10/aggregate-index-data-elasticsearch-using-logstash-jdbc.html
總結(jié)
以上是生活随笔為你收集整理的使用Logstash,JDBC将数据聚合并索引到Elasticsearch中的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: spark减少内存消耗_将内存消耗减少2
- 下一篇: 棋牌类游戏算法–牌分类_快速分类–三向和