使用MongoDB进行事件流

MongoDB是一个非常出色的“ NoSQL”数据库,具有广泛的应用程序。 在SoftwareMill开发的一个项目中,我们将其用作复制的事件存储,然后将事件从事件流传输到其他组件。

介绍

基本思想非常简单(另请参阅Martin Fowler关于Event Sourcing的文章)。 我们的系统生成一系列事件。 这些事件将保留在事件存储中。 系统中的其他组件遵循事件流并对其进行“处理”。 例如,可以将它们汇总并写入报告数据库(另一方面,它类似于CQRS )。 这种方法有很多优点:

  • 事件的读取和写入是解耦的(异步的)
  • 鉴于它没有死得太久,任何后续组件都可能死亡,然后“追赶”
  • 可能有多个关注者。 跟随者可以从从属副本读取数据,以获得更好的可伸缩性
  • 事件活动的爆发对事件接收器的影响减少; 最坏的情况下,报告生成速度会变慢

这里的关键组件当然是快速可靠的事件存储。 我们用来实现一个的MongoDB的三个关键功能是:

  • 上限集合和尾部游标
  • 快速收集附件
  • 复制集


采集

作为基础,我们使用有上限的集合 ,根据定义,该集合受大小限制。 如果编写新事件将导致集合超出大小限制,则最早的事件将被覆盖。 这给了我们类似于事件的循环缓冲区的功能。 (此外,我们也很安全地避免了磁盘空间不足错误。)

在2.2版之前,默认情况下,上限集合没有_id字段(因此没有索引)。 但是,由于我们希望事件能够在整个副本集上可靠地写入,因此_id字段及其上的索引都是必需的。

写作活动

编写事件是一个简单的Mongo插入操作; 插入也可以分批完成。 根据我们对事件丢失的容忍度,我们可能会使用各种Mongo 写入问题 (例如,等待来自单节点或多个节点的写入确认)。

所有事件都是不可变的。 除了更好的,线程安全的Java代码外,这是事件流的必要条件。 如果事件是可变的,事件接收器将如何知道更新的内容? 而且,这对Mongo的性能有很好的影响。 由于永远不会更改数据,因此写入磁盘的文档永远不会缩小或扩展,因此无需在磁盘上移动块。 实际上,在具有上限的集合中,Mongo不允许增长曾经编写的文档。

阅读活动

读取事件流要复杂一些。 首先,可能有多个阅读器,每个阅读器在流中具有不同的进度。 其次,如果流中没有事件,我们希望读者等待一些事件可用,并避免主动轮询。 最后,我们想分批处理事件,以提高性能。

有尾游标可以解决这些问题。 要创建这样的游标,我们必须提供一个起点–事件的ID,我们将从该事件开始读取; 如果未提供ID,则光标将返回最早的可用事件。 因此,每个读取器必须存储它已读取和处理的最后一个事件。

更重要的是,如果没有新数据可用,可尾光标可以有选择地阻塞一段时间,从而解决了主动轮询问题。

(顺便说一下,mongo用于在副本集之间复制数据的oplog集合也是一个有上限的集合。从属Mongo实例在该集合后面尾随,流式传输“事件”(即数据库操作),并按顺序在本地应用它们。 )

读取Java中的事件

使用Mongo Java驱动程序时 ,有一些“问题”。 首先,您需要初始化游标。 为此,我们需要提供(1)最后一个事件ID(如果存在); (2)我们要读取事件的顺序(此处为自然顺序,即插入顺序); (3)两个关键的游标选项,我们希望游标是可拖尾的,并且如果没有新数据,我们希望将其阻止:

DBObject query = lastReceivedEventId.isPresent()? BasicDBObjectBuilder.start('_id', BasicDBObjectBuilder.start('$gte', lastReceivedEventId.get()).get()).get(): null;DBObject sortBy = BasicDBObjectBuilder.start('$natural', 1).get();DBCollection collection = ... // must be a capped collection
DBCursor cursor = collection.find(query).sort(sortBy).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);

您可能想知道为什么我们使用>= last_id而不是> 。 由于生成Mongo ObjectId的方式在这里需要。 如果使用一个简单的> last_id我们可能会错过一些与last_id事件在同一秒之后但之后发生的事件。 这也意味着我们的Java代码必须处理这一事实,并丢弃收到的第一个事件。

游标的类扩展了基本的Java Iterator接口,因此非常易于使用。 因此,现在我们可以进行批处理了。 在游标上进行迭代时,驱动程序将批量从Mongo服务器接收数据; 因此我们可以像调用其他迭代器一样简单地调用hasNext()next()来接收后续元素,并且只有某些调用会真正导致与服务器的网络通信。

在Mongo Java驱动程序中,实际上可能阻塞的hasNext()hasNext() 。 如果我们要分批处理事件,我们需要(a)只要有可用的元素就读取它们,并且(b)在被阻止没有更多事件之前有某种了解的方式,并且我们可以处理事件已经批处理。 由于hasNext()可以阻止,因此我们无法直接执行此操作。

