在优先级队列中,针对生产者给消息设置的优先级进行排序,优先级越大的越先被消费者消费。如果在消费者的消费速度大于生产者的速度,且Broker中没有消息堆积的情况下,对发送的消息设置优先级没有意义。
在生产者,按如下方式给消息设置 priority 属性
1 2 3
| AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .priority(5).build();
|
在消费者,按如下方式声明优先级队列,优先级的取值范围是0-255,推荐使用 10。
1 2 3
| Map<String,Object> arguments = new HashMap<>(1); arguments.put("x-max-priority",10); channel.queueDeclare("queue_name", true, false, false, arguments);
|
在RabbitMQ的控制台添加优先级队列

代码示例
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class MessagePriorityProducer { static final String QUEUE_NAME = "priority";
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.19.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
AMQP.BasicProperties properties1 = new AMQP.BasicProperties() .builder() .priority(5) .build(); AMQP.BasicProperties properties2 = new AMQP.BasicProperties() .builder() .priority(3) .build();
for (int i = 1; i <= 10; i++) { String message = "message_" + i; if (i == 5) { channel.basicPublish("", QUEUE_NAME, properties1, message.getBytes()); } else if (i == 7) { channel.basicPublish("", QUEUE_NAME, properties2, message.getBytes()); } else { channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } System.out.println("发送消息完成:" + message); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class MessagePriorityConsumer { static final String QUEUE_NAME = "priority";
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.19.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
Map<String,Object> arguments = new HashMap<>(1); arguments.put("x-max-priority",10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
System.out.println("等待接收消息..."); channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> { byte[] msgBytes = delivery.getBody(); System.out.println("接收到消息:消息ID=" + consumerTag + "===>" + new String(msgBytes)); }, consumerTag -> { System.out.println("消息被中断:消息ID=" + consumerTag); }); } }
|