日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Aleri –复杂事件处理

發(fā)布時(shí)間:2023/12/3 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Aleri –复杂事件处理 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Sybase的Aleri流媒體平臺(tái)是CEP市場(chǎng)中最受歡迎的產(chǎn)品之一。 它在Sybase的交易平臺(tái)RAP版本中使用,該版本在資本市場(chǎng)中廣泛用于管理投資組合中的頭寸。 今天,在這個(gè)由多個(gè)部分組成的系列文章的第一個(gè)部分中,我希望提供Aleri平臺(tái)的概述,并在需要時(shí)提供一些代碼示例。 在第二部分中,我將介紹Aleri Studio,它是基于Eclipse的GUI,可簡(jiǎn)化CEP工作流程建模任務(wù)并通過(guò)儀表板監(jiān)視Aleri服務(wù)器。

在我先前關(guān)于復(fù)雜事件處理的博客文章中 ,我演示了使用Esper,開源CEP軟件和Twitter4J API處理來(lái)自Twitter的推文流的方法。 但是,CEP產(chǎn)品不僅僅只處理一個(gè)數(shù)據(jù)流。 單個(gè)數(shù)據(jù)流可以通過(guò)標(biāo)準(zhǔn)的異步消息傳遞平臺(tái)輕松處理,并且不會(huì)帶來(lái)非常具有挑戰(zhàn)性的可伸縮性或延遲問(wèn)題。 但是,當(dāng)涉及消費(fèi)多個(gè)實(shí)時(shí)數(shù)據(jù)流并進(jìn)行實(shí)時(shí)分析時(shí),并且當(dāng)數(shù)據(jù)流之間的相關(guān)性很重要時(shí),沒有什么比CEP平臺(tái)更勝一籌了。 源饋送流媒體平臺(tái)的速度,數(shù)量和復(fù)雜性可能會(huì)有所不同。 真正的企業(yè)級(jí)CEP應(yīng)該輕松有效地處理各種實(shí)時(shí)高速數(shù)據(jù),例如股票行情自動(dòng)收錄器和速度較慢但數(shù)量眾多的脫機(jī)批量上傳。 除了提供標(biāo)準(zhǔn)接口之外,CEP還應(yīng)該提供一種更簡(jiǎn)單的編程語(yǔ)言來(lái)查詢流數(shù)據(jù)并通過(guò)諸如模式匹配和快照查詢之類的功能來(lái)生成連續(xù)的情報(bào)。

Sybase交易平臺(tái)– RAP版本。 引用網(wǎng)址

為了保持簡(jiǎn)單性和高水平,CEP可以分為三個(gè)基本部分。 第一種是獲取/使用源數(shù)據(jù)的機(jī)制。 接下來(lái)是調(diào)查數(shù)據(jù),識(shí)別事件和模式,然后通過(guò)為目標(biāo)系統(tǒng)提供可操作的項(xiàng)與目標(biāo)系統(tǒng)進(jìn)行交互的過(guò)程。 可執(zhí)行事件采用不同的形式和格式,具體取決于您使用CEP的應(yīng)用程序。 一個(gè)行動(dòng)項(xiàng)目可能是–根據(jù)風(fēng)險(xiǎn)監(jiān)控應(yīng)用程序中計(jì)算的風(fēng)險(xiǎn)出售股票頭寸。 通過(guò)讀取化工廠中的數(shù)千個(gè)傳感器來(lái)指示洗錢應(yīng)用程序中的潛在欺詐事件或監(jiān)視系統(tǒng)中的災(zāi)難性事件。 從字面上看,有成千上萬(wàn)種情況是無(wú)法手動(dòng)和離線檢查數(shù)據(jù)的。 在完成以下部分之后,您可能需要自己嘗試Aleri。 此鏈接http://www.sybase.com/aleriform可以直接將您帶到Aleri下載頁(yè)面。 可從Sybase的官方網(wǎng)站免費(fèi)獲得有效期為90天的評(píng)估副本。 大量的文檔,出色的教程和網(wǎng)站上的一些示例代碼應(yīng)該可以幫助您快速入門。

如果您是任何CEP產(chǎn)品的現(xiàn)有用戶,我建議您將Aleri與該產(chǎn)品進(jìn)行比較,并與社區(qū)共享或在此博客上發(fā)表評(píng)論。 根據(jù)一些過(guò)時(shí)的估計(jì),Tibco CEP是市場(chǎng)上最大的CEP供應(yīng)商。 我不確定StreamBase另一個(gè)領(lǐng)先產(chǎn)品有多少市場(chǎng)份額。 您還可以在Youtube.com上觀看 網(wǎng)絡(luò)研討會(huì) ,該研討會(huì)總體上介紹了CEP的好處,以及具體介紹了Streambase的一些關(guān)鍵功能。 對(duì)于新手來(lái)說(shuō),這是CEP和資本市場(chǎng)用例的絕佳介紹。

