【原】StreamInsight 浅入浅出(四)—— 例子

对于StreamInsight这种不是很线性的架构,最好还是直接拿出来一个例子,简单但完整的把流程走过一遍,更能看清所谓“流”、“事件”、“适配器”之类到底是什么东西,有什么关系。

官方例子下载地址:http://go.microsoft.com/fwlink/?LinkId=180356,这里就理一遍其中最简单的例子:TrafficJoinQuery

场景描述

这个例子的场景可以描述为:有九个测速器,编号为1001~1009,分别放置在3个地点。每个测速器每20s会记录下这20s内通过的车辆数以及它们的平均速度。现在要统计出每个测速器记录的一分钟内车辆数的平均数:

比如1001号测速器,10:00:00~10:00:20记录了20辆车,10:00:20~10:00:40记录了15辆车,10:00:40~10:01:00记录了25辆车,10:01:00~10:01:20记录了5辆车,那么1001号测速器在10:00:00~10:01:00这一分钟内车辆数的平均数就是(20+15+25)/3=20,而在10:00:20~10:01:20这一分钟内车辆数的平均数就是(15+25+5)/3=15。

这里最重要的就是搞清每一次计数的时候,哪些数据是包括其中的

提供的数据是两个csv文件,一个是包含了时间、测速器编号、车数、车速的日志文件,另一个是测速器编号与所在地点(1,2,3)对应的表。最终的结果在对第一张表的聚合计算的基础上,再把这两张表连接起来。

准备工作

当然要先安装StreamInsight http://msdn.microsoft.com/zh-cn/library/ee378749.aspx 。然后注意把下载下来的例子里的

using (Server server = Server.Create("Default"))

改成

using (Server server = Server.Create("XXXXX"))

其中XXXXX就是你的StreamInsight的实例名。 如果想使用 Connect的方法的话,需要先开启一个 Host,提供一个 EndPoint :

Server serverInsight = Server.Create("StreamInsight");
ServiceHost host = new ServiceHost(serverInsight.CreateManagementService());
WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message);
binding.HostNameComparisonMode = HostNameComparisonMode.Exact;
host.AddServiceEndpoint(typeof(IManagementService),binding,"http://localhost:80/StreamInsight/StreamInsight");
host.Open();

然后在程序中通过

