日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Elasticsearch Pipeline 详解

發(fā)布時(shí)間:2024/8/23 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Elasticsearch Pipeline 详解 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

    • Ingest Node
      • 簡(jiǎn)介 Ingest Node
      • 簡(jiǎn)介 Pipeline、Processors
      • Pipeline 定義
      • 簡(jiǎn)介 Simulate Pipeline API
      • 訪問(wèn) Pipeline 中的內(nèi)容
      • Processors 類型詳解
        • Append Processor
        • Convert Processor
        • Date Processor
        • Date Index Name Processor
        • Fail Processor
        • Foreach Processor
        • Grok Processor
        • Gsub Processor
        • Join Processor
        • JSON Processor
        • KV Processor
        • Lowercase Processor
        • Remove Processor
        • Rename Processor
        • Script Processor
        • Set Processor
        • Split Processor
        • Sort Processor
        • Trim Processor
        • Uppercase Processor
        • Dot Expander Processor
      • 自定義 Processor

說(shuō)明

本文是建立在有一些 Elasticsearch 基礎(chǔ)和了解相關(guān) Pipeline 概念的人

Ingest Node

簡(jiǎn)介 Ingest Node

Ingest Node(預(yù)處理節(jié)點(diǎn))是 ES 用于功能上命名的一種節(jié)點(diǎn)類型,可以通過(guò)在 elasticsearch.xml 進(jìn)行如下配置來(lái)標(biāo)識(shí)出集群中的某個(gè)節(jié)點(diǎn)是否是 Ingest Node.

node.ingest: false

上述將 node.ingest 設(shè)置成 false,則表明當(dāng)前節(jié)點(diǎn)不是 Ingest Node,不具有預(yù)處理能力,當(dāng)然 Elasticsearch 默認(rèn)所有節(jié)點(diǎn)都是 Ingest Node,即集群中所有的節(jié)點(diǎn)都具有預(yù)處理能力.

何為預(yù)處理呢?

用過(guò) Logstash 對(duì)日志進(jìn)行處理的用戶都知道,一般情況下我們并不會(huì)直接將原始的日志進(jìn)行加載到 Elasticsearch 集群,而是對(duì)原始日志信息進(jìn)行(深)加工后保存到 Elasticsearch 集群中.比如 Logstash 支持多種解析器比如 json,kv,date 等,比較經(jīng)典的是 grok.這里我們不會(huì)對(duì) Logstash 的解析器進(jìn)行詳細(xì)說(shuō)明,只是為了描述一個(gè)問(wèn)題,有些時(shí)候我們需要 Logstash 來(lái)對(duì)加載到 Elasticsearch 中的數(shù)據(jù)進(jìn)行處理,這個(gè)處理,從概念上而言,我們也能稱之為"預(yù)處理.而這里我們所說(shuō)的預(yù)處理也其實(shí)就是類似的概念.

可以這么說(shuō),在 Elasticsearch 沒(méi)有提供 IngestNode 這一概念時(shí),我們想對(duì)存儲(chǔ)在 Elasticsearch 里的數(shù)據(jù)在存儲(chǔ)之前進(jìn)行加工處理的話,我們只能依賴 Logstash 或自定義插件來(lái)完成這一功能,但是在 Elasticsearch 5.x 版本中,官方在內(nèi)部集成了部分 Logstash 的功能,這就是 Ingest,而具有 Ingest 能力的節(jié)點(diǎn)稱之為 Ingest Node.

請(qǐng)查看 Filter plugins 來(lái)了解更多關(guān)于 Logstash 解析器的內(nèi)容.

如果要脫離 Logstash 來(lái)對(duì)在 Elasticsearch 寫(xiě)入文檔之前對(duì)文檔進(jìn)行加工處理,比如為文檔某個(gè)字段設(shè)置默認(rèn)值,重命名某個(gè)字段,設(shè)置通過(guò)腳本來(lái)完成更加復(fù)雜的加工邏輯,我們則必須要了解兩個(gè)基本概念: Pipeline 和 Processors.

參考: Ingest Node 來(lái)了解更多關(guān)于上述兩個(gè)概念.下文只是簡(jiǎn)單說(shuō)明兩者.

簡(jiǎn)介 Pipeline、Processors

