如何将不同类型数据导入Elaticsearch中?
Elaticsearch的原理明白了以后,手頭有很多不同類型的數(shù)據(jù),如:?
1)單條數(shù)據(jù),如程序中自己構造的JSON格式數(shù)據(jù);?
2)符合Elasticsearch索引規(guī)范的批量數(shù)據(jù);?
3)日志文件,格式*.log;?
4)結構化數(shù)據(jù),存儲在mysql、oracle等關系型數(shù)據(jù)庫中;?
5)非結構化數(shù)據(jù),存儲在mongo中;?
如何將這些數(shù)據(jù)導入到Elasticsearch中呢?接下來,本文將逐個介紹。
1、單條索引導入elasticsearch
該方法類似mysql的insert 語句,用于插入一條數(shù)據(jù)。
[root@yang json_input]# curl -XPUT 'http://192.168.1.1:9200/blog/article/1' -d ' > { > "title":"New version of Elasticsearch released!", > "content":"Version 1.0 released today!", > "tags":["announce","elasticsearch","release"] > }'- 1
- 2
- 3
- 4
- 5
- 6
結果查看如下所示:
[root@yang json_input]# curl -XGET 'http://192.168.1.1:9200/blog/article/1?pretty' {"_index" : "blog","_type" : "article","_id" : "1","_version" : 1,"found" : true,"_source" : {"title" : "New version of Elasticsearch released!","content" : "Version 1.0 released today!","tags" : [ "announce", "elasticsearch", "release" ]} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
圖形化顯示如下:?
2、批量索引導入到 elasticsearch。
(1)索引結構映射
類似于SQL創(chuàng)建模式描述數(shù)據(jù),Mapping控制并定義結構。
[root@yang json_input]# cat mapping.json { "book" : { "_all": { "enabled": false }, "properties" : { "author" : { "type" : "string" }, "characters" : { "type" : "string" }, "copies" : { "type" : "long", "ignore_malformed" : false }, "otitle" : { "type" : "string" }, "tags" : { "type" : "string" }, "title" : { "type" : "string" }, "year" : { "type" : "long", "ignore_malformed" : false, "index" : "analyzed" }, "available" : { "type" : "boolean" } } } } [root@yang json_input]# curl -XPUT 'http://110.0.11.120:9200/library/book/_mapping' -d @mapping.json {"acknowledged":true}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
(2)批量索引,將構造好的JSON信息和數(shù)據(jù)導入elasticsearch
Elasticsearch可以合并多個請求至單個包中,而這些包可以單個請求的形式傳送。如此,可以將多個操作結合起來:
1)在索引中增加或更換現(xiàn)有文檔(index);?
2)從索引中移除文檔(delete);?
3)當索引中不存在其他文檔定義時,在索引中增加新文檔(create)。
為了獲得較高的處理效率,選擇這樣的請求格式。它假定,請求的每一行包含描述操作說明的JSON對象,第二行為JSON對象本身。
可以把第一行看做信息行,第二行行為數(shù)據(jù)行。唯一的例外是Delete操作,它只包含信息行。
舉例如下:
[root@yang json_input]# cat documents_03.json { "index": {"_index": "library", "_type": "book", "_id": "1"}} { "title": "All Quiet on the Western Front","otitle": "Im Westen nichts Neues","author": "Erich Maria Remarque","year": 1929,"characters": ["Paul B?umer", "Albert Kropp", "Haie Westhus", "Fredrich Müller", "Stanislaus Katczinsky", "Tjaden"],"tags": ["novel"],"copies": 1, "available": true, "section" : 3} { "index": {"_index": "library", "_type": "book", "_id": "2"}} { "title": "Catch-22","author": "Joseph Heller","year": 1961,"characters": ["John Yossarian", "Captain Aardvark", "Chaplain Tappman", "Colonel Cathcart", "Doctor Daneeka"],"tags": ["novel"],"copies": 6, "available" : false, "section" : 1} { "index": {"_index": "library", "_type": "book", "_id": "3"}} { "title": "The Complete Sherlock Holmes","author": "Arthur Conan Doyle","year": 1936,"characters": ["Sherlock Holmes","Dr. Watson", "G. Lestrade"],"tags": [],"copies": 0, "available" : false, "section" : 12} { "index": {"_index": "library", "_type": "book", "_id": "4"}} { "title": "Crime and Punishment","otitle": "Преступлéние и наказáние","author": "Fyodor Dostoevsky","year": 1886,"characters": ["Raskolnikov", "Sofia Semyonovna Marmeladova"],"tags": [],"copies": 0, "available" : true}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
為了執(zhí)行批量請求,Elasticsearch提供了_bulk端點,形式是/_bulk,或者是/index_name/_bulk, 甚至是/index_name/type_name/_bulk。
Elasticsearch會返回每個操作的具體的信息,因此對于大批量的請求,響應也是巨大的。
3)執(zhí)行結果如下所示:
[root@yang json_input]# curl -s -XPOST '10.0.1.30:9200/_bulk' --data-binary @documents_03.json {"took":150,"errors":false,"items":[{"index":{"_index":"library","_type":"book","_id":"1","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"2","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"3","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}},{"index":{"_index":"library","_type":"book","_id":"4","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"status":201}}]}- 1
- 2
執(zhí)行結果如下圖所示:?
3、使用Logstash將 log文件導入elasticsearch
以下以項目實戰(zhàn)的 MprobeDebug.log導入到ES中。
[root@yang logstash_conf]# tail -f MrobeDebug.log [DEBUG][2015-07-23 23:59:58,138] : After CurProbe.Update()....lineNo:233, function:main [DEBUG][2015-07-23 23:59:58,594] : lineNo:960, function:MNetworker::MessageTranslator, revoke nMsgRes = m_MsgPool.PeekMessage(CurMsg); [DEBUG][2015-07-23 23:59:58,608] : ProbeTaskType_FTP lineNo:148, function:TempProbe::Update ........- 1
- 2
- 3
- 4
- 5
核心配置文件要求如下:
[root@yang logstash_conf]# cat three.conf input { file { path=> "/opt/logstash/bin/logstash_conf/MrobeDebug.log" type=>"ttlog" } }output { elasticsearch { hosts => "110.10.11.120:9200" index => "tt_index" } stdout { codec => json_lines } }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
導入結果如下:?
合計導入數(shù)據(jù):200414條。?
4、從Mysql/Oracle關系型數(shù)據(jù)庫向Elasticsearch導入數(shù)據(jù)
參見:?
http://blog.csdn.net/laoyang360/article/details/51747266?
http://blog.csdn.net/laoyang360/article/details/51824617
5、從MongoDB非關系型數(shù)據(jù)庫向Elasticsearch導入數(shù)據(jù)
參見:?
http://blog.csdn.net/laoyang360/article/details/51842822
使用插件:mongo-connector?
1)mongo與副本集成員連接?
2)初始化副本集配置?
3)Mongo與ES同步操作
總結
以上是生活随笔為你收集整理的如何将不同类型数据导入Elaticsearch中?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Elasticsearch对外提供分词服
- 下一篇: ES6.X,你必须知道的API和相关技巧