SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列

简介

rabbitmq消息简介

RabbitMQ的消息默认不会超时。 

什么是死信队列?什么是延迟队列?

死信队列:

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

以下几种情况会导致消息变成死信:

消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;

消息过期;

队列达到最大长度。

延迟队列:

延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

相关网址

详解RabbitMQ中死信队列和延迟队列的使用详解

实例代码

路由配置

package com.example.config; import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; @Configurationpublic class RabbitRouterConfig {    public static final String EXCHANGE_TOPIC_WELCOME   = "Exchange@topic.welcome";    public static final String EXCHANGE_FANOUT_UNROUTE  = "Exchange@fanout.unroute";    public static final String EXCHANGE_TOPIC_DELAY     = "Exchange@topic.delay";     public static final String ROUTINGKEY_HELLOS        = "hello.#";    public static final String ROUTINGKEY_DELAY         = "delay.#";     public static final String QUEUE_HELLO              = "Queue@hello";    public static final String QUEUE_HI                 = "Queue@hi";    public static final String QUEUE_UNROUTE            = "Queue@unroute";    public static final String QUEUE_DELAY              = "Queue@delay";     public static final Integer TTL_QUEUE_MESSAGE       = 5000;     @Autowired    AmqpAdmin amqpAdmin;     @Bean    Object initBindingTest() {        amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build());        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build());        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)                .durable(true)                .autoDelete()                .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE)                 .build());         amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)                .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY)                .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY)                .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE)                .build());        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());         amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,                EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));        amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,                EXCHANGE_FANOUT_UNROUTE, "", null));        amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,                EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null));         return new Object();    }}

登录后复制

控制器

package com.example.controller; import com.example.config.RabbitRouterConfig;import com.example.mq.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestControllerpublic class HelloController {    @Autowired    private Sender sender;     @PostMapping("/hi")    public void hi() {        sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());    }     @PostMapping("/hello1")    public void hello1() {        sender.send("hello.a", "hello1 message:" + LocalDateTime.now());    }     @PostMapping("/hello2")    public void hello2() {        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());    }     @PostMapping("/ae")    public void aeTest() {        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());    }}

登录后复制

发送器

package com.example.mq; import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; import java.util.Date; @Componentpublic class Sender {    @Autowired    private AmqpTemplate rabbitTemplate;     public void send(String routingKey, String message) {        this.rabbitTemplate.convertAndSend(routingKey, message);    }     public void send(String exchange, String routingKey, String message) {        this.rabbitTemplate.convertAndSend(exchange, routingKey, message);    }}

登录后复制

接收器

package com.example.mq; import com.example.config.RabbitRouterConfig;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class Receiver {    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)    public void hi(String payload) {        System.out.println ("Receiver(hi) : "  + payload);    }     // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)    // public void hello(String hello) throws InterruptedException {    //     System.out.println ("Receiver(hello) : "  + hello);    //     Thread.sleep(5 * 1000);    //     System.out.println("(hello):sleep over");    // }    //    // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)    // public void unroute(String hello) throws InterruptedException {    //     System.out.println ("Receiver(unroute) : "  + hello);    //     Thread.sleep(5 * 1000);    //     System.out.println("(unroute):sleep over");    // }     @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY)    public void delay(String hello) throws InterruptedException {        System.out.println ("Receiver(delay) : "  + hello);        Thread.sleep(5 * 1000);        System.out.println("(delay):sleep over");    }}

登录后复制

application.yml

server:#  port: 9100  port: 9101spring:  application:#    name: demo-rabbitmq-sender    name: demo-rabbitmq-receiver  rabbitmq:    host: localhost    port: 5672    username: admin    password: 123456#    virtualHost: /    publisher-confirms: true    publisher-returns: true#    listener:#      simple:#        acknowledge-mode: manual#      direct:#        acknowledge-mode: manual

登录后复制

实例测试

分别启动发送者和接收者。

访问:http://localhost:9100/hello2

五秒钟后输出:

