activitemq与spring的整合

image.png

activitemq整合spring

一.activmq的点对点模型

image.png

pom.xml:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.demo</groupId><artifactId>aq-test</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>aq-test Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>javax.jms</groupId><artifactId>jms-api</artifactId><version>1.1-rev-1</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.14.5</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework/spring-aop --><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.1.3.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version></dependency></dependencies><build><finalName>aq-test</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>
ActiviteMq.class:(发送端)
package com.demo;import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;import javax.jms.*;public class ActiviteMq {@Testpublic void testQueueProducer() throws JMSException {//1.创建connectinfactory对象,需要指定服务的IP以及端口号//brokerURL服务器的ip以及端口号ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");//2.使用ConnectionFactory创建Connection connection = connectionFactory.createConnection();//3.开启链接,调用connection对象的start的方法connection.start();//4.使用connection对创建一个session对象//[4.1] 第一参数:是否开启事务//[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式//1.自动应答2.手动应答 一般为自动Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象//参数:队列名称Queue queue = session.createQueue("test-queue2");//第六步 使用session创建一个producer对象MessageProducer producer = session.createProducer(queue);//第七步 创建一个message对象 创建一个textmessage对象TextMessage textMessage = session.createTextMessage("风风光光");//第八步 使用producer对象发送消息producer.send(textMessage);//第九步 关闭资源producer.close();session.close();connection.close();}
}
ReceiveMsf.class:(接收端)
package com.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;import javax.jms.*;
import java.io.IOException;public class ReceiveMsf {@Testpublic void testQueueConsumer() throws JMSException, IOException {//1.创建connectinfactory对象,需要指定服务的IP以及端口号//brokerURL服务器的ip以及端口号ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");//2.使用ConnectionFactory创建Connection connection = connectionFactory.createConnection();//3.开启链接,调用connection对象的start的方法connection.start();//4.使用connection对创建一个session对象//[4.1] 第一参数:是否开启事务//[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式//1.自动应答2.手动应答 一般为自动Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象//参数:队列名称Queue queue = session.createQueue("test-queue2");// 第六步:使用Session对象创建一个Consumer对象。MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;//取消的内容text = textMessage.getText();//第八步 打印消息System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});//等待键盘输入 阻塞System.in.read();//第九步 关闭资源consumer.close();session.close();connection.close();}
}

二.activmq的发布订阅模型

image.png

TopicProducer.class
package com.demo.dingyue;import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;import javax.jms.*;public class TopicProducer {@Testpublic void testTopicProducer() throws JMSException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("huaYuanBaoBao");MessageProducer producer = session.createProducer(topic);TextMessage textMessage = session.createTextMessage("这个是发布订阅的");producer.send(textMessage);producer.close();session.close();connection.close();}}
TopicCustomer.class:
package com.demo.dingyue;import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;import javax.jms.*;
import java.io.IOException;public class TopicCustomer {@Testpublic void testTopicCustomer() throws JMSException, IOException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("huaYuanBaoBao");MessageConsumer consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try{TextMessage textMessage = (TextMessage) message;String text = null;//取出消息的内容text= textMessage.getText();System.out.println(text);}catch (Exception e){e.printStackTrace();}}});System.out.println("消费端03");System.in.read();//关闭资源connection.close();consumer.close();session.close();}
}

和Spring整合:

spring-amq.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-3.0.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-3.0.xsd"><context:component-scan base-package="com.demo.spring"/><bean id="amqSenderService" class="com.demo.spring.AMQSenderServiceImpl"><!--<bean id="user" class="com.demo.spring.User">--></bean><bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="trustAllPackages" value="true"/><property name="brokerURL"><value>tcp://192.168.1.20:61616</value></property></bean></property><property name="maxConnections" value="100"></property></bean><!--使用缓存可以提升效率--><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="jmsFactory"/><property name="sessionCacheSize" value="1"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean><!--测试Queue,队列的名字是spring-queue--><bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"><!--<constructor-arg index="0" value="spring-queue"/>--><constructor-arg name="name" value="spring-queue"/></bean><!--测试Topic--><bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="spring-topic"/></bean></beans>
AMQSenderServiceImpl:
package com.demo.spring;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;@Service
public class AMQSenderServiceImpl  {private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);@Resource(name = "jmsTemplate")private JmsTemplate jmsTemplate;//目的地队列的明证,我们要向这个队列发送消息@Resource(name = "destinationQueue")private Destination destination;//向特定的队列发送消息public void sendMsg(final User user) {
//        final String msg = JSON.toJSONString(mqParamDto);user.setEmail("javaceshi@aa.com");user.setPassword("123456");user.setPhone("123456");user.setSex("M");user.setUsername("javaceshi");try {logger.info("将要向队列{}发送的消息msg:{}", destination, user);jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {
//                    return session.createObjectMessage(user);return  session.createTextMessage("2019/1/18message");}});} catch (Exception ex) {logger.error("向队列{}发送消息失败,消息为:{}", destination, user);}}
}
AMQReceiverServiceImpl:
package com.demo.spring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;@Service
public class AMQReceiverServiceImpl {private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);@Resource(name = "jmsTemplate")private JmsTemplate jmsTemplate;//目的地队列的明证,我们要向这个队列接收消息@Resource(name = "destinationQueue")private Destination destination;//向特定的队列接收消息public void receiverMsg(final User user) {
//try {Object object = jmsTemplate.receive(destination);User msg = (User) object;System.out.println(msg);} catch (Exception ex) {ex.printStackTrace();}}
}

