【原】StreamInsight 浅入浅出(四)—— 例子
對于StreamInsight這種不是很線性的架構,最好還是直接拿出來一個例子,簡單但完整的把流程走過一遍,更能看清所謂“流”、“事件”、“適配器”之類到底是什么東西,有什么關系。
官方例子下載地址:http://go.microsoft.com/fwlink/?LinkId=180356,這里就理一遍其中最簡單的例子:TrafficJoinQuery
場景描述
這個例子的場景可以描述為:有九個測速器,編號為1001~1009,分別放置在3個地點。每個測速器每20s會記錄下這20s內通過的車輛數以及它們的平均速度。現在要統計出每個測速器記錄的一分鐘內車輛數的平均數:
比如1001號測速器,10:00:00~10:00:20記錄了20輛車,10:00:20~10:00:40記錄了15輛車,10:00:40~10:01:00記錄了25輛車,10:01:00~10:01:20記錄了5輛車,那么1001號測速器在10:00:00~10:01:00這一分鐘內車輛數的平均數就是(20+15+25)/3=20,而在10:00:20~10:01:20這一分鐘內車輛數的平均數就是(15+25+5)/3=15。
這里最重要的就是搞清每一次計數的時候,哪些數據是包括其中的。
提供的數據是兩個csv文件,一個是包含了時間、測速器編號、車數、車速的日志文件,另一個是測速器編號與所在地點(1,2,3)對應的表。最終的結果在對第一張表的聚合計算的基礎上,再把這兩張表連接起來。
準備工作
當然要先安裝StreamInsight http://msdn.microsoft.com/zh-cn/library/ee378749.aspx 。然后注意把下載下來的例子里的
using (Server server = Server.Create("Default"))改成
using (Server server = Server.Create("XXXXX"))其中XXXXX就是你的StreamInsight的實例名。 如果想使用 Connect的方法的話,需要先開啟一個 Host,提供一個 EndPoint :
Server serverInsight = Server.Create("StreamInsight"); ServiceHost host = new ServiceHost(serverInsight.CreateManagementService()); WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message); binding.HostNameComparisonMode = HostNameComparisonMode.Exact; host.AddServiceEndpoint(typeof(IManagementService),binding,"http://localhost:80/StreamInsight/StreamInsight"); host.Open();然后在程序中通過
using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress(@http://localhost/StreamInsight/StreamInsight)))連接到EndPoint。
適配器
例子的Solution下包括三個項目,其中“SimpleTextFileReader”和“SimpleTextFileWriter”是兩個適配器項目,分別對應輸出、輸入適配器。從例子中可以看出,推薦的做法是適配器項目與主程序項目獨立,這樣能很容易的切換適配器。
查看這兩個項目,可以看出輸入適配器與輸出適配器的結構是類似的,都包含一個工廠 Factory 類,一個提供配置信息的 Config 類,三個分別對應三種事件模型的適配器。
Factory
對于輸出適配器,Factory類要完成的就是用Create方法,根據輸入的事件模型(EventShape)來返回對應的適配器。而輸入適配器的Factory類由于應用了 IDeclareAdvanceTimeProperties 接口,還要額外實現 DeclareAdvanceTimeProperties 方法來進行一些配置,主要是CTI事件的生成頻率、延遲時長以及超時事件的處理策略的配置。具體可參見代碼中的注釋和 AdvanceTimeGenerationSettings 以及 AdapterAdvanceTimeSettings 這兩個類的構造函數在 MSDN 中的解釋。
Config
雖然一般 Config 類都帶有"Config"的后綴,但事實上 Config 類并沒有統一的基類或者接口。它的作用就是由外部傳遞一些配置信息給 Factory 并進一步傳遞到適配器中。
一般來說 Config 類中不包含公開的方法,而是由一些基本類型的屬性構成。
在這個例子中,TextFileReaderConfig 類中配置了輸入文件的名稱(InputFileName),列的分隔符(Delimiter),文件的文化屬性(CultureName),各列的順序(InputFieldOrders),它們的用處可以在適配器中看到。而 CtiFrequency 則指明了 CTI 事件的頻率,作用于 TextFileReaderFactory 。
Adapter
不同的事件模型對應的適配器,其代碼往往是類似的。比照 SimpleTextFileReader 工程下的三個適配器類,我們會發現除了 CreateEventFromLine 方法內部有不同,其他都是近似甚至一樣的。
這里關鍵的方法是 ProduceEvents,Start 方法和 Resume 方法都調用了這個方法:
/// <summary> /// Main driver to read events from the CSV file and enqueue them. /// </summary> private void ProduceEvents() {IntervalEvent currentEvent = default(IntervalEvent);try{// Keep reading lines from the file.while (true){if (AdapterState.Stopping == AdapterState){Stopped();return;}// Did we enqueue the previous line successfully?if (this.currentLine == null){this.currentLine = this.streamReader.ReadLine();if (this.currentLine == null){// Stop adapter (and hence the query) at the end of the file.Stopped();return;}}try{// Create and fill event structure with data from text file line.currentEvent = this.CreateEventFromLine(this.currentLine);// In case we just went into the stopping state.if (currentEvent == null){continue;}}catch (Exception e){// The line couldn't be transformed into an event.// Just ignore it, and release the event's memory.ReleaseEvent(ref currentEvent);this.consoleTracer.WriteLine(this.currentLine + " could not be read into a CEP event: " + e.Message);// Make sure we read a new line next time.this.currentLine = null;continue;}if (EnqueueOperationResult.Full == Enqueue(ref currentEvent)){// If the enqueue was not successful, we keep the event.// It is good practice to release the event right away and// not hold on to it.ReleaseEvent(ref currentEvent);// We are suspended now. Tell the engine we are ready to be resumed.Ready();// Leave thread to wait for call into Resume().return;}// Enqueue was successful, so we can read a new line again.this.currentLine = null;}}catch (AdapterException e){this.consoleTracer.WriteLine("ProduceEvents - " + e.Message + e.StackTrace);} }在 While 循環中每次從日志文件中讀取一行記錄,然后利用 CreateEventFromLine 方法將該行記錄轉化為相應的事件 currentEvent,最后通過 Enqueue 方法,把新的事件插入隊列中。如果理解了上一篇文章中的適配器的狀態機,就會注意在每次讀取日志前先判斷適配器的狀態是否為 Stopping ,并在日志讀取空行(日志讀完)后停止適配器運行。
當 Enqueue 的結果為 Full 時,說明隊列已滿,這次插入是失敗的,而且當前的狀態是 Suspended(由輸出適配器或者其他的適配器導致)。所以一方面通過 Ready 方法將狀態重置為 Running 好進行下一次的插入。同時為了節省內存,釋放 currentEvent 。
這里要注意幾個 return ,因為在這里說明直接退出了方法,循環中止,日志讀取中止。直到再次調用 ProduceEvents 方法,也就是外部調用 Resume 方法(在整個Query過程中,Start 方法只會在初始時調用一次),才會再次啟動循環,讀取日志。
至于 CreateEventFromLine 方法,就是通過一行日志生成對應的事件。對于非類型化的適配器,事件負載要通過 SetField 方法賦值,這里通過 Config 中的 InputFieldOrders,將 csv 日志的各列分別對應到事件負載的各字段中。
主程序
主項目 TrafficJoinQuery 中的三個文件,在 EventTypes 中的兩個類對應兩種事件負載——測量日志與地理信息。這就體現了非類型化的適配器的優勢——對于兩種事件負載,只需要同一個適配器就可以了,負載字段在運行時根據配置信息動態確定。
查詢模板
Program中,最復雜的是 QueryTemplate 的創建。所謂 QueryTemplate,顧名思義,就是查詢模板,通過預先設定一套計算方法和規則,將輸入流轉化為輸出流。這里有兩段 Linq 代碼:
// Extend duration of each sensor reading, so that they fall in // a one-minute sliding window. Group by sensor ID and calculate the // average vehicular count per group within each window. // Include the grouping key in the aggregation result. var avgCount = from oneMinReading in sensorStream.AlterEventDuration(e => TimeSpan.FromMinutes(1))group oneMinReading by oneMinReading.SensorId into oneGroupfrom eventWindow in oneGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)select new { avgCount = eventWindow.Avg(e => e.VehicularCount), SensorId = oneGroup.Key };// Join sensors and locations. Moreover, filter the count // result by a threshold, which is looked up based on the // sensor location through a user-defined function. var joined = from averageEvent in avgCountjoin locationData in locationStreamon averageEvent.SensorId equals locationData.SensorIdwhere averageEvent.avgCount > UserFunctions.LocationCountThreshold(locationData.LocationId)select new{SensorId = locationData.SensorId,LocationID = locationData.LocationId,VehicularCount = averageEvent.avgCount};在第一段中先利用 AlterEventDuration 方法將每條記錄的有效時間延續至一分鐘——因為我們要統計的是一分鐘的平均值。之后對 SensorId 做聚合分組,最后用 SnapshotWindow 方法截取每組每個時間段的平均值。這里 SnapshotWindow 可以認為是給事件流的橫切面拍了一個快照,獲取的是一個時間點上的數據。
而第二段就是將第一段獲得的事件流與地點數據做連接,而且還利用 UserFunctions 提供的 LocationCountThreshold 方法過濾了一部分數據。最終我們得到的事件負載包含了 SensorId 、LocationID 、VehicularCount 三個字段。
關于聚合、連接、時間窗口以及其他的 Linq 語法,具體會在以后介紹。
查詢綁定
有了查詢模板,也只是打了一個空架子,只有連上輸入、輸出適配器,才能得到一個能實際運作的系統。在 BindQuery 方法中就將兩個輸入適配器和一個輸出適配器與查詢模板綁定在了一起。
兩個輸入適配器一個是邊緣事件適配器,一個是時間段事件適配器。前者對應的是地理數據,因為邊緣事件在沒有接收到結束邊緣事件時,它的結束時間是無窮大,也就是在整個查詢過程中是有效的,正適合需要一直有效的地理數據。而時間段事件在生成時就明確了開始時間和結束時間,符合這里車數日志記錄的情況。
輸出適配器是點事件,說明我們要得到的結果是每個時間點意義上的值。
查詢啟動、停止與診斷
// Start the query query.Start();// Wait for the query to be suspended - that is the state // it will be in as soon as the output adapter stops due to // the end of the stream. DiagnosticView dv = server.GetDiagnosticView(query.Name);while ((string)dv[DiagnosticViewProperty.QueryState] == "Running") {// Sleep for 1s and check againThread.Sleep(1000);dv = server.GetDiagnosticView(query.Name); }// Retrieve some diagnostic information from the CEP server // about the query. Console.WriteLine(string.Empty); RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/EventManager")), Console.Out); RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/PlanManager")), Console.Out); RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/Application/TrafficJoinSample/Query/TrafficSensorQuery")), Console.Out);query.Stop();啟動、停止不需細說。由于 query.Start() 后實際是適配器用另外的線程執行相應的方法(ProduceEvents),主線程需要等待適配器線程執行結束。所以這里用 DiagnosticView 獲得當前查詢的狀態。直到不為 Running,才輸出查詢的診斷報告。最后停止查詢。
這里的診斷報告會列出一些查詢數據,比如總事件數、查詢時間等。但從中很難看出查詢的具體流程是怎樣的,即使你進行調試,由于具體的查詢實際是在各個線程中執行的,無法順序跟蹤事件的產生、計算、輸出。所以,StreamInsight 提供了一個圖形化的調試工具,StreamInsight Event Flow Debugger。關于這個工具的使用,會在下一篇文章詳細介紹。
轉載于:https://www.cnblogs.com/smjack/archive/2010/10/29/1864429.html
總結
以上是生活随笔為你收集整理的【原】StreamInsight 浅入浅出(四)—— 例子的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kernel部分数据结构列表三(inod
- 下一篇: 使用delphi 开发多层应用(十三)使