Receiver(delay) : hello2 message:2020-11-27T09:30:51.548(delay):sleep over

以上就是SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2590835.html

(0)
上一篇 2025年3月6日 16:43:17
下一篇 2025年3月6日 16:43:27

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • SpringBoot整合消息队列RabbitMQ的方法是什么

    简介 在spring项目中,可以使用spring-rabbit去操作rabbitmq 尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。 一般在开发…

    编程技术 2025年3月6日
    200
  • SpringBoot怎么整合RabbitMQ实现延迟队列

    如何保证消息不丢失 rabbitmq消息投递路径 生产者->交换机->队列->消费者 总的来说分为三个阶段。 1.生产者保证消息投递可靠性。 2.mq内部消息不丢失。 3.消费者消费成功。 什么是消息投递可靠性 简单点说就…

    编程技术 2025年3月6日
    200
  • SpringBoot怎么集成Redisson实现延迟队列

    使用场景 1、下单成功,30分钟未支付。支付超时,自动取消订单 2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评 3、下单成功,商家5分钟未接单,订单取消 4、配送超时,推送短信提醒 …… 对于延时比较长的场景、实时性不高的…

    2025年3月6日
    200
  • PHP开发:使用 RabbitMQ 实现任务队列

    随着互联网的不断发展,网站的流量越来越大,访问量的增长带来的问题也越来越多。当用户量过大时,服务器负载会增大,这时就需要使用一些技术手段来解决这些问题。任务队列就是其中的一种方式,可以将一些耗时的操作异步执行,从而缓解服务器压力。本文将介绍…

    编程技术 2025年3月6日
    200
  • Java API 开发中使用 RabbitMQ 进行异步消息处理

    随着互联网的快速发展,异步消息处理在分布式系统中扮演着重要的角色,可以提高系统的可靠性和并发性。rabbitmq是一种开源的消息队列系统,可以快速可靠地传递消息,被广泛应用于互联网领域。本文将介绍在java api开发中如何使用rabbit…

    编程技术 2025年3月6日
    200
  • PHP实现开源RabbitMQ SDK

    rabbitmq是一种基于amqp(advanced message queuing protocol)协议的消息队列,被广泛用于解耦、异步处理等场景。而php作为一种高度流行的动态语言,也有众多开源的rabbitmq sdk可供选择。本文…

    编程技术 2025年3月6日
    200
  • PHP和RabbitMQ集成实现消息队列处理

    随着应用系统的不断扩大,消息处理已经成为了一个非常重要的工作。而为了更好地处理消息,许多应用系统都采用了消息队列技术。消息队列技术可以实现异步处理消息,将消息缓存在队列中,实现“生产者-消费者”模式,可以提高系统的可伸缩性和容错性。 而在实…

    编程技术 2025年3月6日
    200
  • PHP开发中如何使用RabbitMQ实现消息传递

    rabbitmq是一种消息队列软件,可用于在应用程序之间进行消息传递。在php开发中,使用rabbitmq可以实现异步处理任务、实现分布式系统等。本篇文章将介绍如何在php开发中使用rabbitmq实现消息传递。 一、安装RabbitMQ服…

    编程技术 2025年3月6日
    200
  • 如何使用Java开发一个基于RabbitMQ的消息队列应用

    如何使用Java开发一个基于RabbitMQ的消息队列应用 引言:消息队列是现代分布式系统中常用的一种通信机制,它可以解耦发送者和接收者,提高系统的健壮性和可扩展性。RabbitMQ是一个轻量级的、可靠的开源消息队列系统,它基于AMQP协议…

    2025年3月6日
    200
  • 如何利用React和RabbitMQ构建可靠的消息传递应用

    如何利用React和RabbitMQ构建可靠的消息传递应用 引言:现代化的应用程序需要支持可靠的消息传递,以实现实时更新和数据同步等功能。React是一种流行的JavaScript库,用于构建用户界面,而RabbitMQ是一种可靠的消息传递…

    2025年3月6日
    200

发表回复

登录后才能评论