测试类:App

package com.demo.spring;import com.demo.spring.User;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;/*** 主发送类**/
public class App
{public static void main( String[] args ){final  User user = new User();user.setEmail("javaceshi@aa.com");user.setPassword("123456");user.setPhone("123456");user.setSex("M");user.setUsername("javaceshi");ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-amq.xml");AMQSenderServiceImpl sendService = (AMQSenderServiceImpl)context.getBean("amqSenderService");sendService.sendMsg(user);
//        sendService.send(user);System.out.println("send successfully, please visit http://192.168.1.20:8161/admin to see it");}
}

转载于:https://www.cnblogs.com/charlypage/p/10493286.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/351224.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python遍历字典的四种方法对比

#!/usr/bin/python from time import clockl [(x,x) for x in xrange (10000000)] d dict(l) t0 clock() # 方法一 for i in d: n d[i]t1 clock() # 方法二&#xff1a;最慢 for k,v in d.items(): n vt2 clock() # 方法三: 最快&#xff0c;推荐方法 for k,v in d.ite…

jboss启动初始页面_JBoss BRMS最佳实践– BPM流程初始化层的提示

jboss启动初始页面我过去发布过一些有关迁移策略的文章&#xff0c;仔细研究了流程层&#xff0c;并提供了一些有关jBPM的最佳实践 &#xff0c;它们都涉及到BPM策略的非常具体的部分。 我想重新讨论最佳实践的主题&#xff0c;然后在智能集成企业级别上&#xff0c;我们讨论使…

Python 学习笔记 多进程 multiprocessing

Python 解释器有一个全局解释器锁(PIL)&#xff0c;导致每个 Python 进程中最多同时运行一个线程&#xff0c;因此 Python 多线程程序并不能改善程序性能&#xff0c;不能发挥多核系统的优势&#xff0c;可以通过这篇文章了解。 但是多进程程序不受此影响&#xff0c; Python 2…

Django 学习笔记第一课

Django web 框架介绍&#xff1a; MVC框架的核心思想 核心思想&#xff1a;解耦&#xff1b; 好处&#xff1a;可扩展性&#xff0c;向后兼容&#xff0c;低耦合&#xff0c;高内聚&#xff1b; 普通web结构框架MVC框架 M:model 主要用于数据库层次的封装&#xff1b; V:view…

记住要重置线程上下文类加载器

我很难思考与Java 加载有关的东西&#xff0c;而不是与类加载器有关的东西。 在使用应用程序服务器或OSGi的情况下尤其如此&#xff0c;在这些应用程序服务器或OSGi中&#xff0c;经常使用多个类加载器&#xff0c;并且透明地使用类加载器的能力降低了。 我同意OSGI Alliance B…

EntityFramework Code-First—领域类配置之DataAnnotations

本文出自&#xff1a;https://www.cnblogs.com/tang-tang/p/5510574.html 一、摘要 EF Code-First提供了一个可以用在领域类或其属性上的DataAnnotation特性集合&#xff0c;DataAnnotation特性会覆盖默认的EF约定。 DataAnnotation存在于两个命名空间里&#xff1a; System.Co…

Python 调试工具 PDB(Linux 环境下调试)

转载&#xff1a;http://blog.163.com/gjx0619126/blog/static/12740839320114995947700/ 在python中使用pdb模块可以进行调试 import pdb pdb.set_trace() 也可以使用python -m pdb mysqcript.py这样的方式 (Pdb) 会自动停在第一行&#xff0c;等待调试,这时你可以看看 帮助…

Ubuntu 更新源方法

安装完Ubuntu系统之后&#xff0c;面临的最主要的一个问题就是将apt安装源进行更新&#xff0c;因为在国内直接利用Ubuntu默认的安装源下载安装包速度慢&#xff0c;并且有的时候软件版本也比较旧。今天小编对Ubuntu更新源进行介绍&#xff1a;&#xff08;这里针对阿里源和清华…

PAT Basic 1002

1002 写出这个数 &#xff08;20 分&#xff09;读入一个正整数 n&#xff0c;计算其各位数字之和&#xff0c;用汉语拼音写出和的每一位数字。 输入格式&#xff1a; 每个测试输入包含 1 个测试用例&#xff0c;即给出自然数 n 的值。这里保证 n 小于 10​100​​。 输出格式&…

mybatis crud_MyBatis教程– CRUD操作和映射关系–第2部分

mybatis crud为了说明这一点&#xff0c;我们正在考虑以下示例域模型&#xff1a; 会有用户&#xff0c;每个用户可能都有一个博客&#xff0c;每个博客可以包含零个或多个帖子。 这三个表的数据库结构如下&#xff1a; CREATE TABLE user (user_id int(10) unsigned NOT NU…

MATLAB 排序函数(先按第一列排序(主排序)然后再按第二列排序(次排序))

利用 sortrows 函数实现MATLAB 先按第一列排序&#xff08;主排序&#xff09;然后再按第二列排序&#xff08;次排序&#xff09; A [8,9,6;5,5,2;2,5,8] sortrows(A)A 8 9 65 5 22 5 8ans 2 5 85 5 28 9 6

用PDB库调试Python程序

Python自带的pdb库&#xff0c;发现用pdb来调试程序还是很方便的&#xff0c;当然了&#xff0c;什么远程调试&#xff0c;多线程之类&#xff0c;pdb是搞不定的。用pdb调试有多种方式可选&#xff1a;1. 命令行启动目标程序&#xff0c;加上-m参数&#xff0c;这样调用myscrip…

数据操作

mysql> create table employee(-> id int primary key auto_increment,-> emp_name char(12) not null,-> sex enum(male,female) not null default male, #大部分是男的-> age int(3) unsigned not null default 28,-> hire_date date not null,-> post …

/usr/bin/ld: cannot find -l*** 这里***可以指lapack等

在Linux安装编译过程中有时会出现在如下形式的错误&#xff1a; /usr/bin/ld: cannot find -l***这里表示编译过程中找不到以下库名&#xff1a; lib库名(即***).so会发生这样的原因有以下三种情形&#xff1a; 系统没有安装相对应的lib 相对应的lib版本不对 lib&#xff0…

通过分区在Kafka中实现订单保证人

Kafka最重要的功能之一是实现消息的负载平衡&#xff0c;并保证分布式集群中的排序&#xff0c;否则传统队列中将无法实现。 首先让我们尝试了解问题陈述 让我们假设我们有一个主题&#xff0c;其中发送消息&#xff0c;并且有一个消费者正在使用这些消息。 如果只有一个使用…

破解栅栏密码python脚本

今天遇到一个要破解的栅栏密码&#xff0c;写了个通用的脚本 1 #!/usr/bin/env python2 # -*- coding: gbk -*-3 # -*- coding: utf_8 -*-4 # Author: 蔚蓝行5 # http://www.cnblogs.com/duanv6 e raw_input(请输入要解密的字符串\n)7 elen len(e)8 field[]9 for i in range(…

水稻已知os基因号,利用DAVIA进行GO功能富集分析

第1-5步&#xff1a; 已知水稻的基因&#xff08;os&#xff09;&#xff0c;进行功能注释 第6步 第七步&#xff1a; 第八步&#xff1a; 第九步&#xff1a; 第十步&#xff1a; 第十一步&#xff1a;

二,八,十,十六进制之间转换的相应方法

int num1 Integer.valueOf(n,16); //16进制转换成10进制 Integer.toHexString(Integer i); //10进制转换成16进制 补充&#xff1a;Integer.toHexString(Integer i);该方法得出的字符默认为小写&#xff0c;如果想得到大写结果&#xff0c;则变为Integer.toHexString(Integer i…

IDF实验室-图片里的英语

原题&#xff1a; 一恒河沙中有三千世界&#xff0c;一张图里也可以有很多东西。 不多说了&#xff0c;答案是这个图片包含的那句英文的所有单词的首字母。 首字母中的首字母要大写&#xff0c;答案格式是wctf{一坨首字母} 加油吧少年&#xff01;看好你哦~ writeup&#xff…

linux 终端调用MATLAB程序

linux 终端调用MATLAB程序 路径&#xff1a;/A/B/C/ 程序名称&#xff1a;xxx.m linux 终端调用MATLAB函数方法 cd /A/B/C/ matlab -nodisplay -nosplash -nodesktop -r "xxx;exit;"