activitemq整合spring
一.activmq的点对点模型
pom.xml:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.demo</groupId><artifactId>aq-test</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>aq-test Maven Webapp</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>javax.jms</groupId><artifactId>jms-api</artifactId><version>1.1-rev-1</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.14.5</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework/spring-aop --><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.1.3.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version></dependency></dependencies><build><finalName>aq-test</finalName><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-war-plugin</artifactId><version>3.2.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin></plugins></pluginManagement></build>
</project>
ActiviteMq.class:(发送端)
package com.demo;import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;import javax.jms.*;public class ActiviteMq {@Testpublic void testQueueProducer() throws JMSException {//1.创建connectinfactory对象,需要指定服务的IP以及端口号//brokerURL服务器的ip以及端口号ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");//2.使用ConnectionFactory创建Connection connection = connectionFactory.createConnection();//3.开启链接,调用connection对象的start的方法connection.start();//4.使用connection对创建一个session对象//[4.1] 第一参数:是否开启事务//[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式//1.自动应答2.手动应答 一般为自动Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象//参数:队列名称Queue queue = session.createQueue("test-queue2");//第六步 使用session创建一个producer对象MessageProducer producer = session.createProducer(queue);//第七步 创建一个message对象 创建一个textmessage对象TextMessage textMessage = session.createTextMessage("风风光光");//第八步 使用producer对象发送消息producer.send(textMessage);//第九步 关闭资源producer.close();session.close();connection.close();}
}
ReceiveMsf.class:(接收端)
package com.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;import javax.jms.*;
import java.io.IOException;public class ReceiveMsf {@Testpublic void testQueueConsumer() throws JMSException, IOException {//1.创建connectinfactory对象,需要指定服务的IP以及端口号//brokerURL服务器的ip以及端口号ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");//2.使用ConnectionFactory创建Connection connection = connectionFactory.createConnection();//3.开启链接,调用connection对象的start的方法connection.start();//4.使用connection对创建一个session对象//[4.1] 第一参数:是否开启事务//[4.2] 第二参数:当第一个参数为false的时候 才有意义 消息的应答模式//1.自动应答2.手动应答 一般为自动Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//第五步:使用session对象创建一个destination对象(topic,queue) 此处创建一个queue对象//参数:队列名称Queue queue = session.createQueue("test-queue2");// 第六步:使用Session对象创建一个Consumer对象。MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = null;//取消的内容text = textMessage.getText();//第八步 打印消息System.out.println(text);} catch (JMSException e) {e.printStackTrace();}}});//等待键盘输入 阻塞System.in.read();//第九步 关闭资源consumer.close();session.close();connection.close();}
}
二.activmq的发布订阅模型
TopicProducer.class
package com.demo.dingyue;import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;import javax.jms.*;public class TopicProducer {@Testpublic void testTopicProducer() throws JMSException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("huaYuanBaoBao");MessageProducer producer = session.createProducer(topic);TextMessage textMessage = session.createTextMessage("这个是发布订阅的");producer.send(textMessage);producer.close();session.close();connection.close();}}
TopicCustomer.class:
package com.demo.dingyue;import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;import javax.jms.*;
import java.io.IOException;public class TopicCustomer {@Testpublic void testTopicCustomer() throws JMSException, IOException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.20:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("huaYuanBaoBao");MessageConsumer consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try{TextMessage textMessage = (TextMessage) message;String text = null;//取出消息的内容text= textMessage.getText();System.out.println(text);}catch (Exception e){e.printStackTrace();}}});System.out.println("消费端03");System.in.read();//关闭资源connection.close();consumer.close();session.close();}
}
和Spring整合:
spring-amq.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-3.0.xsdhttp://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-3.0.xsd"><context:component-scan base-package="com.demo.spring"/><bean id="amqSenderService" class="com.demo.spring.AMQSenderServiceImpl"><!--<bean id="user" class="com.demo.spring.User">--></bean><bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="trustAllPackages" value="true"/><property name="brokerURL"><value>tcp://192.168.1.20:61616</value></property></bean></property><property name="maxConnections" value="100"></property></bean><!--使用缓存可以提升效率--><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="jmsFactory"/><property name="sessionCacheSize" value="1"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean><!--测试Queue,队列的名字是spring-queue--><bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"><!--<constructor-arg index="0" value="spring-queue"/>--><constructor-arg name="name" value="spring-queue"/></bean><!--测试Topic--><bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="spring-topic"/></bean></beans>
AMQSenderServiceImpl:
package com.demo.spring;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;@Service
public class AMQSenderServiceImpl {private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);@Resource(name = "jmsTemplate")private JmsTemplate jmsTemplate;//目的地队列的明证,我们要向这个队列发送消息@Resource(name = "destinationQueue")private Destination destination;//向特定的队列发送消息public void sendMsg(final User user) {
// final String msg = JSON.toJSONString(mqParamDto);user.setEmail("javaceshi@aa.com");user.setPassword("123456");user.setPhone("123456");user.setSex("M");user.setUsername("javaceshi");try {logger.info("将要向队列{}发送的消息msg:{}", destination, user);jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {
// return session.createObjectMessage(user);return session.createTextMessage("2019/1/18message");}});} catch (Exception ex) {logger.error("向队列{}发送消息失败,消息为:{}", destination, user);}}
}
AMQReceiverServiceImpl:
package com.demo.spring;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;@Service
public class AMQReceiverServiceImpl {private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);@Resource(name = "jmsTemplate")private JmsTemplate jmsTemplate;//目的地队列的明证,我们要向这个队列接收消息@Resource(name = "destinationQueue")private Destination destination;//向特定的队列接收消息public void receiverMsg(final User user) {
//try {Object object = jmsTemplate.receive(destination);User msg = (User) object;System.out.println(msg);} catch (Exception ex) {ex.printStackTrace();}}
}
测试类:App
package com.demo.spring;import com.demo.spring.User;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;/*** 主发送类**/
public class App
{public static void main( String[] args ){final User user = new User();user.setEmail("javaceshi@aa.com");user.setPassword("123456");user.setPhone("123456");user.setSex("M");user.setUsername("javaceshi");ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-amq.xml");AMQSenderServiceImpl sendService = (AMQSenderServiceImpl)context.getBean("amqSenderService");sendService.sendMsg(user);
// sendService.send(user);System.out.println("send successfully, please visit http://192.168.1.20:8161/admin to see it");}
}