apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)
apache ignite
在本文中, “使用Apache Ignite進(jìn)行高性能內(nèi)存計(jì)算”一書的作者將討論使用Apache Strom和Apache Ignite進(jìn)行復(fù)雜的事件處理。 本文的一部分摘自
書 。
術(shù)語(yǔ)“復(fù)雜事件處理”或CEP沒(méi)有廣泛或高度接受的定義。 Wikipedia的以下引用可以簡(jiǎn)要描述什么是復(fù)雜事件處理:
“復(fù)雜事件處理(CEP)主要是一個(gè)事件處理概念,用于處理多個(gè)事件,以識(shí)別事件云中有意義的事件為目標(biāo)。 CEP采用的技術(shù)包括檢測(cè)許多事件的復(fù)雜模式,事件相關(guān)性和抽象性,事件層次結(jié)構(gòu)以及事件之間的因果關(guān)系,成員關(guān)系和時(shí)間安排以及事件驅(qū)動(dòng)過(guò)程。
為簡(jiǎn)單起見(jiàn),復(fù)雜事件處理(CEP)是一種用于在真實(shí)世界中永不停止或流式傳輸事件數(shù)據(jù)的低延遲過(guò)濾,聚合和計(jì)算的技術(shù)。 在IT環(huán)境中,原始基礎(chǔ)結(jié)構(gòu)和業(yè)務(wù)事件的數(shù)量和速度均呈指數(shù)級(jí)增長(zhǎng)。 此外,移動(dòng)設(shè)備的爆炸式增長(zhǎng)和高速連接的普遍性加劇了移動(dòng)數(shù)據(jù)的爆炸式增長(zhǎng)。 同時(shí),對(duì)業(yè)務(wù)流程敏捷性和執(zhí)行的需求僅在增長(zhǎng)。 這兩個(gè)趨勢(shì)給組織施加了壓力,要求它們提高其能力以支持事件驅(qū)動(dòng)的實(shí)施架構(gòu)模式。 實(shí)時(shí)事件處理需要基礎(chǔ)架構(gòu)和應(yīng)用程序開(kāi)發(fā)環(huán)境來(lái)執(zhí)行事件處理要求。 這些要求通常包括從日常使用案例擴(kuò)展到極高的速度或各種數(shù)據(jù)和事件吞吐量的需求,潛在的延遲時(shí)間以微秒為單位,而不是響應(yīng)時(shí)間的秒數(shù)。
Apache Ignite允許在內(nèi)存中以可伸縮且容錯(cuò)的方式處理連續(xù)不斷的數(shù)據(jù)流,而不是在數(shù)據(jù)到達(dá)數(shù)據(jù)庫(kù)后對(duì)其進(jìn)行分析。 這不僅使您能夠關(guān)聯(lián)關(guān)系并從大量數(shù)據(jù)中檢測(cè)有意義的模式,還可以更快,更高效地完成此操作。 事件歷史記錄可以在內(nèi)存中保留任何時(shí)間長(zhǎng)度(對(duì)于長(zhǎng)時(shí)間運(yùn)行的事件序列至關(guān)重要),也可以作為事務(wù)記錄在存儲(chǔ)的數(shù)據(jù)庫(kù)中。
Apache Ignite CEP可以在眾多行業(yè)中使用,以下是一些一流的用例:
還有更多的工業(yè)或功能領(lǐng)域,您可以在其中使用Apache Ignite處理流事件數(shù)據(jù),例如保險(xiǎn),運(yùn)輸和公共部門。 復(fù)雜事件處理或CEP包含其過(guò)程的三個(gè)主要部分:
如上圖所示,數(shù)據(jù)是從不同來(lái)源獲取的。 源可以是任何傳感器(IoT),Web應(yīng)用程序或行業(yè)應(yīng)用程序。 可以直接在Ignite群集上以收集方式并發(fā)處理流數(shù)據(jù)。 另外,數(shù)據(jù)可以從其他來(lái)源豐富或過(guò)濾掉。 計(jì)算數(shù)據(jù)后,可以將計(jì)算或匯總的數(shù)據(jù)導(dǎo)出到其他系統(tǒng)以進(jìn)行可視化或采取措施。
Apache Ignite Storm Streamer模塊提供了通過(guò)Storm到Ignite緩存的流傳輸。 在開(kāi)始使用Ignite流媒體之前,讓我們看一下Apache Storm,以獲取有關(guān)Apache Storm的一些基礎(chǔ)知識(shí)。
Apache Storm是一個(gè)分布式容錯(cuò)實(shí)時(shí)計(jì)算系統(tǒng)。 在短時(shí)間內(nèi),Apache Storm成為分布式實(shí)時(shí)處理系統(tǒng)的標(biāo)準(zhǔn),該系統(tǒng)使您可以處理大量數(shù)據(jù)。 Apache Storm項(xiàng)目是開(kāi)源的,用Java和Clojure編寫。 它成為實(shí)時(shí)分析的首選。 Apache Ignite Storm流媒體模塊提供了一種方便的方法,可通過(guò)Storm將數(shù)據(jù)流傳輸?shù)絀gnite緩存。
關(guān)鍵概念:
Apache Storm從一端讀取??原始數(shù)據(jù)流,并將其通過(guò)一系列小型處理單元,然后在另一端輸出處理后的信息。 讓我們?cè)敿?xì)了解Apache Storm的主要組件–
元組 –它是Storm的主要數(shù)據(jù)結(jié)構(gòu)。 這是元素的有序列表。 通常,元組支持所有基本數(shù)據(jù)類型。
流 –這是一個(gè)無(wú)界且無(wú)序的元組序列。
嘴 -流的來(lái)源,簡(jiǎn)單來(lái)說(shuō),壺嘴從拓?fù)渲械脑醋x取數(shù)據(jù)。 噴嘴可以是可靠的或不可靠的。 噴口可以與隊(duì)列,Web日志,事件數(shù)據(jù)等對(duì)話。
螺栓 –螺栓是邏輯處理單元,它負(fù)責(zé)處理數(shù)據(jù)和創(chuàng)建新的流。 螺栓可以執(zhí)行過(guò)濾,聚合,聯(lián)接,與文件/數(shù)據(jù)庫(kù)交互等操作。 螺栓從噴嘴接收數(shù)據(jù),然后發(fā)射到一個(gè)或多個(gè)螺栓。
拓?fù)?–拓?fù)涫恰皣娍诤吐菟ā钡挠邢驁D,該圖的每個(gè)節(jié)點(diǎn)都包含數(shù)據(jù)處理邏輯(螺栓),而連接邊定義數(shù)據(jù)(流)的流。
與Hadoop不同,Storm可使拓?fù)溆谰眠\(yùn)行直到您將其殺死。 一個(gè)簡(jiǎn)單的拓?fù)浣Y(jié)構(gòu)從噴口開(kāi)始,從源頭發(fā)射流到螺栓以處理數(shù)據(jù)。 Apache Storm的主要工作是運(yùn)行拓?fù)?#xff0c;并將在給定的時(shí)間運(yùn)行任意數(shù)量的拓?fù)洹?
開(kāi)箱即用的Ignite提供了Storm Bolt(StormStreamer)的實(shí)現(xiàn),以將計(jì)算的數(shù)據(jù)流式傳輸?shù)絀gnite緩存中。 另一方面,您可以記下自定義的Strom Bolt,以將流數(shù)據(jù)提取到Ignite中。 要開(kāi)發(fā)自定義的Storm Bolt,只需實(shí)現(xiàn)* BaseBasicBolt *或* IRichBolt * Storm接口。 但是,如果決定使用StormStreamer,則必須配置一些屬性才能正確運(yùn)行Ignite Bolt。 所有必填屬性如下所示:
| 1個(gè) | 快取名稱 | 將在其中存儲(chǔ)數(shù)據(jù)的Ignite緩存的緩存名稱。 |
| 2 | IgniteTupleField | 命名“點(diǎn)燃元組”字段,通過(guò)它在拓?fù)渲蝎@取元組數(shù)據(jù)。 默認(rèn)情況下,該值為ignite。 |
| 3 | IgniteConfigFile | 此屬性將設(shè)置Ignite彈簧配置 文件。 允許您向和發(fā)送消息和使用消息 從點(diǎn)燃主題。 |
| 4 | 允許覆蓋 | 它將啟用覆蓋緩存中的現(xiàn)有值,默認(rèn)值為false。 |
| 5 | 自動(dòng)刷新頻率 | 自動(dòng)刷新頻率(以毫秒為單位)。 從本質(zhì)上講,這是拖纜將在 嘗試將到目前為止添加的所有數(shù)據(jù)提交到遠(yuǎn)程 節(jié)點(diǎn)。 默認(rèn)值為10秒。 |
掌握了基礎(chǔ)知識(shí)之后,我們來(lái)構(gòu)建一些有用的工具來(lái)檢查Ignite StormStreamer的工作方式。 該應(yīng)用程序的基本思想是設(shè)計(jì)噴嘴和螺栓的一種拓?fù)?#xff0c;該拓?fù)淇梢蕴幚韥?lái)自交通日志文件的大量數(shù)據(jù),并在特定值超過(guò)預(yù)定義閾值時(shí)觸發(fā)警報(bào)。 使用拓?fù)?#xff0c;可以逐行讀取日志文件,并且該拓?fù)渲荚诒O(jiān)視傳入的數(shù)據(jù)。 在我們的例子中,日志文件將包含數(shù)據(jù),例如車輛注冊(cè)號(hào),速度和來(lái)自高速公路交通攝像頭的高速公路名稱。 如果車輛超過(guò)速度限制(例如120km / h),Storm拓?fù)鋾?huì)將數(shù)據(jù)發(fā)送到Ignite緩存。
接下來(lái)的清單將顯示我們將在示例中使用的CSV文件類型,其中包含車輛數(shù)據(jù)信息,例如車輛注冊(cè)號(hào),車輛行駛的速度和高速公路的位置。
AB 123, 160, North city BC 123, 170, South city CD 234, 40, South city DE 123, 40, East city EF 123, 190, South city GH 123, 150, West city XY 123, 110, North city GF 123, 100, South city PO 234, 140, South city XX 123, 110, East city YY 123, 120, South city ZQ 123, 100, West city 以上示例的思想取自Dobbs博士的期刊。 由于本書不是為了研究Apache Storm,因此我將使示例盡可能簡(jiǎn)單。 另外,我還添加了著名的Storm單詞計(jì)數(shù)示例,該示例通過(guò)StormStreamer模塊將單詞計(jì)數(shù)值提取到Ignite緩存中。 如果您對(duì)代碼感到好奇,可以通過(guò)以下網(wǎng)址獲得
Chapter-cep / storm 。 上面的CSV文件將成為Storm拓?fù)涞膩?lái)源。
如上圖所示, FileSourceSpout接受輸入的CSV日志文件,逐行讀取數(shù)據(jù),并將數(shù)據(jù)發(fā)送到SpeedLimitBolt以進(jìn)行進(jìn)一步的閾值處理。 處理完成后,如果發(fā)現(xiàn)有任何超過(guò)速度限制的汽車,則將數(shù)據(jù)發(fā)送到Ignite StormStreamer螺栓,然后將其提取到緩存中。 讓我們深入了解Storm拓?fù)洹?
第1步:
因?yàn)檫@是一個(gè)Storm拓?fù)?#xff0c;所以必須在maven項(xiàng)目中添加Storm和Ignite StormStreamer依賴項(xiàng)。
<dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-storm</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-core</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-spring</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>0.10.0</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>log4j-over-slf4j</artifactId></exclusion><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions> </dependency>在撰寫本書時(shí),僅支持Apache Storm版本0.10.0。 請(qǐng)注意,按照Ignite文檔中的描述,您不需要任何Kafka模塊即可運(yùn)行或執(zhí)行此示例。
第2步:
創(chuàng)建的Ignite配置文件(見(jiàn)例如,ignite.xml文件/chapter-cep/storm/src/resources/example-ignite.xml ),并確保它是可以從類路徑。 Ignite配置的內(nèi)容與本章的上一部分相同。
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:util="http://www.springframework.org/schema/util"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/utilhttp://www.springframework.org/schema/util/spring-util.xsd"><bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"><!-- Enable client mode. --><property name="clientMode" value="true"/><!-- Cache accessed from IgniteSink. --><property name="cacheConfiguration"><list><!-- Partitioned cache example configuration with configurations adjusted to server nodes'. --><bean class="org.apache.ignite.configuration.CacheConfiguration"><property name="atomicityMode" value="ATOMIC"/><property name="name" value="testCache"/></bean></list></property><!-- Enable cache events. --><property name="includeEventTypes"><list><!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). --><util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/></list></property><!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --><property name="discoverySpi"><bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"><property name="ipFinder"><bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"><property name="addresses"><list><value>127.0.0.1:47500</value></list></property></bean></property></bean></property></bean> </beans>第三步:
創(chuàng)建一個(gè)ignite-storm.properties文件,以添加緩存名稱,元組名稱和Ignite配置的名稱,如下所示。
cache.name=testCache tuple.name=ignite ignite.spring.xml=example-ignite.xml第4步:
接下來(lái),創(chuàng)建FileSourceSpout Java類,如下所示,
public class FileSourceSpout extends BaseRichSpout {private static final Logger LOGGER = LogManager.getLogger(FileSourceSpout.class);private SpoutOutputCollector outputCollector;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.outputCollector = spoutOutputCollector;} @Overridepublic void nextTuple() {try {Path filePath = Paths.get(this.getClass().getClassLoader().getResource("source.csv").toURI());try(Stream<String> lines = Files.lines(filePath)){lines.forEach(line ->{outputCollector.emit(new Values(line));});} catch(IOException e){LOGGER.error(e.getMessage());}} catch (URISyntaxException e) {LOGGER.error(e.getMessage());}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("trafficLog"));} }FileSourceSpout代碼具有三種重要方法
- open():此方法將在spout的開(kāi)頭被調(diào)用,并為您提供上下文信息。
- nextTuple():此方法將允許您一次將一個(gè)元組傳遞給Storm拓?fù)湟赃M(jìn)行處理,在這種方法中,我逐行讀取CSV文件,并將該行作為元組發(fā)出給螺栓。
- defineOutputFields():此方法聲明輸出元組的名稱,在本例中,名稱應(yīng)為trafficLog。
步驟5:
現(xiàn)在創(chuàng)建實(shí)現(xiàn)BaseBasicBolt接口的SpeedLimitBolt.java類。
public class SpeedLimitBolt extends BaseBasicBolt {private static final String IGNITE_FIELD = "ignite";private static final int SPEED_THRESHOLD = 120;private static final Logger LOGGER = LogManager.getLogger(SpeedLimitBolt.class);@Overridepublic void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {String line = (String)tuple.getValue(0);if(!line.isEmpty()){String[] elements = line.split(",");// we are interested in speed and the car registration numberint speed = Integer.valueOf((elements[1]).trim());String car = elements[0];if(speed > SPEED_THRESHOLD){TreeMap<String, Integer> carValue = new TreeMap<String, Integer>();carValue.put(car, speed);basicOutputCollector.emit(new Values(carValue));LOGGER.info("Speed violation found:"+ car + " speed:" + speed);}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields(IGNITE_FIELD));} }讓我們?cè)俅沃鹦羞M(jìn)行。
- execute():這是實(shí)現(xiàn)螺栓的業(yè)務(wù)邏輯的方法,在這種情況下,我用逗號(hào)分隔行并檢查汽車的速度限制。 如果給定汽車的速度限制高于閾值,則我們將從該元組創(chuàng)建新的樹(shù)圖數(shù)據(jù)類型,并將該元組發(fā)送到下一個(gè)螺栓,在本例中,下一個(gè)螺栓將是StormStreamer。
- defineOutputFields():此方法類似于FileSourceSpout中的clarifyOutputFields()方法,它聲明將返回Ignite元組以進(jìn)行進(jìn)一步處理。
請(qǐng)注意,元組名稱IGNITE在這里很重要, StormStreamer將僅處理名稱為Ignite的元組。
步驟6:
現(xiàn)在是時(shí)候創(chuàng)建我們的拓?fù)鋪?lái)運(yùn)行我們的示例了。 拓?fù)鋵娍诤吐菟ń壴谝粡垐D中,該圖定義了數(shù)據(jù)如何在組件之間流動(dòng)。 它還提供了Storm在創(chuàng)建集群中組件實(shí)例時(shí)使用的并行提示。 要實(shí)現(xiàn)拓?fù)?#xff0c;請(qǐng)?jiān)趕rc \ main \ java \ com \ blu \ imdg \ storm \ topology目錄中創(chuàng)建一個(gè)名為SpeedViolationTopology.java的新文件。 使用以下內(nèi)容作為文件的內(nèi)容:
public class SpeedViolationTopology {private static final int STORM_EXECUTORS = 2;public static void main(String[] args) throws Exception {if (getProperties() == null || getProperties().isEmpty()) {System.out.println("Property file <ignite-storm.property> is not found or empty");return;}// Ignite Stream Iboltfinal StormStreamer<String, String> stormStreamer = new StormStreamer<>();stormStreamer.setAutoFlushFrequency(10L);stormStreamer.setAllowOverwrite(true);stormStreamer.setCacheName(getProperties().getProperty("cache.name"));stormStreamer.setIgniteTupleField(getProperties().getProperty("tuple.name"));stormStreamer.setIgniteConfigFile(getProperties().getProperty("ignite.spring.xml"));TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new FileSourceSpout(), 1);builder.setBolt("limit", new SpeedLimitBolt(), 1).fieldsGrouping("spout", new Fields("trafficLog"));// set ignite boltbuilder.setBolt("ignite-bolt", stormStreamer, STORM_EXECUTORS).shuffleGrouping("limit");Config conf = new Config();conf.setDebug(false);conf.setMaxTaskParallelism(1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("speed-violation", conf, builder.createTopology());Thread.sleep(10000);cluster.shutdown();}private static Properties getProperties() {Properties properties = new Properties();InputStream ins = SpeedViolationTopology.class.getClassLoader().getResourceAsStream("ignite-storm.properties");try {properties.load(ins);} catch (IOException e) {e.printStackTrace();properties = null;}return properties;} }讓我們?cè)俅沃鹦羞M(jìn)行。 首先,我們閱讀ignite-strom.properties文件以獲取所有必要的參數(shù),然后再配置StormStreamer螺栓。 風(fēng)暴拓?fù)浠旧鲜且粋€(gè)Thrift結(jié)構(gòu)。 TopologyBuilder類提供了一種簡(jiǎn)單而優(yōu)雅的方法來(lái)構(gòu)建復(fù)雜的Storm拓?fù)洹?TopologyBuilder類具有setSpout和setBolt的方法。 接下來(lái),我們使用“拓?fù)洹睒?gòu)建器構(gòu)建Storm拓?fù)?#xff0c;并添加了帶有名稱spout和1執(zhí)行程序的并行提示的spout。
我們還將SpeedLimitBolt定義為具有1個(gè)執(zhí)行程序的并行提示的拓?fù)洹?接下來(lái),我們使用shufflegrouping設(shè)置StormStreamer螺栓, shufflegrouping訂閱該螺栓,并在StormStreamer螺栓的各個(gè)實(shí)例之間平均分配元組(限制)。
出于開(kāi)發(fā)目的,我們使用LocalCluster實(shí)例創(chuàng)建本地集群,并使用SubmitTopology方法提交拓?fù)洹?將拓?fù)涮峤坏郊汉?#xff0c;我們將等待10秒鐘,等待集群計(jì)算提交的拓?fù)?#xff0c;然后使用LocalCluster的 shutdown方法關(guān)閉集群。
步驟7:
接下來(lái),首先運(yùn)行Apache Ignite或集群的本地節(jié)點(diǎn)。 構(gòu)建maven項(xiàng)目后,使用以下命令在本地運(yùn)行拓?fù)洹?
mvn compile exec:java -Dstorm.topology=com.blu.imdg.storm.topology.SpeedViolationTopology該應(yīng)用程序?qū)a(chǎn)生很多系統(tǒng)日志,如下所示。
現(xiàn)在,如果我們通過(guò)ignitevisior驗(yàn)證了Ignite緩存,我們應(yīng)該將以下輸出輸出到控制臺(tái)中。
輸出顯示結(jié)果,這是我們期望的。 從我們的source.csv日志文件中,只有五輛車超過(guò)了120 km / h的速度限制。
這幾乎是對(duì)Ignite Storm Streamer的實(shí)用概述的總結(jié)。 如果您對(duì)Ignite Camel或Ignite Flume Streamer感到好奇,請(qǐng)參閱“使用Apache Ignite進(jìn)行高性能內(nèi)存計(jì)算”一書 。 您也可以與作者聯(lián)系以獲取該書的免費(fèi)副本,該書可以免費(fèi)分發(fā)給學(xué)生和教師。
翻譯自: https://www.javacodegeeks.com/2016/10/complex-event-processing-cep-apache-storm-apache-ignite.html
apache ignite
總結(jié)
以上是生活随笔為你收集整理的apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: kata_小规模流处理kata。 第2部
- 下一篇: kata_小规模流处理kata。 第1部