NiFi 常用处理器(Processor)介绍
生活随笔
收集整理的這篇文章主要介紹了
NiFi 常用处理器(Processor)介绍
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
常用處理器(Processor)介紹
處理器的種類
為了創(chuàng)建有效的數(shù)據(jù)流處理流程, 用戶必須了解可用的處理器類型。NiFi 包含許多不同的處理器, 這些處理器提供了可從眾多不同系統(tǒng)中提取數(shù)據(jù), 路由, 轉(zhuǎn)換, 處理, 拆分和聚合數(shù)據(jù)以及將數(shù)據(jù)分發(fā)到多個系統(tǒng)的功能。
下面將重點介紹一些最常用的處理器, 按功能對它們進行分類。
數(shù)據(jù)轉(zhuǎn)換
- CompressContent:壓縮或解壓
- ConvertCharacterSet:將用于編碼內(nèi)容的字符集從一個字符集轉(zhuǎn)換為另一個字符集
- EncryptContent:加密或解密
- ReplaceText:使用正則表達式修改文本內(nèi)容
- TransformXml:應(yīng)用XSLT轉(zhuǎn)換XML內(nèi)容
- JoltTransformJSON:應(yīng)用JOLT規(guī)范來轉(zhuǎn)換JSON內(nèi)容
路由和調(diào)解
- ControlRate:限制流程中數(shù)據(jù)流經(jīng)某部分的速率
- DetectDuplicate:根據(jù)一些用戶定義的標(biāo)準(zhǔn)去監(jiān)視發(fā)現(xiàn)重復(fù)的FlowFiles。通常與HashContent一起使用
- DistributeLoad:通過只將一部分?jǐn)?shù)據(jù)分發(fā)給每個用戶定義的關(guān)系來實現(xiàn)負(fù)載平衡或數(shù)據(jù)抽樣
- MonitorActivity:當(dāng)用戶定義的時間段過去而沒有任何數(shù)據(jù)流經(jīng)此節(jié)點時發(fā)送通知。(可選)在數(shù)據(jù)流恢復(fù)時發(fā)送通知。
- RouteOnAttribute:根據(jù)FlowFile包含的屬性路由FlowFile。
- ScanAttribute:掃描FlowFile上用戶定義的屬性集,檢查是否有任何屬性與用戶定義的字典匹配。
- RouteOnContent:根據(jù)FlowFile的內(nèi)容是否與用戶自定義的正則表達式匹配。如果匹配,則FlowFile將路由到已配置的關(guān)系。
- ScanContent:在流文件的內(nèi)容中搜索用戶定義字典中存在的術(shù)語,并根據(jù)這些術(shù)語的存在或不存在來路由。字典可以由文本條目或二進制條目組成。
- ValidateXml:以XML模式驗證XML內(nèi)容; 根據(jù)用戶定義的XML Schema,判斷FlowFile的內(nèi)容是否有效,進而來路由FlowFile。
數(shù)據(jù)庫訪問
- ConvertJSONToSQL:將JSON文檔轉(zhuǎn)換為SQL INSERT或UPDATE命令,然后可以將其傳遞給PutSQL Processor
- ExecuteSQL:執(zhí)行用戶定義的SQL SELECT命令,結(jié)果為Avro格式的FlowFile
- PutSQL:通過執(zhí)行FlowFile內(nèi)容定義的SQL DDM語句來更新數(shù)據(jù)庫
- SelectHiveQL:對Apache Hive數(shù)據(jù)庫執(zhí)行用戶定義的HiveQL SELECT命令,結(jié)果為Avro或CSV格式的FlowFile
- PutHiveQL:通過執(zhí)行FlowFile內(nèi)容定義的HiveQL DDM語句來更新Hive數(shù)據(jù)庫
屬性提取
- EvaluateJsonPath:用戶提供JSONPath表達式(類似于XPath,用于XML解析/提取), 然后根據(jù)JSON內(nèi)容評估這些表達式,用結(jié)果值替換FlowFile內(nèi)容或?qū)⒔Y(jié)果值提取到用戶自己命名的Attribute中。
- EvaluateXPath:用戶提供XPath表達式,然后根據(jù)XML內(nèi)容評估這些表達式, 用結(jié)果值替換FlowFile內(nèi)容或?qū)⒔Y(jié)果值提取到用戶自己命名的Attribute中。
- EvaluateXQuery:用戶提供XQuery查詢,然后根據(jù)XML內(nèi)容評估此查詢, 用結(jié)果值替換FlowFile內(nèi)容或?qū)⒔Y(jié)果值提取到用戶自己命名的Attribute中。
- ExtractText:用戶提供一個或多個正則表達式,然后根據(jù)FlowFile的文本內(nèi)容對其進行評估, 然后將結(jié)果值提取到用戶自己命名的Attribute中。
- HashAttribute:對用戶定義的現(xiàn)有屬性列表的串聯(lián)進行hash。
- HashContent:對FlowFile的內(nèi)容進行hash, 并將得到的hash值添加到Attribute中。
- IdentifyMimeType:評估FlowFile的內(nèi)容, 以確定FlowFile封裝的文件類型。此處理器能夠檢測許多不同的MIME類型, 例如圖像, 文字處理器文檔, 文本和壓縮格式, 僅舉幾例。
- UpdateAttribute:向FlowFile添加或更新任意數(shù)量的用戶定義的屬性。這對于添加靜態(tài)的屬性值以及使用表達式語言動態(tài)計算出來的屬性值非常有用。該處理器還提供"高級用戶界面(Advanced User Interface)",允許用戶根據(jù)用戶提供的規(guī)則有條件地去更新屬性。
系統(tǒng)交互
- ExecuteProcess:運行用戶自定義的操作系統(tǒng)命令。進程的StdOut被重定向,以便StdOut的內(nèi)容輸出為FlowFile的內(nèi)容。此處理器是源處理器(不接受數(shù)據(jù)流輸入,沒有上游組件) - 其輸出預(yù)計會生成新的FlowFile,并且系統(tǒng)調(diào)用不會接收任何輸入。如果要為進程提供輸入,請使用ExecuteStreamCommand Processor。
- ExecuteStreamCommand:運行用戶定義的操作系統(tǒng)命令。FlowFile的內(nèi)容可選地流式傳輸?shù)竭M程的StdIn。StdOut的內(nèi)容輸出為FlowFile的內(nèi)容。此處理器不能用作源處理器 - 必須傳入FlowFiles才能執(zhí)行。
數(shù)據(jù)提取
- GetFile:將文件內(nèi)容從本地磁盤(或網(wǎng)絡(luò)連接的磁盤)流式傳輸?shù)絅iFi,然后刪除原始文件。此處理器應(yīng)將文件從一個位置移動到另一個位置,而不是用于復(fù)制數(shù)據(jù)。
- GetFTP:通過FTP將遠(yuǎn)程文件的內(nèi)容下載到NiFi中,然后刪除原始文件。此處理器應(yīng)將文件從一個位置移動到另一個位置,而不是用于復(fù)制數(shù)據(jù)。
- GetSFTP:通過SFTP將遠(yuǎn)程文件的內(nèi)容下載到NiFi中,然后刪除原始文件。此處理器應(yīng)將文件從一個位置移動到另一個位置,而不是用于復(fù)制數(shù)據(jù)。
- GetJMSQueue:從JMS隊列下載消息,并根據(jù)JMS消息的內(nèi)容創(chuàng)建FlowFile。可選地,JMS屬性也可以作為屬性復(fù)制。
- GetJMSTopic:從JMS主題下載消息,并根據(jù)JMS消息的內(nèi)容創(chuàng)建FlowFile。可選地,JMS屬性也可以作為屬性復(fù)制。此處理器支持持久訂閱和非持久訂閱。
- GetHTTP:將基于HTTP或HTTPS的遠(yuǎn)程URL的請求內(nèi)容下載到NiFi中。處理器將記住ETag和Last-Modified Date,以確保不會持續(xù)攝取數(shù)據(jù)。
- ListenHTTP:啟動HTTP(或HTTPS)服務(wù)器并偵聽傳入連接。對于任何傳入的POST請求,請求的內(nèi)容將作為FlowFile寫出,并返回200響應(yīng)。
- ListenUDP:偵聽傳入的UDP數(shù)據(jù)包,并為每個數(shù)據(jù)包或每個數(shù)據(jù)包創(chuàng)建一個FlowFile(取決于配置),并將FlowFile發(fā)送到"success"。
- GetHDFS:監(jiān)視HDFS中用戶指定的目錄。每當(dāng)新文件進入HDFS時,它將被復(fù)制到NiFi并從HDFS中刪除。此處理器應(yīng)將文件從一個位置移動到另一個位置,而不是用于復(fù)制數(shù)據(jù)。如果在集群中運行,此處理器需僅在主節(jié)點上運行。要從HDFS復(fù)制數(shù)據(jù)并使其保持原狀,或者從群集中的多個節(jié)點流式傳輸數(shù)據(jù),請參閱ListHDFS處理器。
- ListHDFS / FetchHDFS:ListHDFS監(jiān)視HDFS中用戶指定的目錄,并發(fā)出一個FlowFile,其中包含它遇到的每個文件的文件名。然后,它通過分布式緩存在整個NiFi集群中保持此狀態(tài)。然后可以在集群中,將其發(fā)送到FetchHDFS處理器,后者獲取這些文件的實際內(nèi)容并發(fā)出包含從HDFS獲取的內(nèi)容的FlowFiles。
- GetKafka:從Apache Kafka獲取消息,特別是0.8.x版本。消息可以作為每個消息的FlowFile發(fā)出,也可以使用用戶指定的分隔符一起進行批處理。
- GetMongo:對MongoDB執(zhí)行用戶指定的查詢,并將內(nèi)容寫入新的FlowFile。
數(shù)據(jù)出口/發(fā)送數(shù)據(jù)
- PutEmail:向配置的收件人發(fā)送電子郵件。FlowFile的內(nèi)容可選擇作為附件發(fā)送。
- PutFile:將FlowFile的內(nèi)容寫入本地(或網(wǎng)絡(luò)連接)文件系統(tǒng)上的目錄。
- PutFTP:將FlowFile的內(nèi)容復(fù)制到遠(yuǎn)程FTP服務(wù)器。
- PutSFTP:將FlowFile的內(nèi)容復(fù)制到遠(yuǎn)程SFTP服務(wù)器。
- PutJMS:將FlowFile的內(nèi)容作為JMS消息發(fā)送到JMS代理,可選擇將Attributes添加JMS屬性。
- PutSQL:將FlowFile的內(nèi)容作為SQL DDL語句(INSERT,UPDATE或DELETE)執(zhí)行。FlowFile的內(nèi)容必須是有效的SQL語句。屬性可以用作參數(shù),FlowFile的內(nèi)容可以是參數(shù)化的SQL語句,以避免SQL注入攻擊。
- PutKafka:將FlowFile的內(nèi)容作為消息發(fā)送到Apache Kafka,特別是0.8.x版本。FlowFile可以作為單個消息或分隔符發(fā)送,例如可以指定換行符,以便為單個FlowFile發(fā)送許多消息。
- PutMongo:將FlowFile的內(nèi)容作為INSERT或UPDATE發(fā)送到Mongo。
分裂和聚合
- SplitText:SplitText接收單個FlowFile,其內(nèi)容為文本,并根據(jù)配置的行數(shù)將其拆分為1個或多個FlowFiles。例如,可以將處理器配置為將FlowFile拆分為多個FlowFile,每個FlowFile只有一行。
- SplitJson:允許用戶將包含數(shù)組或許多子對象的JSON對象拆分為每個JSON元素的FlowFile。
- SplitXml:允許用戶將XML消息拆分為多個FlowFiles,每個FlowFiles包含原始段。這通常在多個XML元素與"wrapper"元素連接在一起時使用。然后,此處理器允許將這些元素拆分為單獨的XML元素。
- UnpackContent:解壓縮不同類型的存檔格式,例如ZIP和TAR。然后,歸檔中的每個文件都作為單個FlowFile傳輸。
- SegmentContent:根據(jù)某些已配置的數(shù)據(jù)大小將FlowFile劃分為可能的許多較小的FlowFile。不對任何類型的分界符執(zhí)行拆分,而是僅基于字節(jié)偏移執(zhí)行拆分。這是在傳輸FlowFiles之前使用的,以便通過并行發(fā)送許多不同的部分來提供更低的延遲。而另一方面,MergeContent處理器可以使用碎片整理模式重新組裝這些FlowFiles。
- MergeContent:此處理器負(fù)責(zé)將許多FlowFiles合并到一個FlowFile中。可以通過將其內(nèi)容與可選的頁眉,頁腳和分界符連接在一起,或者通過指定存檔格式(如ZIP或TAR)來合并FlowFiles。FlowFiles可以根據(jù)公共屬性進行分箱(binned),或者如果這些流是被其他組件拆分的,則可以進行"碎片整理(defragmented)"。根據(jù)元素的數(shù)量或FlowFiles內(nèi)容的總大小(每個bin的最小和最大大小是用戶指定的)并且還可以配置可選的Timeout屬性,即FlowFiles等待其bin變?yōu)榕渲玫纳舷拗底畲髸r間。
- SplitContent:將單個FlowFile拆分為可能的許多FlowFile,類似于SegmentContent。但是,對于SplitContent,不會在任意字節(jié)邊界上執(zhí)行拆分,而是指定要拆分內(nèi)容的字節(jié)序列。
HTTP
- GetHTTP:將基于HTTP或HTTPS的遠(yuǎn)程URL的內(nèi)容下載到NiFi中。處理器將記住ETag和Last-Modified Date,以確保不會持續(xù)攝取數(shù)據(jù)。
- ListenHTTP:啟動HTTP(或HTTPS)服務(wù)器并偵聽傳入連接。對于任何傳入的POST請求,請求的內(nèi)容將作為FlowFile寫出,并返回200響應(yīng)。
- InvokeHTTP:執(zhí)行用戶配置的HTTP請求。此處理器比GetHTTP和PostHTTP更通用,但需要更多配置。此處理器不能用作源處理器,并且需要具有傳入的FlowFiles才能被觸發(fā)以執(zhí)行其任務(wù)。
- PostHTTP:執(zhí)行HTTP POST請求,將FlowFile的內(nèi)容作為消息正文發(fā)送。這通常與ListenHTTP結(jié)合使用,以便在無法使用s2s的情況下在兩個不同的NiFi實例之間傳輸數(shù)據(jù)(例如,節(jié)點無法直接訪問并且能夠通過HTTP進行通信時代理)。 注意:除了現(xiàn)有的RAW套接字傳輸之外,HTTP還可用作s2s傳輸協(xié)議。它還支持HTTP代理。建議使用HTTP s2s,因為它更具可擴展性,并且可以使用具有更好用戶身份驗證和授權(quán)的輸入/輸出端口的方式來提供雙向數(shù)據(jù)傳輸。
- HandleHttpRequest / HandleHttpResponse:HandleHttpRequest Processor是一個源處理器,與ListenHTTP類似,啟動嵌入式HTTP(S)服務(wù)器。但是,它不會向客戶端發(fā)送響應(yīng)(比如200響應(yīng))。相反,流文件是以HTTP請求的主體作為其內(nèi)容發(fā)送的,所有典型servlet參數(shù)、頭文件等的屬性作為屬性。然后,HandleHttpResponse能夠在FlowFile完成處理后將響應(yīng)發(fā)送回客戶端。這些處理器總是彼此結(jié)合使用,并允許用戶在NiFi中可視化地創(chuàng)建Web服務(wù)。這對于將前端添加到非基于Web的協(xié)議或圍繞已經(jīng)由NiFi執(zhí)行的某些功能(例如數(shù)據(jù)格式轉(zhuǎn)換)添加簡單的Web服務(wù)特別有用。
總結(jié)
以上是生活随笔為你收集整理的NiFi 常用处理器(Processor)介绍的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql 事件跟踪_ORACLE 事件
- 下一篇: send返回值