您的位置 首页 知识分享

实时批处理

高效批处理:实时数据处理的优雅方案 批处理是优化数据库操作的常用技术,广泛应用于数据库、Redis和各种批量A…

高效批处理:实时数据处理的优雅方案

批处理是优化数据库操作的常用技术,广泛应用于数据库、Redis和各种批量API中。其优势在于速度更快、成本更低且速度限制更低,但代价是代码复杂度略有提升。本文探讨如何优雅地处理实时到达的数据批处理问题。

现实场景示例

假设一个应用需要在每次用户交互时更新数据库中的用户活跃时间戳last_active_at,且每次HTTP请求都会触发此更新。如果量巨大,频繁的数据库更新会造成不必要的压力。理想情况下,我们希望将这些更新批量处理:

UPDATE users SET last_active_at = NOW() WHERE id IN (17, 25, 31);
登录后复制

挑战在于,更新函数每次只处理一个用户ID,无法直接进行批量操作。

解决方案:基于通道的实时批处理

我们可以通过一个共享队列(使用Go通道实现)来解决这个问题。后台工作进程从队列中读取用户ID,累积到一定数量后批量更新数据库。为了保持开发友好性,更新函数updateUserTimestamp应保持原有的接口:接受单个用户ID,支持上下文取消,并返回相应的错误信息。

为了实现错误处理,每个请求都创建一个专用回复通道,工作进程通过该通道返回结果,确保每个函数调用都能等待其自身的结果。

下图展示了两个并发updateUserTimestamp调用的流程:

实时批处理

代码实现(部分)

首先定义请求结构体:

type updateUserTimestampRequest struct {     userid  int     replyto chan error }  var updateUserTimestampQueue = make(chan updateUserTimestampRequest)
登录后复制

updateUserTimestamp函数:

func updateUserTimestamp(ctx context.Context, userid int) error {     req := updateUserTimestampRequest{         userid:  userid,         replyto: make(chan error, 1), // 必须缓冲     }      select {     case updateUserTimestampQueue <- req:         select {         case err := <-req.replyto:             return err         case <-ctx.Done():             return ctx.Err()         }     case <-ctx.Done():         return ctx.Err()     } }
登录后复制

实时批处理策略

为了避免在低负载情况下批处理时间过长,我们需要引入超时机制。如果批次在一定时间内未满,则立即发送到数据库。超时时间可以设置为几毫秒,在高负载时立即处理,低负载时最多引入几毫秒的延迟。

工作进程实现

工作进程使用rill并发工具包简化实现:

func updateUserTimestampWorker(batchSize int, batchTimeout time.Duration, concurrency int, dbTimeout time.Duration) {     requests := rill.FromChan(updateUserTimestampQueue, nil)     requestBatches := rill.Batch(requests, batchSize, batchTimeout)     _ = rill.ForEach(requestBatches, concurrency, func(batch []updateUserTimestampRequest) error {         // ... (批量更新数据库和返回结果) ...     }) }
登录后复制

完整代码及测试

完整的代码可以在Go Playground上找到(链接略)。 主函数模拟多个并发goroutine调用updateUserTimestamp函数。

总结

本文介绍了一种基于Go通道的实时数据批处理方案,它结合了批处理的效率和实时响应的特性。通过使用通道、goroutine和rill工具包,我们可以构建一个高效、简洁且易于维护的系统。 当然,其他方法例如使用Redis缓存或外部队列也适用于此场景,选择何种方案取决于具体的应用需求。

以上就是实时批处理的详细内容,更多请关注php中文网其它相关文章!

本文来自网络,不代表甲倪知识立场,转载请注明出处:http://www.spjiani.cn/wp/8785.html

作者: nijia

发表评论

您的电子邮箱地址不会被公开。

联系我们

联系我们

0898-88881688

在线咨询: QQ交谈

邮箱: email@wangzhan.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部