SpringBoot+RabbitMQ实现手动Consumer Ack

Lewis
2021-03-12 / 0 评论 / 235 阅读 / 正在检测是否收录...

一、Consumer Ack的三种方式

  1. 自动确认:acknowledge = “none”,这是默认的方式,如果不配置的话,默认就是自动确认,消费方从消息队列中拿出消息后,消息队列中都会清除掉这条消息(不安全).
  2. 手动确认:acknowledge = “manual”,手动确认就是当消费者取出来消息其后的操作正常执行后,返回给消息队列,让其清除该条消息;如果后续执行有异常,可以设置requeue=true返回其消息队列,再让其消息队列重新给消费者发送消息.
  3. 根据异常情况确认(很麻烦):acknowledge = “auto”.

二、SpringBoot+RabbitMQ实现手动Consumer Ack

1.pom文件中导入依赖坐标
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在生产者与消费者工程yml配置文件中开启手动Ack
spring:
  rabbitmq:
    host: 192.168.253.128 #ip
    username: guest
    password: guest
    virtual-host: /
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual #开启手动Ack
3.在生产者工程中创建一个配置类声明队列与交换机的关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfig {

    //交换机的名称;
    public static final String DIRECT_EXCHANGE_NAME = "direct_boot_exchange";
    //队列名称;
    public static final String DIRECT_QUEUE_NAME = "direct_boot_queue";


    /**
     * 声明交换机,在以后我们会定义多个交换机,
     * 所以给这个注入的Bean起一个名字,同理在绑定的时候用@Qualifier注解;
     * durablie:持久化
     */

    @Bean("directExchange")
    public Exchange directExchange(){
        return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
    }

    //声明队列;
    @Bean("directQueue")
    public Queue testQueue(){
        return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();
    }

    //绑定交换机和队列,把上述声明的交换机、队列作为参数传入进来;
    @Bean
    public Binding bindDirectExchangeQueue(@Qualifier("directQueue") Queue queue,
                                           @Qualifier("directExchange") Exchange exchange){
                                           
        return BindingBuilder.bind(queue).to(exchange).with("info").noargs();

    }

}
4.在消费者工程中创建一个组件监听在生产者声明的队列
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class MyAckListener {

    /**
     *
     * @param message 队列中的消息;
     * @param channel 当前的消息队列;
     * @param tag 取出来当前消息在队列中的的索引,
     * 用这个@Header(AmqpHeaders.DELIVERY_TAG)注解可以拿到;
     * @throws IOException
     */

    @RabbitListener(queues = "direct_boot_queue")
    public void myAckListener(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

        System.out.println(message);
        try {

            /**
             * 无异常就确认消息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:取出来当前消息在队列中的的索引;
             * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
             * deliveryTag为5及其以下的消息;一般设置为false
             */
            channel.basicAck(tag, false);
        }catch (Exception e){
            /**
             * 有异常就绝收消息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
             *         false:将消息丢弃
             */
            channel.basicNack(tag,false,true);
        }
        
    }

}
5.在生产者中创建一个测试类来发送消息
import com.itlw.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducedTest {

    //从IOC容器中拿模板类;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        //发送消息;
        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME,
                "info","这是一条测试消息....");
    }
}
6.启动消费者工程来接收此队列的消息
可以看到控制台输出了接收到的消息,并且因为已经被确认,所以队列中消息已经为0,要测出效果,手动添加一个异常.
km5m73o2.png
km5m7gcz.png
7.手动添加一个异常
try {
            /**
             * 无异常就确认消息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:取出来当前消息在队列中的的索引;
             * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
             * deliveryTag为5及其以下的消息;一般设置为false
             */
            int i = 3 / 0;//手动添加异常
            channel.basicAck(tag, false);
        } catch (Exception e) {
            /**
             * 有异常就绝收消息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
             *         false:将消息丢弃
             */
            channel.basicNack(tag, false, true);
        }
8.再次运行看结果
我设置了 channel.basicNack(tag, false, true);第三个requeue属性为true由队列又重新发送给消费者,消费者接收到消息后确认之前遇到了错误又重新拒收消息…所以进入了一个死循环
km5m8bei.png
等暂停运行后,可以看到消息队列中还剩一条消息,就是消费者绝收的这条消息,如果把requeue设置为false,那么这个队列中将没有这条消息.
km5m8xbd.png
km5m969n.png
1

评论

博主关闭了当前页面的评论