package com.java1234.producer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {/*** direct交换机名称*/public static final String DIRECT_EXCHANGE="directExchange";/*** direct交换机名称1*/public static final String DIRECT_EXCHANGE1="directExchange1";/*** fanout交换机名称*/public static final String FANOUT_EXCHANGE="fanoutExchange";/*** direct队列名称*/public static final String DIRECT_QUEUE="directQueue";/*** direct1队列名称*/public static final String DIRECT_QUEUE1="directQueue1";/*** direct2队列名称*/public static final String DIRECT_QUEUE2="directQueue2";/*** 订阅队列1名称*/public static final String SUB_QUEUE1="subQueue1";/*** 订阅队列2名称*/public static final String SUB_QUEUE2="subQueue2";/*** direct路由Key*/public static final String DIRECT_ROUTINGKEY="directRoutingKey";/*** topic队列名称1**/public static final String TOPIC_QUEUE1="topicQueue1";/*** topic队列名称2**/public static final String TOPIC_QUEUE2="topicQueue2";/*** direct交换机名称*/public static final String TOPIC_EXCHANGE="topicExchange";/*** 定义一个direct交换机* @return*/@Beanpublic DirectExchange directExchange(){return new DirectExchange(DIRECT_EXCHANGE);}/*** 定义一个direct交换机1* @return*/@Beanpublic DirectExchange directExchange1(){return new DirectExchange(DIRECT_EXCHANGE1);}/*** 定义一个direct交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}/*** 定义一个direct队列* @return*/@Beanpublic Queue directQueue(){return new Queue(DIRECT_QUEUE);}/*** 定义一个direct1队列* @return*/@Beanpublic Queue directQueue1(){return new Queue(DIRECT_QUEUE1);}/*** 定义一个direct2队列* @return*/@Beanpublic Queue directQueue2(){return new Queue(DIRECT_QUEUE2);}/*** 定义一个订阅队列1* @return*/@Beanpublic Queue subQueue1(){return new Queue(SUB_QUEUE1);}/*** 定义一个订阅队列2* @return*/@Beanpublic Queue subQueue2(){return new Queue(SUB_QUEUE2);}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding(){return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding fanoutBinding1(){return BindingBuilder.bind(subQueue1()).to(fanoutExchange());}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(subQueue2()).to(fanoutExchange());}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding1(){return BindingBuilder.bind(directQueue1()).to(directExchange1()).with("error");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding2(){return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("info");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding3(){return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("error");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding4(){return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("warning");}/*** 定义一个topic队列1*/@Beanpublic Queue topicQueue1(){return new Queue(TOPIC_QUEUE1);}/*** 定义一个topic队列2*/@Beanpublic Queue topicQueue2(){return new Queue(TOPIC_QUEUE2);}/*** 定义一个direct交换机* @return*/@Beanpublic TopicExchange topicExchange(){return new TopicExchange(TOPIC_EXCHANGE);}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding topicBinding1(){return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding topicBinding2(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding topicBinding3(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");}
}
package com.java1234.consumer.service.impl;import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {@Autowiredprivate AmqpTemplate amqpTemplate;@Overridepublic void receiveMessage() {String message=(String) amqpTemplate.receiveAndConvert(RabbitMQConfig.DIRECT_QUEUE);System.out.println("接受到的mq消息:"+message);}@Override@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE1})public void receiveMessage2(String message) {
// System.out.println("消费者1:接收到的mq消息:"+message);System.out.println("队列1接收日志消息:"+message);}@Override@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE2})public void receiveMessage3(String message) {
// System.out.println("消费者2:接收到的mq消息:"+message);System.out.println("队列2接收日志消息:"+message);}@Override@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})public void receiveSubMessage1(String message) {System.out.println("订阅者1:接收到的mq消息:"+message);}@Override@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})public void receiveSubMessage2(String message) {System.out.println("订阅者2:接收到的mq消息:"+message);}}
package com.java1234.consumer.service;public interface RabbitMqService {/*** 接受消息*/public void receiveMessage();/*** 接受消息*/public void receiveMessage2(String message);/*** 接受消息*/public void receiveMessage3(String message);/*** 接受订阅消息1*/public void receiveSubMessage1(String message);/*** 接受订阅消息2*/public void receiveSubMessage2(String message);
}
package com.java1234.consumer;import com.java1234.consumer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {ApplicationContext ac = SpringApplication.run(ConsumerApplication.class,args);
// RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
// rabbitMqService.receiveMessage();}
}
package com.java1234.producer.service.impl;import com.java1234.producer.config.RabbitMQConfig;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService, RabbitTemplate.ConfirmCallback {@Autowiredprivate AmqpTemplate amqpTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}/*** String exchange 交换机名称* String routingKey 路由Key* Object object 具体发送的消息* @param message*/@Overridepublic void sendMessage(String message) {
// amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message);CorrelationData correlationData=new CorrelationData("3453");rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message,correlationData);}@Overridepublic void sendFanoutMessage(String message) {amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,"",message);}@Overridepublic void sendRoutingMessage() {amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE1,"warning2","发送warning2级别的消息");}@Overridepublic void sendTopicMessage() {
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.rabbit","飞快的橘色兔子");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.orange.elephant","慢腾腾的橘色大象");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.fox","quick.orange.fox");
// amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.brown.fox","lazy.brown.fox");amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.brown.fox","quick.brown.fox");}/**** @param correlationData 消息唯一标识* @param ack 交换机是否成功收到消息 true成功 false失败* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm方法被执行了..."+correlationData);if(ack){System.out.println("交换机,消息接收成功"+cause);}else{System.out.println("交换机,消息接收失败"+cause);//我们这里要做一些消息补发的措施System.out.println("id="+correlationData.getId());}}
}
package com.java1234.producer.service;public interface RabbitMqService {/*** 发送消息* @param message*/public void sendMessage(String message);/*** 发送消息* @param message*/public void sendFanoutMessage(String message);/*** 发送路由模式消息*/public void sendRoutingMessage();/*** 发送Topic模式消息*/public void sendTopicMessage();
}
package com.java1234.producer;import com.java1234.producer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
// rabbitMqService.sendRoutingMessage();
// rabbitMqService.sendTopicMessage();rabbitMqService.sendMessage("confirm确认测试消息");// for(int i=0;i < 10;i++){rabbitMqService.sendMessage("RabbitMQ大爷你好!!!"+i);
// rabbitMqService.sendFanoutMessage(i+"用户欠费了");
// }}
}
server:port: 80
spring:rabbitmq:host: 192.168.30.113port: 5672username: pzypassword: 123456virtual-host: /publisher-confirm-type: correlated