【README】
本文使用java 连接rabbitmq,模拟生产者,消费者场景
【1】项目搭建
1)maven项目,依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hello</groupId><artifactId>rabbitmqtest</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.11.0</version></dependency></dependencies>
</project>
2)生产者
package com.hello.rabbitmqtest.simple;import com.hello.trong.rabbitmqtest.util.MyDateUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {/*** 队列名称 */static final String QUEUE_NAME = "simple_queue2";public static void main(String[] args) throws Exception {// 创建连接工厂 ConnectionFactory connFactory = new ConnectionFactory();// 主机地址,默认为 localhostconnFactory.setHost("192.168.163.201");;connFactory.setPort(5672);// 设置虚拟主机
// connFactory.setVirtualHost("/hello");// 设置账号密码 connFactory.setUsername("guest");connFactory.setPassword("guest");// 创建连接Connection conn = connFactory.newConnection();// 创建频道Channel channel = conn.createChannel();/*** 创建队列* 队列名称, 是否持久化队列,是否独占本次连接,是否在不使用的时候自动删除队列, 队列其他参数; */channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 发送消息 String message = "【simple_queue2】 hello rabbitmq now is " + MyDateUtil.getNow();channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("生产者发送消息" + message);// 关闭通道和连接channel.close();conn.close(); }
}
// 日期工具类
public class MyDateUtil {public static void main(String[] args) {}public static String getNow() {return getNow("yyyy-MM-dd hh:mm:ss.SSS");}public static String getNow(String format) {SimpleDateFormat formater = new SimpleDateFormat(format);String dateFormated = formater.format(new Date());return dateFormated; }
}
3)消费者
package com.hello.rabbitmqtest.simple;import java.io.IOException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;/*** rabbitmq 消费者*/
public class MyConsumer {/*** 队列名称 */static final String QUEUE_NAME = "simple_queue2";public static void main(String[] args) throws Exception {// 创建连接工厂 ConnectionFactory connFactory = new ConnectionFactory();// 主机地址,默认为 localhostconnFactory.setHost("192.168.163.201");;connFactory.setPort(5672);// 设置虚拟主机connFactory.setVirtualHost("/");// 设置账号密码 connFactory.setUsername("guest");connFactory.setPassword("guest");// 创建连接Connection conn = connFactory.newConnection();// 创建频道Channel channel = conn.createChannel();// 声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("customer wait to receive message");// 告诉服务器,我们需要哪个频道的角色,如果频道中有消息,就会执行回调函数 handleDeliveryConsumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费者标签,在 channel.basicConsume 可以指定 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) * @param properties 基本属性* @param body 消息字节数组 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交换机=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消费者收到的消息【%s】", message)); System.out.println("============================================="); } };// 自动恢复队列应答 -- rabbitmq中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); }
}
【2】 测试效果
// 生产者
生产者发送消息【simple_queue2】 hello rabbitmq now is 2021-02-28 12:01:23.926// 消费者
路由key=simple_queue2
交换机=
消息id=3
消费者收到的消息【【simple_queue2】 hello rabbitmq now is 2021-02-28 12:01:23.926】
=============================================