近日,私密社交网络 Nextdoor 在其官方博客发表了一篇文章,介绍其分布式任务队列系统的演进过程。该系统每天要处理数以百万计的异步任务,包括向数以百万计的邻居发送内容通知、创建搜索索引、以及其它应该从交互式Web 和移动应用程序解耦的耗时的处理过程。它由两部分组成:消息代理(队列)和一组任务工作进程。像其它许多系统一样,他们使用 RabbitMQ 作为消息代理,使用 Celery 作为任务工作进程。在公司规模较小的时候,这些开源项目提供了很大的帮助。但随着用户数的增多,不久前,他们在 Celery 的稳定性方面遇到了问题。即使得到了 Celery 创建者 Ask Solem 本人的支持,但他们仍然会遇到一些问题。最终,他们决定用他们自己开发的项目 Taskworker 替换 Celery。同时,为了减少运维开销,他们用 Amazon SQS 替换了 RabbitMQ。他们的理由是,Amazon SQS 容易理解,具有高可扩展性,而且完全由 Amazon 管理。
文章首先列出了他们在使用 Celery 时面临的三个主要问题:
- Celery 工作进程在他们系统的现有规模下不稳定。工作进程经常莫名其妙地宕掉,而且由于其代码库很复杂,很难进行故障排除。
- Celery 工作进程无法有效利用系统的计算资源。由于 Celery 不支持优先级队列,所以许多工作进程节点要么未充分利用,要么出现了过载。
- Celery 工作进程处理任务的延时经常非常高。
由于上述问题的存在,他们为 Taskworker 设定了三个目标:
- 简单:故障排除要简单。
- 高效:计算资源的利用要尽可能的高效。
- 可扩展:系统应该是完全分布式的,并可横向扩展。
文章接下来详细介绍了 Taskworker 设计及应用到生产环境过程中的一些关键点。
设计决策
基于上述三个目标,他们提出了一种很简单的设计,用 Python 伪代码表示(不包括错误处理和重试逻辑)如下:
def run_taskworker(): while True: queue = select_queue() tasks = queue.get_tasks() for task in tasks: task.run()
在底层,他们会在每个工作进程节点上运行一组 Taskworker 进程,每个进程都运行上面所示的循环。所有进程都是完全独立的。select_queue()函数根据队列的优先级决定从哪个队列获取任务。它既要能优先处理高优先级队列的任务,又要能避免低优先级队列挨饿。
在通过模拟生产负载进行了十多次基准测试后,他们最终选用了一个彩票算法的变体,如下所示:
def select_queue(): candidate_queues = get_all_queues() while not candidate_queues.empty(): queue = run_lottery(candidate_queues) if queue.empty(): candidate_queues.remove(queue) else: return queue return run_lottery(get_all_queues())
文中还提到,他们要管理十几个或更多不同种类的队列,每个队列包含的任务具有相同的优先级和相似的运行时间。他们在队列层面进行配置设定,包括优先级、SQS 可见性超时以及一次任务处理循环获取的任务数。另外,SQS 在向工作进程发送任务时遵循“至少一次”的语义,这就需要任务必须是幂等的。
应用到生产环境
在这一部分,文章介绍了以下三个方面:
- 发布过程:为了保持兼容,SQS 队列和 Taskworker 的版本总是相同。
- 能力计划:他们使用 Taskworker 模拟生产负载,以决定在一天中的不同时段如何设置工作进程的能力。
- 任务迁移:他们基于每个任务增加了自己开发的开关功能,用于决定是将任务发布到 RabbitMQ 还是 SQS。当开始迁移的时候,他们只需要简单地、一个任务接一个任务地开启开关功能。
结论
截止博文发表时,Taskworker 已经在生产环境中运行了三个多月。他们没有再遇到稳定性问题。在运行相同数量的工作进程节点的情况下, Celery 系统队列中的任务忙时平均延时是 Taskworker 系统的 40 倍。
文章最后指出,Taskworker 还有许多可以改进的地方,而且正在准备开源。
感谢郭蕾对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。
评论