本文要点
与 10 年前的消息传递解决方案相比,基于云的消息传递服务提供了更为自动化的事务保证。
在处理多队列交互时,可能会意外地进入不一致的状态。
我们可以通过收件箱和发件箱模式将队列与数据库事务连接在一起。
为了便于去重、保证幂等性和事务,要求每个消息都有唯一的标识符。
不要将基于日志的事件处理器与队列混淆起来,要在这二者中选择一个解决方案,请先确定自己需要什么。
与之前的消息服务相比,当今基于云的消息服务是否提供了不一样的事务支持?如果是这样,这意味着什么?在与分布式系统专家 Udi Dahan 的交谈中,我们探讨了这个问题。
InfoQ:请先介绍一下你自己。
Udi Dahan:我叫 Udi Dahan。我是 NServiceBus 的创始人和 Particular Software 的首席执行官。我们主要开发消息中间件,帮助人们更容易、更快速、更可靠地构建复杂的业务系统。
InfoQ:定义一下什么是”事务“吧。
Dahan:关于事务,有两种解释。一种从技术方面来解释,比如 ACID(原子性、一致性、隔离性和持久性)。如果从业务方面来解释,我认为最好的解释是这样的:事务是一种可以让你的系统处于一致性状态的工具,不会让系统留有不准确的垃圾数据,这些垃圾数据会让系统变得无用和不符合监管规范。
InfoQ:请讲讲事务处理从 10 年前的中间件到现如今基于云的消息代理的发展过程。
Dahan:据我所知,事务首先是从数据库领域开始出现的。正如我之前提到的,事务主要与数据一致性有关。
现在有必要说明一下为什么将隔离性也包含在事务属性中。主要的一个考虑是当多个用户或参与者(可能是不同的系统)并行操作数据集时,系统是否能够继续正确地执行操作?我们希望系统不只是在单个用户操作单条特定的记录时能够正常运作,在现今的世界里,会有很多事情同时发生。
我们需要面对的一个事实是,所有的事物都是相互关联的,用户希望连接到他们的数据,并在全球范围内与其他用户实时协作操作任意一组数据。
消息传递系统提供了长距离传递可靠消息的一些元素。试想这样的一个场景:你将钱从一个帐户转移到另一个帐户。任何银行都不可能、也不希望将记录锁定在任何其他一家银行的数据库中。
消息传递是作为一个临时位置引入的,既不在你的数据库中,也不在我的数据库中。然后我们可以把钱通过这些高度可靠的管道进行转移。转移过程的每一步都可以是一个事务:从我的数据库到一个传出队列,从传出队列到一个中间队列,从一个中间队列到另一个中间队列,再到对方的传入队列,从对方的传入队列到对方的数据库。只要这些步骤中的每一个都是可靠并且具备事务性,就可以从业务角度保证整个流程是安全的。
我认为消息传递系统已经开始参与更大规模的事务业务处理流程,也是为什么从一开始事务就被认为是消息传递基础设施的一个非常重要的组成部分。
InfoQ:在云端会有不一样吗?你有没有注意到,Azure Service Bus 或 Amazon SQS 提供的保证机制与十几二十年前使用的企业消息总线是不一样的?
Dahan:我们确实看到了它们的不同之处,而且不仅仅是因为它们在云端。在 SQS 和 Azure Service Bus 之前,一些开源的消息系统(比如 RabbitMQ)都不支持事务。ActiveMQ 虽然声称支持事务,但它的实现不稳定,我们不能完全相信它。
我认为 NoSQL 数据库的崛起是一件非常重要的事情。它催生了业界对事务的需求,而首当其冲的是 MongoDB。
在云端,我们看到了缺乏事务支持所造成的连锁反应。现在,当我们迁移到云端(有成千上万台分布在全球各地的机器)时,需要适当地考虑事务问题。否则的话,就又回到了之前银行开户的例子,那样是不行的。所以,如果云供应商说他们不支持事务,那是因为它们无法灵活提供某种全局性的事务保证。
如果有人说:“我不知道,反正数据会有最终一致性,这样就可以了”,那是因为他们没有意识到,只是心中念着“最终一致性”的咒语,奇迹并不会自己发生。
你可能冒着出现最终“不一致性”的风险,因为你并没有为缺乏事务支持提供有效的补偿。
InfoQ:能举一个意外出现不一致状态的例子吗?
Dahan:实际上,这样的例子有很多,系统最终都会处于不一致的状态,具体取决于你所使用的基础设施以及如何使用它们。
有一些消息队列系统(包括非云端和云端的)不支持多队列事务。假设你从一个队列接收消息,然后向多个其他队列发送消息,那么就会出现这样的情况:你把这些消息发送给其他队列,然后回头对收到的第一条消息进行确认。但也可能出现部分故障,其中有一些消息发出去了,另一些则没有,具体取决于如何与消息代理通信。例如,一些消息代理与客户端使用了异步的通信模式。RabbitMQ 就提供了这种模式,叫作“发布者去人”,这个特性默认情况下是关闭的。
因此,可能会出现这样的情况:我发送了消息,但客户端并没有立即与消息代理发起通信,而代码继续执行。所以有些消息没有被发送出去,但代码可能不知道。我可能会遇到这样的一种情况:我与多个系统集成,但其中一个系统没有接收到信息。客户端代码不会接收到异常,或者通过异步的方式接收到异常。后面的事情就由业务开发人员来决定该怎么处理。业务开发人员一般的反应是:“哦,有一个异常,我把它记录下来,稍后会有人来查看的”。
这个场景很简单。我将介绍另一个涉及数据库通信的例子,这个例子涉及到一些业务系统与消息系统的组合。假设你往数据表格中插入一个实体,数据库会返回给你一个增量标识符,用在你稍后将要发布出去的消息中。现在有一个零售场景,我想买一些东西,你在订单表中插入一些数据,得到订单 ID 12345,然后你发布了一个事件,说我下了一个 DI 为 12345 的订单。现在,你将事务提交到数据库,这个时候可能会出现死锁,因为其他人正在同时处理这笔数据,你不得不回滚并重试。
你的代码支持回滚和重试,而我会对最初收到的信息进行确认,并再次处理重试的消息。我发布的事件(即我收到客户订单,ID 是由数据库生成的 12345)不在事务内,因为传入队列、传出队列和数据库之间并不存在分布式事务。
现在你有一个下游系统,它接收带有客户信息的事件,处理计费、市场营销、会计等方面的业务。这些下游系统与实体标识符之间的连接是错误的,但没有人知道,直到很久以后,等到客户打电话向我们解释出了问题,我们才知道发生了什么。支持人员开始查找原因,但到了那个时候,真的很难找出哪里出了问题,什么样的状态才是正确的,以及下游系统有哪些东西可能出现了不一致。
这些东西在测试过程中通常不容易被发现。因为大多数人不会在并发场景下测试系统,也不会尝试模拟事务失败和回滚等场景。
假设 Amazon SQS 或 Azure 服务总线的客户端解决了这个问题,换句话说,开发人员相信这些库的供应商为他们提供了足够用来构建业务代码的构建块。我们所说的这些代码其实都很简单,从队列中读取消息,将记录插入数据库并发布事件,很多代码都类似这样,但还是存在出现不一致性的风险。
InfoQ:开发人员要怎样做?怎样才能更安全?
Dahan:事实证明,有很多相对简单的模式可用于队列系统和数据库之间的事务。有两个基本的模式,分别是收件箱和发件箱模式。
要使用这两个模式,需要为消息提供一个标识符,用于惟一地标识消息,然后根据实际需要进行重试或去重。
大多数队列系统不一定会提供基于消息标识符的去重机制,有些队列系统甚至不强制使用消息标识符,只是可选的。所以,首先是确保为所有消息提供惟一标识符。
当你的业务代码需要发出消息时,你可以将消息发送到发件箱,而不是直接与消息代理发生通信。发件箱会替你把消息发送给消息代理。现在,发件箱可以作为数据库事务的一部分。
因此,本质上,我们在消息系统中引入了半持久性级别,无论用户要求发送什么消息,都可以作为相同数据库事务的一部分。这意味着如果数据库事务因为发生故障而回滚,不仅可以回滚业务数据,而且可以回滚所有要发送出去的消息,这样就可以防止错误的业务数据脱离事务边界。
这就是发件箱模式。另一个是收件箱模式。假设你要从消息队列中读取一条消息。这个消息有唯一标识符,你的业务逻辑对业务实体进行了更新,并向发件箱发布了一个事件。现在,代码已经准备好发布消息了,但端点却在事务提交后发生了崩溃。然后会发生什么?这个时候需要用到收件箱,在进行消息重试时,收件箱会拿到消息的标识符,与发件箱中的消息进行比对,发现消息已经被处理过了,而且不会重新调用同样业务逻辑,因为实际上已经成功处理过了。
因此,收件箱模式为我们提供了一种内置的幂等性,开发人员不需要在程序中处理幂等性问题。巧合的是,这也是我与 REST 社区一直在争论的一个问题。他们喜欢耍纸上谈兵,好像实现幂等性是一件很简单的事情一样。这有点像最终一致性,如果你说得次数多了,好像就会变成真的一样!不,实现等幂性实际上并不是一件简单的事情。“你只要检查一下之前是否已经处理过那个消息“。但如果是更新操作呢?当然,如果是一个插入操作,你可以检查一下之前是否插入过,但你怎么检查一个更新操作?”只要检查一下业务数据,看看是不是没有发生变化“。这样真的可以吗?如果你在一个并发环境中操作,会有多个用户同时更新相同的数据,会发生什么情况?你成功地执行了更新操作,但又因为某种原因回滚了,然后其他的进程执行了后续的更新。在你进行重试时,发现它的状态与之前看到的不同,所以你会重复之前的处理过程,但并没有意识到实际上覆盖了一些错误的数据。
因此,在并发环境中,与更新相关的等幂性并不是一件容易实现的事情,无论是 REST 还是消息传递系统。这就是为什么我们从一开始就要有事务。你有收件箱,有发件箱,还有消息 ID。然后,在你发送消息时,可能会发生崩溃。因此,在重试发生发件箱中的消息时,需要保留相同的消息标识符。这一点很重要,因为可能有一些下游系统会重复接收到这些消息。如果你保留了相同的消息标识符,那么它们的收件箱就可以成功地删除重复的内容。
因此,这里有很多东西与消息 ID 管理、消息去重、幂等性、捕获和存储消息以及数据存储层的事务管理有关。如果单独看这些东西,它们并不难。但要以正确的方式将它们串在一起,将它们用于 RabbitMQ、Amazon SQS、Azure 服务总线和现有的数据库技术,可能就很棘手了。这正是我为什么要创建 NServiceBus!一般的业务开发人员应该专注于他们的业务代码,而不是花时间找出如何实现这些中间件消息传递模式来获得最终的一致性,而不是最终不一致性。
InfoQ:那么,NServiceBus 是否采用了这些模式并将它们嵌入到库中,这样开发人员就不用处理这些问题了?NServiceBus 如何解决这个问题?
Dahan:NServiceBus 提供了一个框架,所有这些东西都已经包含在里面了。
这也是我对行业发展方向存在异议的地方。我们是基准导向型的。我们会说“RabbitMQ 每秒可以处理 60000 条消息,而 ZeroMQ 每秒可以发送 120,000 条信息“。但如果做不到安全,速度快又有什么用?如果一辆汽车里没有安全带或安全气囊,你愿意开着这样的车带着你的家人四处转吗,尽管它可以在两秒内加速到 60 码?
我希望安全第一,速度第二。所以,NServiceBus 首先提供的是安全性,让你不会丢失任何消息。
我们已经基于所有的技术栈和所有不同的故障模式对它进行了测试,能够保证你不会丢失任何消息,或者不会让你的业务数据变得不一致。如果你想要更快的速度,可以来找我们,我们可以一起讨论如何做出调整。但在涉及业务数据、业务工作流这类的东西时,我想你会希望它们始终处于正确的状态。
InfoQ:你有没有注意到现在出现了很多类似 Kafka 的分布式日志系统?可以使用消息 ID,可以重放数据,这些会改变你对系统集成的想法吗?
Dahan:Kafka 是一个很棒的数据流平台。他们确实构建了一些令人感到惊叹的平台,但我认为它所呈现的远远不止这些。它可以用来解决之前我所描述的场景中。消息代理和流式平台之间最主要的区别是:它们适用于不同的环境。
假设在物联网环境中,我从火车或卡车上的传感器获取信息,这些传感器可以告诉我火车或卡车的速度、航向和当前的燃料情况。如果无法成功读取其中的一个消息,可以读取下一个,然后旧消息就变得无关紧要了。因此,你不需要高度一致的一次处理语义。实际上,以每次处理一条消息的方式来处理这种洪水似的消息流是非常低效的!
有很多这样的情况,你需要处理更多的数据流,你希望能够尽可能快地处理,而且同时需要处理很多数据流。
现在,在这些域之外,大多数时候业务事件可以通过业务逻辑浮出出来。“我想我们的卡车坏了,我们需要采取行动“。这是一个业务事件,你可不想错过这个事件,所以你会在数据流和业务事件处理之间来回切换。
至于支持 Kafka,我想说的是,很多技术都是这样的。你可以就这样使用它,也可以让它支持更多的场景。问题是,要想取得成功,你需要在核心平台上做多少额外的工作?
对于 Kafka 来说,需要做相当多的工作。实际上,我们也试图这样做,因为客户会问我们为什么 NServiceBus 不支持 Kafka。我们讨论了刚才介绍的整个架构。但核心的技术结论是,我们首先需要在 Kafka 的基础上构建队列,然后在队列基础上构建 NServiceBus。
首先,这很困难。其次,它的可伸缩性不好。Kafka 之所以可以做出这样的设计,是因为他们不需要支持排队语义。如果他们知道需要支持队列语义,他们就会以不同的方式来设计它。
InfoQ:最后还有什么想说的吗?
Dahan:对于那些从未使用过 NServiceBus 的人,不要被这个名字搞迷糊!有时候,当人们听到“服务总线”这个名字时,他们会认为它是 ESB 之类的东西,而他们可能只需要一些轻量级的东西。实际上,它是给.NET 开发人员准备的几个 NuGet 包而已。你可以去试试入门示例,你可以会在 15 分钟或更短的时间内启动和运行它,真的很容易。我们可以支持任意的工作负载。
受访者介绍
Udi Dahan 是著名的面向服务架构和领域驱动设计专家,也是.NET 服务总线 NServiceBus 的作者。
原文链接:
How Do We Think about Transactions in (Cloud) Messaging Systems? An Interview with Udi Dahan
评论