Fanout消息模型
 
* 广播模型:*  一个交换机绑定多个队列*  每个队列都有一个消费者*  每个消费者消费自己队列中的消息,每个队列的信息是一样的
 
生产者
 
package com.example.demo02.mq.fanout;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
public class FanoutSender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);String msg = "fanout message";channel.basicPublish("fanout.exchange", "", null, msg.getBytes());channel.close();connection.close();}
}
 
消费者1
 
package com.example.demo02.mq.fanout;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
public class FanoutReceiver1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);channel.queueDeclare("fanout.queue1", false, false, false, null);channel.queueBind("fanout.queue1", "fanout.exchange", "");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Fanout1接收到的消息是:" + new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("fanout.queue1",false,consumer);}
}
 
消费者2
 
package com.example.demo02.mq.fanout;import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
public class FanoutReceiver2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);channel.queueDeclare("fanout.queue2", false, false, false, null);channel.queueBind("fanout.queue2", "fanout.exchange", "");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Fanout2接收到的消息是:" + new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("fanout.queue2",false,consumer);}
}
 
结果
 