using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress(@http://localhost/StreamInsight/StreamInsight))) 

连接到EndPoint。

适配器

例子的Solution下包括三个项目,其中“SimpleTextFileReader”和“SimpleTextFileWriter”是两个适配器项目,分别对应输出、输入适配器。从例子中可以看出,推荐的做法是适配器项目与主程序项目独立,这样能很容易的切换适配器

查看这两个项目,可以看出输入适配器与输出适配器的结构是类似的,都包含一个工厂 Factory 类,一个提供配置信息的 Config 类,三个分别对应三种事件模型的适配器。

Factory

对于输出适配器,Factory类要完成的就是用Create方法,根据输入的事件模型(EventShape)来返回对应的适配器。而输入适配器的Factory类由于应用了 IDeclareAdvanceTimeProperties 接口,还要额外实现 DeclareAdvanceTimeProperties 方法来进行一些配置,主要是CTI事件的生成频率、延迟时长以及超时事件的处理策略的配置。具体可参见代码中的注释和 AdvanceTimeGenerationSettings 以及 AdapterAdvanceTimeSettings 这两个类的构造函数在 MSDN 中的解释。

Config

虽然一般 Config 类都带有"Config"的后缀,但事实上 Config 类并没有统一的基类或者接口。它的作用就是由外部传递一些配置信息给 Factory 并进一步传递到适配器中。

一般来说 Config 类中不包含公开的方法,而是由一些基本类型的属性构成。

在这个例子中,TextFileReaderConfig 类中配置了输入文件的名称(InputFileName),列的分隔符(Delimiter),文件的文化属性(CultureName),各列的顺序(InputFieldOrders),它们的用处可以在适配器中看到。而 CtiFrequency 则指明了 CTI 事件的频率,作用于 TextFileReaderFactory 。

Adapter

不同的事件模型对应的适配器,其代码往往是类似的。比照 SimpleTextFileReader 工程下的三个适配器类,我们会发现除了 CreateEventFromLine 方法内部有不同,其他都是近似甚至一样的。

这里关键的方法是 ProduceEvents,Start 方法和 Resume 方法都调用了这个方法:

/// <summary>
/// Main driver to read events from the CSV file and enqueue them.
/// </summary>
private void ProduceEvents()
{IntervalEvent currentEvent = default(IntervalEvent);try{// Keep reading lines from the file.while (true){if (AdapterState.Stopping == AdapterState){Stopped();return;}// Did we enqueue the previous line successfully?if (this.currentLine == null){this.currentLine = this.streamReader.ReadLine();if (this.currentLine == null){// Stop adapter (and hence the query) at the end of the file.Stopped();return;}}try{// Create and fill event structure with data from text file line.currentEvent = this.CreateEventFromLine(this.currentLine);// In case we just went into the stopping state.if (currentEvent == null){continue;}}catch (Exception e){// The line couldn't be transformed into an event.// Just ignore it, and release the event's memory.ReleaseEvent(ref currentEvent);this.consoleTracer.WriteLine(this.currentLine + " could not be read into a CEP event: " + e.Message);// Make sure we read a new line next time.this.currentLine = null;continue;}if (EnqueueOperationResult.Full == Enqueue(ref currentEvent)){// If the enqueue was not successful, we keep the event.// It is good practice to release the event right away and// not hold on to it.ReleaseEvent(ref currentEvent);// We are suspended now. Tell the engine we are ready to be resumed.Ready();// Leave thread to wait for call into Resume().return;}// Enqueue was successful, so we can read a new line again.this.currentLine = null;}}catch (AdapterException e){this.consoleTracer.WriteLine("ProduceEvents - " + e.Message + e.StackTrace);}
}

在 While 循环中每次从日志文件中读取一行记录,然后利用 CreateEventFromLine 方法将该行记录转化为相应的事件 currentEvent,最后通过 Enqueue 方法,把新的事件插入队列中。如果理解了上一篇文章中的适配器的状态机,就会注意在每次读取日志前先判断适配器的状态是否为 Stopping ,并在日志读取空行(日志读完)后停止适配器运行。

Enqueue 的结果为 Full 时,说明队列已满,这次插入是失败的,而且当前的状态是 Suspended(由输出适配器或者其他的适配器导致)。所以一方面通过 Ready 方法将状态重置为 Running 好进行下一次的插入。同时为了节省内存,释放 currentEvent 。

这里要注意几个 return ,因为在这里说明直接退出了方法,循环中止,日志读取中止。直到再次调用 ProduceEvents 方法,也就是外部调用 Resume 方法(在整个Query过程中,Start 方法只会在初始时调用一次),才会再次启动循环,读取日志。

至于 CreateEventFromLine 方法,就是通过一行日志生成对应的事件。对于非类型化的适配器,事件负载要通过 SetField 方法赋值,这里通过 Config 中的 InputFieldOrders,将 csv 日志的各列分别对应到事件负载的各字段中。

主程序

主项目 TrafficJoinQuery 中的三个文件,在 EventTypes 中的两个类对应两种事件负载——测量日志与地理信息。这就体现了非类型化的适配器的优势——对于两种事件负载,只需要同一个适配器就可以了,负载字段在运行时根据配置信息动态确定

查询模板

Program中,最复杂的是 QueryTemplate 的创建。所谓 QueryTemplate,顾名思义,就是查询模板,通过预先设定一套计算方法和规则,将输入流转化为输出流。这里有两段 Linq 代码:

// Extend duration of each sensor reading, so that they fall in
// a one-minute sliding window. Group by sensor ID and calculate the 
// average vehicular count per group within each window.
// Include the grouping key in the aggregation result.
var avgCount = from oneMinReading in sensorStream.AlterEventDuration(e => TimeSpan.FromMinutes(1))group oneMinReading by oneMinReading.SensorId into oneGroupfrom eventWindow in oneGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)select new { avgCount = eventWindow.Avg(e => e.VehicularCount), SensorId = oneGroup.Key };// Join sensors and locations. Moreover, filter the count 
// result by a threshold, which is looked up based on the
// sensor location through a user-defined function.
var joined = from averageEvent in avgCountjoin locationData in locationStreamon averageEvent.SensorId equals locationData.SensorIdwhere averageEvent.avgCount > UserFunctions.LocationCountThreshold(locationData.LocationId)select new{SensorId = locationData.SensorId,LocationID = locationData.LocationId,VehicularCount = averageEvent.avgCount};

在第一段中先利用 AlterEventDuration 方法将每条记录的有效时间延续至一分钟——因为我们要统计的是一分钟的平均值。之后对 SensorId 做聚合分组,最后用 SnapshotWindow 方法截取每组每个时间段的平均值。这里 SnapshotWindow 可以认为是给事件流的横切面拍了一个快照,获取的是一个时间点上的数据。

而第二段就是将第一段获得的事件流与地点数据做连接,而且还利用 UserFunctions 提供的 LocationCountThreshold 方法过滤了一部分数据。最终我们得到的事件负载包含了 SensorId 、LocationID 、VehicularCount 三个字段。

关于聚合、连接、时间窗口以及其他的 Linq 语法,具体会在以后介绍。

查询绑定

有了查询模板,也只是打了一个空架子,只有连上输入、输出适配器,才能得到一个能实际运作的系统。在 BindQuery 方法中就将两个输入适配器和一个输出适配器与查询模板绑定在了一起。

两个输入适配器一个是边缘事件适配器,一个是时间段事件适配器。前者对应的是地理数据,因为边缘事件在没有接收到结束边缘事件时,它的结束时间是无穷大,也就是在整个查询过程中是有效的,正适合需要一直有效的地理数据。而时间段事件在生成时就明确了开始时间和结束时间,符合这里车数日志记录的情况。

输出适配器是点事件,说明我们要得到的结果是每个时间点意义上的值。

查询启动、停止与诊断

// Start the query
query.Start();// Wait for the query to be suspended - that is the state
// it will be in as soon as the output adapter stops due to
// the end of the stream.
DiagnosticView dv = server.GetDiagnosticView(query.Name);while ((string)dv[DiagnosticViewProperty.QueryState] == "Running")
{// Sleep for 1s and check againThread.Sleep(1000);dv = server.GetDiagnosticView(query.Name);
}// Retrieve some diagnostic information from the CEP server
// about the query.
Console.WriteLine(string.Empty);
RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/EventManager")), Console.Out);
RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/PlanManager")), Console.Out);
RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/Application/TrafficJoinSample/Query/TrafficSensorQuery")), Console.Out);query.Stop();

启动、停止不需细说。由于 query.Start() 后实际是适配器用另外的线程执行相应的方法(ProduceEvents),主线程需要等待适配器线程执行结束。所以这里用 DiagnosticView 获得当前查询的状态。直到不为 Running,才输出查询的诊断报告。最后停止查询。

这里的诊断报告会列出一些查询数据,比如总事件数、查询时间等。但从中很难看出查询的具体流程是怎样的,即使你进行调试,由于具体的查询实际是在各个线程中执行的,无法顺序跟踪事件的产生、计算、输出。所以,StreamInsight 提供了一个图形化的调试工具,StreamInsight Event Flow Debugger。关于这个工具的使用,会在下一篇文章详细介绍。

转载于:https://www.cnblogs.com/smjack/archive/2010/10/29/1864429.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/495711.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cheat Engine 教程( 1 - 9 通关 )

工具包&#xff1a;https://down.52pojie.cn/Tools/Debuggers/ Cheat Engine 官网&#xff1a;https://www.cheatengine.org/ ce 修改器绿色版(cheat engine) v7.4 官方最新版&#xff1a;http://www.downcc.com/soft/21673.html 这个教程全部是来自 Cheat Engine 软件的中的…

人工智能元老痛批IBM:沃森是个骗局,这根本不是认知

作者 Roger Schank李林 编译整理量子位 出品 | 公众号 QbitAI一篇质疑IBM的旧文今天在HackerNews上火了起来&#xff0c;虽已时隔两年&#xff0c;这篇文章还是引起了网友们的强烈共鸣。文章作者Roger Schank是AI领域元老人物&#xff0c;AAAI Fellow&#xff0c;曾任耶鲁大学人…

判断字符串中是否包含中文

判断字符串中是否包含中文 selectcasewheny我like%[啊-座]%then包含中文 else不包含中文 end转载于:https://www.cnblogs.com/stublue/archive/2010/11/05/1869916.html

OllyDBG完美教程(超强入门级)

OllyDBG 视频教程&#xff1a;https://www.bilibili.com/video/av6889190 动态调试工具之OllyDbg(OD)教程&#xff1a;https://www.bilibili.com/video/av70600053 使用 OllyDbg 从零开始 Cracking.chm ( 58章 )&#xff1a;https://pan.baidu.com/s/18iXvF5I_No4-a1DK3jKrbg …

12种Bean转换

来源&#xff1a;再见 BeanUtils&#xff01;性能真拉跨&#xff01; 一、前言 二、性能测试对比 三、12种转换案例 1. get\set 2. json2Json 3. Apache copyProperties 4. Spring copyProperties 5. Bean Mapping 6. Bean Mapping ASM 7. BeanCopier 8. Orika 9. Do…

上海人工智能再出重磅!寒武纪科技发布新一代云端AI芯片,联想、曙光、科大讯飞发布相关应用...

来源&#xff1a;文汇网 作者&#xff1a;许琦敏、郭超豪峰值功耗不超过110瓦&#xff0c;等效理论峰值速度可达每秒166.4万亿次定点运算。寒武纪科技在上海发布了中国第一款云端智能芯片——Cambricon MLU100芯片和板卡产品、寒武纪1M终端智能处理器IP产品。联想、曙光和科大…

OD 快捷键使用大全。非常详细( 游戏逆向分析必看 )+ OD 断点 使用大全

From&#xff1a;https://www.cnblogs.com/YiShen/p/9742872.html OllyDBG 快捷键 OllyDbg 窗口通用快捷键 快捷键    功能      Ctrl F2重启程序&#xff0c;即重新启动被调试程序&#xff08; 重新载入程序 &#xff09;。如果当前没有调试的程序&#xff0c;Oll…

电子发票中数字签名的提取解析

前言 随着电子信息技术的发展与成熟&#xff0c;加上国家的大力推广&#xff0c;电子发票已经开始慢慢取代纸质发票。相比传统的纸质发票&#xff0c;电子发票除了绿色环保&#xff0c;节约成本之外&#xff0c;更重要的是电子发票采取电子签章实现发票签名、电子盖章&#xff…

主流流媒体软件pplive和ppstream的分析

现有P2P流媒体软件开发新的流媒体系统&#xff0c;充分了解现有的流媒体软件的优劣得失是必不可少的。主流的软件pplive和ppstream就是分析的对象。以下分析全部基于Sockmon5的数据包拦截。手上资源有限&#xff0c;对协议的分析不很充分。 一、pplive&#xff1a;这款…

无人车、超级高铁、智慧城市......这是一份来自未来的出行报告

来源&#xff1a;机器之能 作者&#xff1a;Charles McLellan 编译&#xff1a;王宇欣在新兴技术的驱动下&#xff0c;运输业即将迎来第二春。虽然个中细节还需打磨&#xff0c;但未来的运输系统必然会实现数据驱动、彼此相关联、高度自动化。有关技术与运输业未来的文章比比皆…

在 VC++ 中使用 内联汇编

From&#xff1a;https://blog.csdn.net/root19881111/article/details/8450266 VC内联汇编(MSDN相关内容完整翻译)&#xff1a;http://www.cppblog.com/xingkongyun/archive/2008/12/21/70003.html 调 call 和 偷功能 时&#xff0c;VC中内联汇编容易产生的错误&#xff1a;…

OFD 版式技术解析系列(一):开篇

在版式电子文件领域&#xff0c;大家比较熟悉的就是 PDF(Portable Document Format)格式&#xff0c;该格式由 Adobe 公司在 1992 年发布&#xff0c;迄今已经有 28 个年头&#xff0c;2008 年 7 月 1 日&#xff0c;IS 组织正式发布 PDF 的国际标准&#xff0c;PDF 成为了独立…

无人驾驶急需解决:规划控制和传感器价格高两大问题

来源&#xff1a;AI科技大本营 作者 &#xff1a;Mavis2017 年百度 AI 开发者大会上&#xff0c;现场视频连线了正乘坐无人驾驶汽车行驶在五环上朝会场赶来的李彦宏&#xff0c;他坐在副驾驶上解说&#xff0c;身边司机的双手并没有触碰方向盘&#xff0c;也正是因为这句话&am…

CString 与 LPCWSTR、LPSTR、char*、LPWSTR 等类型的转换

From&#xff1a;https://www.cnblogs.com/leanee/articles/2940088.html char [] 到 LPWSTR转换的一个具体应用&#xff1a;http://www.cppblog.com/lateCpp/articles/153358.html CString详细讲解&#xff1a;https://blog.csdn.net/qq_41786318/article/details/81989217 …

ofd电子文档内容分析工具(分析文档、签章和证书)

前言 ofd是国家文档标准&#xff0c;其对标的文档格式是pdf。ofd文档是容器格式文件&#xff0c;ofd其实就是压缩包。将ofd文件后缀改为.zip&#xff0c;解压后可看到文件包含的内容。 加入QQ交流群&#xff1a;618168615。获取下载程序。 ofd文件解压后&#xff0c;可以看到…

关于信任

[caption id"attachment_349" align"alignnone" width"374" caption"Trust is the most important thing to the team!"][/caption] 偶然间看到一张截图&#xff0c;是杭州小马哥不知何年何月何地做的show&#xff0c;这句话从他嘴里讲…

ES的安装和RestClient的操作

目录 初识elasticsearch 什么是elasticsearch elasticsearch的发展 Lucene的优缺点 elasticsearch的优势 倒排索引 es与mysql的概念对比 文档 索引 概念对比 架构 安装es 安装kibana 安装ik分词器 分词器 安装ik分词器 ik分词器的拓展和停用词典 操作索引库…

深度 | 智慧•城市,基于国际视野下的思考

来源&#xff1a;智慧城市决策参考智慧城市的兴起&#xff0c;得益于ICT技术的迅猛发展。经过这些年国内外诸多城市的探索和实践&#xff0c;智慧城市已经从最初的营销概念&#xff0c;逐渐发展成为一种支持城市发展的新理念。然而在实际应用中&#xff0c;智慧城市的内涵仍然是…

__cdecl、__stdcall、__fastcall 与 __pascal 浅析

X86调用约定 calling convention&#xff1a;https://www.cnblogs.com/shangdawei/p/3323252.html__cdecl、__stdcall、__fastcall 与 __pascal 浅析&#xff1a;https://www.cnblogs.com/yenyuloong/p/9626658.html王爽 汇编语言第三版 第9章 转移指令的原理&#xff1a;https…

适配ofd签章SES_CertList

import org.bouncycastle.asn1.*;import java.io.IOException;/*** 签章者证书信息列表** author 权观宇* since 2020-04-19 17:19:36*/ public class SES_CertList extends ASN1Objectimplements ASN1Choice {/*** 签章者证书列表*/private final CertInfoList certs;/*** 签章…