目录
引入依赖
配置文件
不同模式下使用springboot收发消息
直连模式
生产者
消费者
Fanout模式
生产者
消费者
Topic主题模式
生产者
消费者
Headers模式
生产者
消费者
补充Quorum队列
生产者
消费者
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
注意下版本。不同版本下的配置方式会有变化。
配置文件
所有的基础运行环境都在application.properties中进行配置。所有配置以spring.rabbitmq开头。通常按照示例进行一些基础的必要配置就可以跑了。
server.port=8080
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/mirror# 单词推送消息数量
spring.rabbitmq.listener.simple.prefetch=1
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency=5
# 消费者的最大线程数量
spring.rabbitmq.listener.simple.max-concurrency=10
# 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=none
不同模式下使用springboot收发消息
工具类
public class MyConstants {public static final String QUEUE_QUORUM = "quorumQueue";public static final String QUEUE_STREAM = "streamQueue";//direct模式,直接发送到队列public static final String QUEUE_DIRECT = "directqueue";//fanout模式public static final String EXCHANGE_FANOUT = "fanoutExchange";public static final String QUEUE_FANOUT_Q1 = "fanout.q1";public static final String QUEUE_FANOUT_Q2 = "fanout.q2";public static final String QUEUE_FANOUT_Q3 = "fanout.q3";public static final String QUEUE_FANOUT_Q4 = "fanout.q4";//topic模式public static final String EXCHANGE_TOPIC = "topicExchange";public static final String QUEUE_TOPIC1 = "hunan.eco";public static final String QUEUE_TOPIC2 = "hunan.IT";public static final String QUEUE_TOPIC3 = "hebei.eco";public static final String QUEUE_TOPIC4 = "hebei.IT";//header模式public static final String EXCHANGE_HEADER = "headerExchange";public static final String QUEUE_TXTYP1 = "txTyp1";public static final String QUEUE_BUSTYP1 = "busTyp1";public static final String QUEUE_TXBUSTYP1 = "txbusTyp1";}
直连模式
/*** 直连模式只需要声明队列,所有消息都通过队列转发。*/
@Configuration
public class DirectConfig {@Beanpublic Queue directQueue() {return new Queue(MyConstants.QUEUE_DIRECT);}
}
生产者
@ApiOperation(value="direct发送接口",notes="直接发送到队列。task模式")@GetMapping(value="/directSend")public Object directSend(String message) throws AmqpException, UnsupportedEncodingException {//设置部分请求参数MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setPriority(2);//设置消息转换器,如jsonrabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//将对象转换成json再发送。
// rabbitTemplate.convertandsend("",Object);//发消息rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));return "message sended : "+message;}
消费者
//直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式//通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos@RabbitListener(queues=MyConstants.QUEUE_DIRECT,containerFactory="qos_4")public void directReceive22(Message message, Channel channel, String messageStr) {System.out.println("consumer1 received message : " +messageStr);}@RabbitListener(queues=MyConstants.QUEUE_DIRECT)public void directReceive2(String message) {System.out.println("consumer2 received message : " +message);}
@Configuration
public class RabbitmqConfig {@Bean(name="qos_4")public SimpleRabbitListenerContainerFactory getSimpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setMaxConcurrentConsumers(4);factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动确认return factory;}
}
Fanout模式
/*** Fanout模式需要声明exchange,并绑定queue,由exchange负责转发到queue上。*/
@Configuration
public class FanoutConfig {//声明队列@Beanpublic Queue fanoutQ1() {return new Queue(MyConstants.QUEUE_FANOUT_Q1);}@Beanpublic Queue fanoutQ2() {return new Queue(MyConstants.QUEUE_FANOUT_Q2);}@Beanpublic Queue fanoutQ3() {return new Queue(MyConstants.QUEUE_FANOUT_Q3);}@Beanpublic Queue fanoutQ4() {return new Queue(MyConstants.QUEUE_FANOUT_Q4);}//声明exchange@Beanpublic FanoutExchange setFanoutExchange() {return new FanoutExchange(MyConstants.EXCHANGE_FANOUT);}//声明Binding,exchange与queue的绑定关系@Beanpublic Binding bindQ1() {return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());}@Beanpublic Binding bindQ2() {return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());}@Beanpublic Binding bindQ3() {return BindingBuilder.bind(fanoutQ3()).to(setFanoutExchange());}@Beanpublic Binding bindQ4() {return BindingBuilder.bind(fanoutQ4()).to(setFanoutExchange());}}
生产者
@ApiOperation(value="fanout发送接口",notes="发送到fanoutExchange。消息将往该exchange下的所有queue转发")@GetMapping(value="/fanoutSend")public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send(MyConstants.EXCHANGE_FANOUT, "", new Message(message.getBytes("UTF-8"),messageProperties));Message message2 = MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build();rabbitTemplate.send(message2);return "message sended : "+message;}
消费者
//fanout模式接收还是只指定队列@RabbitListener(queues=MyConstants.QUEUE_FANOUT_Q1)public void fanoutReceiveq1(String message) {System.out.println("fanoutReceive q1 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_FANOUT_Q2)public void fanoutReceiveq2(String message) {System.out.println("fanoutReceive q2 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_FANOUT_Q3)public void fanoutReceiveq3(String message) {System.out.println("fanoutReceive q3 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_FANOUT_Q4)public void fanoutReceiveq4(String message) {System.out.println("fanoutReceive q4 received message : " +message);}
Topic主题模式
@Configuration
public class TopicConfig {//声明队列@Beanpublic Queue topicQ1() {return new Queue(MyConstants.QUEUE_TOPIC1);}@Beanpublic Queue topicQ2() {return new Queue(MyConstants.QUEUE_TOPIC2);}@Beanpublic Queue topicQ3() {return new Queue(MyConstants.QUEUE_TOPIC3);}@Beanpublic Queue topicQ4() {return new Queue(MyConstants.QUEUE_TOPIC4);}//声明exchange@Beanpublic TopicExchange setTopicExchange() {return new TopicExchange(MyConstants.EXCHANGE_TOPIC);}//声明binding,需要声明一个roytingKey@Beanpublic Binding bindTopicHebei1() {return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("hunan.*");}@Beanpublic Binding bindTopicHebei2() {return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("*.IT");}@Beanpublic Binding bindTopicHebei3() {return BindingBuilder.bind(topicQ3()).to(setTopicExchange()).with("*.eco");}@Beanpublic Binding bindTopicHebei4() {return BindingBuilder.bind(topicQ4()).to(setTopicExchange()).with("hebei.*");}}
生产者
@ApiOperation(value="topic发送接口",notes="发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。")@ApiImplicitParam(name="routingKey",value="路由关键字")@GetMapping(value="/topicSendHunanIT")public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {if(null == routingKey) {routingKey="hebei.IT";}MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));return "message sended : routingKey >"+routingKey+";message > "+message;}
消费者
//topic Receiver//注意这个模式会有优先匹配原则。例如发送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不会再去匹配*.IT(hebei.IT)@RabbitListener(queues=MyConstants.QUEUE_TOPIC1)public void topicReceiveq1(String message) {System.out.println("topic hunan.eco received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_TOPIC2)public void topicReceiveq2(String message) {System.out.println("topic hunan.IT received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_TOPIC3)public void topicReceiveq3(String message) {System.out.println("topic hebei.eco received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_TOPIC4)public void topicReceiveq4(String message) {System.out.println("topic hebei.IT received message : " +message);}
Headers模式
@Configuration
public class HeaderConfig {//声明queue@Beanpublic Queue headQueueTxTyp1() {return new Queue(MyConstants.QUEUE_TXTYP1);}@Beanpublic Queue headQueueBusTyp1() {return new Queue(MyConstants.QUEUE_BUSTYP1);}@Beanpublic Queue headQueueTxBusTyp() {return new Queue(MyConstants.QUEUE_TXBUSTYP1);}//声明exchange@Beanpublic HeadersExchange setHeaderExchange() {return new HeadersExchange(MyConstants.EXCHANGE_HEADER);}//声明Binding//绑定header中txtyp=1的队列。header的队列匹配可以用mathces和exisits@Beanpublic Binding bindHeaderTxTyp1() {return BindingBuilder.bind(headQueueTxTyp1()).to(setHeaderExchange()).where("txTyp").matches("1");}//绑定Header中busTyp=1的队列。@Bean public Binding bindHeaderBusTyp1() {return BindingBuilder.bind(headQueueBusTyp1()).to(setHeaderExchange()).where("busTyp").matches("1");}//绑定Header中txtyp=1或者busTyp=1的队列。@Bean public Binding bindHeaderTxBusTyp1() {Map<String,Object> condMap = new HashMap<>();condMap.put("txTyp", "1");condMap.put("busTyp", "1");
// return BindingBuilder.bind(headQueueTxBusTyp()).to(setHeaderExchange()).whereAny(new String[] {"txTyp","busTyp"}).exist();return BindingBuilder.bind(headQueueTxBusTyp()).to(setHeaderExchange()).whereAny(condMap).match();}
}
生产者
@ApiOperation(value="header发送接口",notes="发送到headerExchange。exchange转发消息时,不再管routingKey,而是根据header条件进行转发。")@GetMapping(value="/headerSend")public Object headerSend(String txTyp,String busTyp,String message) throws AmqpException, UnsupportedEncodingException {if(null == txTyp) {txTyp="0";}if(null == busTyp) {busTyp="0";}MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setHeader("txTyp", txTyp);messageProperties.setHeader("busTyp", busTyp);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send("headerExchange", "uselessRoutingKey", new Message(message.getBytes("UTF-8"),messageProperties));return "message sended : txTyp >"+txTyp+";busTyp > "+busTyp;}
消费者
//header receiver//这个模式不再根据routingKey转发,而是根据header中的匹配条件进行转发@RabbitListener(queues=MyConstants.QUEUE_TXTYP1)public void headerReceiveq1(String message) {System.out.println("header txTyp1 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_BUSTYP1)public void headerReceiveq2(String message) {System.out.println("header busTyp1 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_TXBUSTYP1)public void headerReceiveq3(String message) {System.out.println("header txbusTyp1 received message : " +message);}
补充Quorum队列
/*** @desc 声明一个Quorum队列*/
@Configuration
public class QuorumConfig {@Beanpublic Queue quorumQueue() {Map<String,Object> params = new HashMap<>();params.put("x-queue-type","quorum");return new Queue(MyConstants.QUEUE_QUORUM,true,false,false,params);}
}
生产者
@ApiOperation(value="quorum队列发送接口",notes="直接发送到队列。Quorum队列")@GetMapping(value="/directQuorum")public Object directQuorum(String message) throws AmqpException, UnsupportedEncodingException {//设置部分请求参数MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setPriority(2);//设置消息转换器,如jsonrabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//将对象转换成json再发送。
// rabbitTemplate.convertandsend("",Object);//发消息rabbitTemplate.send(MyConstants.QUEUE_QUORUM,new Message(message.getBytes("UTF-8"),messageProperties));return "message sended : "+message;}
消费者
@RabbitListener(queues = MyConstants.QUEUE_QUORUM)public void quorumReceiver(String message){System.out.println("quorumReceiver received message : "+ message);}