RabbitMQ优先级队列的使用
生产者
public class PriorityQueue
{public static void Send(){string path = AppDomain.CurrentDomain.BaseDirectory;string tag = path.Split('/', '\\').Last(s => !string.IsNullOrEmpty(s));Console.WriteLine($"这里是 {tag} 启动了。。");ConnectionFactory factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码 using (IConnection connection = factory.CreateConnection()){using (IModel channel = connection.CreateModel()){//创建队列的时候,指定队列的优先级;x-max-priority:最大的优先级是10channel.QueueDeclare(queue: "PriorityQueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { {"x-max-priority",10 } //指定队列要支持优先级设置;});channel.ExchangeDeclare(exchange: "PriorityQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueBind(queue: "PriorityQueue", exchange: "PriorityQueueExchange", routingKey: "PriorityKey");string[] questionList = { "vip学员1 来请教", "甲 同学来请教问题", "乙 同学来请教问题", "丙 同学来请教问题", "丁 同学来请教问题", "vip学员2 来请教" };//设置消息优先级//VIP学员和公开课学员同时来请教问题解答,当然是优先VIP学员;IBasicProperties props = channel.CreateBasicProperties();foreach (string questionMsg in questionList){if (questionMsg.StartsWith("vip")){props.Priority = 9;channel.BasicPublish(exchange: "PriorityQueueExchange",routingKey: "PriorityKey",basicProperties: props,body: Encoding.UTF8.GetBytes(questionMsg));}else{props.Priority = 1;channel.BasicPublish(exchange: "PriorityQueueExchange",routingKey: "PriorityKey",basicProperties: props,body: Encoding.UTF8.GetBytes(questionMsg));}Console.WriteLine($"{questionMsg} 已发送~~");}Console.Read();}}}
}
消费者
public class PriorityQueue
{public static void Consumption(){var factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()){using (IModel channel = connection.CreateModel()){//定义消费者 var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{string msg = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine(msg);channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);};Console.WriteLine("消费者准备就绪....");//处理消息channel.BasicConsume(queue: "PriorityQueue", autoAck: false, consumer: consumer);Console.ReadKey();}}}
}