自定义注解 实现自定义消息_实现自定义的未来

自定义注解 实现自定义消息

上一次我们学习了java.util.concurrent.Future<T>背后的原理 。 我们还发现, Future<T>通常由库或框架返回。 但是,没有什么可以阻止我们在有意义的情况下自行实现所有功能。 它不是特别复杂,可以显着改善您的设计。 我尽力为我们的示例选择有趣的用例。

JMS(Java消息服务)是用于发送异步消息的标准Java API。 当我们想到JMS时​​,我们立即看到客户端以一发不可收拾的方式向服务器(经纪人)发送消息。 但是在JMS之上实现请求-答复消息传递模式同样普遍。 实现非常简单:您将请求消息(当然是异步地)发送到另一侧的MDB。

MDB处理该请求,然后将答复发送回硬编码的答复队列或客户机选择的任意队列,并与JMSReplyTo属性中的消息一起发送。 第二种情况更有趣。 客户端可以创建一个临时队列,并在发送请求时将其用作回复队列。 这样,每个请求/答复对使用不同的答复队列,因此不需要关联ID,选择器等。

但是有一个问题。 向JMS代理发送消息是简单且异步的。 但是,收到答复要麻烦得多。 您可以实现MessageListener以使用一条消息,也可以使用阻塞MessageConsumer.receive() 。 第一种方法非常重,很难在实践中使用。 第二个失败了异步消息传递的目的。 您还可以按一定间隔轮询回复队列,这听起来更糟。

到现在为止,了解Future抽象您应该有一些设计想法。 如果我们可以发送请求消息并取回Future<T> (代表尚未发送的答复消息)怎么办? Future抽象应该处理所有逻辑,我们可以放心地将其用作未来结果的句柄。 这是用于创建临时队列和发送请求的管道代码:

private <T extends Serializable> Future<T> asynchRequest(ConnectionFactory connectionFactory, Serializable request, String queue) throws JMSException {Connection connection = connectionFactory.createConnection();connection.start();final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);final Queue tempReplyQueue = session.createTemporaryQueue();final ObjectMessage requestMsg = session.createObjectMessage(request);requestMsg.setJMSReplyTo(tempReplyQueue);sendRequest(session.createQueue(queue), session, requestMsg);return new JmsReplyFuture<T>(connection, session, tempReplyQueue);
}

asynchRequest()方法仅将ConnectionFactory带到JMS代理和任意数据。 该对象将使用ObjectMessage发送到queue 。 最后一行至关重要–我们返回自定义的JmsReplyFuture<T> ,它将表示尚未收到的回复。 注意我们如何将临时JMS队列传递给JMSReplyTo属性和Future 。 MDB方面的实现并不那么重要。 不用说是将回复发送回指定队列:

final ObjectMessage reply = session.createObjectMessage(...);
session.createProducer(request.getJMSReplyTo()).send(reply);

