(mq)rabbitmq消息消费确认(ack模式)

weblog 785 0 0

上一篇文章《(mq)rabbitmq消息发送确认》介绍了消息发布时的确认方案,本篇文章将介绍,消息消费确认的方法。

和确认发布一样,消费者有时也需要确认,rabbitmq有三种确认模式:

  • AcknowledgeMode.NONE:不确认
  • AcknowledgeMode.AUTO:自动确认
  • AcknowledgeMode.MANUAL:手动确认

rabbitmq默认使用的自动确认消息。如果业务代码中出现了异常,则该消息将会丢失,所以就需要手动确认消息。

消费者修改配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 手动确认

确认方法(Channel提供)

成功确认:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

  • deliveryTag:该消息的index
  • multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。

说明:

当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

multiple是为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。

失败确认(方法1)

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

  • deliveryTag:该消息的index。
  • multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
  • requeue:被拒绝的是否重新入队列。

失败确认(方法2)

void basicReject(long deliveryTag, boolean requeue) throws IOException;

  • deliveryTag:该消息的index。
  • requeue:被拒绝的是否重新入队列。

channel.basicNack channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

重新发布

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

  • exchange:要将消息发送到的Exchange(交换器)
  • routingKey:路由Key
  • mandatory:如果为true, 消息不能路由到指定的队列时,会触发channel.BasicReturn事件,如果为false,则broker会直接将消息丢弃。(channel.BasicReturn += Channel_BasicReturn;)
  • basicProperties:其它的一些属性
  • body:消息内

代码案例:


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;


/**
* @author: Jiajiajia
* @Date: 2021/8/26
* @Description: 直接交换机 监听器
*/
@Component
public class DirectListener {
    @RabbitListener(queues = "directQueue")
    public void receive(Map map, Message message, Channel channel) throws IOException {
        try{
            //todo
            System.out.println("监听接收到消息"+map);
            int a = 1/0;
            // 手动确认 成功消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }catch (Exception e){
            e.printStackTrace();
            // 失败确认
             channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
            // 失败确认后重新发布到 directRouting2 队列
            channel.basicPublish("directExchange","directRouting2",true,null,message.getBody());
        }
    }
}

猜你喜欢
official 829 发送在使用mq发送的时候,由于一些不定因素,可能会导致发送失败,比如网络的问题,服务器问题,或mq本身的问题都可能会导致发送失败。那么当发送成功或失败后程序如何感知呢?那就
java基础 1293 java线程通讯之生产者生产者是并发、多线程编程中经典的设计,生产者和者通过分离的执行工作解耦,简化了开发,生产者和者可以以不同的速度生产和数据。一个生产和
数据结构与算法 1338 在生产中我们可能会遇到在处理正常业务的过程中,其中会夹杂着一些非必需或不是特别重要的业务,而且这些业务还比较耗时,这个时候为了不影响正常业务性能,我们可以这些不是特别重要而且还比较耗时的业务独立出来,放入后台的一个执行队列中,后台可以慢慢执行,当队列中没有业务数据时,使该执行线程进入等待状态。当业务数据添加进队列中后唤醒处于等待状态的执行线程,继续处理业务。一、阻塞队列的实现packagecom.
official 878 上一篇《(mq)rabbitmq安装延时队列插件实现延时1》文章中介绍了rabbitmq安装延时队列插件。本编将继续结合代码来实现延时队列(基于springboot项目)。下方所有源代码均已上传
weblog 2555 某些情形中如果想要实现前端页面刷新,那么一个比较好的办法就是用websocket实现。应该是比ajax轮询要好吧~。既然是websocket主动推送,那么服务端查询和推送的时机就很重要,也就
official 743 什么是死信队列?死信队列:DLX,dead-letter-exchange利用DLX,当在一个队列中变成死信(deadmessage)之后,它能被重新publish到另一个Exchange(死信
weblog 724 pom依赖!--activemq所需要的jar包--dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-all/artifactIdversion5.15.9/version/dependency!--activemq和spring整合的基础包--dependencygroupIdorg.apache.xbean/group
框架 1279 activemq下载地址:http://activemq.apache.org/activemq-5140-release.html把下载的tar.gz文件放在linux系统的/opt/文件夹下,解压。服务启动,重启和关闭到activemq解压后的bin目录下执行:./activemqstart#启动服务./activemqrestart#重新启动服务./activemqstop#关闭服务检查是否
目录
没有一个冬天不可逾越,没有一个春天不会来临。最慢的步伐不是跬步,而是徘徊,最快的脚步不是冲刺,而是坚持。