使用SynchronousQueue实现生产者/消费者

Java提供了许多用于并发支持的有用类中,有一个我想谈一谈: SynchronousQueue 。 特别是,我想通过使用方便的SynchronousQueue作为交换机制来完成Producer / Consumer实现。

除非我们了解SynchronousQueue实现的内幕,否则可能不清楚为什么要使用这种类型的队列进行生产者/消费者通信。 事实证明,这并不是我们过去通常考虑的队列。 这个类比只是一个最多包含一个元素的集合。

为什么有用? 好吧,有几个原因。 从生产者的角度来看,只能将一个元素(或消息)存储到队列中。 为了继续进行下一个元素(或消息),生产者应等到消费者使用队列中的当前元素。 从使用者的角度来看,它只是轮询队列以查找下一个可用的元素(或消息)。 很简单,但是最大的好处是:生产者发送消息的速度不能超过消费者处理消息的速度。

这是我最近遇到的用例之一:比较两个数据库表(可能只是巨大的),并检测其中包含不同数据或数据是否相同(副本)。 SynchronousQueue是解决此问题的便捷工具:它允许在自己的线程中处理每个表,并在从两个不同的数据库读取数据时补偿可能的超时/延迟。

让我们从定义比较功能开始,该功能接受源数据源和目标数据源以及表名(进行比较)。 我正在使用Spring框架中非常有用的JdbcTemplate类,因为它非常好地抽象了处理连接和准备好的语句的所有无聊的细节。

public boolean compare( final DataSource source, final DataSource destination, final String table )  {final JdbcTemplate from = new  JdbcTemplate( source );final JdbcTemplate to = new JdbcTemplate( destination );
}

在进行任何实际数据比较之前,最好比较一下源数据库和目标数据库的表行数:

if( from.queryForLong('SELECT count(1) FROM ' + table ) != to.queryForLong('SELECT count(1) FROM ' + table ) ) {return false;
}

现在,至少知道表在两个数据库中包含相同数量的行,我们可以开始进行数据比较。 该算法非常简单:

  • 为源(生产者)和目标(消费者)数据库创建一个单独的线程
  • 生产者线程从表中读取单行并将其放入SynchronousQueue
  • 使用者线程还从表中读取单行,然后向队列询问要比较的可用行(必要时等待),最后比较两个结果集

使用另一大部分Java并发实用程序进行线程池,让我们定义一个具有固定线程数量的线程池(2)。

final ExecutorService executor = Executors.newFixedThreadPool( 2 );
final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();

按照描述的算法,生产者功能可以表示为单个可调用项:

