在我先前關于復雜事件處理的博客文章中 ,我演示了使用Esper,開源CEP軟件和Twitter4J API處理來自Twitter的推文流的方法。 但是,CEP產品不僅僅只處理一個數據流。 單個數據流可以通過標準的異步消息傳遞平臺輕松處理,并且不會帶來非常具有挑戰性的可伸縮性或延遲問題。 但是,當涉及消費多個實時數據流并進行實時分析時,并且當數據流之間的相關性很重要時,沒有什么比CEP平臺更勝一籌了。 源饋送流媒體平臺的速度,數量和復雜性可能會有所不同。 真正的企業級CEP應該輕松有效地處理各種實時高速數據,例如股票行情自動收錄器和速度較慢但數量眾多的脫機批量上傳。 除了提供標準接口之外,CEP還應該提供一種更簡單的編程語言來查詢流數據并通過諸如模式匹配和快照查詢之類的功能來生成連續的情報。
![]() |
Sybase交易平臺– RAP版本。 引用網址 |
為了保持簡單性和高水平,CEP可以分為三個基本部分。 第一種是獲取/使用源數據的機制。 接下來是調查數據,識別事件和模式,然后通過為目標系統提供可操作的項與目標系統進行交互的過程。 可執行事件采用不同的形式和格式,具體取決于您使用CEP的應用程序。 一個行動項目可能是–根據風險監控應用程序中計算的風險出售股票頭寸。 通過讀取化工廠中的數千個傳感器來指示洗錢應用程序中的潛在欺詐事件或監視系統中的災難性事件。 從字面上看,有成千上萬種情況是無法手動和離線檢查數據的。 在完成以下部分之后,您可能需要自己嘗試Aleri。 此鏈接http://www.sybase.com/aleriform可以直接將您帶到Aleri下載頁面。 可從Sybase的官方網站免費獲得有效期為90天的評估副本。 大量的文檔,出色的教程和網站上的一些示例代碼應該可以幫助您快速入門。
如果您是任何CEP產品的現有用戶,我建議您將Aleri與該產品進行比較,并與社區共享或在此博客上發表評論。 根據一些過時的估計,Tibco CEP是市場上最大的CEP供應商。 我不確定StreamBase另一個領先產品有多少市場份額。 您還可以在Youtube.com上觀看 網絡研討會 ,該研討會總體上介紹了CEP的好處,以及具體介紹了Streambase的一些關鍵功能。 對于新手來說,這是CEP和資本市場用例的絕佳介紹。
通過使用Studio(gui)或使用Splash(語言)或通過使用Aleri Modeling語言(ML)創建模型來構建Aleri CEP上的應用程序,這是部署之前的最后階段。
以下是Splash的主要功能列表。
- 數據類型 –支持標準數據類型和XML。 還為用戶定義的數據類型支持'Typedef'。
- 訪問控制 –粒度級別的訪問控制,允許訪問一個或多個流(包含許多流)
- SQL –建立模型的另一種方式。 由于其視覺范式,構建Aleri工作室模型可能需要更長的時間。 精通SQL的人應該可以使用Aleri SQL更快地完成它,這與眾所周知的常規SQL非常相似。
- 聯接 –支持的聯接為內部,左側,右側和完全聯接
- 過濾器表達式 –包括何處,擁有,分組擁有
- ML – Aleri SQL以Aleri建模語言(ML)生成數據模型–熟練的ML用戶可能僅使用ML(代替Aleri Studio和Aleri SQL)來構建模型。
- 模式匹配語言 –包括諸如“內部”以指示間隔(滑動窗口),“從”以指示數據流和有趣的“ fby”以指示序列的結構(其后為)
- 用戶定義的函數 – Splash中提供的用戶定義的函數接口使您可以用C ++或Java創建函數,并在模型的Splash表達式中使用它們。
高級模式匹配–功能在此處通過示例進行說明。 –以下三個代碼段及其說明直接取自Sybase有關Aleri的文檔。
第一個示例檢查以查看經紀人是否發送與其他的客戶之一相同的股票的買單,然后為該客戶插入買單,然后出售該股票。 當這些動作按順序發生時,它將創建一個“ buyahead”事件。
within 5 minutes
from
BuyStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Buy1,
BuyStock[Symbol=sym; Shares=n2; Broker=b; Customer=c1] as Buy2,
SellStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Sell
on Buy1 fby Buy2 fby Sell
{
if ((b = c0) and (b != c1)) {
output [Symbol=sym; Shares=n1; Broker=b];
}
}
本示例使用fby關系檢查三個事件,一個接一個。 因為在三個模式中使用了相同的變量sym,所以三個事件中的值必須相同。 但是,不同的變量可能具有相同的值(例如n1和n2。),如果Buy1和Sell事件中的Broker和Customer相同,而Buy2事件中的Customer不同,則它將輸出一個事件。
下一個示例顯示對事件的布爾運算。 該規則描述了一種可能的盜竊情況,即在架子上有商品讀取時(可能通過RFID),然后沒有對該商品進行結帳,然后在門附近的掃描儀上讀取了該商品。
within 12 hours
from
ShelfReading[TagId=tag; ProductName=pname] as onShelf,
CounterReading[TagId=tag] as checkout,
ExitReading[TagId=tag; AreaId=area] as exit
on onShelf fby not(checkout) fby exit
output [TagId=t; ProductName=pname; AreaId=area];
下一個示例顯示了如果用戶嘗試在5分鐘內三次未成功登錄帳戶,則如何發出警報。
from
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login1,
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login2,
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login3,
LoginAttempt[IpAddress=ip; Account=acct; Result=1] as login4
on (login1 fby login2 fby login3) and not(login4)
output [Account=acct];
希望闖入計算機系統的人們經常掃描多個TCP / IP端口以查找開放的端口,并嘗試利用偵聽這些端口的程序中的漏洞。 這是一條規則,檢查是否單個IP地址嘗試在三個端口上進行連接,以及是否使用“ sendmail”程序進行了連接。
within 30 minutes
from
Connect[Source=ip; Port=22] as c1,
Connect[Source=ip; Port=23] as c2,
Connect[Source=ip; Port=25] as c3
SendMail[Source=ip] as send
on (c1 and c2 and c3) fby send
output [Source=ip];
Aleri提供了許多現成的接口,可輕松與源系統和目標系統集成。 通過這些接口/適配器,Aleri平臺可以與標準關系數據庫,消息傳遞框架(如IBM MQ),套接字和文件系統文件進行通信。 Aleri可以通過標準化接口輕松使用csv,FIX,路透社市場數據,SOAP,http,SMTP等各種格式的數據。
以下是將Aleri與其他系統集成的可用技術。
- Java,C ++和點網提供了發布/訂閱API-一種標準的發布/訂閱機制
- 通過ODBC和JDBC連接使用帶有SELECT,UPDATE,DELETE和INSERT語句的SQL接口 。
- 內置用于市場數據和FIX的適配器
在本系列的下一部分中,我們將介紹Aleri Studio,它是可以幫助我們輕松構建CEP應用程序的gui。
在我的上一篇文章中,對Sybase的復雜事件處理平臺Aleri進行了高級概述。
本周,讓我們回顧一下Aleri Studio,Aleri平臺的用戶界面以及pub / sub api的使用,這是與Aleri平臺進行交互的多種方式之一。 該工作室是平臺不可或缺的一部分,并隨附免費的評估版。 如果您尚未這樣做,請從此處下載副本。 Aleri產品的安裝過程非常簡單,幾分鐘即可啟動并運行。
aleri工作室是用于構建模型的創作平臺,該模型定義了各種數據流之間的交互作用和排序。 它還可以合并多個流以形成一個或多個流。 使用這個基于Eclipse的工作室,您可以通過向其提供測試數據來測試所構建的模型,并實時監控流中的活動。 讓我們看一下您可以在Aleri中定義的各種流及其功能。
源流 –只有這種類型的流才能處理傳入的數據。 傳入數據可以執行的操作是插入,更新,刪除和向上插入。 Upsert,顧名思義,如果流中已經存在定義行的鍵,則更新數據。 否則,它將在流中插入一條記錄。
聚合流 –此流為由特定屬性定義的每個組創建摘要記錄。 這提供了與ANSI SQL中的“分組依據”等效的功能。
復制流 –通過復制另一個流但使用不同的保留規則來創建此流。
計算流 –該流允許您在數據的每一行上使用一個函數來為數據流的每一行獲取新的計算元素。
擴展流 –該流是通過其他列表達式從另一個流派生的
過濾流 –您可以為此流定義過濾條件。 就像擴展和計算流一樣,此流在其他流上應用過濾條件以派生新流。
Flex Stream –通過自定義編碼方法,在處理流數據方面具有顯著的靈活性。 只有此流允許您編寫自己的方法以滿足特殊需求。
加入流 –通過在某些條件下加入兩個或多個流來創建新流。 內連接和外連接均可用于連接流。
模式流 –模式匹配規則與此流一起應用
聯合流 –顧名思義,這將連接具有相同行數據結構的兩個或多個流。 與加入流不同,此流包含來自所有參與流的所有數據。
通過使用其中一些流和Aeri的pub api,我將演示將Twitter實時提要隔離到兩個不同的流中。 Twitter實時提要由Twitter4j庫中的偵聽器使用。 如果您只想先嘗試使用Twitter4j庫,請按照我之前的文章“ 在Twitter上跟蹤用戶情緒 ”。 通過使用Aleri的發布API,將twitter4j偵聽器接收的數據饋送到我們模型中的源流。 在本練習中,我們將嘗試根據推文的內容將其分離出來。 基于我之前的帖子中的示例,我們將根據內容將傳入流分為兩個流。 一個流將獲取任何包含'lol'的推文,而另一個流將在文本中顯示帶有笑臉“ :)”的推文。 首先,讓我們列出使它成為一個可行示例所需執行的任務。
- 創建具有三個流的模型
- 驗證模型沒有錯誤
- 創建一個靜態數據文件
- 啟動Aleri服務器,并將靜態數據文件手動輸入到流中,以確認模型正確工作。
- 編寫Java代碼以使用Twitter提要。 使用發布API將推文發布到Aleri平臺。
- 運行演示并觀看實時數據流經各種流的過程。