通過(guò)使用Studio(gui)或使用Splash(語(yǔ)言)或通過(guò)使用Aleri Modeling語(yǔ)言(ML)創(chuàng)建模型來(lái)構(gòu)建Aleri CEP上的應(yīng)用程序,這是部署之前的最后階段。

以下是Splash的主要功能列表。

  • 數(shù)據(jù)類型 –支持標(biāo)準(zhǔn)數(shù)據(jù)類型和XML。 還為用戶定義的數(shù)據(jù)類型支持'Typedef'。
  • 訪問(wèn)控制 –粒度級(jí)別的訪問(wèn)控制,允許訪問(wèn)一個(gè)或多個(gè)流(包含許多流)
  • SQL –建立模型的另一種方式。 由于其視覺范式,構(gòu)建Aleri工作室模型可能需要更長(zhǎng)的時(shí)間。 精通SQL的人應(yīng)該可以使用Aleri SQL更快地完成它,這與眾所周知的常規(guī)SQL非常相似。
  • 聯(lián)接 –支持的聯(lián)接為內(nèi)部,左側(cè),右側(cè)和完全聯(lián)接
  • 過(guò)濾器表達(dá)式 –包括何處,擁有,分組擁有
  • ML – Aleri SQL以Aleri建模語(yǔ)言(ML)生成數(shù)據(jù)模型–熟練的ML用戶可能僅使用ML(代替Aleri Studio和Aleri SQL)來(lái)構(gòu)建模型。
  • 模式匹配語(yǔ)言 –包括諸如“內(nèi)部”以指示間隔(滑動(dòng)窗口),“從”以指示數(shù)據(jù)流和有趣的“ fby”以指示序列的結(jié)構(gòu)(其后為)
  • 用戶定義的函數(shù) – Splash中提供的用戶定義的函數(shù)接口使您可以用C ++或Java創(chuàng)建函數(shù),并在模型的Splash表達(dá)式中使用它們。

高級(jí)模式匹配–功能在此處通過(guò)示例進(jìn)行說(shuō)明。 –以下三個(gè)代碼段及其說(shuō)明直接取自Sybase有關(guān)Aleri的文檔。
第一個(gè)示例檢查以查看經(jīng)紀(jì)人是否發(fā)送與其他的客戶之一相同的股票的買單,然后為該客戶插入買單,然后出售該股票。 當(dāng)這些動(dòng)作按順序發(fā)生時(shí),它將創(chuàng)建一個(gè)“ buyahead”事件。

within 5 minutes from BuyStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Buy1, BuyStock[Symbol=sym; Shares=n2; Broker=b; Customer=c1] as Buy2, SellStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Sell on Buy1 fby Buy2 fby Sell { if ((b = c0) and (b != c1)) { output [Symbol=sym; Shares=n1; Broker=b]; } }

本示例使用fby關(guān)系檢查三個(gè)事件,一個(gè)接一個(gè)。 因?yàn)樵谌齻€(gè)模式中使用了相同的變量sym,所以三個(gè)事件中的值必須相同。 但是,不同的變量可能具有相同的值(例如n1和n2。),如果Buy1和Sell事件中的Broker和Customer相同,而Buy2事件中的Customer不同,則它將輸出一個(gè)事件。

下一個(gè)示例顯示對(duì)事件的布爾運(yùn)算。 該規(guī)則描述了一種可能的盜竊情況,即在架子上有商品讀取時(shí)(可能通過(guò)RFID),然后沒有對(duì)該商品進(jìn)行結(jié)帳,然后在門附近的掃描儀上讀取了該商品。

within 12 hours from ShelfReading[TagId=tag; ProductName=pname] as onShelf, CounterReading[TagId=tag] as checkout, ExitReading[TagId=tag; AreaId=area] as exit on onShelf fby not(checkout) fby exit output [TagId=t; ProductName=pname; AreaId=area];

下一個(gè)示例顯示了如果用戶嘗試在5分鐘內(nèi)三次未成功登錄帳戶,則如何發(fā)出警報(bào)。

from LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login1, LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login2, LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login3, LoginAttempt[IpAddress=ip; Account=acct; Result=1] as login4 on (login1 fby login2 fby login3) and not(login4) output [Account=acct];

