(mq)rabbitmq安装延时队列插件实现延时消息 2
上一篇《(mq)rabbitmq安装延时队列插件实现延时消息 1》文章中介绍了rabbitmq安装延时队列插件。本编将继续结合代码来实现延时队列(基于springboot项目)。
下方所有源代码均已上传到github: https://github.com/18438301593/rabbitmq ,注意看README.md
文档
配置文件
生产者消费者都一样
spring:
#给项目来个名字
application:
name: rabbitmq-consumer
#配置rabbitMq 服务器
rabbitmq:
host: 192.168.159.128
port: 5672
username: guest
password: guest
配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author: Jiajiajia
* @Date: 2021/8/26
* @Description: 延时交换机(需要安装插件 rabbitmq_delayed_message_exchange)
*/
@Configuration
public class DelayExchangeConfig {
public DelayExchangeConfig(){
System.out.println("DelayExchangeConfig init");
}
/**
* 延时队列
* @return
*/
@Bean
public Queue delayQueue(){
return new Queue("delayQueue",true);
}
/**
* 延时队列交换机
* 注意这里的交换机类型:CustomExchange
* @return
*/
@Bean
public CustomExchange delayExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayExchange","x-delayed-message",true, false,args);
}
/**
* 给延时队列绑定交换机
* @return
*/
@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange){
return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();
}
}
生产者
/**
* 测试 延时交换机
* @return
*/
@GetMapping("delay")
private String delay(@RequestParam("time") Integer time){
Map map = new HashMap();
map.put("id","1");
map.put("name","delay");
rabbitTemplate.convertAndSend("delayExchange","delay", map,
message -> {
message.getMessageProperties().setHeader("x-delay",time);
return message;
});
return "ok";
}
消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author: Jiajiajia
* @Date: 2021/8/26
* @Description: 延迟交换机 监听器
*/
@Component
public class DelayListener {
public DelayListener(){
System.out.println("DelayListener init ");
}
@RabbitListener(queues = "delayQueue")
public void receive(Map map){
System.out.println("delayQueue监听接收到消息:"+map);
}
}
fixed
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。