因此,让我们深入研究JmsReplyFuture<T> 。 我假设请求和答复都是ObjectMessage 。 使用不同类型的消息不是很困难。 首先让我们看看如何设置从回复通道接收消息:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {//...public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;replyConsumer = session.createConsumer(replyQueue);replyConsumer.setMessageListener(this);}@Overridepublic void onMessage(Message message) {//...}}

如您所见, JmsReplyFuture实现了Future<T> (其中T是包装在ObjectMessage的对象的预期类型)和JMS MessageListener 。 在构造函数中,我们只是开始侦听replyQueue 。 根据我们的设计假设,我们知道那里最多会有一条消息,因为回复队列是临时丢弃队列。 在上一篇文章中,我们了解到Future.get()应该在等待结果时阻塞。 另一方面, onMessage()是从某些内部JMS客户端线程/库调用的回调方法。 显然,我们需要一些共享变量/锁,以使等待中的get()知道答复已到达。 最好我们的解决方案应该是轻量级的,并且不引入任何延迟,因此忙于等待volatile变量是一个坏主意。 最初,我虽然使用了Semaphore ,但我将使用它来从onMessage()取消阻塞get() onMessage() 。 但是我仍然需要一些共享变量来保存实际的回复对象。 因此,我想到了使用ArrayBlockingQueue的想法。 当我们知道不会再有一个项目时,使用队列听起来可能很奇怪。 但是,它利用旧的生产者-消费者模式很好地工作: Future.get()是一个消费者,它阻塞了空队列的poll()方法。 另一方面, onMessage()是生产者,将回复消息放入该队列中并立即取消阻塞消费者。 外观如下:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {private final BlockingQueue<T> reply = new ArrayBlockingQueue<>(1);//...@Overridepublic T get() throws InterruptedException, ExecutionException {return this.reply.take();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {final T replyOrNull = reply.poll(timeout, unit);if (replyOrNull == null) {throw new TimeoutException();}return replyOrNull;}@Overridepublic void onMessage(Message message) {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();reply.put((T) object);//...}}

实施仍未完成,但涵盖了最重要的概念。 请注意, BlockingQueue.poll(long, TimeUnit)方法非常适合Future.get(long, TimeUnit) 。 不幸的是,即使它们来自相同的程序包并且在相同的时间内或多或少地被开发,一种方法在超时时返回null ,而另一种方法应引发异常。 易于修复。

还要注意onMessage()的实现变得多么容易。 我们只是将新收到的消息放在BlockingQueue reply ,而集合将为我们完成所有同步。 我们仍然缺少一些不太重要但仍然重要的细节–取消和清理。 无需赘述,下面是一个完整的实现:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {private static enum State {WAITING, DONE, CANCELLED}private final Connection connection;private final Session session;private final MessageConsumer replyConsumer;private final BlockingQueue<T> reply = new ArrayBlockingQueue<>(1);private volatile State state = State.WAITING;public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;replyConsumer = session.createConsumer(replyQueue);replyConsumer.setMessageListener(this);}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {try {state = State.CANCELLED;cleanUp();return true;} catch (JMSException e) {throw Throwables.propagate(e);}}@Overridepublic boolean isCancelled() {return state == State.CANCELLED;}@Overridepublic boolean isDone() {return state == State.DONE;}@Overridepublic T get() throws InterruptedException, ExecutionException {return this.reply.take();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {final T replyOrNull = reply.poll(timeout, unit);if (replyOrNull == null) {throw new TimeoutException();}return replyOrNull;}@Overridepublic void onMessage(Message message) {try {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();reply.put((T) object);state = State.DONE;cleanUp();} catch (Exception e) {throw Throwables.propagate(e);}}private void cleanUp() throws JMSException {replyConsumer.close();session.close();connection.close();}
}

我使用特殊的State枚举来保存有关状态的信息。 与基于多个标志, null检查等的复杂条件相比,我发现它更具可读性。要记住的第二件事是取消。 幸运的是,它非常简单。 我们基本上关闭了基础会话/连接。 在整个请求/答复消息交换的整个过程中,它必须保持打开状态,否则临时JMS答复队列将消失。 请注意,我们不能轻易通知经纪人/ MDB我们对答复不再感兴趣。 我们只是停止监听它,但是MDB仍将处理请求并尝试将答复发送到不再存在的临时队列。

那么这一切在实践中看起来如何? 假设我们有一个MDB可以接收一个数字并返回一个平方。 假设计算需要一点时间,所以我们提前开始计算,同时做一些工作,然后再取回结果。 这样的设计如下所示:

final Future<Double> replyFuture = asynchRequest(connectionFactory, 7, "square");
//do some more work
final double resp = replyFuture.get();      //49

其中"square"是请求队列的名称。 如果我们重构它并使用依赖注入,我们可以将其进一步简化为:

final Future<Double> replyFuture = calculator.square(7);
//do some more work
final double resp = replyFuture.get();      //49

您知道该设计的最佳选择吗? 即使我们正在利用相当先进的JMS功能,此处也没有JMS代码。 此外,我们稍后可以使用SOAP或GPU将calculator替换为其他实现。 就客户端代码而言,我们仍然使用Future<Double>抽象。 尚未提供计算结果。 根本的机制无关紧要。 那就是抽象之美。

显然,此实现尚未准备好生产(到目前为止)。 但更糟糕的是,它缺少一些基本功能。 我们仍然在某个时候调用阻塞Future.get() 。 而且,无法组成/链接期货(例如, 当响应到达时,发送另一条消息 )或等待最快的期货完成。 耐心一点!

参考: NoBlogDefFound博客中的JCG合作伙伴 Tomasz Nurkiewicz 实现了自定义Future 。

翻译自: https://www.javacodegeeks.com/2013/02/implementing-custom-future.html

自定义注解 实现自定义消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/350024.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

菜单 java_java 菜单

继承体系MenuBar,Menu,MenuItem之间的关系&#xff1a;先创建菜单条&#xff0c;再创建菜单&#xff0c;每一个菜单中建立菜单项。也可以菜单添加到菜单中&#xff0c;作为子菜单。通过setMenuBar()方法&#xff0c;将菜单添加到Frame中。package june610;import java.awt.File…

Jsp、Servlet

1 forward、redirect forward 转发是服务器行为&#xff0c;浏览器根本不知道服务器发送的内容是从哪儿来&#xff0c;所以它的地址栏中还是原来的地址。 redirect 重定向是客户端行为。redirect就是服务端根据逻辑,发送一个状态码,告诉浏览器重新去请求那个地址&#xff0c;一…

Java Finalizer和Java文件输入/输出流

在与主题直接合作或花时间学习它们之后&#xff0c;我经常会发现自己在网上注意到更多主题。 最近的Stephen Connolly &#xff08; CloudBees &#xff09;发表FileInputStream / FileOutputStream被认为有害的消息引起了我的注意&#xff0c;因为我最近在Java的finalizer中遇…

java 运行main_使用maven运行Java Main的三种方法解析

maven使用exec插件运行java main方法&#xff0c;以下是3种不同的操作方式。一、从命令行运行1、运行前先编译代码&#xff0c;exec&#xff1a;java不会自动编译代码&#xff0c;你需要手动执行mvn compile来完成编译。mvn compile2、编译完成后&#xff0c;执行exec运行main方…

CentOS7 修复boot目录

这里为了达到实验目的&#xff0c;首先删除boot目录下所有内容 重启后发现系统进不去了&#xff0c;这正是我们想要的 进入系统救援模式&#xff0c;以重新引导系统 进入救援模式后&#xff0c;输入以下命令进行修复boot目录 重启后&#xff0c;能正常引导系统了 转载于:https:…

java corepoolsize_理解ThreadPoolExecutor线程池的corePoolSize、maximumPoolSize和poolSize

我们知道&#xff0c;受限于硬件、内存和性能&#xff0c;我们不可能无限制的创建任意数量的线程&#xff0c;因为每一台机器允许的最大线程是一个有界值。也就是说ThreadPoolExecutor管理的线程数量是有界的。线程池就是用这些有限个数的线程&#xff0c;去执行提交的任务。然…

开式蓄冷罐与闭式蓄冷罐_一罐来统治所有人

开式蓄冷罐与闭式蓄冷罐跳下内存通道 早在1998年&#xff0c;当我是一名C / C 开发人员时&#xff0c;尝试使用Java时&#xff0c;有关该语言的一些内容对我来说就显得有些恼火了。 我记得很担心这些 为什么没有合适的编辑器呢&#xff1f; C / C 有很多。 我为Java拥有的只是…

嵊州D5T2 折纸 folding

折纸 folding 【问题描述】 在非常紧张的 NOIP 考试中&#xff0c;有人喜欢啃指甲&#xff0c;有人喜欢转铅笔&#xff0c;有人喜欢撕 纸条&#xff0c;……而小 x 喜欢迷折纸。 现有一个 W * H 的矩形纸张&#xff0c;监考老师想知道&#xff0c;小 x 至少要折多少次才能使 矩…

使用Portworx和Couchbase的有状态容器

容器本应是短暂的&#xff0c;因此可以很好地扩展以用于无状态应用程序。 有状态的容器&#xff08;例如Couchbase&#xff09;需要区别对待。 管理Docker容器的持久性概述了如何管理有状态容器的持久性。 该博客将说明如何使用Docker Volume Plugins和Portworx创建有状态的容…

java和jvm_java 和 JVM

C和Java的区别指针&#xff1a;java中不存在指针的概念&#xff0c;编程者无法直接通过指针来直接访问内存&#xff0c;有利于维护java程序的安全多重继承&#xff1a;C支持多重继承&#xff0c;java不支持多重继承&#xff0c;但是允许一个类继承多个接口来实现多重继承的问题…

Redis 集群_主从复制_哨兵模型

1 redis集群简介 1.1 集群的概念 所谓的集群&#xff0c;就是通过添加服务器的数量&#xff0c;提供相同的服务&#xff0c;从而让服务器达到一个稳定、高效的状态。 1.1.1 使用redis集群的必要性 问题&#xff1a;我们已经部署好了redis&#xff0c;并且能启动一个redis&#…

java 7 update 17_Java version 7, Update 17 is NOT recongnized by FireFox version 19.0.2

选择的解决方案Reply to my own post:Out of all the solutions posted regarding Java install problems, I gleaned one thing: A re-install of Java and a re-install of Firefox may well solve the problem.Well, for me, thus far, it has.The simple steps I followed:…

英语中的国籍,国家和地区

英语中对于国籍&#xff0c;国家的表述是不一样的&#xff0c;我时常迷糊于此。前些日子我发现了一本英语奇书《Practical English Usage》&#xff0c;这本书回答了我许多英语中的疑问。下面的内容也就是将该书的第364条抄一下。 1&#xff0c;Introduction In order to refer…

junit测试方法_JUnit测试方法订购

junit测试方法直到版本4.10为止的Junit都使用反射API返回的测试类中测试方法的顺序作为测试方法执行的顺序– Class.getMethods&#xff08;&#xff09; 。 引用getMethods&#xff08;&#xff09;api的Javadoc&#xff1a; 返回的数组中的元素未排序&#xff0c;并且没有任…

java非阻塞锁_Java并发问题的非阻塞解决方案

转自http://blog.csdn.net/u011277203/article/details/9223545在并发环境中&#xff0c;对于共享资源通常会采用显式的锁机制(比如synchronized或ReentrantLock)来保证在任意时刻只会有一条线程访问这些变量&#xff0c;并且这些变量的修改对随后获取锁的线程是可见的。无法获…

Java垃圾收集器:G1GC何时将CMS强制退出?

在针对JDK 9&#xff08;2017/4/4&#xff09;提出的JEP中 &#xff0c; Mark Reinhold写道JEP 291 &#xff08;“弃用并发标记扫描&#xff08;CMS&#xff09;垃圾收集器”&#xff09;是“已被放入“建议的在讨论和审查后&#xff0c;由所有者将其定位为目标”。 如果JEP 2…

模块简介/模块的导入/模块的查找顺序/绝对导入和相对导入/软件开发目录规范...

一.模块的简介 什么是模块: 模块就是一系列功能的结合体 模块的三种来源: 1.内置的 2.第三方的  3.自定义的 模块的四种表现形式: 1.使用python编写的py文件(也就意味着py文件也可以称之为模块:一个py文件也可以称之为一个模块)(***)    2.已被编译为共享库或DLL的C或C扩展…

java dll 乱码_java调用c++ dll出现中文乱码 | 学步园

最近的开发用到了使用java调用本机动态连接库的功能&#xff0c;将文件路径通过java调用C代码对文件进行操作。在调用中如果路径中包含有中文字符就会出现问题&#xff0c;程序运行就会中止。下面用一个小例子&#xff0c;来说明记录下解决的方法。java中传入一个字符串&#x…

每日一问:Android 滑动冲突,你们都是怎样处理的

坚持原创日更&#xff0c;短平快的 Android 进阶系列&#xff0c;敬请直接在微信公众号搜索&#xff1a;nanchen&#xff0c;直接关注并设为星标&#xff0c;精彩不容错过。 在 Android 开发中&#xff0c;滑动冲突总是我们一个无法避免的话题。而对于解决方案却是众说纷纭。比…

java多线程 cpu分配_java多线程总结(转载)

Java 多线程编程总结-------------------------------------------------------------------------------------------------下面的内容是很早之前写的&#xff0c;内容不够充实&#xff0c;而且是基于Java1.4的内容&#xff0c;Java5之后&#xff0c;线程并发部分扩展了相当多…