Callable< Void > producer = new Callable< Void >() {@Overridepublic Void call() throws Exception {from.query( 'SELECT * FROM ' + table,new RowCallbackHandler() {@Overridepublic void processRow(ResultSet rs) throws SQLException {try {                   List< ? > row = ...; // convert ResultSet to Listif( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {throw new SQLException( 'Having more data but consumer has already completed' );}} catch( InterruptedException ex ) {throw new SQLException( 'Having more data but producer has been interrupted' );}}});return  null;}
};

由于Java语法,该代码有点冗长,但实际上并没有做很多事情。 从表生成器读取的每个结果集都将转换为一个列表(由于是样板,因此省略了实现),并将其放入队列( offer )。 如果队列不为空,则生产者将被阻止等待消费者完成工作。 使用者可以分别表示为以下可调用对象:

Callable< Void > consumer = new Callable< Void >() {@Overridepublic Void call() throws Exception {to.query( 'SELECT * FROM ' + table,new RowCallbackHandler() {@Overridepublic void processRow(ResultSet rs) throws SQLException {try {List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );if( source == null ) {throw new SQLException( 'Having more data but producer has already completed' );}                                     List< ? > destination = ...; // convert ResultSet to Listif( !source.equals( destination ) ) {throw new SQLException( 'Row data is not the same' );}} catch ( InterruptedException ex ) {throw new SQLException( 'Having more data but consumer has been interrupted' );}}});return  null;}
};

使用者对队列执行反向操作:与其放入数据,不如将数据从队列中拉出( poll )。 如果队列为空,则阻止消费者,等待生产者发布下一行。 剩下的部分只是提交那些可调用对象以执行。 Future的get方法返回的任何异常都表明表不包含相同的数据(或者从数据库获取数据存在问题):

List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );for( final Future< Void > future: futures ) {future.get( 5, TimeUnit.MINUTES );}

参考: Andriy Redko {devmind}博客中的JCG合作伙伴 Andrey Redko 使用SynchronousQueue实现了生产者/消费者 。

翻译自: https://www.javacodegeeks.com/2013/01/implementing-producerconsumer-using-synchronousqueue.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/369747.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python含多个附件的邮件_Python发送带有多个图像附件的电子邮件

我试图用Python发送一封带有多个图像附件的电子邮件。但是通过下面的代码&#xff0c;我可以在正文中包含第一个图像&#xff0c;但是第二个图像会作为附件附加到电子邮件中。有没有办法可以在HTML的主体中同时获得这两个图像&#xff1f;下面是我当前的代码。在from email.mim…

Oracle存储过程总结

1.存储过程结构 ":"是赋值语句 如: l_name :sky;..."" 是判断是否相等. 如: if 11 then...":" 是变量绑定 如: if :P_NAME sky then... 1.1 第一个存储过程 create or replace procedure proc1( para1 varchar2, para2 out varchar2, para3 in…

图表测试点

测试点1&#xff0c;默认状态下&#xff0c;时间和时间插件还有图表显示一致2&#xff0c;看各种表&#xff08;折线图&#xff0c;柱状图&#xff0c;等&#xff09;与下表格显示一致3&#xff0c;数据库里的与页面上的数据位置显示的数据一致&#xff0c;点击页面 默认的折线…

CSS布局(五) 网页布局方式

网页实质是块与块之间的位置&#xff0c;块挨着块&#xff0c;块嵌套块&#xff0c;块叠着块。 三种关系&#xff1a;相邻&#xff0c;嵌套&#xff0c;重叠。 下面介绍网页布局的常用几种方式 1.一列布局&#xff1a; 一般都是固定的宽高&#xff0c;设置margin : 0 auto来水…

使用Mozilla Persona认证用户的指南

到目前为止&#xff0c;只有Twitter和Facebook身份验证&#xff0c;我决定将Mozilla Persona添加到我最新项目&#xff08; 计算机 &#xff0c;计算机生成的音乐&#xff09;的列表中。 为什么&#xff1f; 我喜欢尝试新事物 存储密码是一个艰巨的过程&#xff0c;尽管我知道…

python字典与json转换_python字典与json转换的方法总结

在python中json分别由列表和字典组成&#xff0c;本文主要介绍python中字典与json相互转换的方法。使用json.dumps可以把字典转成json字符串。使用json.loads可以把json字符串转为字典类型的数据。1、字典转json使用json.dumpsjson.dumps是对python对象编码成json对象&#xff…

变量声明declare,简单运算符运算,变量测试与内容替换

declare -/ 选项 变量名 - 设类型 取消类型 -i 设为整型 -x 设为环境变量 -p 显示类型属性&#xff08;property&#xff09; [rootlocalhost ~]# a1 [rootlocalhost ~]# declare -p a declare -- a"1" [rootlocalhost ~]# export a [rootlocalhost ~]# declare -p …

如何水平居中一个元素

在项目中经常会遇到居中问题&#xff0c;这里借鉴度娘的众多答案&#xff0c;做一个总结&#xff1a; 一、元素的水平居中 1、行级元素的水平居中 <div style"width: 200px;height: 100px;border: 1px solid cyan; text-align: center;"><span>行级元素…

Yammer Metrics,一种监视应用程序的新方法

当您运行诸如Web应用程序之类的长期应用程序时&#xff0c;最好了解一些关于它们的统计信息&#xff0c;例如&#xff0c;服务的请求数&#xff0c;请求持续时间或活动请求数。 但是还有一些更通用的信息&#xff0c;例如内部集合的状态&#xff0c;代码的某些部分被执行了多少…

mysql教程目录_MySql目录(二)

MySql索引(二) 转自&#xff1a; http://www.cnblogs.com/dreamhome/archive/2013/04/16/3025304.html 所有MySQL列类型可以被索引。根据存储引擎定义每个表的最大索引数和最大索引长度。 所有存储引擎支持每个表至少16个索引&#xff0c;总索引长度至少为256字节。大多数存储引…

solr和Lucene的配置方式和应用

solr字段类型 类说明BinaryField二进制数据BoolField布尔值&#xff0c;其中’t’/’T’/’1’都是trueCollationFiled支持Unicode排序CurrencyField支持货币和汇率DateRangeFiled支持索引date rangeExternamFiledFiledpull磁盘上的文件EnumField支持定义枚举值ICUCollationFie…

PostgreSQL 9.6 keepalived主从部署

## 环境&#xff1a; PostgreSQL版&#xff1a;9.6 角色 OS IPmaster CentOS7   10.100.12.73 slave CentOS7 10.100.12.74 vIP 10.1…

CSS——清除浮动的六种解决方案

内容的高度撑起父元素容器的高度&#xff0c;效果图如下HTML和CSS代码如下给&#xff50;标签加上浮动以后&#xff0c;&#xff50;&#xff5b;float&#xff1a;left&#xff1b;&#xff5d;&#xff0c;此时DIV塌陷&#xff0c;两段内容同行显示&#xff0c;效果如下&…

40个Java Collections面试问答

Java Collections Framework是Java编程语言的基本方面。 这是Java面试问题的重要主题之一。 在这里&#xff0c;我列出了Java集合框架的一些重要问题和解答。 什么是Java Collections Framework&#xff1f; 列出Collections框架的一些好处&#xff1f; 集合框架中泛型的好处…

vs mysql iss_MySQL5.7与8.0的连接问题(vs2015\2017)

1.MySQL8.0 root密码忘记重置与5.7不同&#xff0c;绝大多数经验帖不适用8.0https://dev.mysql.com/doc/refman/8.0/en/resetting-permissions.html8.0 重置密码的方式2.MySQL连接vs2015时报错提示&#xff1a;Authentication method ‘caching_sha2_password‘ not supported …

191. Number of 1 Bits

Write a function that takes an unsigned integer and returns the number of ’1 bits it has (also known as the Hamming weight). For example, the 32-bit integer ’11 has binary representation 00000000000000000000000000001011, so the function should return 3. …

AtCoder Beginner Contest 084(AB)

A - New Year 题目链接&#xff1a;https://abc084.contest.atcoder.jp/tasks/abc084_a Time limit : 2sec / Memory limit : 256MB Score : 100 points Problem Statement How many hours do we have until New Year at M oclock (24-hour notation) on 30th, December? Cons…

打开就随机生长的树

今天接触了一个新东西&#xff0c;感觉很酷炫的样子。不是我写的&#xff0c;拿给大家看一看&#xff0c;喜欢的可以直接拿走不谢。树的形状和树枝多少都是随机的&#xff0c;每刷新一次就有一次的惊喜哦&#xff0c;无聊的亲们可以多刷几次&#xff0c;当动画来看哦。2017年又…

等待正确的时刻–集成测试

当您必须测试多线程程序时&#xff0c;总是需要等到系统达到特定状态后&#xff0c;测试才能验证是否达到了正确的状态。 这样做的通常方法是在系统中插入一个“探针”&#xff0c;该探针将向同步原语发出信号 &#xff08;例如Semaphore &#xff09;&#xff0c;并且测试将一…

网络编程---黏包

基于UDP协议的socket udp的server 不需要进行监听也不需要建立连接&#xff0c;在启动服务之后只能被动的等待客户端发送消息过来。 客户端发送消息的同时还会 自带地址信息&#xff0c;消息回复的时候 不仅需要发送消息 还需把对方的地址填上。 udp的client 不需要connect 因为…