Elasticsearch Pipeline 详解
文章目錄
- Ingest Node
- 簡(jiǎn)介 Ingest Node
- 簡(jiǎn)介 Pipeline、Processors
- Pipeline 定義
- 簡(jiǎn)介 Simulate Pipeline API
- 訪問(wèn) Pipeline 中的內(nèi)容
- Processors 類型詳解
- Append Processor
- Convert Processor
- Date Processor
- Date Index Name Processor
- Fail Processor
- Foreach Processor
- Grok Processor
- Gsub Processor
- Join Processor
- JSON Processor
- KV Processor
- Lowercase Processor
- Remove Processor
- Rename Processor
- Script Processor
- Set Processor
- Split Processor
- Sort Processor
- Trim Processor
- Uppercase Processor
- Dot Expander Processor
- 自定義 Processor
說(shuō)明
本文是建立在有一些 Elasticsearch 基礎(chǔ)和了解相關(guān) Pipeline 概念的人
Ingest Node
簡(jiǎn)介 Ingest Node
Ingest Node(預(yù)處理節(jié)點(diǎn))是 ES 用于功能上命名的一種節(jié)點(diǎn)類型,可以通過(guò)在 elasticsearch.xml 進(jìn)行如下配置來(lái)標(biāo)識(shí)出集群中的某個(gè)節(jié)點(diǎn)是否是 Ingest Node.
node.ingest: false上述將 node.ingest 設(shè)置成 false,則表明當(dāng)前節(jié)點(diǎn)不是 Ingest Node,不具有預(yù)處理能力,當(dāng)然 Elasticsearch 默認(rèn)所有節(jié)點(diǎn)都是 Ingest Node,即集群中所有的節(jié)點(diǎn)都具有預(yù)處理能力.
何為預(yù)處理呢?
用過(guò) Logstash 對(duì)日志進(jìn)行處理的用戶都知道,一般情況下我們并不會(huì)直接將原始的日志進(jìn)行加載到 Elasticsearch 集群,而是對(duì)原始日志信息進(jìn)行(深)加工后保存到 Elasticsearch 集群中.比如 Logstash 支持多種解析器比如 json,kv,date 等,比較經(jīng)典的是 grok.這里我們不會(huì)對(duì) Logstash 的解析器進(jìn)行詳細(xì)說(shuō)明,只是為了描述一個(gè)問(wèn)題,有些時(shí)候我們需要 Logstash 來(lái)對(duì)加載到 Elasticsearch 中的數(shù)據(jù)進(jìn)行處理,這個(gè)處理,從概念上而言,我們也能稱之為"預(yù)處理.而這里我們所說(shuō)的預(yù)處理也其實(shí)就是類似的概念.
可以這么說(shuō),在 Elasticsearch 沒(méi)有提供 IngestNode 這一概念時(shí),我們想對(duì)存儲(chǔ)在 Elasticsearch 里的數(shù)據(jù)在存儲(chǔ)之前進(jìn)行加工處理的話,我們只能依賴 Logstash 或自定義插件來(lái)完成這一功能,但是在 Elasticsearch 5.x 版本中,官方在內(nèi)部集成了部分 Logstash 的功能,這就是 Ingest,而具有 Ingest 能力的節(jié)點(diǎn)稱之為 Ingest Node.
請(qǐng)查看 Filter plugins 來(lái)了解更多關(guān)于 Logstash 解析器的內(nèi)容.
如果要脫離 Logstash 來(lái)對(duì)在 Elasticsearch 寫(xiě)入文檔之前對(duì)文檔進(jìn)行加工處理,比如為文檔某個(gè)字段設(shè)置默認(rèn)值,重命名某個(gè)字段,設(shè)置通過(guò)腳本來(lái)完成更加復(fù)雜的加工邏輯,我們則必須要了解兩個(gè)基本概念: Pipeline 和 Processors.
參考: Ingest Node 來(lái)了解更多關(guān)于上述兩個(gè)概念.下文只是簡(jiǎn)單說(shuō)明兩者.
簡(jiǎn)介 Pipeline、Processors
管道(Pipeline)是眾所周知的一個(gè)概念,Elasticsearch 引入這一概念,是為了讓那些有過(guò)工作經(jīng)驗(yàn)的人來(lái)說(shuō)更直白,更輕松的理解這一概念.本人是主要用 Java 進(jìn)行開(kāi)發(fā),這里就以 Pipeline 和 java 中的 Stream 進(jìn)行類比,兩者從功能和概念上很類似,我們經(jīng)常會(huì)對(duì) Stream 中的數(shù)據(jù)進(jìn)行處理,比如 map 操作,peek 操作,reduce 操作,count 操作等,這些操作從行為上說(shuō),就是對(duì)數(shù)據(jù)的加工,而 Pipeline 也是如此,Pipeline 也會(huì)對(duì)通過(guò)該 Pipeline 的數(shù)據(jù)(一般來(lái)說(shuō)是文檔)進(jìn)行加工,比如上面說(shuō)到的,修改文檔的某個(gè)字段值,修改文檔某個(gè)字段的類型等等.而 Elasticsearch 對(duì)該加工行為進(jìn)行抽象包裝,并稱之為 Processors.Elasticsearch 命名了多種類型的 Processors 來(lái)規(guī)范對(duì)文檔的操作,比如 set,append,date,join,json,kv 等等.這些不同類型的 Processors,我們會(huì)在后文進(jìn)行說(shuō)明
Pipeline 定義
定義一個(gè) Pipeline 是件很簡(jiǎn)單的事情,官方給出了參考:
PUT _ingest/pipeline/my-pipeline-id {"description" : "describe pipeline","processors" : [{"set" : {"field": "foo","value": "bar"}}] }上面的例子,表明通過(guò)指定的 URL 請(qǐng)求"_ingest/pipeline"定義了一個(gè) ID 為"my-pipeline-id"的 pipeline,其中請(qǐng)求體中的存在兩個(gè)必須要的元素:
- description 描述該 pipeline 是做什么的
- processors 定義了一系列的 processors,這里只是簡(jiǎn)單的定義了一個(gè)賦值操作,即將字段名為"foo"的字段值都設(shè)置為"bar"
如果需要了解更多關(guān)于 Pipeline 定義的信息,可以參考: Ingest APIs
簡(jiǎn)介 Simulate Pipeline API
既然 Elasticsearch 提供了預(yù)處理的能力,總不能是黑盒處理吧,為了讓開(kāi)發(fā)者更好的了解和使用預(yù)處理的方式和原理,官方也提供了相關(guān)的接口,來(lái)讓我們對(duì)這些預(yù)處理操作進(jìn)行測(cè)試,這些接口,官方稱之為: Simulate Pipeline API.想要深入了解 pipeline 以及 processors 的使用,我們基本離不開(kāi) Simulate Pipeline API 來(lái)輔助幫我們完成許多工作.
下面是一個(gè)比較簡(jiǎn)單的 Simulate Pipeline API 的例子:
POST _ingest/pipeline/_simulate {"pipeline" : {// pipeline definition here},"docs" : [{ "_source": {/** first document **/} },{ "_source": {/** second document **/} },// ...] }上面的例子中,在請(qǐng)求 URL 中并沒(méi)有明確指定使用哪個(gè) pipeline,這種情況下需要我們?cè)谡?qǐng)求體中即時(shí)定義一個(gè),當(dāng)然我們也可以在請(qǐng)求 URL 中指定一個(gè) pipeline 的 ID,如下面例子:
POST _ingest/pipeline/my-pipeline-id/_simulate {"docs" : [{ "_source": {/** first document **/} },{ "_source": {/** second document **/} },// ...] }這樣一來(lái),在請(qǐng)求體中我們定義的"docs"中的內(nèi)容就能夠使用該 pipeline 和該 pipeline 種的 processors,下面是一個(gè)比較具體的例子:
POST _ingest/pipeline/_simulate {"pipeline" :{"description": "_description","processors": [{"set" : {"field" : "field2","value" : "_value"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"foo": "bar"}},{"_index": "index","_type": "type","_id": "id","_source": {"foo": "rab"}}] }在上面具體的例子中,我們并沒(méi)有在請(qǐng)求 URL 中指定使用哪個(gè) pipeline,因此我們不得不在請(qǐng)求體中即時(shí)定義一個(gè) pipeline 和對(duì)應(yīng)的 processors,例子很簡(jiǎn)單,只是讓通過(guò)該 pipeline 的的文檔中的字段名稱為"field2"的字段值為"_value",文檔我們也指定了,就是"docs"中定義的文檔,該請(qǐng)求的響應(yīng)信息如下:
{"docs": [{"doc": {"_id": "id","_index": "index","_type": "type","_source": {"field2": "_value","foo": "bar"},"_ingest": {"timestamp": "2017-05-04T22:30:03.187Z"}}},{"doc": {"_id": "id","_index": "index","_type": "type","_source": {"field2": "_value","foo": "rab"},"_ingest": {"timestamp": "2017-05-04T22:30:03.188Z"}}}] }從具體的響應(yīng)結(jié)果中看到,在文檔通過(guò) pipeline 時(shí)(或理解為被 pipeline 中的 processors 加工后),新的文檔與原有的文檔產(chǎn)生了差異,這些差異體現(xiàn)為:
訪問(wèn) Pipeline 中的內(nèi)容
如果我們只是定義一個(gè) Pipeline 而不能訪問(wèn)該 Pipeline 的上下文信息,那么設(shè)計(jì) Pipeline 就顯然是多此一舉.與 logstash 不同的時(shí)候,logstash 環(huán)境與 Elasticsearch 環(huán)境是隔離的,兩者無(wú)法形成上下文環(huán)境,而 Pipeline 則不同,天然的集成在 Elasticsearch 中,從而很好的與其形成上線文環(huán)境,這個(gè)環(huán)境讓 Pipeline 能夠更充分的利用起來(lái),比如與 Pipeline 能夠形成最直接的上線文關(guān)系的是文檔信息,由此可見(jiàn),我們可以在 Pipeline(應(yīng)該是 processors 中)能夠直接訪問(wèn)通過(guò) pipeline 的文檔信息,如文檔的字段,元數(shù)據(jù)信息.
下面的例子說(shuō)明了我們?nèi)绾卧?processors 中訪問(wèn)這些上下文信息.
訪問(wèn)源文檔字段
{"set": {"field": "my_field""value": 582.1} }這個(gè)例子直接引用了通過(guò)該 Pipeline 內(nèi)的文檔字為"my_field"的字段,并將所有文檔該字段的值設(shè)置為 582.1.
或者我們也可以通過(guò)_source 字段來(lái)訪問(wèn)源文檔的字段,如下
{"set": {"field": "_source.my_field""value": 582.1} }訪問(wèn)文檔的元數(shù)據(jù)字段
每個(gè)文檔都會(huì)有一些元數(shù)據(jù)字段信息(metadata filed),比如_id,_index,_type 等,我們?cè)?processors 中也可以直接訪問(wèn)這些信息的,比如下面的例子:
{"set": {"field": "_id""value": "1"} }直接訪問(wèn)文檔的_id 字段,并設(shè)置該值為 1
訪問(wèn)瞬態(tài)元字段(ingest metadata field)
這個(gè)地方并沒(méi)有直接翻譯官方名稱,因?yàn)椴缓美斫?我使用了我自己的理解意圖去翻譯該名稱.其實(shí)含義是一致的,官方稱之為ingest metadata field,翻譯過(guò)來(lái)且為"攝取元數(shù)據(jù)字段?"(請(qǐng)高人翻譯,我實(shí)在無(wú)法很好的翻譯出來(lái)),從官方對(duì)該詞的釋義中可以理解出來(lái),這些字段是臨時(shí)保存的,在這些文檔被處理完成之后返回給對(duì)應(yīng)的批量請(qǐng)求和索引請(qǐng)求的時(shí)候,這些字段不會(huì)一并返回,因此這里稱之為"瞬態(tài)元字段",下面是一個(gè)如何訪問(wèn)這些字段的例子:
{"set": {"field": "received""value": "{{_ingest.timestamp}}"} }該例子會(huì)在文檔中臨時(shí)加入一個(gè)名稱為"received"的字段,值為瞬態(tài)元字段中的 timestamp,以{{_ingest.timestamp}}的格式來(lái)進(jìn)行訪問(wèn),看到{{}}是不是有種很熟悉的感覺(jué),因?yàn)橹挥性谏婕吧舷挛牡沫h(huán)境中一般才會(huì)使用這種表達(dá)式.
Processors 類型詳解
- Append Processor 追加處理器
- Convert Processor 轉(zhuǎn)換處理器
- Date Processor 日期轉(zhuǎn)換器
- Date Index Name Processor 日期索引名稱處理器
- Fail Processor 失敗處理器
- Foreach Processor 循環(huán)處理器
- Grok Processor Grok 處理器
- Gsub Processor
- Join Processor
- JSON Processor
- KV Processor
- Lowercase Processor
- Remove Processor
- Rename Processor
- Script Processor
- Split Processor
- Sort Processor
- Trim Processor
- Uppercase Processor
- Dot Expander Processor
Append Processor
顧名思義,追加處理器.就是當(dāng)我們加載原始文檔到 Elasticsearch 的時(shí)候,某些字段的描述信息可能需要我們定制性的增加一些額外說(shuō)明.
使用方法如下:
{"append": {"field": "field1","value": ["item2", "item3", "item4"]} }比如我們需要從第三方新導(dǎo)入一批商品,因?yàn)槟撤N原因這批新商品必須要打上一個(gè)"家居"的標(biāo)簽來(lái)表示該批商品是"家居分類",當(dāng)然一個(gè)商品也可能不止一個(gè)分類,這個(gè)時(shí)候要求我們?cè)趯?dǎo)入這批文檔數(shù)據(jù)的時(shí)候,需要在原有的標(biāo)簽字段中新增一個(gè)"家居"標(biāo)簽,這個(gè)時(shí)候我們就通過(guò)追加處理器來(lái)完成.
下面的例子我們通過(guò) Simulate Pipeline API 來(lái)幫助我們完成
POST _ingest/pipeline/_simulate {"pipeline": {"description": "新增一個(gè)家居標(biāo)簽","processors": [{"append": {"field": "tag","value": ["家居"]}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000002","tag": "沙發(fā)"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000003"}}] }返回結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000001","tag": ["衣柜","家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000002","tag": ["沙發(fā)","家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000003","tag": ["家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}}] }我們可以看到,通過(guò)該處理器處理之后,新的文檔中的 tag 字段都新增了一個(gè)"家居"標(biāo)簽,同時(shí)如果原文檔沒(méi)有該字段,則會(huì)新建該字段.
Convert Processor
該類型的 Processor 是用于在寫(xiě)入某些文檔之前對(duì)該文檔的字段類型進(jìn)行轉(zhuǎn)換,比如文檔中有個(gè)字符串類型的"價(jià)格"字段,我們希望將其轉(zhuǎn)換成 float 類型,則我們可以使用該處理器來(lái)完成這項(xiàng)操作,實(shí)現(xiàn)如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "修改字符串類型為double類型","processors": [{"convert": {"field": "price","type": "float"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜","price":"999.8"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000002","tag": "沙發(fā)","price":"899.8"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000003","price":"799.8"}}] }測(cè)試結(jié)果如下:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000001","tag": "衣柜","price": 999.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.663Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000002","tag": "沙發(fā)","price": 899.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.664Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000003","price": 799.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.664Z"}}}] }可以看到,新的文檔,price 字段將不再是字符串類型了,而是 float 類型.
使用 Convert Processor 需要注意的是,目前官方支持轉(zhuǎn)換的類型有如下幾個(gè):integer, float, string, boolean, and auto。
關(guān)于更多該處理器的使用說(shuō)明,請(qǐng)參考: https://www.elastic.co/guide/en/elasticsearch/reference/current/convert-processor.html
Date Processor
日期處理器,顧名思義,就是將原文檔中的某個(gè)日期字段轉(zhuǎn)換成一個(gè) Elasticsearch 識(shí)別的時(shí)間戳字段(一般默認(rèn)為@timestamp),該時(shí)間戳字段將會(huì)新增到原文檔中.(目前發(fā)現(xiàn)該功能比較雞肋,因?yàn)檗D(zhuǎn)換后的時(shí)間格式只能是 ISO 8601 時(shí)間格式的,并不能轉(zhuǎn)換成其他格式,因此很少用)
下面是一個(gè)使用 Date Processor 的例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "轉(zhuǎn)換成指定格式的日期格式字段","processors": [{"date": {"field": "date","formats": ["UNIX"],"timezone": "Asia/Shanghai"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜","price":"999.8","date":"1511696379"}}] }就是將原有的字符串表示的日期格式進(jìn)行格式轉(zhuǎn)換(最終轉(zhuǎn)換成的是 ISO 時(shí)間格式),結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"date": "1511696379","code": "000001","@timestamp": "2017-11-26T19:39:39.000+08:00","price": "999.8","tag": "衣柜"},"_ingest": {"timestamp": "2017-11-26T12:01:16.787Z"}}}] }我們可以看到多了一個(gè)默認(rèn)字段@timestamp,該字段名稱可以通過(guò) target_field 字段來(lái)指定該字段名稱.
為什么說(shuō)只能生成 ISO 格式的時(shí)間呢?這方面的資料官方?jīng)]有提供,只能自己去源碼中看,這里
public void execute(IngestDocument ingestDocument) {String value = ingestDocument.getFieldValue(field, String.class);DateTime dateTime = null;Exception lastException = null;for (Function<String, DateTime> dateParser : dateParsers) {try {dateTime = dateParser.apply(value);} catch (Exception e) {//try the next parser and keep track of the exceptionslastException = ExceptionsHelper.useOrSuppress(lastException, e);}}if (dateTime == null) {throw new IllegalArgumentException("unable to parse date [" + value + "]", lastException);}ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));}其中最后一行,在設(shè)置字段的時(shí)候,發(fā)現(xiàn)源碼中已經(jīng)寫(xiě)死為 ISODateTimeFormat.個(gè)人感覺(jué)官方應(yīng)該會(huì)自定義 pattern.
Date Index Name Processor
Date Index Name Processor 算得上是一個(gè)比較強(qiáng)大的處理了,它的目的是讓通過(guò)該處理器的文檔能夠分配到符合指定時(shí)間格式的索引中,前提是按照官方提供的說(shuō)明來(lái)進(jìn)行使用:
我們先創(chuàng)建一個(gè)該類型的 pipeline,如下:
curl -XPUT 'localhost:9200/_ingest/pipeline/monthlyindex?pretty' -H 'Content-Type: application/json' -d' {"description": "monthly date-time index naming","processors" : [{"date_index_name" : {"field" : "date1","index_name_prefix" : "myindex-","date_rounding" : "M"}}] } '我們?cè)賱?chuàng)建一個(gè)文檔,如下(該文檔會(huì)使用該 pipeline)
curl -XPUT 'localhost:9200/myindex/type/1?pipeline=monthlyindex&pretty' -H 'Content-Type: application/json' -d' {"date1" : "2016-04-25T12:02:01.789Z" } '結(jié)果如下:
{"_index" : "myindex-2016-04-01","_type" : "type","_id" : "1","_version" : 1,"result" : "created","_shards" : {"total" : 2,"successful" : 1,"failed" : 0},"created" : true }以上請(qǐng)求將不會(huì)將此 document(文檔)放入 myindex 索引,而是放入到 myindex-2016-04-01 索引中.
這就是日期索引名稱處理器(Date Index Name Processor)強(qiáng)大的地方,我們寫(xiě)入的文檔可以根據(jù)其中的某個(gè)日期格式的字段來(lái)指定該文檔將寫(xiě)入哪個(gè)索引中,該功能配上 template,能夠?qū)崿F(xiàn)很強(qiáng)大的日志收集功能,比如按月按天來(lái)將日志寫(xiě)入 Elasticsearch.
如果想了解 Date Index Name Processor 的使用,請(qǐng)參考:Date Index Name Processor
Fail Processor
該處理器比較簡(jiǎn)單,就是當(dāng)文檔通過(guò)該 pipeline 的時(shí)候,一旦出現(xiàn)異常,該 pipeline 指定的錯(cuò)誤信息就會(huì)返回給請(qǐng)求者.
POST _ingest/pipeline/_simulate {"pipeline": {"description": "monthly date-time index naming","processors" : [{"fail": {"message": "an error message"}}]}, "docs": [{"_index": "myindex","_type": "type","_id": "id","_source": {"id":"1"}}] }結(jié)果如下:
{"docs": [{"error": {"root_cause": [{"type": "exception","reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.common.FailProcessorException: an error message","header": {"processor_type": "fail"}}],"type": "exception","reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.common.FailProcessorException: an error message","caused_by": {"type": "illegal_argument_exception","reason": "org.elasticsearch.ingest.common.FailProcessorException: an error message","caused_by": {"type": "fail_processor_exception","reason": "an error message"}},"header": {"processor_type": "fail"}}}] }Foreach Processor
一個(gè) Foreach Processor 是用來(lái)處理一些數(shù)組字段,數(shù)組內(nèi)的每個(gè)元素都會(huì)使用到一個(gè)相同的處理器,比如
POST _ingest/pipeline/_simulate {"pipeline": {"description": "foreach processor","processors" : [{"foreach": {"field": "values","processor": {"uppercase": {"field": "_ingest._value"}}}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"id":"1","values":["hello","world","felayman"]}}] }上面的例子中,文檔中的 values 字段是一個(gè)數(shù)組類型的字段,其中每個(gè)元素都需要使用 Foreach Processor 中的一個(gè)共同的 uppercase Processor 來(lái)保證該字段中的每個(gè)元素都能執(zhí)行相同的操作
結(jié)果如下:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"id": "1","values": ["HELLO","WORLD","FELAYMAN"]},"_ingest": {"_value": null,"timestamp": "2017-11-27T08:55:17.496Z"}}}] }關(guān)于每個(gè) Processor,深入下去都有許多需要說(shuō)明的,這里沒(méi)有深入,只是讓大家了解其基本使用。
詳情請(qǐng)參考: Foreach Processore
Grok Processor
Grok Processor 可以算的上是一個(gè)比較實(shí)用的處理器了,會(huì)經(jīng)常使用到日志格式切割上,有用過(guò) logstash 的用戶應(yīng)該都知道 Grok 的強(qiáng)大.這里并不會(huì)涉及到到 Grok 詳細(xì)的語(yǔ)法知識(shí).這里我們只是略過(guò)的說(shuō)明 Elasticsearch 中的ingest node 中的 pipeline 也能提供 logstash 中的 Grok 的功能.
如下的使用例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "grok processor","processors" : [{"grok": {"field": "message","patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": "55.3.244.1 GET /index.html 15824 0.043"}}] }上面例子中的日志可能是 nginx 的日志,格式為:
55.3.244.1 GET /index.html 15824 0.043對(duì)應(yīng)的 Grok 語(yǔ)法為:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}因此該文本格式的日志信息在經(jīng)過(guò) Grok Processor 之后,能夠解析成一個(gè)標(biāo)準(zhǔn)文檔的格式,該文檔可以使用 Elasticsearch 提供的檢索和聚合功能充分使用到,而原始的文本格式的日志信息則無(wú)法做到這一點(diǎn)
返回結(jié)果如下:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"duration": "0.043","request": "/index.html","method": "GET","bytes": "15824","client": "55.3.244.1","message": "55.3.244.1 GET /index.html 15824 0.043"},"_ingest": {"timestamp": "2017-11-27T09:07:40.782Z"}}}] }可以看到,對(duì)應(yīng)的切割點(diǎn)都轉(zhuǎn)換成文檔中的一個(gè)字段.
關(guān)于 Grok Processor 的詳細(xì)介紹,請(qǐng)參考:Grok Processor
Gsub Processor
Gsub Processor 能夠解決一些字符串中才特有的問(wèn)題,比如我想把字符串格式的日期格式如"yyyy-MM-dd HH:mm:ss"轉(zhuǎn)換成"yyyy/MM/dd HH:mm:ss"的格式,我們可以借助于 Gsub Processor 來(lái)解決,而 Gsub Processor 也正是利用正則來(lái)完成這一任務(wù)的.
如:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "gsub processor","processors" : [{"gsub": {"field": "message","pattern": "-","replacement": "/"}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": "2017-11-27 00:00:00"}}] }返回結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "2017/11/27 00:00:00"},"_ingest": {"timestamp": "2017-11-27T09:15:05.502Z"}}}] }就將 message 字段中的日期格式修改成我們想要的格式了.
更多關(guān)于 Gsub Processor 的介紹,請(qǐng)參考 Gsub Processor
Join Processor
Join Processor 能夠?qū)⒃疽粋€(gè)數(shù)組類型的字段值,分解成以指定分隔符分割的一個(gè)字符串.
如下例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "join processor","processors": [{"join": {"field": "message","separator": "、、、、"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": ["hello","world","felayman"]}}] }結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello、、、、world、、、、felayman"},"_ingest": {"timestamp": "2017-11-27T09:25:41.260Z"}}}] }可以看到,原文檔中數(shù)組格式的的 message 字段,經(jīng)過(guò) Join Processor 處理后,變成了字符串類型的字段"hello、、、、world、、、、felayman".
更多關(guān)于 Join Processor 的內(nèi)容,請(qǐng)參考:Join Processor
JSON Processor
JSON Processor 也是用來(lái)處理字符串類型的字段,可以將那些符合 JSON 格式(或被 JSON 串化)的文本,在經(jīng)過(guò) JSON Processor 加工之后,解析成對(duì)應(yīng)的 JSON 格式,如下例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "join processor","processors": [{"json": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"{\"foo\": 2000}"}}] }結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": {"foo": 2000}},"_ingest": {"timestamp": "2017-11-27T09:30:28.546Z"}}}] }可以看到,原本是字符串格式的字段 message,在處理之后變成了一個(gè)標(biāo)準(zhǔn)的 JSON 格式.
更多關(guān)于 JSON Processor 的內(nèi)容,請(qǐng)參考:JSON Processor
KV Processor
KV Processor 用來(lái) K,V 字符串格式的處理器,比如 K=V, K:V,K->V 等格式的解析.
如下例:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "kv processor","processors": [{"kv": {"field": "message","field_split":" ","value_split": ":"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"ip:127.0.0.1"}}] }其中字段 message 的原始值為"“ip:127.0.0.1"”,我們想將該格式的內(nèi)容新增一個(gè)獨(dú)立的字段如 ip:127.0.0.1 這種格式.
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "ip:107.0.0.1","ip": "107.0.0.1"},"_ingest": {"timestamp": "2017-11-27T09:46:48.715Z"}}}] }更多關(guān)于 KV Processor 的內(nèi)容,請(qǐng)參考:KV Processor
Lowercase Processor
Lowercase Processor 也是一個(gè)專用于字符串類型的字段處理器,顧名思義,是將字符串都轉(zhuǎn)換成小寫(xiě)格式.
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "lowercase processor","processors": [{"lowercase": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"HELLO,WORLD,FELAYMAN."}}] }結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello,world,felayman."},"_ingest": {"timestamp": "2017-11-27T09:49:58.564Z"}}}] }可以看到原文檔中的 message 字段的值,在使用 Lowercase Processor 之后,都變成大寫(xiě)的了.
更多關(guān)于 Lowercase Processor 的內(nèi)容,請(qǐng)參考:Lowercase Processor
Remove Processor
Remove Processor 是用來(lái)處理在寫(xiě)入文檔之前,刪除原文檔中的某些字段值的.
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "remove processor","processors": [{"remove": {"field": "extra_field"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"extra_field":"extra_field","message":" hello, felayman."}}] }在經(jīng)過(guò) Remove Processor 處理后,extra_field 字段將不存在了.
結(jié)果為下:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": " hello, felayman."},"_ingest": {"timestamp": "2017-11-27T09:58:46.038Z"}}}] }Rename Processor
Rename Processor 是用來(lái)處理在文檔寫(xiě)入 Elasticsearch 之前修改某個(gè)文檔的字段的名稱
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "remove processor","processors": [{"rename": {"field": "old_name","target_field": "new_name"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"old_name":"old_name"}}] }會(huì)發(fā)現(xiàn)原文檔的字段 old_name 被重新命名為 new_name 字段
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"new_name": "old_name"},"_ingest": {"timestamp": "2017-11-27T10:00:48.694Z"}}}] }Script Processor
Script Processor 算的上是最強(qiáng)大的處理了,因?yàn)槟艹浞掷?Elasticsearch 提供的腳本能力.
這里也不會(huì)詳細(xì)介紹 Elasticsearch 中的腳本如何使用,有關(guān)信息,請(qǐng)參考:Script
我么看下在 Script Processor 中使用腳本的例子:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "script processor","processors": [{"script": {"lang": "painless","inline": "ctx.viewCount = (ctx.viewCount) *10"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"viewCount": 100}}] }這里我們通過(guò)腳本讓原文檔中的 viewCount 字段的值擴(kuò)大十倍,結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"viewCount": 1000},"_ingest": {"timestamp": "2017-11-27T10:07:56.112Z"}}}] }Set Processor
Set Processor 作用于兩種不同情況:
例子如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "set processor","processors": [{"set": {"field": "category","value": "家居"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"id":"0001"}}] }這里我們?yōu)槊總€(gè)使用 Set Processor 的文檔新增一個(gè)分類"家居"
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"id": "0001","category": "家居"},"_ingest": {"timestamp": "2017-11-27T10:12:05.306Z"}}}] }Split Processor
Split Processor 用于將一個(gè)以指定分隔分開(kāi)的字符串轉(zhuǎn)換成一個(gè)數(shù)組類型的字段
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "Split processor","processors": [{"split": {"field": "message","separator": "-"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello-world"}}] }其中原文檔的 message 字段值將會(huì)有"hello-world"轉(zhuǎn)換為[“hello”,“world”]
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": ["hell","world"]},"_ingest": {"timestamp": "2017-11-27T10:13:35.982Z"}}}] }Sort Processor
Sort Processor 用于處理數(shù)組類型的字段,可以將存儲(chǔ)在原文檔中某個(gè)數(shù)組類型的字段中的元素按照升序或降序來(lái)對(duì)原元素進(jìn)行排序
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "sort processor","processors": [{"sort": {"field": "category","order": "asc"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"category":[2,3,4,1]}}] }我們使用升序來(lái)修改原文檔 category 字段的值的存儲(chǔ)排序
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"category": [1,2,3,4]},"_ingest": {"timestamp": "2017-11-27T10:16:16.409Z"}}}] }Trim Processor
哎,Elasticsearch 開(kāi)發(fā)者真的沒(méi)誰(shuí)了,基本上能把 String 類中的方法都拿過(guò)來(lái)搬一套過(guò)來(lái)使用.
Trim Processor 是專門(mén)用于處理字符串兩端的空格問(wèn)題,如下
POST _ingest/pipeline/_simulate {"pipeline": {"description": "trim processor","processors": [{"trim": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":" hello, felayman."}}] }請(qǐng)注意,是去除字符串兩端的空格.
結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello, felayman."},"_ingest": {"timestamp": "2017-11-27T09:56:17.946Z"}}}] }Uppercase Processor
該處理器類似于 Lowercase Processor,將字符串文本統(tǒng)一轉(zhuǎn)換成大寫(xiě).
如下:
POST _ingest/pipeline/_simulate {"pipeline": {"description": "uppercase processor","processors": [{"uppercase": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello,world,felayman."}}] }結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "HELLO,WORLD,FELAYMAN."},"_ingest": {"timestamp": "2017-11-27T09:53:33.672Z"}}}] }更多關(guān)于 Uppercase Processor 的內(nèi)容,請(qǐng)參考:Uppercase Processor
Dot Expander Processor
自定義 Processor
如果發(fā)現(xiàn)官方提供的 Processor 不滿足我們的加工邏輯怎么辦?不用擔(dān)心,官方提供了很好的插件機(jī)制來(lái)幫助我們對(duì)其進(jìn)行擴(kuò)展,想實(shí)現(xiàn)一個(gè)自定義的 Processor 需要兩個(gè)過(guò)程:
- 實(shí)現(xiàn) IngestPlugin 接口,并實(shí)現(xiàn) IngestPlugin 中的 getProcessors 方法
- 實(shí)現(xiàn)自定義的 Processor
下面我們給一個(gè)具體的例子,我們將任何傳入的一個(gè)字段的值都變成大寫(xiě)(最簡(jiǎn)化模型,不指定字段,使用_ingest/pipeline/_simulate api 模擬該操作)
新建一個(gè)項(xiàng)目,名稱為 elasticsearch-help,新建 FirstUpperPlugin 類,如下:
package org.elasticsearch.help; import org.elasticsearch.help.processor.FirstUpperProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import java.util.Collections; import java.util.Map;/*** @auhthor lei.fang@shijue.me* @since . 2017-11-26*/ public class FirstUpperPlugin extends Plugin implements IngestPlugin{@Overridepublic Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {return Collections.singletonMap(FirstUpperProcessor.TYPE,new FirstUpperProcessor.Factory());} }其中我們實(shí)現(xiàn)了 getProcessors 方法,該方法主要用來(lái)提供我們自定義的額 Processor,其中 FirstUpperProcessor 源碼如下:
package org.elasticsearch.help.processor; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; import java.util.Map;/*** @auhthor lei.fang@shijue.me* @since . 2017-11-26*/ public class FirstUpperProcessor extends AbstractProcessor {public static final String TYPE = "firstUpper";private final String field;public FirstUpperProcessor(String tag,String field) {super(tag);this.field = field;}@Overridepublic void execute(IngestDocument ingestDocument) throws Exception {ingestDocument.setFieldValue(field,field.toUpperCase());}@Overridepublic String getType() {return TYPE;}public static final class Factory implements Processor.Factory {public FirstUpperProcessor create(Map<String, Processor.Factory> registry, String processorTag,Map<String, Object> config) throws Exception {String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");return new FirstUpperProcessor(processorTag, field);}}}可以看到,其中核心的方法為 execute,主要是將傳入的字段的值轉(zhuǎn)換成大寫(xiě),這里只是調(diào)用了簡(jiǎn)單的 toUpperCase()方法.
我們使用_ingest/pipeline/_simulate api 來(lái)進(jìn)行測(cè)試:
curl -XGET 'http://localhost:9200/_ingest/pipeline/_simulate?pretty ' -d '{"pipeline": {"description": "字符串首字母轉(zhuǎn)換成大寫(xiě)","processors": [{"firstUpper":{"field":"message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello,world."}}] }'結(jié)果為:
{"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message":"Hello,world."},"_ingest": {"timestamp": "2017-11-27T10:16:16.409Z"}}}] }可以看到,自定義的 Ingest node 插件成功了.
完結(jié).
總結(jié)
以上是生活随笔為你收集整理的Elasticsearch Pipeline 详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 关于在头文件中定义变量
- 下一篇: equal_range