希望闖入計(jì)算機(jī)系統(tǒng)的人們經(jīng)常掃描多個(gè)TCP / IP端口以查找開放的端口,并嘗試?yán)脗陕犨@些端口的程序中的漏洞。 這是一條規(guī)則,檢查是否單個(gè)IP地址嘗試在三個(gè)端口上進(jìn)行連接,以及是否使用“ sendmail”程序進(jìn)行了連接。

within 30 minutes from Connect[Source=ip; Port=22] as c1, Connect[Source=ip; Port=23] as c2, Connect[Source=ip; Port=25] as c3 SendMail[Source=ip] as send on (c1 and c2 and c3) fby send output [Source=ip];

Aleri提供了許多現(xiàn)成的接口,可輕松與源系統(tǒng)和目標(biāo)系統(tǒng)集成。 通過(guò)這些接口/適配器,Aleri平臺(tái)可以與標(biāo)準(zhǔn)關(guān)系數(shù)據(jù)庫(kù),消息傳遞框架(如IBM MQ),套接字和文件系統(tǒng)文件進(jìn)行通信。 Aleri可以通過(guò)標(biāo)準(zhǔn)化接口輕松使用csv,FIX,路透社市場(chǎng)數(shù)據(jù),SOAP,http,SMTP等各種格式的數(shù)據(jù)。

以下是將Aleri與其他系統(tǒng)集成的可用技術(shù)。

  • Java,C ++和點(diǎn)網(wǎng)提供了發(fā)布/訂閱API-一種標(biāo)準(zhǔn)的發(fā)布/訂閱機(jī)制
  • 通過(guò)ODBC和JDBC連接使用帶有SELECT,UPDATE,DELETE和INSERT語(yǔ)句的SQL接口
  • 內(nèi)置用于市場(chǎng)數(shù)據(jù)和FIX的適配器

在本系列的下一部分中,我們將介紹Aleri Studio,它是可以幫助我們輕松構(gòu)建CEP應(yīng)用程序的gui。

在我的上一篇文章中,對(duì)Sybase的復(fù)雜事件處理平臺(tái)Aleri進(jìn)行了高級(jí)概述。

本周,讓我們回顧一下Aleri Studio,Aleri平臺(tái)的用戶界面以及pub / sub api的使用,這是與Aleri平臺(tái)進(jìn)行交互的多種方式之一。 該工作室是平臺(tái)不可或缺的一部分,并隨附免費(fèi)的評(píng)估版。 如果您尚未這樣做,請(qǐng)從此處下載副本。 Aleri產(chǎn)品的安裝過(guò)程非常簡(jiǎn)單,幾分鐘即可啟動(dòng)并運(yùn)行。

aleri工作室是用于構(gòu)建模型的創(chuàng)作平臺(tái),該模型定義了各種數(shù)據(jù)流之間的交互作用和排序。 它還可以合并多個(gè)流以形成一個(gè)或多個(gè)流。 使用這個(gè)基于Eclipse的工作室,您可以通過(guò)向其提供測(cè)試數(shù)據(jù)來(lái)測(cè)試所構(gòu)建的模型,并實(shí)時(shí)監(jiān)控流中的活動(dòng)。 讓我們看一下您可以在Aleri中定義的各種流及其功能。

源流 –只有這種類型的流才能處理傳入的數(shù)據(jù)。 傳入數(shù)據(jù)可以執(zhí)行的操作是插入,更新,刪除和向上插入。 Upsert,顧名思義,如果流中已經(jīng)存在定義行的鍵,則更新數(shù)據(jù)。 否則,它將在流中插入一條記錄。

聚合流 –此流為由特定屬性定義的每個(gè)組創(chuàng)建摘要記錄。 這提供了與ANSI SQL中的“分組依據(jù)”等效的功能。

復(fù)制流 –通過(guò)復(fù)制另一個(gè)流但使用不同的保留規(guī)則來(lái)創(chuàng)建此流。

計(jì)算流 –該流允許您在數(shù)據(jù)的每一行上使用一個(gè)函數(shù)來(lái)為數(shù)據(jù)流的每一行獲取新的計(jì)算元素。

擴(kuò)展流 –該流是通過(guò)其他列表達(dá)式從另一個(gè)流派生的

過(guò)濾流 –您可以為此流定義過(guò)濾條件。 就像擴(kuò)展和計(jì)算流一樣,此流在其他流上應(yīng)用過(guò)濾條件以派生新流。

Flex Stream –通過(guò)自定義編碼方法,在處理流數(shù)據(jù)方面具有顯著的靈活性。 只有此流允許您編寫自己的方法以滿足特殊需求。

