上一篇文章《(mq)rabbitmq消息发送确认》介绍了消息发布时的确认方案,本篇文章将介绍,消息消费确认的方法。
和确认发布一样,消费者有时也需要确认,rabbitmq有三种确认模式:
- AcknowledgeMode.NONE:不确认
- AcknowledgeMode.AUTO:自动确认
- AcknowledgeMode.MANUAL:手动确认
rabbitmq默认使用的自动确认消息。如果业务代码中出现了异常,则该消息将会丢失,所以就需要手动确认消息。
消费者修改配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认
确认方法(Channel提供)
成功确认:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
- deliveryTag:该消息的index
- multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。
说明:
当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。
multiple是为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
失败确认(方法1)
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
- deliveryTag:该消息的index。
- multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
- requeue:被拒绝的是否重新入队列。
失败确认(方法2)
void basicReject(long deliveryTag, boolean requeue) throws IOException;
- deliveryTag:该消息的index。
- requeue:被拒绝的是否重新入队列。
channel.basicNack
与 channel.basicReject
的区别在于basicNack
可以批量拒绝多条消息,而basicReject
一次只能拒绝一条消息。
重新发布
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- exchange:要将消息发送到的Exchange(交换器)
- routingKey:路由Key
- mandatory:如果为true, 消息不能路由到指定的队列时,会触发channel.BasicReturn事件,如果为false,则broker会直接将消息丢弃。(channel.BasicReturn += Channel_BasicReturn;)
- basicProperties:其它的一些属性
- body:消息内
代码案例:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* @author: Jiajiajia
* @Date: 2021/8/26
* @Description: 直接交换机 监听器
*/
@Component
public class DirectListener {
@RabbitListener(queues = "directQueue")
public void receive(Map map, Message message, Channel channel) throws IOException {
try{
//todo
System.out.println("监听接收到消息"+map);
int a = 1/0;
// 手动确认 成功消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
e.printStackTrace();
// 失败确认
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
// 失败确认后重新发布到 directRouting2 队列
channel.basicPublish("directExchange","directRouting2",true,null,message.getBody());
}
}
}