这就是为什么我们引入了中间队列( LinkedBlockingQueue )的原因。 在单独的线程中,从游标读取的事件在到达时即被放入队列中。 如果没有事件,则线程将在cursor.hasNext()cursor.hasNext() 。 阻塞队列有一个可选的大小限制,因此,如果队列已满,则放置一个元素也将阻塞,直到有可用空间为止。 在事件消费者线程中,我们首先尝试以阻塞方式(使用.poll从队列中读取单个元素,因此我们在这里等待所有事件可用。 然后,我们尝试将队列的全部内容消耗到一个临时集合中(使用.drainTo ,构建批处理,并可能获取0个元素,但我们始终拥有第一个)。

值得一提的是,如果集合为空,则Mongo不会阻止,因此我们必须回到主动轮询。 我们还必须考虑到游标可能会在等待期间死亡的事实。 要对此进行检查,我们应该验证cursor.getCursorId() != 0 ,其中0是“死光标”的ID。 在这种情况下,我们只需要重新实例化游标即可。

加起来

综上所述,我们得到了一个非常快速的事件源/流解决方案。 从某种意义上说,这是“自我调节”,即如果事件活动达到高峰,则事件接收器将大批量读取这些事件。 如果事件活动少,则将分批快速处理它们。

我们还将同一个Mongo实例用于其他目的。 从操作角度来看,拥有一个数据库系统来聚簇和维护常规数据和事件肯定是一件好事。

参考: Adam Warski博客的Blog中来自我们的JCG合作伙伴 Adam Warski的MongoDB事件流 。

翻译自: https://www.javacodegeeks.com/2012/11/event-streaming-with-mongodb.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/370872.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hihocoder-Week173--A Game

hihocoder-Week173--A Game A Game 时间限制:10000ms单点时限:1000ms内存限制:256MB描述 Little Hi and Little Ho are playing a game. There is an integer array in front of them. They take turns (Little Ho goes first) to select a number from either the beginning …

php打乱数组二维数组、多维数组

//这个是针对二维数组的!下面针对多维数组的乱序方法<?php function shuffle_assoc($list) { if (!is_array($list)) return $list; $keys array_keys($list); shuffle($keys); $random array(); foreach ($keys as $key) $random[$key] $list[$key]; ret…

明明一样的程序为啥有的系统就报错有的就正常运行呢_SurfaceGo Android系统折腾笔记...

Surface Go平板在Win10系统下的表现我认为还是比较出色的&#xff0c;x86架构CPU意味着不考虑性能的情况下&#xff0c;台式机上能跑的程序&#xff0c;这台平板也能跑&#xff0c;新Galgame一出就能直接安装上躺床上玩&#xff0c;妙哉。但遗憾的是现实世界还是要考虑性能问题…

c语言实训作业总结,c语言程序设计上机实践心得报告

c语言程序设计上机实践心得报告 班级:11 电信 2 姓名:莫金波 学号:1107032242012.12.28 惠州学院 HUIZHOU UNIVERSITY 我们专业的学生在专业老师的带领下进行了 c 语言设计基础教程的 实践学习。在这之前&#xff0c;我们已经对 c 语言这门课程学习了差不多一 个学期&#xff0…

JavaOne 2012:在JVM上诊断应用程序

值得参加Staffan Larsen &#xff08;Oracle Java Serviceability Architect&#xff09;的演讲“ 在JVM上诊断应用程序 ”&#xff08;Hilton Plaza A / B&#xff09;&#xff0c;只是为了学习Oracle JVM 7随附的新jcmd命令行工具。该演示对我来说是“奖金”&#xff0c;这对…

mysql慢查询工具

GeorgeHao安装过程&#xff1a; [rootlocalhost-centos6 ~]# wget percona.com/get/pt-query-digest [rootlocalhost-centos6 ~]# chmod ux pt-query-digest [rootlocalhost-centos6 ~]# mv /root/pt-query-digest /usr/bin/ 今天有在阿里云服务器跑分的时候出现"Cant loc…

python字符串转date,在Python上将字符串转换为Date类型

I have this string:2012-02-10 # (year-month-day)and I need it to be as date type for me to use the date function isoweekday().Does anyone know how I can convert this string into a date?解决方案You can do that with datetime.strptime()Example:>>> f…

文档词频矩阵_论文理解:从词嵌入到文档距离

论文作者简介本论文第一作者Matt J. Kusner是牛津大学的副教授&#xff0c;致力于设计适应现实世界问题需求的新机器学习模型&#xff08;例如&#xff0c;fair algorithms, discrete generative models, document distances, privacy, dataset compression, budgeted learning…

C# 线程理解

概念引用&#xff1a;http://blog.csdn.net/yujie_yang/article/details/53173752 多线程和多进程的区别&#xff1a;任务管理器里各种不同的进程就是多进程&#xff0c;或者是你同时运行多个”.exe’程序就可以理解为多进程&#xff0c;多进程是要更多消耗CPU资源的。 多线程是…

c语言主调函数和被调函数,在C语言中,何为主调函数和被调函数,他们之 – 手机爱问...

2007-08-30请详细一些~最好举出例子你好。评价宝宝的标准基本上是&#xff1a;技能>资质>成长因为宝宝的评价是一项 仁者见仁的活儿&#xff0c;但其中有些规律我想是可以具体话的&#xff0c;希望能对你有帮助&#xff1a;1&#xff1a;技能&#xff1a;技能的意义有多大…

学习关于display :flex 布局问题!

很多人不明白这个display:flex是到底是什么东西&#xff0c;如何使用的 。 1.什么是display&#xff1a;flex呢&#xff1f; 答&#xff1a;flex是 flexible box的缩写&#xff0c;意为弹性布局 &#xff1b;这个东西的引入&#xff0c;为盒模型提供了最大的灵活性&#xf…

QT信号和槽函数学习笔记

//connect 函数有4个参数 分别是 发送者 信号。接受者 &#xff0c;槽 //connect(sender,signal,receiver,slot) /* * 信号和槽 * 信号 就是一个普通的函数 定义信号的时候需要在函数前面加上signals: &#xff0c;不需要实现 * 槽 函数 在QT5中科院是类的任意成员函数&#xf…

数据库和Webapp安全

威胁模型 这是根据我网站上的快速参考页松散地讨论数据库和Webapp安全的问题。 该页面变得笨拙&#xff0c;并且使读者无法轻松地与我或其他人进行交互。 威胁模型 所有安全分析都必须从检查威胁模型开始。 威胁模型要求您回答四个问题&#xff1a; 我要保护的是什么&#…

note同步不及时 one_一辆理想ONE又“跪了”?理想官方紧急发文回应

汽车行业关注(autochat.com.cn)10月16日报道——10月15日&#xff0c;有网友在社交媒体上发布视频&#xff0c;从视频可以看到&#xff0c;一辆理想ONE在遭遇事故后&#xff0c;左前轮脱落在车外疑似断轴,从视频未能判定是断轴引起的事故&#xff0c;还是事故引起的断轴。针对该…

C语言连续多个空格合并一个,C语言合并连续空格

一开始自己写的&#xff1a;a&#xff1a;#includemain(){int c;int state0;while (( cgetchar()) ! EOF) {if (c ){state1;continue;}if (state){state0;putchar( );putchar(c);}elseputchar(c);}}网上搜的&#xff1a;b:#include #define NONBLANK avoid main(){int c , last…

Skywalking 中 Agent 自动同步配置源码解析

文章目录 前言正文实现架构实现模型OAP 同步 ApolloConfigWatcherRegisterConfigChangeWatcher Agent 侧 前言 本文代码 OAP 基于 v9.7&#xff0c;Java Agent 基于 v9.1&#xff0c;配置中心使用 apollo。 看本文需要配合代码“食用”。 正文 Skywalking 中就使用这种模型…

华为5720设置静态路由不通_【干货分享】交换机与路由器在环路中的处理机制了解一下!...

点击蓝字关注我们-今天小盟带大家来讨论一下交换机与路由器在环路中的处理机制-01基础配置1---如图配置路由器各接口地址&#xff0c;AR-2为PC-1的网关路由器2---AR-1配置静态默认路由&#xff0c;下一跳地址指向AR-2&#xff1b;[AR-1]ip route-static 0.0.0.0 0 12.1.1.2AR-2…

IPC 进程间通信方式——信号量

信号量 本质上是共享资源的数目&#xff0c;用来控制对共享资源的访问。用于进程间的互斥和同步每种共享资源对应一个信号量&#xff0c;为了便于大量共享资源的操作引入了信号量集&#xff0c;可对多对信号量一次性操作。对信号量集中所有的操作可以要求全部成功&#xff0c;也…

css选择器的优先级

选择器的优先级表述为4个部分&#xff0c;用0,0,0,0表示。 !important--1,0,0,0行内样式ID选择器--0,1,0,0类选择器(例如,.example)、属性选择器&#xff08;例如, [type"radio"]&#xff09;或伪类&#xff08;例如, :hover&#xff09;--0,0,1,0元素&#xff08;例…

VisualVM介绍使用

1 打开VisualVM&#xff08;这个工具放在JDK安装目录的bin目录下&#xff0c;双击jvisualvm.exe即可打开&#xff09;&#xff0c;如下图所示 以VisualVM自身为例&#xff0c;VisualVM本身也是一个java程序&#xff0c;当然也而已用VisualVM来分析 2 概述页面主要显示程序…