該圖像是Aleri Studio的三個流的快照-左側的一個名為“ tweets”是源流,右側的兩個名為“ lolFilter”和“ smileyFilter”屬于過濾器類型。 源流接受傳入的數據,而過濾器流接收已過濾的數據。 這是我定義過濾條件的方式-例如(tweets.text,'%lol%')。 tweets是流的名稱,text是我們感興趣的流中的字段。%lol%表示,選擇內容中帶有“ lol”字符串的任何tweet。 每個流只有2個字段-id和text。 ID和文本映射到Twitter發送的ID和文本消息。 定義模型后,您可以通過單擊頂部功能區中的復選標記來檢查是否有任何錯誤。 如果出現任何錯誤,則會在圖像右下方的面板中顯示。 一旦您的模型沒有錯誤,就可以進行測試了。
下圖顯示了Studio的測試界面。 首先嘗試使用靜態數據文件運行模型。 頂部的紅色小方塊表示Aleri服務器當前正在運行。 右下角的控制臺窗口顯示服務器消息,例如成功啟動和停止等。在左窗格中的“運行測試”選項卡上,您可以在其中選擇靜態數據文件來饋送源流。 右側窗格顯示所有當前正在運行的流以及由流處理的實時數據。

下圖顯示了用于測試模型的數據文件的格式
tweets ALERI_OPS="i" id="1" text="324test 1234" ;
tweets ALERI_OPS="i" id="2" text="test 12345";
tweets ALERI_OPS="i" id="3" text="test 1234666" ;
tweets ALERI_OPS="i" id="4" text="test 1234888" ;
tweets ALERI_OPS="i" id="5" text="test 1234999" ;