加入流 –通過(guò)在某些條件下加入兩個(gè)或多個(gè)流來(lái)創(chuàng)建新流。 內(nèi)連接和外連接均可用于連接流。

模式流 –模式匹配規(guī)則與此流一起應(yīng)用

聯(lián)合流 –顧名思義,這將連接具有相同行數(shù)據(jù)結(jié)構(gòu)的兩個(gè)或多個(gè)流。 與加入流不同,此流包含來(lái)自所有參與流的所有數(shù)據(jù)。

通過(guò)使用其中一些流和Aeri的pub api,我將演示將Twitter實(shí)時(shí)提要隔離到兩個(gè)不同的流中。 Twitter實(shí)時(shí)提要由Twitter4j庫(kù)中的偵聽器使用。 如果您只想先嘗試使用Twitter4j庫(kù),請(qǐng)按照我之前的文章“ 在Twitter上跟蹤用戶情緒 ”。 通過(guò)使用Aleri的發(fā)布API,將twitter4j偵聽器接收的數(shù)據(jù)饋送到我們模型中的源流。 在本練習(xí)中,我們將嘗試根據(jù)推文的內(nèi)容將其分離出來(lái)。 基于我之前的帖子中的示例,我們將根據(jù)內(nèi)容將傳入流分為兩個(gè)流。 一個(gè)流將獲取任何包含'lol'的推文,而另一個(gè)流將在文本中顯示帶有笑臉“ :)”的推文。 首先,讓我們列出使它成為一個(gè)可行示例所需執(zhí)行的任務(wù)。

  • 創(chuàng)建具有三個(gè)流的模型
  • 驗(yàn)證模型沒有錯(cuò)誤
  • 創(chuàng)建一個(gè)靜態(tài)數(shù)據(jù)文件
  • 啟動(dòng)Aleri服務(wù)器,并將靜態(tài)數(shù)據(jù)文件手動(dòng)輸入到流中,以確認(rèn)模型正確工作。
  • 編寫Java代碼以使用Twitter提要。 使用發(fā)布API將推文發(fā)布到Aleri平臺(tái)。
  • 運(yùn)行演示并觀看實(shí)時(shí)數(shù)據(jù)流經(jīng)各種流的過(guò)程。
  • 該圖像是Aleri Studio的三個(gè)流的快照-左側(cè)的一個(gè)名為“ tweets”是源流,右側(cè)的兩個(gè)名為“ lolFilter”和“ smileyFilter”屬于過(guò)濾器類型。 源流接受傳入的數(shù)據(jù),而過(guò)濾器流接收已過(guò)濾的數(shù)據(jù)。 這是我定義過(guò)濾條件的方式-例如(tweets.text,'%lol%')。 tweets是流的名稱,text是我們感興趣的流中的字段。%lol%表示,選擇內(nèi)容中帶有“ lol”字符串的任何tweet。 每個(gè)流只有2個(gè)字段-id和text。 ID和文本映射到Twitter發(fā)送的ID和文本消息。 定義模型后,您可以通過(guò)單擊頂部功能區(qū)中的復(fù)選標(biāo)記來(lái)檢查是否有任何錯(cuò)誤。 如果出現(xiàn)任何錯(cuò)誤,則會(huì)在圖像右下方的面板中顯示。 一旦您的模型沒有錯(cuò)誤,就可以進(jìn)行測(cè)試了。

    下圖顯示了Studio的測(cè)試界面。 首先嘗試使用靜態(tài)數(shù)據(jù)文件運(yùn)行模型。 頂部的紅色小方塊表示Aleri服務(wù)器當(dāng)前正在運(yùn)行。 右下角的控制臺(tái)窗口顯示服務(wù)器消息,例如成功啟動(dòng)和停止等。在左窗格中的“運(yùn)行測(cè)試”選項(xiàng)卡上,您可以在其中選擇靜態(tài)數(shù)據(jù)文件來(lái)饋送源流。 右側(cè)窗格顯示所有當(dāng)前正在運(yùn)行的流以及由流處理的實(shí)時(shí)數(shù)據(jù)。

    下圖顯示了用于測(cè)試模型的數(shù)據(jù)文件的格式

    tweets ALERI_OPS="i" id="1" text="324test 1234" ; tweets ALERI_OPS="i" id="2" text="test 12345"; tweets ALERI_OPS="i" id="3" text="test 1234666" ; tweets ALERI_OPS="i" id="4" text="test 1234888" ; tweets ALERI_OPS="i" id="5" text="test 1234999" ;

    此練習(xí)的源代碼在底部。
    請(qǐng)記住,您需要在構(gòu)建路徑中具有twitter4j庫(kù),并在運(yùn)行程序之前運(yùn)行Aleri服務(wù)器。 因?yàn)槲覜]有在執(zhí)行線程中添加任何計(jì)時(shí)器,所以停止執(zhí)行的唯一方法是中止執(zhí)行。 為了簡(jiǎn)潔起見,并且為了使代碼行簡(jiǎn)短,我刪除了所有異常處理和日志記錄。 該代碼僅利用Aleri的pub / sub api的發(fā)布部分。 我將在我的下一篇博文中演示api的sub side的用法。

    package com.sybase.aleri;import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException;import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterException; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.conf.Configuration; import twitter4j.conf.ConfigurationBuilder;import com.aleri.pubsub.SpGatewayConstants; import com.aleri.pubsub.SpObserver; import com.aleri.pubsub.SpPlatform; import com.aleri.pubsub.SpPlatformParms; import com.aleri.pubsub.SpPlatformStatus; import com.aleri.pubsub.SpPublication; import com.aleri.pubsub.SpStream; import com.aleri.pubsub.SpStreamDataRecord; import com.aleri.pubsub.SpStreamDefinition; import com.aleri.pubsub.SpSubscription; import com.aleri.pubsub.SpSubscriptionCommon; import com.aleri.pubsub.impl.SpFactory; import com.aleri.pubsub.impl.SpUtils; import com.aleri.pubsub.test.ClientSpObserver;import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Vector; import java.util.TimeZone;public class TwitterTest_2 {//make sure that Aleri server is running prior to running this programstatic {//creates the publishing platformcreatePlatform();}// Important objects from the publish APIstatic SpStream stream;static SpPlatformStatus platformStatus;static SpPublication pub;public static void main(String[] args) throws TwitterException, IOException {TwitterTest_2 tt2 = new TwitterTest_2();ConfigurationBuilder cb = new ConfigurationBuilder();cb.setDebugEnabled(true);//use your twitter id and passcodecb.setUser("Your user name");cb.setPassword("Your Password");// creating the twitter4j listenerConfiguration cfg = cb.build();TwitterStream twitterStream = new TwitterStreamFactory(cfg).getInstance();StatusListener_1 listener;listener = new StatusListener_1();twitterStream.addListener(listener);//runs the sample that comes with twitter4jtwitterStream.sample();}private static int createPlatform() {int rc = 0;//Aleri platform configuration - better alternative is to your properties fileString host = "localhost";int port = 22000;//aleri configured to run with empty userid and pwd stringsString user = "";String password = "";//name of the source stream - the one that gets the data from the twitter4jString streamName = "tweets";String name = "TwitterTest_2";SpPlatformParms parms = SpFactory.createPlatformParms(host, port, user,password, false, false);platformStatus = SpFactory.createPlatformStatus();SpPlatform sp = SpFactory.createPlatform(parms, platformStatus);stream = sp.getStream(streamName);pub = sp.createPublication(name, platformStatus);// Then get the stream definition containing the schema information.SpStreamDefinition sdef = stream.getDefinition(); /*int numFieldsInRecord = sdef.getNumColumns();Vector colTypes = sdef.getColumnTypes();Vector colNames = sdef.getColumnNames();*/return 0;}static SpStream getStream() {return stream;}static SpPlatformStatus getPlatformStatus() {return platformStatus;}static SpPublication getPublication() {return pub;}static int publish(SpStream stream, SpPlatformStatus platformStatus,SpPublication pub, Collection fieldData) {int rc = 0;int i = pub.start();SpStreamDataRecord sdr = SpFactory.createStreamDataRecord(stream,fieldData, SpGatewayConstants.SO_UPSERT,SpGatewayConstants.SF_NULLFLAG, platformStatus);Collection dataSet = new Vector();dataSet.add(sdr);System.out.println("\nAttempting to publish the data set to the Platform for stream <"+ stream.getName() + ">.");rc = pub.publishTransaction(dataSet, SpGatewayConstants.SO_UPSERT,SpGatewayConstants.SF_NULLFLAG, 1);// commit blocks the thread until data is consumed by the platformSystem.out.println("before commit() call to the Platform.");rc = pub.commit();return 0;}}

    參考: Aleri –復(fù)雜事件處理–第一部分 , 理解Aleri –復(fù)雜事件處理–第二部分來(lái)自我們JCG合作伙伴 Mahesh Gadgil在“ 簡(jiǎn)單而實(shí)用”的博客上。


    翻譯自: https://www.javacodegeeks.com/2012/04/aleri-complex-event-processing.html

    總結(jié)

    以上是生活随笔為你收集整理的Aleri –复杂事件处理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。