SpringBoot怎么整合RabbitMQ实现延迟队列

如何保证消息不丢失

rabbitmq消息投递路径

生产者->交换机->队列->消费者

总的来说分为三个阶段。

1.生产者保证消息投递可靠性。

2.mq内部消息不丢失。

3.消费者消费成功。

什么是消息投递可靠性

简单点说就是消息百分百发送到消息队列中。

我们可以开启confirmCallback

生产者投递消息后,mq会给生产者一个ack.根据ack,生产者就可以确认这条消息是否发送到mq.

开启confirmCallback

修改配置文件

#NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法spring:  rabbitmq:    publisher-confirm-type: correlated

登录后复制

测试代码

@Test  public void testConfirmCallback() throws InterruptedException {      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {      /**      *      * @param correlationData 配置      * @param ack 交换机是否收到消息,true是成功,false是失败      * @param cause 失败的原因      */      @Override      public void confirm(CorrelationData correlationData, boolean ack, String cause) {          System.out.println("confirm=====>");          System.out.println("confirm==== ack="+ack);          System.out.println("confirm==== cause="+cause);          //根据ACK状态做对应的消息更新操作 TODO      }      });      rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");      Thread.sleep(10000);  }

登录后复制

通过returnCallback保证消息从交换器发送到队列成功。 修改配置文件

spring:  rabbitmq:    #开启returnCallback    publisher-returns: true    #交换机处理消息到路由失败,则会返回给生产者    template:      mandatory: true

登录后复制

测试代码

@Test  void testReturnCallback() {      //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定      rabbitTemplate.setMandatory(true);      //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息      rabbitTemplate.setReturnsCallback(returned -> {          int code = returned.getReplyCode();          System.out.println("code="+code);          System.out.println("returned="+ returned);      });      rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback");  }

登录后复制

消费者消费消息时需要通过ack手动确认消息已消费。

修改配置文件

spring:  rabbitmq:    listener:        simple:          acknowledge-mode: manual

登录后复制

编写测试代码

@RabbitHandler  public void consumer(String body, Message message, Channel channel) throws IOException {      long msgTag = message.getMessageProperties().getDeliveryTag();      System.out.println("msgTag="+msgTag);      System.out.println("message="+ message);      System.out.println("body="+body);      //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除      channel.basicAck(msgTag,false);      // channel.basicNack(msgTag,false,true);    }

登录后复制

deliveryTags是消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

ttl死信队列

什么是死信队列

没有被及时消费的消息存放的队列

消息有哪几种情况成为死信

消费者拒收消息 (basic.reject/ basic.nack) ,并且没有重新入队 requeue=false

消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)

队列的消息长度达到极限

结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

死信队列经常用来做延迟队列消费。

延迟队列

生产者投递到mq中并不希望这条消息立马被消费,而是等待一段时间后再去消费。

springboot整合rabbitmq实现订单超时自动关闭

package com.fandf.test.rabbit;    import org.springframework.amqp.core.*;  import org.springframework.beans.factory.annotation.Qualifier;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;    import java.util.HashMap;  import java.util.Map;    /**  * @author fandongfeng  * @date 2023/4/15 15:38  */  @Configuration  public class RabbitMQConfig {        /**      * 订单交换机      */      public static final String ORDER_EXCHANGE = "order_exchange";      /**      * 订单队列      */      public static final String ORDER_QUEUE = "order_queue";      /**      * 订单路由key      */      public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";      /**      * 死信交换机      */      public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";      /**      * 死信队列 routingKey      */      public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";      /**      * 死信队列      */      public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";      /**      * 创建死信交换机      */      @Bean("orderDeadLetterExchange")      public Exchange orderDeadLetterExchange() {          return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);      }      /**      * 创建死信队列      */      @Bean("orderDeadLetterQueue")      public Queue orderDeadLetterQueue() {          return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();      }      /**      * 绑定死信交换机和死信队列      */      @Bean("orderDeadLetterBinding")      public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {          return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();      }      /**      * 创建订单交换机      */      @Bean("orderExchange")      public Exchange orderExchange() {          return new TopicExchange(ORDER_EXCHANGE, true, false);      }      /**      * 创建订单队列      */      @Bean("orderQueue")      public Queue orderQueue() {          Map args = new HashMap(3);          //消息过期后,进入到死信交换机          args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);          //消息过期后,进入到死信交换机的路由key          args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);          //过期时间,单位毫秒          args.put("x-message-ttl", 10000);          return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();      }      /**      * 绑定订单交换机和队列      */      @Bean("orderBinding")      public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {          return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();      }      }

登录后复制

消费者

package com.fandf.test.rabbit;    import cn.hutool.core.date.DateUtil;  import com.rabbitmq.client.Channel;  import org.springframework.amqp.core.Message;  import org.springframework.amqp.rabbit.annotation.RabbitHandler;  import org.springframework.amqp.rabbit.annotation.RabbitListener;  import org.springframework.stereotype.Component;    import java.io.IOException;    /**  * @author fandongfeng  * @date 2023/4/15 15:42  */  @Component  @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)  public class OrderMQListener {            @RabbitHandler      public void consumer(String body, Message message, Channel channel) throws IOException {          System.out.println("收到消息:" + DateUtil.now());          long msgTag = message.getMessageProperties().getDeliveryTag();          System.out.println("msgTag=" + msgTag);          System.out.println("message=" + message);          System.out.println("body=" + body);          channel.basicAck(msgTag, false);      }    }

登录后复制

测试类

@Test  void testOrder() throws InterruptedException {  //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定      rabbitTemplate.setMandatory(true);      //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息      rabbitTemplate.setReturnsCallback(returned -> {      int code = returned.getReplyCode();      System.out.println("code=" + code);      System.out.println("returned=" + returned);      });      rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟");      System.out.println("发送消息:" + DateUtil.now());      Thread.sleep(20000);  }

登录后复制

程序输出

发送消息:2023-04-16 15:14:34
收到消息:2023-04-16 15:14:44
msgTag=1
message=(Body:’测试订单延迟’ MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15:14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
body=测试订单延迟

以上就是SpringBoot怎么整合RabbitMQ实现延迟队列的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2590829.html

(0)
上一篇 2025年3月6日 16:43:09
下一篇 2025年3月6日 16:43:17

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • SpringBoot整合消息队列RabbitMQ的方法是什么

    简介 在spring项目中,可以使用spring-rabbit去操作rabbitmq 尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。 一般在开发…

    编程技术 2025年3月6日
    000
  • SpringBoot怎么集成Redisson实现延迟队列

    使用场景 1、下单成功,30分钟未支付。支付超时,自动取消订单 2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评 3、下单成功,商家5分钟未接单,订单取消 4、配送超时,推送短信提醒 …… 对于延时比较长的场景、实时性不高的…

    2025年3月6日
    200
  • PHP开发:使用 RabbitMQ 实现任务队列

    随着互联网的不断发展,网站的流量越来越大,访问量的增长带来的问题也越来越多。当用户量过大时,服务器负载会增大,这时就需要使用一些技术手段来解决这些问题。任务队列就是其中的一种方式,可以将一些耗时的操作异步执行,从而缓解服务器压力。本文将介绍…

    编程技术 2025年3月6日
    200
  • Java API 开发中使用 RabbitMQ 进行异步消息处理

    随着互联网的快速发展,异步消息处理在分布式系统中扮演着重要的角色,可以提高系统的可靠性和并发性。rabbitmq是一种开源的消息队列系统,可以快速可靠地传递消息,被广泛应用于互联网领域。本文将介绍在java api开发中如何使用rabbit…

    编程技术 2025年3月6日
    200
  • PHP实现开源RabbitMQ SDK

    rabbitmq是一种基于amqp(advanced message queuing protocol)协议的消息队列,被广泛用于解耦、异步处理等场景。而php作为一种高度流行的动态语言,也有众多开源的rabbitmq sdk可供选择。本文…

    编程技术 2025年3月6日
    200
  • PHP和RabbitMQ集成实现消息队列处理

    随着应用系统的不断扩大,消息处理已经成为了一个非常重要的工作。而为了更好地处理消息,许多应用系统都采用了消息队列技术。消息队列技术可以实现异步处理消息,将消息缓存在队列中,实现“生产者-消费者”模式,可以提高系统的可伸缩性和容错性。 而在实…

    编程技术 2025年3月6日
    200
  • PHP开发中如何使用RabbitMQ实现消息传递

    rabbitmq是一种消息队列软件,可用于在应用程序之间进行消息传递。在php开发中,使用rabbitmq可以实现异步处理任务、实现分布式系统等。本篇文章将介绍如何在php开发中使用rabbitmq实现消息传递。 一、安装RabbitMQ服…

    编程技术 2025年3月6日
    200
  • 如何使用Java开发一个基于RabbitMQ的消息队列应用

    如何使用Java开发一个基于RabbitMQ的消息队列应用 引言:消息队列是现代分布式系统中常用的一种通信机制,它可以解耦发送者和接收者,提高系统的健壮性和可扩展性。RabbitMQ是一个轻量级的、可靠的开源消息队列系统,它基于AMQP协议…

    2025年3月6日
    200
  • 如何利用React和RabbitMQ构建可靠的消息传递应用

    如何利用React和RabbitMQ构建可靠的消息传递应用 引言:现代化的应用程序需要支持可靠的消息传递,以实现实时更新和数据同步等功能。React是一种流行的JavaScript库,用于构建用户界面,而RabbitMQ是一种可靠的消息传递…

    2025年3月6日
    200
  • rabbitmq和kafka有什么区别

    rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。详细介绍:1、语言与平台,Rabbit…

    2025年3月6日
    200

发表回复

登录后才能评论