管道(Pipeline)是眾所周知的一個(gè)概念,Elasticsearch 引入這一概念,是為了讓那些有過(guò)工作經(jīng)驗(yàn)的人來(lái)說(shuō)更直白,更輕松的理解這一概念.本人是主要用 Java 進(jìn)行開(kāi)發(fā),這里就以 Pipeline 和 java 中的 Stream 進(jìn)行類比,兩者從功能和概念上很類似,我們經(jīng)常會(huì)對(duì) Stream 中的數(shù)據(jù)進(jìn)行處理,比如 map 操作,peek 操作,reduce 操作,count 操作等,這些操作從行為上說(shuō),就是對(duì)數(shù)據(jù)的加工,而 Pipeline 也是如此,Pipeline 也會(huì)對(duì)通過(guò)該 Pipeline 的數(shù)據(jù)(一般來(lái)說(shuō)是文檔)進(jìn)行加工,比如上面說(shuō)到的,修改文檔的某個(gè)字段值,修改文檔某個(gè)字段的類型等等.而 Elasticsearch 對(duì)該加工行為進(jìn)行抽象包裝,并稱之為 Processors.Elasticsearch 命名了多種類型的 Processors 來(lái)規(guī)范對(duì)文檔的操作,比如 set,append,date,join,json,kv 等等.這些不同類型的 Processors,我們會(huì)在后文進(jìn)行說(shuō)明

Pipeline 定義

定義一個(gè) Pipeline 是件很簡(jiǎn)單的事情,官方給出了參考:

PUT _ingest/pipeline/my-pipeline-id {"description" : "describe pipeline","processors" : [{"set" : {"field": "foo","value": "bar"}}] }

上面的例子,表明通過(guò)指定的 URL 請(qǐng)求"_ingest/pipeline"定義了一個(gè) ID 為"my-pipeline-id"的 pipeline,其中請(qǐng)求體中的存在兩個(gè)必須要的元素:

  • description 描述該 pipeline 是做什么的
  • processors 定義了一系列的 processors,這里只是簡(jiǎn)單的定義了一個(gè)賦值操作,即將字段名為"foo"的字段值都設(shè)置為"bar"

如果需要了解更多關(guān)于 Pipeline 定義的信息,可以參考: Ingest APIs

簡(jiǎn)介 Simulate Pipeline API

既然 Elasticsearch 提供了預(yù)處理的能力,總不能是黑盒處理吧,為了讓開(kāi)發(fā)者更好的了解和使用預(yù)處理的方式和原理,官方也提供了相關(guān)的接口,來(lái)讓我們對(duì)這些預(yù)處理操作進(jìn)行測(cè)試,這些接口,官方稱之為: Simulate Pipeline API.想要深入了解 pipeline 以及 processors 的使用,我們基本離不開(kāi) Simulate Pipeline API 來(lái)輔助幫我們完成許多工作.

下面是一個(gè)比較簡(jiǎn)單的 Simulate Pipeline API 的例子:

POST _ingest/pipeline/_simulate {"pipeline" : {// pipeline definition here},"docs" : [{ "_source": {/** first document **/} },{ "_source": {/** second document **/} },// ...] }

上面的例子中,在請(qǐng)求 URL 中并沒(méi)有明確指定使用哪個(gè) pipeline,這種情況下需要我們?cè)谡?qǐng)求體中即時(shí)定義一個(gè),當(dāng)然我們也可以在請(qǐng)求 URL 中指定一個(gè) pipeline 的 ID,如下面例子:

