我告诉几百位用户他们的款项已经到账了,可实际上并没有!
本文最初发布于 hakibenita.com 网站,经原作者授权由 InfoQ 中文站翻译并分享。
你有没有想过错误都是怎么来的呢?我说的不是那种用简单的单元测试就能捕获的普通错误。我说的是第一眼看上去好像没什么问题,但回头想起来却会觉得很明显的那种错误。
本文讲的是我不小心向几百位用户发送了付款到账的消息,实际上他们的钱还没到手的故事!
当你意识到自己犯了错误时是什么感觉
故事
我们系统中有一个付款流程,是向商家和其他类型的用户付款用的。对于大多数用户来说,付款流程是一件非常重要的事情,因为这就是他们获得报酬的途径。
创建一个付款操作
为了完成付款流程,我们有一个名为 PayoutProcess 的 Django 模型。要创建一个新的付款操作时,我们会使用一个大概长成下面这样的函数:
from __future__ import annotations
from django.db import model, transaction as db_transaction
class PayoutProcess(models.Model):
#... fields
@classmethod
def create(cls, to: User, amount: int) -> PayoutProcess:
# ... Validate input ...
with db_transaction.atomic()
payout = cls.objects.create(
to=user,
amount=amount,
status='pending',
)
# Create related objects etc...
return payout
复制代码
这个函数的简化版本可以创建一个付款流程的新实例并返回它。在现实应用中,这个函数会验证输入并创建几个相关的对象。为了确保所有相关对象都能和付款流程实例一同创建,我们使用了一个数据库事务。
新创建的这个实例现在代表系统中的一个付款流程,其中付款模块负责完成付款操作。完成付款操作的方法多种多样,例如通过银行转账、信用卡或其他方式。并非所有的付款方式都是即时到账的,因此付款操作是一个异步流程,可能需要一些时间才能完成。
当款项到账,付款操作完成时,这个模块会更新实例的状态:
class PayoutProcess(models.Model):
@classmethod
def mark_paid(cls, pk: int) -> PayoutProcess:
with db_transaction.atomic():
payout = cls.objects.select_for_update().get(pk=pk)
if payout.status != 'pending':
raise StateError()
payout.status = 'paid'
payout.save()
return payout
复制代码
这个函数获取付款实例,检查其状态并将其标记为已到账。目前为止都没什么问题!
发送通知
有一天,我们的员工来找我们提出了一个想法。他们说,如果系统能够通知用户,告诉他们款项已经到账就太好了。我们认为这是个好主意!谁不想看到一条消息说自己收到了一些 dollar 呢?
付款模块是我们系统的核心模块。我们针对不同类型的用户都有各自的付款操作,顶级应用使用这个模块在不同的上下文中创建付款流程。例如,一个应用向商家发送佣金付款操作,另一个应用向业务合作伙伴付款。
为了让付款模块保持独立,与使用它的应用解耦,我们要让顶级应用来向用户发送到账通知。问题是顶级应用创建付款流程后,付款模块是在内部处理实际的付款操作的,顶级应用没法知道流程走到了哪一步,除非它不断监控付款模块的状态。
顶级应用创建一个付款操作
为了让顶级应用响应付款模块中的各项更改,我们需要有一种机制来让顶级应用知道某些事情发生了变化。这里棘手的一点是顶级应用依赖付款模块,因此付款模块不能反过来依赖它们,否则就会导致循环依赖。
付款到账时,顶级应用会收到通知
在 Django 中,使用信号(signal)是避免循环依赖并保持模块解耦的一种方法:
# payouts/signals.py
from django.dispatch import Signal
payout_paid = Signal()
复制代码
在声明这个信号后,我们会在一笔付款到账时发送它。这是由下面的模型完成的:
from . import signals
class PayoutProcess(models.Model):
@classmethod
def mark_paid(cls, pk: int) -> PayoutProcess:
with db_transaction.atomic():
payout = cls.objects.select_for_update().get(pk=pk)
if payout.status != 'pending':
raise StateError()
payout.status = 'paid'
payout.save()
signals.payout_paid.send_robust(sender=PayoutProcess, payout=payout)
return payout
复制代码
现在顶级应用可以收听这个信号,并向用户发送通知:
from django.dispatch import receiver
import payout.signals
import payout.models
from .models import MerchantPayoutProcess
@receiver(payout.signals.payout_paid)
def on_merchant_was_paid(sender, payout: payout.models.PayoutProcess) -> None:
try:
p = MerchantPayoutProcess.objects.get(payout_id=payout.id)
except MerchantPayoutProcess.DoesNotExist:
# Not a merchant payout
return
p.user.email_user(f'Dear merchant, you got paid {payout.amount}$!')
复制代码
当信号接收器被触发时,它会首先检查这是否属于它自己的付款操作。如果答案是肯定的,接收器会获取相关对象(在这里就是给商家的一笔付款),并向用户发送通知。
N 个接收者
使用这个模式时,如果你有 N 个接收者,那么每次调度都会导致 N-1 个无用的查询。可以向信号添加一些上下文来避免这种情况。
DISPATCH_UID
在信号接收器上设置 dispatch_uid 是个好主意。这份文档给出了很好的解释。
以这种方式使用信号的好处是,底层付款模块可以与依赖它的应用通信,而不会形成对应用的依赖。这种模式消除了循环依赖,并让底层模块保持独立和解耦。
批量处理
这种设计效果很好,每次付款到账时用户都很高兴。
又有一天,工作人员带着另一个想法回来了。他们说工作量越来越多,所以他们现在想要自动化和简化其中一些任务。他们问我们是否可以将多个付款操作标记为批量付款。经过简短的讨论,我们决定最好让批量流程“要么都成要么都败”,也就是说,哪怕批量付款中有一项操作失败,其他操作也都不能通过。
我们认为这会是一项简单的任务,我们要做的就是对一个数据库事务中的所有给定付款操作执行以下命令:
from django.db import transaction as db_transaction
def mark_paid_in_bulk(payout_ids: list[int]) -> None:
with db_transaction.atomic():
for pk in payout_ids:
PayoutProcess.mark_paid(pk)
复制代码
这个批量处理流程简单地遍历付款操作 ID,并将每个 ID 标记为已到账。为了确保这个流程是原子的,或者“全部成功或全部失败”,我们将循环包装在了一个数据库事务中。
很简单,对吧?从这里开始就是一堆麻烦事了。
Bug
这个批量流程也正常用了一段时间。工作人员会上传 Excel 文件(之类的东西),系统会检查付款操作并将它们全部标记为已到账。
有一天,平时负责这件事的人在放假,就请了其他人代班。代班的员工准备好了 Excel 文件并将其上传到系统。这位新人不熟悉这个流程,所以在付款金额上犯了一些错误。结果,系统拒绝了一些付款操作。
现在系统报告了一个错误,正常人会怎么反应呢?他们开始一次又一次地尝试......
过了一阵儿,我们开始收到用户的投诉,说他们收到了大量的到账消息。有些人很高兴,但还有些用户打开应用查看详情,发现他们实际上没有收到钱,并意识到了这一定是一个错误。
这时候已经有数百名用户收到了这些消息,但没人拿到了哪怕一分钱!那么是什么引发了这个问题呢?当所有付款仍标记为待处理时,为什么系统就把通知发出去了?我们仔细查看批量流程的实现,终于发现了问题。
嵌套事务
将付款标记为已到账的那个函数是在数据库事务内执行的。为了确保信号只在付款状态提交到数据库时才发送,信号会在事务完成后发送:
@classmethod
def mark_paid(cls, pk: int) -> PayoutProcess:
with db_transaction.atomic():
payout = cls.objects.select_for_update().get(pk=pk)
if payout.status != 'pending':
raise StateError()
payout.status = 'paid'
payout.save()
signals.payout_paid.send_robust(sender=PayoutProcess, payout=payout)
return payout
复制代码
当这个函数在单次付款操作上执行时,它会按预期正常工作。但是如果我们添加批量流程就会变成这样:
with db_transaction.atomic():
for pk in payout_ids:
# inline `mark_paid()`
with db_transaction.atomic():
payout = cls.objects.select_for_update().get(pk=pk)
if payout.status != 'pending':
raise StateError()
payout.status = 'paid'
payout.save()
signals.payout_paid.send_robust(sender=PayoutProcess, payout=payout)
复制代码
哈!这个批量流程正在使用它自己的数据库事务!信号发送后,如果批量付款中后面的一次付款失败,付款操作还可以回滚。
这里说明一下,如果我们要批量标记三笔付款,而第三笔未能成功标记,那么所有三笔付款操作都会回滚,但前两笔的通知已经发送出去了:
>>> from django.db import transaction as db_transaction
>>> with db_transaction.atomic():
... for fail in [False, False, True]:
... with db_transaction.atomic():
... if fail:
... raise Exception('Failed!')
... print('Message sent!')
...
Message sent!
Message sent!
Exception: Failed!
复制代码
注意代码中,即便第三笔付款失败导致外部事务回滚所有三笔付款,前两笔付款的成功通知还是会发送出去。
补救措施
那个 mark_paid 函数会假设它自己不在某个数据库事务内执行,但它不会以任何方式检查或避开这种情况。这就是个问题。
断言原子块
在 Django 3.2 之前,我们有一些用例需要确保某个函数在一个数据库事务中执行或者不执行。我们最后实现了两个函数:
# common/db.py
from django.db import connection
def assert_is_in_atomic_block() -> None:
assert connection.in_atomic_block, (
'This function must be run inside of a DB transaction.'
)
def assert_is_not_in_atomic_block() -> None:
assert not connection.in_atomic_block, (
'This function must not be run inside of a DB transaction.'
)
复制代码
有了这些实用函数后,我们就可以避免某些代码在数据库事务中执行:
import common.db
def do_not_run_inside_a_db_transaction():
common.db.assert_is_not_in_atomic_block()
# Rest of function goes here...
复制代码
现在在原子块内运行这段代码块时,将在运行时触发一个断言错误:
>>> from django.db import transaction as db_transaction
>>> with db_transaction.atomic():
... do_not_run_inside_a_db_transaction()
AssertionError: This function must not be run inside of a DB transaction.
复制代码
这种方法的主要缺点是,除非另有明确说明,否则测试将在一个数据库事务中运行。这将导致使用事务的测试全部失败。为了克服这个问题,我们最后在测试中修补了这些函数:
@pytest.fixture(scope='session', autouse=True)
def patch_is_in_db_transaction():
# Patch atomic transaction check in tests.
# The checks can't be run in tests because tests are always wrapped in a transaction.
patch_in = mock.patch('common.db.assert_is_in_atomic_block')
patch_not_in = mock.patch('common.db.assert_is_not_in_atomic_block')
with patch_in, patch_not_in:
yield
复制代码
这个函数创建了一个自动应用于整个测试会话的修复,可以模拟这两个函数并禁用它们的功能。
持久事务
从 Django 3.2 开始,还有另一种方法可以将事务标记为“持久(durable)”,来防止事务在另一个事务内部执行:
with db_transaction.atomic(durable=True):
payout = cls.objects.select_for_update().get(pk=pk)
if payout.status != 'pending':
raise StateError()
payout.status = 'paid'
payout.save()
signals.payout_paid.send_robust(sender=PayoutProcess, payout=payout)
复制代码
如果你尝试在另一个事务中打开一个持久事务,则会引发一个 RuntimeError:
>>> from django.db import transaction as db_transaction
>>> with db_transaction.atomic():
... with db_transaction.atomic(durable=True):
... pass
...
RuntimeError: A durable atomic block cannot be nested within another atomic block.
复制代码
使用持久事务可能会避免这个问题,但它也会让批量处理功能做不出来,或者至少实现起来非常复杂!
提交时发送信号
解决这个问题的另一种方法是试着确保只在整个事务成功提交时才发送信号。一种做法是使用on_commit。
使用 on_commit 时,我们可以注册一个仅在事务实际提交时才执行的函数。为了说明 on_commit 是如何解决问题的,请考虑以下示例:
>>> from django.db import transaction as db_transaction
... with db_transaction.atomic():
... for i, fail in enumerate([False, False, True], 1):
... with db_transaction.atomic():
... print(f'processing {i}...')
... if fail:
... raise Exception('Failed!')
... db_transaction.on_commit(lambda: print(f'Message sent!'))
processing 1...
processing 2...
processing 3...
Exception: Failed!
复制代码
在这个示例中我们循环遍历三个值,其中第三个值会失败。为了仅在事务成功提交时打印消息,我们使用 on_commit。注意在输出中处理了三个项目,但由于第三个项目失败,整个过程失败并且没有发送任何消息。
为了说明当所有项目都成功时会发生什么,请考虑以下示例:
>>> from django.db import transaction as db_transaction
... with db_transaction.atomic():
... for i, fail in enumerate([False, False, False], 1):
... with db_transaction.atomic():
... print(f'processing {i}...')
... if fail:
... raise Exception('Failed!')
... db_transaction.on_commit(lambda: print(f'Message sent!'))
processing 1...
processing 2...
processing 3...
Message sent!
Message sent!
Message sent!
复制代码
太棒了!系统处理了三个项目并发送了三个消息。我们现在可以对付款模块应用类似的修复了:
from . import signals
class PayoutProcess(models.Model):
@classmethod
def mark_paid(cls, pk: int) -> PayoutProcess:
with db_transaction.atomic():
payout = cls.objects.select_for_update().get(pk=pk)
if payout.status != 'pending':
raise StateError()
payout.status = 'paid'
payout.save()
db_transaction.on_commit(lambda signals.payout_paid.send_robust(PayoutProcess, payout))
return payout
复制代码
当一笔付款操作被标记为已到账时,这个函数现在只在事务提交时才会发送信号。这样这个函数就可以安全地在另一个事务中执行了!
使用一个队列
在处理此类问题时,人们很容易马上想到用队列来解决问题。作为思考练习,我们来看看两种通常称为“队列”的常见模式。
异步任务
诸如Celery之类的异步任务运行器非常受欢迎。你可以用它们在现在、稍后或预定时间异步执行任务。但在我们的情况下使用异步任务不会解决问题:
如果我们不管发送消息的不是付款模块这一事实,这里的结果与在 on_commit 中发送信号并从接收器触发异步任务是完全相同的(这就是我们所做的)。
这会遇到与信号相同的问题。批量流程失败时任务已被触发,消息将被发送出去。
这在某些情况下可能是可行的,但还有其他问题:
使用异步任务运行器的另一个缺点是,现在你需要有一个异步任务运行器。如果你已经有了一个,那么问题可能不大,但如果你没有,那么设置和操作这么个工具可能会是很繁琐的工作。
事务队列
如果你决定在数据库中实现一个队列,你可能离正确的解决方案又近了一步。你可以将任务暂存到充当队列的数据库表中,这样就不用信号了。
在数据库中使用队列表的主要好处是,只有在提交事务时才会添加任务。这与流程的整体事务管理非常相称,并能确保任务只在应该添加时才添加上来。
有挑战性的部分是如何确保任务在添加到队列后不久就被接收到。如果你使用一个 cron 作业来处理任务,发送操作可能会延迟到 cron 作业的重复间隔时。如果你使用数据库触发器、LISTEN/NOTIFY 或类似触发器的东西处理任务,则延迟可以更短一些。
测试
我们最后实现了 on_commit 解决方案,因为它只需要对现有代码进行很少的更改即可。然而,在我们完成对代码的更改之后,我们又面临了另一个挑战——测试!
用 Django 测试
我们的测试包括了确保在付款到账时发送通知的场景:
def test_should_send_notification(db, mailoutbox, merchant_user: User) -> None:
comm = MerchantCommission.create_payout(merchant_user, amount=100_00)
PayoutProcess.mark_paid(comm.payout_process_id)
assert len(mailoutbox) == 1
复制代码
在我们改为在 on_commit 中发送信号后,所有这些测试都失败了。经过一些调试,我们发现为信号注册的接收器函数没有被执行,但只在测试中才是这样!
如果你知道测试是怎么执行的,那么 on_commit 处理程序没有被触发这一事实也就不足为奇了。为了加快速度,Django 会在每次测试开始时启动一个数据库事务,然后立即回滚它。以这种方式执行测试是防止更改数据库中数据的各个测试相互影响的快速方法。
为了不使用较慢的事务测试,又能测试在 on_commit 中触发的事物,Django 3.2 添加了一个新的名为captureOnCommitCallbacks的上下文管理器(Ticket#30457):
from django.core import mail
from django.test import TestCase
class TestPayoutProcess(TestCase):
def test_should_send_notification(self, merchant_user: User) -> None:
comm = MerchantCommission.create_payout(merchant_user, amount=100_00)
with self.captureOnCommitCallbacks(execute=True):
PayoutProcess.mark_paid(comm.payout_process_id)
assert len(mail.outbox) == 1
复制代码
这个上下文管理器可用于 TestCase 实例,当 execute=True 时,任何 on_commit 处理程序都将被执行,而不仅仅是被捕获。
使用 Pytest 进行测试
不幸的是,我们不再直接使用 Django 的 TestCase 了,我们使用的是 pytest,而且我们没条件去重写内容。还好 pytest-django 实现了等效的功能。我们快速升级到了 pytest-django 4.4版,准备就绪:
def test_should_send_notification(
db,
mailoutbox,
django_capture_on_commit_callbacks,
merchant_user: User,
) -> None:
comm = MerchantCommission.create_payout(merchant_user, amount=100_00)
with django_capture_on_commit_callbacks(execute=True):
PayoutProcess.mark_paid(comm.payout_process_id)
assert len(mailoutbox) == 1
复制代码
django_capture_on_commit_callbacks这个修复基于 Django 函数。一旦你注入它,你就可以像使用 Django 一样使用它。
由这个嵌套事务引起的“bug”最后导致一些用户收到了多条付款到账消息,不过所有这些用户最后都拿到了钱。
对 Django 信号的思考
就像这个故事里提到的,Django 信号可用于实现模块之间的交互,而无需在它们之间创建显式依赖项。关于信号的官方文档也将这一点作为使用信号的主要原因:
Django 包含一个“信号调度器”,它允许互相解耦的应用在框架中的其他地方发生动作时得到通知。简而言之,信号允许某些发送者通知一组接收者,告诉后者某些动作已经发生。当许多代码段可能对同一事件感兴趣时,它们特别好用。
如果你看看信号是如何在Django中实现的,你会发现幕后并没有太多魔法可言。函数 connect 将一个函数添加到接收器列表中,并且当一个信号被 send(或 send_robust)时,信号对象会遍历接收器函数列表,并逐一执行它们。
这与 pub-sub 模式非常相似,但它缺乏更高级实现所有的一些保障。Django 信号的主要缺点之一是无法保证“消息”会到达目的地。例如,如果服务器在广播信号时崩溃,则某些接收器可能不会执行,并且在服务再次启动时也不会尝试执行。如果你完全依赖信号来触发系统中的某些动作,这可能会成为一个问题。
原文链接:https://hakibenita.com/django-nested-transaction
评论