如何限制生产者和消费者读取消息?

如何限制生产者和消费者读取消息?

php小编子墨在软件开发过程中,消息队列是一种常见的通信机制,用于实现生产者和消费者之间的异步通信。然而,有时候我们希望控制生产者和消费者对消息的读取,以便更好地管理系统资源和处理高峰时段的请求。本文将介绍一些限制生产者和消费者读取消息的方法,帮助开发者优化系统性能和提高应用的稳定性。

问题内容

我想用 go 获得应用程序生产者-消费者(通过信号关闭)。

生产者不断在队列中生成消息,限制为 10 条。一些消费者阅读并处理该频道。如果队列中的消息数为0,生产者再次生成10条消息。当收到停止信号时,生产者停止生成新消息,消费者处理通道中的所有内容。

我找到了一段代码,但无法理解它是否正常工作,因为发现了奇怪的东西:

为什么停止程序后,队列中的消息并没有全部处理完,好像丢失了部分数据。 (在屏幕截图中,发送了 15 条消息,但处理了 5 条消息)如何正确地将队列限制为10条消息,即必须写入10条消息,等待队列计数器变为0时处理,然后再写入10条?是否可以在停止信号后通知生产者,以便他不再向通道生成新消息? (在屏幕截图中,生产者成功写入队列 – 12,13,14,15)

结果:

代码示例:

package mainimport (    "context"    "fmt"    "math/rand"    "os"    "os/signal"    "sync"    "syscall"    "time")func main() {    const nConsumers = 2    in := make(chan int, 10)    p := Producer{&in}    c := Consumer{&in, make(chan int, nConsumers)}    go p.Produce()    ctx, cancelFunc := context.WithCancel(context.Background())    go c.Consume(ctx)    wg := &sync.WaitGroup{}    wg.Add(nConsumers)    for i := 1; i 

解决方法

为什么停止程序后,队列中的消息并没有全部处理完,好像丢失了部分数据。

这是因为当 ctx 完成后,(consumer).consume 停止从 in 通道读取,但 go p.produce() 创建的 goroutine 仍然写入 in 通道。

下面的演示解决了这个问题并简化了源代码。

注释

  1. produce 在 ctx 完成后停止。并且它关闭了 in 通道。

  2. 字段 jobs 已从 consumer 中删除,工作人员直接从 in 通道读取。

  3. 以下要求被忽略,因为它很奇怪。常见的行为是,当作业产生时,如果 in 通道未满,则作业会立即发送到 in 通道;当它已满时,发送操作将阻塞,直到从 in 通道读取作业为止。

    如果队列中的消息数为0,生产者再次生成10条消息

package mainimport (    "context"    "fmt"    "math/rand"    "os/signal"    "sync"    "syscall"    "time")func main() {    const nConsumers = 2    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)    defer stop()    in := make(chan int, 10)    p := Producer{in}    c := Consumer{in}    go p.Produce(ctx)    var wg sync.WaitGroup    wg.Add(nConsumers)    for i := 1; i 

登录后复制

以上就是如何限制生产者和消费者读取消息?的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2355056.html

(0)
上一篇 2025年3月1日 15:05:45
下一篇 2025年3月1日 15:06:10

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • 15个Java线程并发面试题和答案

    1. 现在有线程 T1、T2 和 T3。你如何确保 T2 线程在 T1 之后执行,并且 T3 线程在 T2 之后执行? 这个线程面试题通常在第一轮面试或电话面试时被问到,这道多线程问题为了测试面试者是否熟悉 join 方法的概念。答案也非常…

    2025年5月3日
    000
  • java面试题最容易犯错的8道

    1. static 和 final 的用法 static 的作用从三个方面来谈,分别是静态变量、静态方法、静态类。 静态变量:声明为 static 的静态变量实质上就是全局变量,当声明一个对象时,并不产生static 变量的拷贝,而是该类所…

    2025年5月3日
    000
  • Person p = new Person();在内存中做了哪些事情?

    Person p = new Person();在内存中做了哪些事情? 将Person.class文件加载进内存中。 如果p定义在主方法中,那么,就会在栈空间开辟一个变量空间p。 在堆内存给对象分配空间。 对对象中的成员进行默认初始化。 对…

    2025年5月3日
    000
  • 2020年JAVA最常见面试题汇总(收藏)

    java基础以及多个“比较” 1.collections.sort排序内部原理 在Java 6中Arrays.sort()和Collections.sort()使用的是MergeSort,而在Java 7中,内部实现换成了TimSort,其…

    2025年5月3日
    000
  • 9道常见的java笔试选择题

    1.关于Java编译,下面哪一个正确()(选择一项) A.Java程序经编译后产生machine code B.Java程序经编译后会生产byte code C.Java程序经编译后会产生DLL 立即学习“Java免费学习笔记(深入)”; …

    2025年5月3日
    000
  • java最新基础知识面试题

    java最新基础知识面试题 1、static关键字什么意思?java中是否可以覆盖一个private或者是static的方法? “static”表明一个成员变量或者是成员方法可以在没有所属类的实例变量的情况下访问。 java中static方…

    2025年5月3日
    000
  • java内存溢出面试题

    引起内存溢出的原因有很多种,常见的有以下几种: 内存中加载的数据量过于庞大,如一次从数据库取出过多数据; 集合类中有对对象的引用,使用完后未清空,使得JVM不能回收; 代码中存在死循环或循环产生过多重复的对象实体; 立即学习“Java免费学…

    2025年5月3日
    000
  • Java中高级面试题(附答案)

    1、redis如何跟本地数据同步? 程序实现mysql更新、添加、删除就会同步操作redis 程序查询redis,不存在就查询mysql,自动保存redis 2、redis几种数据结构? set、list、hash、string、zset …

    2025年5月3日
    000
  • java gc 面试题及答案(1~5题)

    1、既然有GC机制,为什么还会有内存泄露的情况? 理论上Java因为有垃圾回收机制(GC)不会存在内存泄露问题(这也是Java被广泛使用于服务器端编程的一个重要原因)。然而在实际开发中,可能会存在无用但可达的对象,这些对象不能被GC回收,因…

    2025年5月3日
    000
  • java笔试常见的选择题

    1.已知表达式int m[] = {0,1,2,3,4,5,6}; 下面那个表达式的值与数组的长度相等() A m.length() B. m.length C. m.length()+1 立即学习“Java免费学习笔记(深入)”; D. …

    2025年5月3日
    000

发表回复

登录后才能评论