springboot rabbitmq reply消息直接回复模式怎么实现

一、使用场景

mq的作用包括了解耦、异步等。

通常生产者只负责生产消息,而不关心消息谁去获取,或者消费结果如何;消费者只负责接收指定的消息进行业务处理而不关心消息从哪里来一级回复业务处理情况。但我们项目中有特殊的业务存在,我们作为消息生产者在生产消息后需要接收消费者的响应结果(说白了就是类似同步调用 请求响应的MQ使用),经过研究,MQ的Reply模式(直接回复模式)就是为此种业务模式而产生。

二、Reply实战

(1)依赖与YML配置

依赖:

我这里只列出最核心的rabbitMq所需依赖

    org.springframework.boot    spring-boot-starter-amqp

登录后复制

配置:

无其余特殊配置,因为reply就是rabbitmq的一种交互方式而已

spring:  rabbitmq:    host: 10.50.40.116    port: 5673    username: admin    password: admin

登录后复制

(2)RabbitMq bean配置

package com.leilei.demo;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author lei * @create 2022-09-19 21:44 * @desc mq配置 **/@Configurationpublic class RabbitMqConfig {    @Bean    public Queue bizQueue() {        return new Queue("bizQueue");    }    @Bean    public Queue replyQueue() {        return new Queue("replyQueue");    }    @Bean    FanoutExchange bizExchange() {        return new FanoutExchange("bizExchange");    }}

登录后复制

业务类:

@Data@NoArgsConstructor@AllArgsConstructorpublic class Vehicle implements Serializable {    private Integer id;    private String name;}

登录后复制

(3)消息生产端

消息生产端需要做的事情:有生产消息、接受消息消费响应

(1)生产消息

1、生产消息,看业务场景选择是否生成全局唯一自定义的消息ID

2、指定消息消费后响应的队列(Reply)

    /**     * 生产消息     *     * @param     * @return void     * @author lei     * @date 2022-09-19 21:59:18     */    public void replySend() {        MessageProperties messageProperties = new MessageProperties();        messageProperties.setReplyTo("replyQueue");        //todo 根据业务,做一个严谨的全局唯一ID,我这里暂时用UUID        String correlationId = UUID.randomUUID().toString();        // 我这里指定了唯一消息ID,看业务场景,消费者消费响应后,生产者端可根据消息ID做业务处理        messageProperties.setCorrelationId(correlationId);        Vehicle vehicle = new Vehicle(1, "川A0001");        Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties);        rabbitTemplate.convertAndSend("bizExchange","",message);        System.out.println("生产者发送消息,自定义消息ID为:" + correlationId);    }

登录后复制

(2)接受Reply响应

消费者消费消息后会将处理结果进行发送到一个队列,我们读取这里队列就可以拿到对应消息的响应结果进行业务处理了

    /**     * 接收消息响应     *     * @param message     * @return void     * @author lei     * @date 2022-09-19 21:59:27     */    @RabbitListener(queues = "replyQueue")    public void replyResponse(Message message) {        String s = new String(message.getBody());        String correlationId = message.getMessageProperties().getCorrelationId();        System.out.println("收到客户端响应消息ID:" + correlationId);        //todo 根据消息ID可判断这是哪一个消息的响应,我们就可做业务操作        System.out.println("收到客户端响应消息:" + s);    }

登录后复制

(4)消息消费端

消息消费端需要做的事有:接受消息然后进行业务处理、响应消息

(1)方法一:sendTo注解+方法返回值

一般来说,我们mq消费者监听方法不需要返回值,我们这里使用sendTo注解,则需要将要响应的消息定义为返回值,sendTo注解中指定要响应到哪个队列

重点:

1、sendTo注解指定要相应的队列(注意和生产端保持一致)

2、方法定义的返回值内容就是要响应的消息,最终会发送到sendTo注解指定要相应的队列

3、这种方法的缺点是消费端的主关性很高,因为sendTo指定的目标队列可以自己瞎写,导致生产者端无法正确收到消息响应,但我相信一般项目中也不会这么干

    /**     * 方式1   SendTo指定响应队列     *     * @param message     * @return String     * @author lei     * @date 2022-09-19 16:17:52     */    @RabbitListener(queues ="bizQueue")    @SendTo("replyQueue")    public String handleEmailMessage(Message message) {        try {            String msg=new String(message.getBody(), StandardCharsets.UTF_8);            log.info("---consumer接收到消息----{}",msg);            return "客户端响应消息:"+msg+"处理完成!";        } catch (Exception e) {            log.error("处理业务消息失败",e);        }        return null;    }

登录后复制

(2)方法二:读取生产端的消息使用模板发送

与普通的消费者方法一样,只需要RabbitListener注解监听业务队列;但还需要根据消息获取出ReplyTo地址,然后自己消费者方法内部手动发送消息

1、优点,更强烈的感受到消息请求 响应的交互性,流程看起来更清晰

2、缺点,代码不雅

    /**     * 方式2  message消息获取内部reply rabbitmq手动发送     *     * @param message     * @return String     * @author lei     * @date 2022-09-19 16:17:52     */    @RabbitListener(queues = "bizQueue")    public void handleEmailMessage2(Message message) {        try {            String msg = new String(message.getBody(), StandardCharsets.UTF_8);            log.info("---consumer接收到消息----{}", msg);            String replyTo = message.getMessageProperties().getReplyTo();            System.out.println("接收到的reply:" + replyTo);            rabbitTemplate.convertAndSend(replyTo, "客户端响应消息:" + msg + "处理完成!", x -> {                x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());                return x;            });        } catch (Exception e) {            log.error("处理业务消息失败",e);        }    }

登录后复制

(3)方法三:方法返回值

这种方式与1其实是一致的,但我经过测试,因为生产者消息指定了ReplyTo的地址,消费者端无需自己再次手动指定,即生产消息到哪里,是否响应以及响应消息发送到哪里全由生产端自己空,消费者只需要处理自身业务以及返回结果

   /**     * 方式三  方法有返回值,返回要响应的数据 (reply 由生产者发送消息时指定,消费者不做任何处理)     *     * @param message     * @return String     * @author lei     * @date 2022-09-19 23:17:47     */    @RabbitListener(queues ="bizQueue")    public String handleEmailMessage3(Message message) {        try {            String msg=new String(message.getBody(), StandardCharsets.UTF_8);            log.info("---consumer接收到消息----{}",msg);            return "客户端响应消息:"+msg+"处理完成!";        }        catch (Exception e) {            log.error("处理业务消息失败",e);        }        return null;    }

登录后复制

(4)测试

生产消息:

springboot rabbitmq reply消息直接回复模式怎么实现

消费消息与响应:

springboot rabbitmq reply消息直接回复模式怎么实现

收到的响应:

springboot rabbitmq reply消息直接回复模式怎么实现

链路:

springboot rabbitmq reply消息直接回复模式怎么实现

以上就是springboot rabbitmq reply消息直接回复模式怎么实现的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2590867.html

(0)
上一篇 2025年3月6日 16:43:52
下一篇 2025年2月28日 12:39:53

AD推荐 黄金广告位招租... 更多推荐

相关推荐

发表回复

登录后才能评论