SpringBoot怎么整合RabbitMq自定义消息监听容器来实现消息批量处理

SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

前言

rabbitmq是一种常用的消息队列,spring boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在rabbitmq中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。spring boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一些特殊的需求,比如批量获取数据。

在本文中,我们将使用Spring Boot来整合RabbitMQ,并自定义一个监听器容器,实现批量获取数据的功能。
前置条件:
在开始之前,您需要具备以下条件:

已经安装好RabbitMQ服务器并启动。

已经创建好要使用的队列。

已经熟悉了Spring Boot和RabbitMQ的基本知识。

环境准备:
在开始之前,我们需要准备好以下环境:

JDK 1.8或以上版本

Spring Boot 2.5.0或以上版本

RabbitMQ 3.8.0或以上版本

添加依赖

首先,在pom.xml文件中添加以下依赖:

    org.springframework.boot    spring-boot-starter-amqp

登录后复制

配置文件

接下来,在application.properties文件中添加以下配置:

spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/# 队列名称spring.rabbitmq.listener.simple.queue-name=myQueue# 最大并发消费者数量spring.rabbitmq.listener.simple.concurrency=5# 最小数量spring.rabbitmq.listener.simple.min-concurrency=1# 最大数量spring.rabbitmq.listener.simple.max-concurrency=10# 批量处理消息的大小spring.rabbitmq.listener.simple.batch-size=50

登录后复制

spring:  rabbitmq:    host: localhost    listener:      simple:        batch-size: 50        concurrency: 5        max-concurrency: 10        min-concurrency: 1        queue-name: myQueue    password: guest    port: 5672    username: guest    virtual-host: /

登录后复制

编写监听器

然后,我们需要创建一个监听器类,以便处理从队列中接收到的消息。以下是一个简单的示例:

@Componentpublic class MyListener {        @RabbitListener(queues = "myQueue", containerFactory = "myFactory")    public void handleMessage(List messages, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag)            throws IOException {        try {            // 处理消息            System.out.println("Received " + messages.size() + " messages");            for (Message message : messages) {           // 处理消息            System.out.println("Received message: " + new String(message.getBody()));        }        channel.basicAck(messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), true);        } finally {            // 手动确认消息            channel.basicAck(deliveryTag, true);        }    }}

登录后复制

在上面的代码中,我们使用了@RabbitListener注解来指定要监听的队列名称,同时也指定了使用myFactory工厂来创建监听容器。在这个监听器中,我们简单地打印了接收到的消息。

创建SimpleRabbitListenerContainerFactory

接下来,我们需要创建一个SimpleRabbitListenerContainerFactory工厂,以便能够自定义监听容器的行为。以下是一个简单的示例:

@Configurationpublic class RabbitMQConfig {//    @Bean//    public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//        factory.setConnectionFactory(connectionFactory);//        factory.setConcurrentConsumers(1);//        factory.setMaxConcurrentConsumers(10);//        factory.setBatchListener(true);//        factory.setBatchSize(50);//        return factory;//    }@Bean    public SimpleRabbitListenerContainerFactory myFactory(            ConnectionFactory connectionFactory,            PlatformTransactionManager transactionManager,            MessageConverter messageConverter) {                SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();                // 并发消费者数,默认为 1        factory.setConcurrentConsumers(5);                // 最大并发消费者数,默认为 1        factory.setMaxConcurrentConsumers(10);                // 拒绝未确认的消息并重新将它们放回队列,默认为 true        factory.setDefaultRequeueRejected(false);                // 容器启动时是否自动启动,默认为 true        factory.setAutoStartup(true);                // 消息确认模式,默认为 AUTO        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);                // 每个消费者在一次请求中预获取的消息数,默认为 1        factory.setPrefetchCount(5);                // 从队列中接收消息的超时时间,默认为 0,表示没有超时限制        factory.setReceiveTimeout(1000);                // 与容器一起使用的事务管理器。默认情况下,容器不会使用事务        factory.setTransactionManager(transactionManager);                // 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息        factory.setMessageConverter(messageConverter);                // 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutor        factory.setTaskExecutor(new SimpleAsyncTaskExecutor());                // 在关闭容器时等待活动线程终止的时间,默认为 5000 毫秒        factory.setShutdownTimeout(10000);                // 重试失败的消息之前等待的时间,默认为 5000 毫秒        factory.setRecoveryInterval(5000);                // 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 true        factory.setMissingQueuesFatal(false);                // 监听器容器连接工厂        factory.setConnectionFactory(connectionFactory);        return factory;    }}

登录后复制

这些属性中的大多数都是可选的,可以根据需要进行设置。根据应用程序的需求,我们可以自由地调整这些属性,以提高应用程序的性能和可靠性。

发送消息

最后,我们可以编写一个简单的发送消息的代码来向队列中发送一些消息。以下是一个简单的示例:

@Componentpublic class MySender {    @Autowired    private RabbitTemplate rabbitTemplate;    public void sendMessage(String message) {        for (int i = 0; i 

登录后复制

以上就是SpringBoot怎么整合RabbitMq自定义消息监听容器来实现消息批量处理的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2590851.html

(0)
上一篇 2025年3月6日 16:43:33
下一篇 2025年3月6日 16:43:40

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • Springboot怎么整合RabbitMQ消息队列

    生产者工程 POM依赖 可以在创建工程时直接选择添加依赖。 application文件 因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置。 RabbitMQ配置文件: 在使用相关交换机及队列时,我…

    2025年3月6日 编程技术
    200
  • SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列

    简介 rabbitmq消息简介 RabbitMQ的消息默认不会超时。  什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成…

    编程技术 2025年3月6日
    200
  • SpringBoot整合消息队列RabbitMQ的方法是什么

    简介 在spring项目中,可以使用spring-rabbit去操作rabbitmq 尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。 一般在开发…

    编程技术 2025年3月6日
    200
  • SpringBoot怎么整合RabbitMQ实现延迟队列

    如何保证消息不丢失 rabbitmq消息投递路径 生产者->交换机->队列->消费者 总的来说分为三个阶段。 1.生产者保证消息投递可靠性。 2.mq内部消息不丢失。 3.消费者消费成功。 什么是消息投递可靠性 简单点说就…

    编程技术 2025年3月6日
    200
  • SpringBoot怎么集成Redisson实现延迟队列

    使用场景 1、下单成功,30分钟未支付。支付超时,自动取消订单 2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评 3、下单成功,商家5分钟未接单,订单取消 4、配送超时,推送短信提醒 …… 对于延时比较长的场景、实时性不高的…

    2025年3月6日
    200
  • PHP开发:使用 RabbitMQ 实现任务队列

    随着互联网的不断发展,网站的流量越来越大,访问量的增长带来的问题也越来越多。当用户量过大时,服务器负载会增大,这时就需要使用一些技术手段来解决这些问题。任务队列就是其中的一种方式,可以将一些耗时的操作异步执行,从而缓解服务器压力。本文将介绍…

    编程技术 2025年3月6日
    200
  • Java API 开发中使用 RabbitMQ 进行异步消息处理

    随着互联网的快速发展,异步消息处理在分布式系统中扮演着重要的角色,可以提高系统的可靠性和并发性。rabbitmq是一种开源的消息队列系统,可以快速可靠地传递消息,被广泛应用于互联网领域。本文将介绍在java api开发中如何使用rabbit…

    编程技术 2025年3月6日
    200
  • PHP实现开源RabbitMQ SDK

    rabbitmq是一种基于amqp(advanced message queuing protocol)协议的消息队列,被广泛用于解耦、异步处理等场景。而php作为一种高度流行的动态语言,也有众多开源的rabbitmq sdk可供选择。本文…

    编程技术 2025年3月6日
    200
  • PHP和RabbitMQ集成实现消息队列处理

    随着应用系统的不断扩大,消息处理已经成为了一个非常重要的工作。而为了更好地处理消息,许多应用系统都采用了消息队列技术。消息队列技术可以实现异步处理消息,将消息缓存在队列中,实现“生产者-消费者”模式,可以提高系统的可伸缩性和容错性。 而在实…

    编程技术 2025年3月6日
    200
  • PHP开发中如何使用RabbitMQ实现消息传递

    rabbitmq是一种消息队列软件,可用于在应用程序之间进行消息传递。在php开发中,使用rabbitmq可以实现异步处理任务、实现分布式系统等。本篇文章将介绍如何在php开发中使用rabbitmq实现消息传递。 一、安装RabbitMQ服…

    编程技术 2025年3月6日
    200

发表回复

登录后才能评论