POST _ingest/pipeline/my-pipeline-id/_simulate {"docs" : [{ "_source": {/** first document **/} },{ "_source": {/** second document **/} },// ...] }

這樣一來(lái),在請(qǐng)求體中我們定義的"docs"中的內(nèi)容就能夠使用該 pipeline 和該 pipeline 種的 processors,下面是一個(gè)比較具體的例子:

POST _ingest/pipeline/_simulate {"pipeline" :{"description": "_description","processors": [{"set" : {"field" : "field2","value" : "_value"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"foo": "bar"}},{"_index": "index","_type": "type","_id": "id","_source": {"foo": "rab"}}] }

在上面具體的例子中,我們并沒(méi)有在請(qǐng)求 URL 中指定使用哪個(gè) pipeline,因此我們不得不在請(qǐng)求體中即時(shí)定義一個(gè) pipeline 和對(duì)應(yīng)的 processors,例子很簡(jiǎn)單,只是讓通過(guò)該 pipeline 的的文檔中的字段名稱為"field2"的字段值為"_value",文檔我們也指定了,就是"docs"中定義的文檔,該請(qǐng)求的響應(yīng)信息如下:

{"docs": [{"doc": {"_id": "id","_index": "index","_type": "type","_source": {"field2": "_value","foo": "bar"},"_ingest": {"timestamp": "2017-05-04T22:30:03.187Z"}}},{"doc": {"_id": "id","_index": "index","_type": "type","_source": {"field2": "_value","foo": "rab"},"_ingest": {"timestamp": "2017-05-04T22:30:03.188Z"}}}] }

從具體的響應(yīng)結(jié)果中看到,在文檔通過(guò) pipeline 時(shí)(或理解為被 pipeline 中的 processors 加工后),新的文檔與原有的文檔產(chǎn)生了差異,這些差異體現(xiàn)為:

  • 文檔都新增了 field2 字段,這點(diǎn)我們可以通過(guò)對(duì)比響應(yīng)前后的定義在"docs"中文檔的_source 內(nèi)容中看出
  • 額外增加了一些臨時(shí)(官方稱之為瞬態(tài))的字段,比如 timestamp,他們都在_ingest 節(jié)點(diǎn)下,(這些)字段都是臨時(shí)與源文檔存在一起,在被 pipeline 中的 processors 加工后后返回給對(duì)應(yīng)的批量操作或索引操作之后,這些信息就不會(huì)攜帶返回.
  • 訪問(wèn) Pipeline 中的內(nèi)容

    如果我們只是定義一個(gè) Pipeline 而不能訪問(wèn)該 Pipeline 的上下文信息,那么設(shè)計(jì) Pipeline 就顯然是多此一舉.與 logstash 不同的時(shí)候,logstash 環(huán)境與 Elasticsearch 環(huán)境是隔離的,兩者無(wú)法形成上下文環(huán)境,而 Pipeline 則不同,天然的集成在 Elasticsearch 中,從而很好的與其形成上線文環(huán)境,這個(gè)環(huán)境讓 Pipeline 能夠更充分的利用起來(lái),比如與 Pipeline 能夠形成最直接的上線文關(guān)系的是文檔信息,由此可見(jiàn),我們可以在 Pipeline(應(yīng)該是 processors 中)能夠直接訪問(wèn)通過(guò) pipeline 的文檔信息,如文檔的字段,元數(shù)據(jù)信息.

    下面的例子說(shuō)明了我們?nèi)绾卧?processors 中訪問(wèn)這些上下文信息.

    訪問(wèn)源文檔字段

    {"set": {"field": "my_field""value": 582.1} }

    這個(gè)例子直接引用了通過(guò)該 Pipeline 內(nèi)的文檔字為"my_field"的字段,并將所有文檔該字段的值設(shè)置為 582.1.

    或者我們也可以通過(guò)_source 字段來(lái)訪問(wèn)源文檔的字段,如下

    {"set": {"field": "_source.my_field""value": 582.1} }

    訪問(wèn)文檔的元數(shù)據(jù)字段

    每個(gè)文檔都會(huì)有一些元數(shù)據(jù)字段信息(metadata filed),比如_id,_index,_type 等,我們?cè)?processors 中也可以直接訪問(wèn)這些信息的,比如下面的例子:

    {"set": {"field": "_id""value": "1"} }

    直接訪問(wèn)文檔的_id 字段,并設(shè)置該值為 1

    訪問(wèn)瞬態(tài)元字段(ingest metadata field)

    這個(gè)地方并沒(méi)有直接翻譯官方名稱,因?yàn)椴缓美斫?我使用了我自己的理解意圖去翻譯該名稱.其實(shí)含義是一致的,官方稱之為ingest metadata field,翻譯過(guò)來(lái)且為"攝取元數(shù)據(jù)字段?"(請(qǐng)高人翻譯,我實(shí)在無(wú)法很好的翻譯出來(lái)),從官方對(duì)該詞的釋義中可以理解出來(lái),這些字段是臨時(shí)保存的,在這些文檔被處理完成之后返回給對(duì)應(yīng)的批量請(qǐng)求和索引請(qǐng)求的時(shí)候,這些字段不會(huì)一并返回,因此這里稱之為"瞬態(tài)元字段",下面是一個(gè)如何訪問(wèn)這些字段的例子:

    {"set": {"field": "received""value": "{{_ingest.timestamp}}"} }

    該例子會(huì)在文檔中臨時(shí)加入一個(gè)名稱為"received"的字段,值為瞬態(tài)元字段中的 timestamp,以{{_ingest.timestamp}}的格式來(lái)進(jìn)行訪問(wèn),看到{{}}是不是有種很熟悉的感覺(jué),因?yàn)橹挥性谏婕吧舷挛牡沫h(huán)境中一般才會(huì)使用這種表達(dá)式.

    Processors 類型詳解

    • Append Processor 追加處理器
    • Convert Processor 轉(zhuǎn)換處理器
    • Date Processor 日期轉(zhuǎn)換器
    • Date Index Name Processor 日期索引名稱處理器
    • Fail Processor 失敗處理器
    • Foreach Processor 循環(huán)處理器
    • Grok Processor Grok 處理器
    • Gsub Processor
    • Join Processor
    • JSON Processor
    • KV Processor
    • Lowercase Processor
    • Remove Processor
    • Rename Processor
    • Script Processor
    • Split Processor
    • Sort Processor
    • Trim Processor
    • Uppercase Processor
    • Dot Expander Processor

    Append Processor

    顧名思義,追加處理器.就是當(dāng)我們加載原始文檔到 Elasticsearch 的時(shí)候,某些字段的描述信息可能需要我們定制性的增加一些額外說(shuō)明.

    使用方法如下:

    {"append": {"field": "field1","value": ["item2", "item3", "item4"]} }

    比如我們需要從第三方新導(dǎo)入一批商品,因?yàn)槟撤N原因這批新商品必須要打上一個(gè)"家居"的標(biāo)簽來(lái)表示該批商品是"家居分類",當(dāng)然一個(gè)商品也可能不止一個(gè)分類,這個(gè)時(shí)候要求我們?cè)趯?dǎo)入這批文檔數(shù)據(jù)的時(shí)候,需要在原有的標(biāo)簽字段中新增一個(gè)"家居"標(biāo)簽,這個(gè)時(shí)候我們就通過(guò)追加處理器來(lái)完成.

    下面的例子我們通過(guò) Simulate Pipeline API 來(lái)幫助我們完成

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "新增一個(gè)家居標(biāo)簽","processors": [{"append": {"field": "tag","value": ["家居"]}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000002","tag": "沙發(fā)"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000003"}}] }

    返回結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000001","tag": ["衣柜","家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000002","tag": ["沙發(fā)","家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000003","tag": ["家居"]},"_ingest": {"timestamp": "2017-11-24T13:19:55.555Z"}}}] }

    我們可以看到,通過(guò)該處理器處理之后,新的文檔中的 tag 字段都新增了一個(gè)"家居"標(biāo)簽,同時(shí)如果原文檔沒(méi)有該字段,則會(huì)新建該字段.

    Convert Processor

    該類型的 Processor 是用于在寫(xiě)入某些文檔之前對(duì)該文檔的字段類型進(jìn)行轉(zhuǎn)換,比如文檔中有個(gè)字符串類型的"價(jià)格"字段,我們希望將其轉(zhuǎn)換成 float 類型,則我們可以使用該處理器來(lái)完成這項(xiàng)操作,實(shí)現(xiàn)如下:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "修改字符串類型為double類型","processors": [{"convert": {"field": "price","type": "float"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜","price":"999.8"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000002","tag": "沙發(fā)","price":"899.8"}},{"_index": "index","_type": "type","_id": "id","_source": {"code":"000003","price":"799.8"}}] }

    測(cè)試結(jié)果如下:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000001","tag": "衣柜","price": 999.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.663Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000002","tag": "沙發(fā)","price": 899.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.664Z"}}},{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"code": "000003","price": 799.8},"_ingest": {"timestamp": "2017-11-26T10:56:47.664Z"}}}] }

    可以看到,新的文檔,price 字段將不再是字符串類型了,而是 float 類型.

    使用 Convert Processor 需要注意的是,目前官方支持轉(zhuǎn)換的類型有如下幾個(gè):integer, float, string, boolean, and auto。

    關(guān)于更多該處理器的使用說(shuō)明,請(qǐng)參考: https://www.elastic.co/guide/en/elasticsearch/reference/current/convert-processor.html

    Date Processor

    日期處理器,顧名思義,就是將原文檔中的某個(gè)日期字段轉(zhuǎn)換成一個(gè) Elasticsearch 識(shí)別的時(shí)間戳字段(一般默認(rèn)為@timestamp),該時(shí)間戳字段將會(huì)新增到原文檔中.(目前發(fā)現(xiàn)該功能比較雞肋,因?yàn)檗D(zhuǎn)換后的時(shí)間格式只能是 ISO 8601 時(shí)間格式的,并不能轉(zhuǎn)換成其他格式,因此很少用)

    下面是一個(gè)使用 Date Processor 的例子:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "轉(zhuǎn)換成指定格式的日期格式字段","processors": [{"date": {"field": "date","formats": ["UNIX"],"timezone": "Asia/Shanghai"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"code":"000001","tag": "衣柜","price":"999.8","date":"1511696379"}}] }

    就是將原有的字符串表示的日期格式進(jìn)行格式轉(zhuǎn)換(最終轉(zhuǎn)換成的是 ISO 時(shí)間格式),結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"date": "1511696379","code": "000001","@timestamp": "2017-11-26T19:39:39.000+08:00","price": "999.8","tag": "衣柜"},"_ingest": {"timestamp": "2017-11-26T12:01:16.787Z"}}}] }

    我們可以看到多了一個(gè)默認(rèn)字段@timestamp,該字段名稱可以通過(guò) target_field 字段來(lái)指定該字段名稱.

    為什么說(shuō)只能生成 ISO 格式的時(shí)間呢?這方面的資料官方?jīng)]有提供,只能自己去源碼中看,這里

    public void execute(IngestDocument ingestDocument) {String value = ingestDocument.getFieldValue(field, String.class);DateTime dateTime = null;Exception lastException = null;for (Function<String, DateTime> dateParser : dateParsers) {try {dateTime = dateParser.apply(value);} catch (Exception e) {//try the next parser and keep track of the exceptionslastException = ExceptionsHelper.useOrSuppress(lastException, e);}}if (dateTime == null) {throw new IllegalArgumentException("unable to parse date [" + value + "]", lastException);}ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));}

    其中最后一行,在設(shè)置字段的時(shí)候,發(fā)現(xiàn)源碼中已經(jīng)寫(xiě)死為 ISODateTimeFormat.個(gè)人感覺(jué)官方應(yīng)該會(huì)自定義 pattern.

    Date Index Name Processor

    Date Index Name Processor 算得上是一個(gè)比較強(qiáng)大的處理了,它的目的是讓通過(guò)該處理器的文檔能夠分配到符合指定時(shí)間格式的索引中,前提是按照官方提供的說(shuō)明來(lái)進(jìn)行使用:

    我們先創(chuàng)建一個(gè)該類型的 pipeline,如下:

    curl -XPUT 'localhost:9200/_ingest/pipeline/monthlyindex?pretty' -H 'Content-Type: application/json' -d' {"description": "monthly date-time index naming","processors" : [{"date_index_name" : {"field" : "date1","index_name_prefix" : "myindex-","date_rounding" : "M"}}] } '

    我們?cè)賱?chuàng)建一個(gè)文檔,如下(該文檔會(huì)使用該 pipeline)

    curl -XPUT 'localhost:9200/myindex/type/1?pipeline=monthlyindex&pretty' -H 'Content-Type: application/json' -d' {"date1" : "2016-04-25T12:02:01.789Z" } '

    結(jié)果如下:

    {"_index" : "myindex-2016-04-01","_type" : "type","_id" : "1","_version" : 1,"result" : "created","_shards" : {"total" : 2,"successful" : 1,"failed" : 0},"created" : true }

    以上請(qǐng)求將不會(huì)將此 document(文檔)放入 myindex 索引,而是放入到 myindex-2016-04-01 索引中.

    這就是日期索引名稱處理器(Date Index Name Processor)強(qiáng)大的地方,我們寫(xiě)入的文檔可以根據(jù)其中的某個(gè)日期格式的字段來(lái)指定該文檔將寫(xiě)入哪個(gè)索引中,該功能配上 template,能夠?qū)崿F(xiàn)很強(qiáng)大的日志收集功能,比如按月按天來(lái)將日志寫(xiě)入 Elasticsearch.

    如果想了解 Date Index Name Processor 的使用,請(qǐng)參考:Date Index Name Processor

    Fail Processor

    該處理器比較簡(jiǎn)單,就是當(dāng)文檔通過(guò)該 pipeline 的時(shí)候,一旦出現(xiàn)異常,該 pipeline 指定的錯(cuò)誤信息就會(huì)返回給請(qǐng)求者.

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "monthly date-time index naming","processors" : [{"fail": {"message": "an error message"}}]}, "docs": [{"_index": "myindex","_type": "type","_id": "id","_source": {"id":"1"}}] }

    結(jié)果如下:

    {"docs": [{"error": {"root_cause": [{"type": "exception","reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.common.FailProcessorException: an error message","header": {"processor_type": "fail"}}],"type": "exception","reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.common.FailProcessorException: an error message","caused_by": {"type": "illegal_argument_exception","reason": "org.elasticsearch.ingest.common.FailProcessorException: an error message","caused_by": {"type": "fail_processor_exception","reason": "an error message"}},"header": {"processor_type": "fail"}}}] }

    Foreach Processor

    一個(gè) Foreach Processor 是用來(lái)處理一些數(shù)組字段,數(shù)組內(nèi)的每個(gè)元素都會(huì)使用到一個(gè)相同的處理器,比如

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "foreach processor","processors" : [{"foreach": {"field": "values","processor": {"uppercase": {"field": "_ingest._value"}}}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"id":"1","values":["hello","world","felayman"]}}] }

    上面的例子中,文檔中的 values 字段是一個(gè)數(shù)組類型的字段,其中每個(gè)元素都需要使用 Foreach Processor 中的一個(gè)共同的 uppercase Processor 來(lái)保證該字段中的每個(gè)元素都能執(zhí)行相同的操作

    結(jié)果如下:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"id": "1","values": ["HELLO","WORLD","FELAYMAN"]},"_ingest": {"_value": null,"timestamp": "2017-11-27T08:55:17.496Z"}}}] }

    關(guān)于每個(gè) Processor,深入下去都有許多需要說(shuō)明的,這里沒(méi)有深入,只是讓大家了解其基本使用。

    詳情請(qǐng)參考: Foreach Processore

    Grok Processor

    Grok Processor 可以算的上是一個(gè)比較實(shí)用的處理器了,會(huì)經(jīng)常使用到日志格式切割上,有用過(guò) logstash 的用戶應(yīng)該都知道 Grok 的強(qiáng)大.這里并不會(huì)涉及到到 Grok 詳細(xì)的語(yǔ)法知識(shí).這里我們只是略過(guò)的說(shuō)明 Elasticsearch 中的ingest node 中的 pipeline 也能提供 logstash 中的 Grok 的功能.

    如下的使用例子:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "grok processor","processors" : [{"grok": {"field": "message","patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": "55.3.244.1 GET /index.html 15824 0.043"}}] }

    上面例子中的日志可能是 nginx 的日志,格式為:

    55.3.244.1 GET /index.html 15824 0.043

    對(duì)應(yīng)的 Grok 語(yǔ)法為:

    %{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

    因此該文本格式的日志信息在經(jīng)過(guò) Grok Processor 之后,能夠解析成一個(gè)標(biāo)準(zhǔn)文檔的格式,該文檔可以使用 Elasticsearch 提供的檢索和聚合功能充分使用到,而原始的文本格式的日志信息則無(wú)法做到這一點(diǎn)

    返回結(jié)果如下:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"duration": "0.043","request": "/index.html","method": "GET","bytes": "15824","client": "55.3.244.1","message": "55.3.244.1 GET /index.html 15824 0.043"},"_ingest": {"timestamp": "2017-11-27T09:07:40.782Z"}}}] }

    可以看到,對(duì)應(yīng)的切割點(diǎn)都轉(zhuǎn)換成文檔中的一個(gè)字段.

    關(guān)于 Grok Processor 的詳細(xì)介紹,請(qǐng)參考:Grok Processor

    Gsub Processor

    Gsub Processor 能夠解決一些字符串中才特有的問(wèn)題,比如我想把字符串格式的日期格式如"yyyy-MM-dd HH:mm:ss"轉(zhuǎn)換成"yyyy/MM/dd HH:mm:ss"的格式,我們可以借助于 Gsub Processor 來(lái)解決,而 Gsub Processor 也正是利用正則來(lái)完成這一任務(wù)的.

    如:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "gsub processor","processors" : [{"gsub": {"field": "message","pattern": "-","replacement": "/"}}]}, "docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": "2017-11-27 00:00:00"}}] }

    返回結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "2017/11/27 00:00:00"},"_ingest": {"timestamp": "2017-11-27T09:15:05.502Z"}}}] }

    就將 message 字段中的日期格式修改成我們想要的格式了.

    更多關(guān)于 Gsub Processor 的介紹,請(qǐng)參考 Gsub Processor

    Join Processor

    Join Processor 能夠?qū)⒃疽粋€(gè)數(shù)組類型的字段值,分解成以指定分隔符分割的一個(gè)字符串.

    如下例子:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "join processor","processors": [{"join": {"field": "message","separator": "、、、、"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message": ["hello","world","felayman"]}}] }

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello、、、、world、、、、felayman"},"_ingest": {"timestamp": "2017-11-27T09:25:41.260Z"}}}] }

    可以看到,原文檔中數(shù)組格式的的 message 字段,經(jīng)過(guò) Join Processor 處理后,變成了字符串類型的字段"hello、、、、world、、、、felayman".

    更多關(guān)于 Join Processor 的內(nèi)容,請(qǐng)參考:Join Processor

    JSON Processor

    JSON Processor 也是用來(lái)處理字符串類型的字段,可以將那些符合 JSON 格式(或被 JSON 串化)的文本,在經(jīng)過(guò) JSON Processor 加工之后,解析成對(duì)應(yīng)的 JSON 格式,如下例子:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "join processor","processors": [{"json": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"{\"foo\": 2000}"}}] }

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": {"foo": 2000}},"_ingest": {"timestamp": "2017-11-27T09:30:28.546Z"}}}] }

    可以看到,原本是字符串格式的字段 message,在處理之后變成了一個(gè)標(biāo)準(zhǔn)的 JSON 格式.

    更多關(guān)于 JSON Processor 的內(nèi)容,請(qǐng)參考:JSON Processor

    KV Processor

    KV Processor 用來(lái) K,V 字符串格式的處理器,比如 K=V, K:V,K->V 等格式的解析.

    如下例:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "kv processor","processors": [{"kv": {"field": "message","field_split":" ","value_split": ":"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"ip:127.0.0.1"}}] }

    其中字段 message 的原始值為"“ip:127.0.0.1"”,我們想將該格式的內(nèi)容新增一個(gè)獨(dú)立的字段如 ip:127.0.0.1 這種格式.

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "ip:107.0.0.1","ip": "107.0.0.1"},"_ingest": {"timestamp": "2017-11-27T09:46:48.715Z"}}}] }

    更多關(guān)于 KV Processor 的內(nèi)容,請(qǐng)參考:KV Processor

    Lowercase Processor

    Lowercase Processor 也是一個(gè)專用于字符串類型的字段處理器,顧名思義,是將字符串都轉(zhuǎn)換成小寫(xiě)格式.

    如下:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "lowercase processor","processors": [{"lowercase": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"HELLO,WORLD,FELAYMAN."}}] }

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello,world,felayman."},"_ingest": {"timestamp": "2017-11-27T09:49:58.564Z"}}}] }

    可以看到原文檔中的 message 字段的值,在使用 Lowercase Processor 之后,都變成大寫(xiě)的了.

    更多關(guān)于 Lowercase Processor 的內(nèi)容,請(qǐng)參考:Lowercase Processor

    Remove Processor

    Remove Processor 是用來(lái)處理在寫(xiě)入文檔之前,刪除原文檔中的某些字段值的.

    如下:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "remove processor","processors": [{"remove": {"field": "extra_field"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"extra_field":"extra_field","message":" hello, felayman."}}] }

    在經(jīng)過(guò) Remove Processor 處理后,extra_field 字段將不存在了.

    結(jié)果為下:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": " hello, felayman."},"_ingest": {"timestamp": "2017-11-27T09:58:46.038Z"}}}] }

    Rename Processor

    Rename Processor 是用來(lái)處理在文檔寫(xiě)入 Elasticsearch 之前修改某個(gè)文檔的字段的名稱

    如下:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "remove processor","processors": [{"rename": {"field": "old_name","target_field": "new_name"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"old_name":"old_name"}}] }

    會(huì)發(fā)現(xiàn)原文檔的字段 old_name 被重新命名為 new_name 字段

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"new_name": "old_name"},"_ingest": {"timestamp": "2017-11-27T10:00:48.694Z"}}}] }

    Script Processor

    Script Processor 算的上是最強(qiáng)大的處理了,因?yàn)槟艹浞掷?Elasticsearch 提供的腳本能力.

    這里也不會(huì)詳細(xì)介紹 Elasticsearch 中的腳本如何使用,有關(guān)信息,請(qǐng)參考:Script

    我么看下在 Script Processor 中使用腳本的例子:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "script processor","processors": [{"script": {"lang": "painless","inline": "ctx.viewCount = (ctx.viewCount) *10"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"viewCount": 100}}] }

    這里我們通過(guò)腳本讓原文檔中的 viewCount 字段的值擴(kuò)大十倍,結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"viewCount": 1000},"_ingest": {"timestamp": "2017-11-27T10:07:56.112Z"}}}] }

    Set Processor

    Set Processor 作用于兩種不同情況:

  • 指定字段存在時(shí),修改指定字段的值
  • 指定字段不存在時(shí),新增該字段并設(shè)置該字段的值
  • 例子如下:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "set processor","processors": [{"set": {"field": "category","value": "家居"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"id":"0001"}}] }

    這里我們?yōu)槊總€(gè)使用 Set Processor 的文檔新增一個(gè)分類"家居"

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"id": "0001","category": "家居"},"_ingest": {"timestamp": "2017-11-27T10:12:05.306Z"}}}] }

    Split Processor

    Split Processor 用于將一個(gè)以指定分隔分開(kāi)的字符串轉(zhuǎn)換成一個(gè)數(shù)組類型的字段

    如下:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "Split processor","processors": [{"split": {"field": "message","separator": "-"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello-world"}}] }

    其中原文檔的 message 字段值將會(huì)有"hello-world"轉(zhuǎn)換為[“hello”,“world”]

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": ["hell","world"]},"_ingest": {"timestamp": "2017-11-27T10:13:35.982Z"}}}] }

    Sort Processor

    Sort Processor 用于處理數(shù)組類型的字段,可以將存儲(chǔ)在原文檔中某個(gè)數(shù)組類型的字段中的元素按照升序或降序來(lái)對(duì)原元素進(jìn)行排序

    如下:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "sort processor","processors": [{"sort": {"field": "category","order": "asc"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"category":[2,3,4,1]}}] }

    我們使用升序來(lái)修改原文檔 category 字段的值的存儲(chǔ)排序

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"category": [1,2,3,4]},"_ingest": {"timestamp": "2017-11-27T10:16:16.409Z"}}}] }

    Trim Processor

    哎,Elasticsearch 開(kāi)發(fā)者真的沒(méi)誰(shuí)了,基本上能把 String 類中的方法都拿過(guò)來(lái)搬一套過(guò)來(lái)使用.

    Trim Processor 是專門(mén)用于處理字符串兩端的空格問(wèn)題,如下

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "trim processor","processors": [{"trim": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":" hello, felayman."}}] }

    請(qǐng)注意,是去除字符串兩端的空格.

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "hello, felayman."},"_ingest": {"timestamp": "2017-11-27T09:56:17.946Z"}}}] }

    Uppercase Processor

    該處理器類似于 Lowercase Processor,將字符串文本統(tǒng)一轉(zhuǎn)換成大寫(xiě).

    如下:

    POST _ingest/pipeline/_simulate {"pipeline": {"description": "uppercase processor","processors": [{"uppercase": {"field": "message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello,world,felayman."}}] }

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message": "HELLO,WORLD,FELAYMAN."},"_ingest": {"timestamp": "2017-11-27T09:53:33.672Z"}}}] }

    更多關(guān)于 Uppercase Processor 的內(nèi)容,請(qǐng)參考:Uppercase Processor

    Dot Expander Processor

    自定義 Processor

    如果發(fā)現(xiàn)官方提供的 Processor 不滿足我們的加工邏輯怎么辦?不用擔(dān)心,官方提供了很好的插件機(jī)制來(lái)幫助我們對(duì)其進(jìn)行擴(kuò)展,想實(shí)現(xiàn)一個(gè)自定義的 Processor 需要兩個(gè)過(guò)程:

    • 實(shí)現(xiàn) IngestPlugin 接口,并實(shí)現(xiàn) IngestPlugin 中的 getProcessors 方法
    • 實(shí)現(xiàn)自定義的 Processor

    下面我們給一個(gè)具體的例子,我們將任何傳入的一個(gè)字段的值都變成大寫(xiě)(最簡(jiǎn)化模型,不指定字段,使用_ingest/pipeline/_simulate api 模擬該操作)

    新建一個(gè)項(xiàng)目,名稱為 elasticsearch-help,新建 FirstUpperPlugin 類,如下:

    package org.elasticsearch.help; import org.elasticsearch.help.processor.FirstUpperProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import java.util.Collections; import java.util.Map;/*** @auhthor lei.fang@shijue.me* @since . 2017-11-26*/ public class FirstUpperPlugin extends Plugin implements IngestPlugin{@Overridepublic Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {return Collections.singletonMap(FirstUpperProcessor.TYPE,new FirstUpperProcessor.Factory());} }

    其中我們實(shí)現(xiàn)了 getProcessors 方法,該方法主要用來(lái)提供我們自定義的額 Processor,其中 FirstUpperProcessor 源碼如下:

    package org.elasticsearch.help.processor; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; import java.util.Map;/*** @auhthor lei.fang@shijue.me* @since . 2017-11-26*/ public class FirstUpperProcessor extends AbstractProcessor {public static final String TYPE = "firstUpper";private final String field;public FirstUpperProcessor(String tag,String field) {super(tag);this.field = field;}@Overridepublic void execute(IngestDocument ingestDocument) throws Exception {ingestDocument.setFieldValue(field,field.toUpperCase());}@Overridepublic String getType() {return TYPE;}public static final class Factory implements Processor.Factory {public FirstUpperProcessor create(Map<String, Processor.Factory> registry, String processorTag,Map<String, Object> config) throws Exception {String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");return new FirstUpperProcessor(processorTag, field);}}}

    可以看到,其中核心的方法為 execute,主要是將傳入的字段的值轉(zhuǎn)換成大寫(xiě),這里只是調(diào)用了簡(jiǎn)單的 toUpperCase()方法.

    我們使用_ingest/pipeline/_simulate api 來(lái)進(jìn)行測(cè)試:

    curl -XGET 'http://localhost:9200/_ingest/pipeline/_simulate?pretty ' -d '{"pipeline": {"description": "字符串首字母轉(zhuǎn)換成大寫(xiě)","processors": [{"firstUpper":{"field":"message"}}]},"docs": [{"_index": "index","_type": "type","_id": "id","_source": {"message":"hello,world."}}] }'

    結(jié)果為:

    {"docs": [{"doc": {"_index": "index","_type": "type","_id": "id","_source": {"message":"Hello,world."},"_ingest": {"timestamp": "2017-11-27T10:16:16.409Z"}}}] }

    可以看到,自定義的 Ingest node 插件成功了.

    完結(jié).

    總結(jié)

    以上是生活随笔為你收集整理的Elasticsearch Pipeline 详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。