此練習的源代碼在底部。
請記住,您需要在構建路徑中具有twitter4j庫,并在運行程序之前運行Aleri服務器。 因為我沒有在執行線程中添加任何計時器,所以停止執行的唯一方法是中止執行。 為了簡潔起見,并且為了使代碼行簡短,我刪除了所有異常處理和日志記錄。 該代碼僅利用Aleri的pub / sub api的發布部分。 我將在我的下一篇博文中演示api的sub side的用法。
package com.sybase.aleri;import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterException;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;import com.aleri.pubsub.SpGatewayConstants;
import com.aleri.pubsub.SpObserver;
import com.aleri.pubsub.SpPlatform;
import com.aleri.pubsub.SpPlatformParms;
import com.aleri.pubsub.SpPlatformStatus;
import com.aleri.pubsub.SpPublication;
import com.aleri.pubsub.SpStream;
import com.aleri.pubsub.SpStreamDataRecord;
import com.aleri.pubsub.SpStreamDefinition;
import com.aleri.pubsub.SpSubscription;
import com.aleri.pubsub.SpSubscriptionCommon;
import com.aleri.pubsub.impl.SpFactory;
import com.aleri.pubsub.impl.SpUtils;
import com.aleri.pubsub.test.ClientSpObserver;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Vector;
import java.util.TimeZone;public class TwitterTest_2 {//make sure that Aleri server is running prior to running this programstatic {//creates the publishing platformcreatePlatform();}// Important objects from the publish APIstatic SpStream stream;static SpPlatformStatus platformStatus;static SpPublication pub;public static void main(String[] args) throws TwitterException, IOException {TwitterTest_2 tt2 = new TwitterTest_2();ConfigurationBuilder cb = new ConfigurationBuilder();cb.setDebugEnabled(true);//use your twitter id and passcodecb.setUser("Your user name");cb.setPassword("Your Password");// creating the twitter4j listenerConfiguration cfg = cb.build();TwitterStream twitterStream = new TwitterStreamFactory(cfg).getInstance();StatusListener_1 listener;listener = new StatusListener_1();twitterStream.addListener(listener);//runs the sample that comes with twitter4jtwitterStream.sample();}private static int createPlatform() {int rc = 0;//Aleri platform configuration - better alternative is to your properties fileString host = "localhost";int port = 22000;//aleri configured to run with empty userid and pwd stringsString user = "";String password = "";//name of the source stream - the one that gets the data from the twitter4jString streamName = "tweets";String name = "TwitterTest_2";SpPlatformParms parms = SpFactory.createPlatformParms(host, port, user,password, false, false);platformStatus = SpFactory.createPlatformStatus();SpPlatform sp = SpFactory.createPlatform(parms, platformStatus);stream = sp.getStream(streamName);pub = sp.createPublication(name, platformStatus);// Then get the stream definition containing the schema information.SpStreamDefinition sdef = stream.getDefinition();
/*int numFieldsInRecord = sdef.getNumColumns();Vector colTypes = sdef.getColumnTypes();Vector colNames = sdef.getColumnNames();*/return 0;}static SpStream getStream() {return stream;}static SpPlatformStatus getPlatformStatus() {return platformStatus;}static SpPublication getPublication() {return pub;}static int publish(SpStream stream, SpPlatformStatus platformStatus,SpPublication pub, Collection fieldData) {int rc = 0;int i = pub.start();SpStreamDataRecord sdr = SpFactory.createStreamDataRecord(stream,fieldData, SpGatewayConstants.SO_UPSERT,SpGatewayConstants.SF_NULLFLAG, platformStatus);Collection dataSet = new Vector();dataSet.add(sdr);System.out.println("\nAttempting to publish the data set to the Platform for stream <"+ stream.getName() + ">.");rc = pub.publishTransaction(dataSet, SpGatewayConstants.SO_UPSERT,SpGatewayConstants.SF_NULLFLAG, 1);// commit blocks the thread until data is consumed by the platformSystem.out.println("before commit() call to the Platform.");rc = pub.commit();return 0;}}
參考: Aleri –復雜事件處理–第一部分 , 理解Aleri –復雜事件處理–第二部分來自我們JCG合作伙伴 Mahesh Gadgil在“ 簡單而實用”的博客上。
翻譯自: https://www.javacodegeeks.com/2012/04/aleri-complex-event-processing.html