实时批处理

高效批处理:实时数据处理的优雅方案

批处理是优化数据库操作的常用技术,广泛应用于数据库、Redis和各种批量API中。其优势在于速度更快、成本更低且速度限制更低,但代价是代码复杂度略有提升。本文探讨如何优雅地处理实时到达的数据批处理问题。

现实场景示例

假设一个应用需要在每次用户交互时更新数据库中的用户活跃时间戳last_active_at,且每次HTTP请求都会触发此更新。如果并发请求量巨大,频繁的数据库更新会造成不必要的压力。理想情况下,我们希望将这些更新批量处理:

UPDATE users SET last_active_at = NOW() WHERE id IN (17, 25, 31);

登录后复制

挑战在于,更新函数每次只处理一个用户ID,无法直接进行批量操作。

解决方案:基于通道的实时批处理

我们可以通过一个共享队列(使用Go通道实现)来解决这个问题。后台工作进程从队列中读取用户ID,累积到一定数量后批量更新数据库。为了保持开发友好性,更新函数updateUserTimestamp应保持原有的接口:接受单个用户ID,支持上下文取消,并返回相应的错误信息。

为了实现错误处理,每个请求都创建一个专用回复通道,工作进程通过该通道返回结果,确保每个函数调用都能等待其自身的结果。

下图展示了两个并发updateUserTimestamp调用的流程:

实时批处理

代码实现(部分)

首先定义请求结构体:

type updateUserTimestampRequest struct {    userid  int    replyto chan error}var updateUserTimestampQueue = make(chan updateUserTimestampRequest)

登录后复制

updateUserTimestamp函数:

func updateUserTimestamp(ctx context.Context, userid int) error {    req := updateUserTimestampRequest{        userid:  userid,        replyto: make(chan error, 1), // 必须缓冲    }    select {    case updateUserTimestampQueue <- req:        select {        case err := <-req.replyto:            return err        case <-ctx.Done():            return ctx.Err()        }    case <-ctx.Done():        return ctx.Err()    }}

登录后复制

实时批处理策略

为了避免在低负载情况下批处理时间过长,我们需要引入超时机制。如果批次在一定时间内未满,则立即发送到数据库。超时时间可以设置为几毫秒,在高负载时立即处理,低负载时最多引入几毫秒的延迟。

工作进程实现

工作进程使用rill并发工具包简化实现:

func updateUserTimestampWorker(batchSize int, batchTimeout time.Duration, concurrency int, dbTimeout time.Duration) {    requests := rill.FromChan(updateUserTimestampQueue, nil)    requestBatches := rill.Batch(requests, batchSize, batchTimeout)    _ = rill.ForEach(requestBatches, concurrency, func(batch []updateUserTimestampRequest) error {        // ... (批量更新数据库和返回结果) ...    })}

登录后复制

完整代码及测试

完整的代码可以在Go Playground上找到(链接略)。 主函数模拟多个并发goroutine调用updateUserTimestamp函数。

总结

本文介绍了一种基于Go通道的实时数据批处理方案,它结合了批处理的效率和实时响应的特性。通过使用通道、goroutine和rill工具包,我们可以构建一个高效、简洁且易于维护的系统。 当然,其他方法例如使用Redis缓存或外部队列也适用于此场景,选择何种方案取决于具体的应用需求。

以上就是实时批处理的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2477067.html

(0)
上一篇 2025年3月4日 19:18:50
下一篇 2025年2月24日 23:38:07

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • Golang文本/模板中的SQL查询

    在使用Go的text/template包动态生成SQL查询构建后端API时,提高开发效率的同时,务必注意SQL注入的风险。本文将演示如何避免这种风险。 text/template的SQL注入漏洞示例 以下代码片段展示了如何通过字符串插值构建…

    2025年3月4日
    200
  • 如何用缓存优化高频时间范围数据查询以减轻数据库压力?

    缓解数据库压力:优化高频时间范围数据查询的缓存策略 频繁的时间范围数据查询会给数据库带来巨大的压力。本文介绍两种有效的缓存策略来优化此类查询,从而减轻数据库负担。 策略一:SQL 查询优化 通过优化SQL语句,减少数据库的访问次数和数据处理…

    2025年3月4日
    200
  • LNMP中PHP-FPM配置优化

    LNMP架构(Linux、Nginx、MySQL、PHP)是Web应用部署的热门选择。提升LNMP性能的关键在于优化各个组件,特别是PHP-FPM。本文将探讨如何优化PHP-FPM以及其他相关组件,以增强整体Web应用性能。 PHP-FPM…

    2025年3月4日
    200
  • 如何设计和实现高效的接口限流机制?

    构建高效的API限流机制 本文探讨如何设计一个高效的API频率限制器,并分析PHP生态中缺乏独立、通用的开源限流组件的原因。 许多开发者都面临着API访问频率控制的需求,以应对恶意请求或突发流量。 实现限流的核心在于记录每个请求的访问次数和…

    2025年3月4日
    200
  • 轻量级PHP接口限流方案如何设计?为什么PHP缺乏独立的限流类开源项目?

    高效PHP接口限流方案设计与实现 本文探讨如何构建一个轻量级的PHP接口限流机制,并分析PHP生态中缺乏独立的、单一功能限流类开源项目的原因。 核心目标是创建一个高效的接口频率控制系统。理想方案应避免引入大型框架,保持轻量级特性。 Redi…

    2025年3月4日
    200
  • LNMP中MySQL如何优化查询

    提升LNMP架构下MySQL数据库查询效率是至关重要的性能优化环节。本文将介绍一系列实用技巧,助您显著改善数据库性能及响应速度。 一、索引策略 索引创建: 针对频繁用于查询条件的字段创建索引,例如:CREATE INDEX idx_colu…

    2025年3月4日
    200
  • 如何优化Linux LAMP性能

    提升Linux LAMP(Linux, Apache, MySQL, PHP)服务器性能,需要从系统、网络、数据库及应用软件等多方面入手。本文提供一系列优化建议,助您打造高效稳定的LAMP架构。 一、系统内核与操作系统优化 保持系统更新: …

    2025年3月4日
    200
  • java框架的运用趋势与新技术结合

    java 框架应用趋势:微服务架构:提高可扩展性和灵活性。响应式编程:处理大量并发请求而不阻塞线程。云原生框架:在云环境中构建和部署应用程序。nosql 数据库集成:支持大量非结构化数据。人工智能和机器学习:增加智能和自动化功能。 Java…

    2025年3月4日
    200
  • golang框架如何与NoSQL数据库交互?

    go 框架通过标准库和第三方框架,为与 nosql 数据库交互提供了强大的功能。这些框架包括:1. mongodb 库:mgo;2. redis 库:redigo;3. cassandra 库:gocql。这些框架提供了与 mongodb、…

    2025年3月4日
    200
  • java框架在高吞吐量项目开发中的性能调优

    java框架在高吞吐量应用程序开发中可通过优化请求类型分析、数据库查询优化、http请求处理优化等技巧提升性能。如spring boot应用,可考虑使用webflux、graphql、缓存、优化查询、对象池等方法进行性能调优,持续监控和调整…

    2025年3月4日
    200

发表回复

登录后才能评论