【README】
本文po出 rabbitmq路由模式;
【1】intro to 路由模式
特点1)队列与交换机的绑定,不能是任意绑定, 而是指定一个路由key-routingkey;
特点2)消息的发送方向在向 exchange-交换机发送消息时,也必须指定消息的routingkey;
特点3)exchange-交换机不再把消息发送给每一个绑定的队列,而是根据消息的routingkey发送到对应的队列;
与发布订阅模式不同,路由模式的交换机类型是 Direct,还有队列绑定交换机的时候需要指定routingkey;
【2】代码
生产者
/*** 路由模式生产者*/
public class RouteProducer {/* 交换机名称 */static final String DIRECT_EXCHANGE = "direct_exchange"; /*队列名称1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*队列名称2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*获取连接*/Connection conn = RBConnectionUtil.getConn();// 创建频道 Channel channel = conn.createChannel();/*** 声明交换机* 参数1-交换机名称 * 参数2-交换机类型(fanout, topic, direct, headers)*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); /*** 声明队列* 参数1 队列名称 * 参数2 是否定义持久化队列 * 参数3 是否独占本次连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(ROUTE_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(ROUTE_QUEUE_UPDATE, true, false, false, null);/*** routingkey-路由键*/String insertRoutingKey = "insert";String updateRoutingKey = "update";/*** 队列绑定交换机* 参数1 队列名称 * 参数2 交换机 * 参数3 routingkey-路由键 */channel.queueBind(ROUTE_QUEUE_INSERT, DIRECT_EXCHANGE, insertRoutingKey);channel.queueBind(ROUTE_QUEUE_UPDATE, DIRECT_EXCHANGE, updateRoutingKey);/* 发送消息-insert */ String insertMsg = "我是消息,路由模式routingkey=" + insertRoutingKey + MyDateUtil.getNow();/*** 参数1 交换机名称 如果没有指定则使用默认 default exchange * 参数2 routingkey-路由key, 简单模式可以传递队列名称 * 参数3 消息其他属性* 参数4 消息内容 */channel.basicPublish(DIRECT_EXCHANGE, insertRoutingKey, null, insertMsg.getBytes());System.out.println("已发送消息=" + insertMsg); /* 发送消息-update */String updateMsg = "我是消息,路由模式routingkey=" + updateRoutingKey + MyDateUtil.getNow();channel.basicPublish(DIRECT_EXCHANGE, updateRoutingKey, null, updateMsg.getBytes());System.out.println("已发送消息=" + updateMsg); /* 关闭连接和信道 */ channel.close();conn.close(); }
}
消费者-insert
/*** 路由模式消费者-routingkey */
public class RouteConsumerInsert {/* 交换机名称 */static final String DIRECT_EXCHANGE = "direct_exchange"; /*队列名称1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*队列名称2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*创建连接 */Connection conn = RBConnectionUtil.getConn();/*创建队列*/Channel channel = conn.createChannel(); /*声明交换机*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);/*** routingkey-路由键*/String insertRoutingKey = "insert";String updateRoutingKey = "update";/*** 声明/创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(ROUTE_QUEUE_INSERT, true, false, false, null);/*** 队列绑定交换机* 参数1 队列名称* 参数2 交换机* 参数3 routingkey-路由键 */channel.queueBind(ROUTE_QUEUE_INSERT, DIRECT_EXCHANGE, insertRoutingKey);/* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息 * 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(ROUTE_QUEUE_INSERT, true, consumer); }
}
消费者-update
/*** 路由模式消费者-routingkey */
public class RouteConsumerUpdate {/* 交换机名称 */static final String DIRECT_EXCHANGE = "topic_exchange"; /*队列名称1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*队列名称2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*创建连接 */Connection conn = RBConnectionUtil.getConn();/*创建队列*/Channel channel = conn.createChannel(); /*声明交换机*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);/*** routingkey-路由键*/String updateRoutingKey = "update";/*** 声明/创建队列 * 参数1 队列名称 * 参数2 是否持久化* 参数3 是否独占本连接 * 参数4 是否在不使用的时候自动删除队列* 参数5 队列其他参数 */channel.queueDeclare(ROUTE_QUEUE_UPDATE, true, false, false, null);/*** 队列绑定交换机* 参数1 队列名称* 参数2 交换机* 参数3 routingkey-路由键 */channel.queueBind(ROUTE_QUEUE_UPDATE, DIRECT_EXCHANGE, updateRoutingKey);/* 创建消费者,设置消息处理逻辑 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消费者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("=== 消费者1 end ===\n"); } };/*** 监听消息 * 参数1 队列名称 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; * 参数3 消息接收后的回调 */channel.basicConsume(ROUTE_QUEUE_UPDATE, true, consumer); }
}