(mq)rabbitmq死信队列与延时消息
什么是死信队列?
死信队列:DLX,dead-letter-exchange
利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange(死信交换机),这个Exchange就是DLX
死信交换机本质上也是一个普通交换机,和一般的Exchange没有区别,只不过它处理消息的特殊性,所以称之为死信。它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中的消息做相应的处理。

什么样的消息会成为死信?
大致有三种情况
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息TTL过期
- 队列达到最大长度
延时队列测试代码:
配置类:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Jiajiajia
* @Date: 2021/8/16
* @Description:
*/
@Configuration
public class MqConfig {
/**
* 死信队列
* @return
*/
@Bean
public Queue dlxQueue() {
return new Queue("dlxQueue",true);
}
/**
* 直连交换机(死信交换机DLX)
* @return
*/
@Bean
DirectExchange dlxExchange() {
return new DirectExchange("dlxExchange",true,false);
}
/**
* 队列和交换机绑定
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(dlxQueue()).
to(dlxExchange()).with("dlxRouting");
}
/**
* 普通交换机
* @return
*/
@Bean
DirectExchange exchange() {
return (DirectExchange) ExchangeBuilder
.directExchange("exchange") .durable(true) .build();
}
/**
* 普通队列
* @return
*/
@Bean
Queue queue() {
return QueueBuilder.durable("queue")
// 限制队列中最大消息数量
.withArgument("x-max-length", 5)
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange","dlxExchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "dlxRouting")
.build();
}
/**
* 队列和交换机绑定
* @return
*/
@Bean
public Binding Binding(Queue queue, DirectExchange exchange) {
return BindingBuilder .bind(queue) .to(exchange)
.with("routing");
}
}
生产者
/**
* 发送延迟消息
* @return
*/
@GetMapping("testTtl")
private String testTtl(){
rabbitTemplate.convertAndSend("exchange","routing","message info",
message ->{
//延迟3秒处理
message.getMessageProperties().setExpiration(String.valueOf(3*1000));
return message;
});
return "ok";
}
消费者:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DelayMessageReceiver {
@RabbitListener(queues = "dlxQueue")
public void cfgUserReceiveDealy(String message) throws IOException {
System.out.println("消息:"+message);
}
}
当queue队列中消息数量超过5,或者queue队列中的消息超时的时候,都会将消息提交到dlxQueue队列。
fixed
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。