写点什么

Mule 的消息路由

2009 年 8 月 17 日

当谈论整合应用时,消息路由备受关注。当我们确定了各个应用,并选择 Mule 作为整合平台,也知道在 Mule 的服务中可以使用哪些 Java 类和 web services 处理消息,那么为了让消息正确地在服务间流转,该如何将所有的事情整合在一起,从而确保获得我们所需要的的结果呢?

Mule 为您的 Mule 应用中的服务间的路由消息提供了强大而灵活的可选项。本文描述了 Mule 的常用消息类型和几种可用的特殊路由器。

下面介绍消息路由器的相关核心概念:

  • 端点(Endpoints)定义了发送和接收消息的通道(channel)。例如,一个购物组件可能会接收到一个 HTTP 订单请求。一旦该组件处理完订单请求,它可能会发送一个 JMS 消息到一个主题(topic)上,以通知审计系统,并返回一个 HTTP 响应。可以通过端点监听 JMS 消息、发送 email、调用 web services 等。
  • 入站路由器(Inbound router)控制服务如何处理入站(incoming)消息,比如,有选择地只消费符合特定条件的消息,或者在将消息转发给服务处理之前,将拥有同一 group ID 的消息聚合(group)在一起。
  • 出站路由器(Outbound router)控制如何分发经服务处理过的消息,比如,将消息发送到一个接受者列表,或者将消息分解成多个部分,并将它们分别发送至不同的端点。
  • 异步回复路由器(Asynchronous reply router)常用于 request/response 场景。在这些场景中,发送一个请求会触发一个或者更多的请求,并且在返回响应之前,需要考虑这些请求的结果。典型的例子是请求发送后,会并行执行任务(task)。在返回响应之前,必须执行完每个任务,处理完结果。
  • Catch-all 策略在当前消息找不到路由路径时才被调用。入站和出站端点都可以配置 catch-all 策略,因此可以捕获到任何孤立的消息,并将这些消息路由到一个共同的位置。
  • 过滤器提供用于调用特定路由器的逻辑。通过逻辑过滤器 AndFilter、OrFilter 和 NotFilter 可以将过滤器组合在一起使用。并非所有的路由器都需要使用过滤器,但是所有的路由器都支持过滤器。

选择消息类型

当将 Mule 的服务结合在一起时,初用 Mule 的人有时会感到困惑,他们不知何时该使用出站路由器,何时可以最大程度地简化获得回复信息。下面介绍 Mule 中可使用的消息类型,可以通过一个列表查看各个传输(transport)所支持的消息类型,详细内容可以查看 Mule 用户指南中的传输特征矩阵(Transports Feature Matrix)(查看前需要先登录,但注册是免费的,只需花费一点时间就可以完成注册。)

异步

如果只想将消息以“即发即弃(fire and forget)”的方式发送给一个服务,(并不需要给调用者返回响应),那么可使用异步消息类型。如果将入站端点的 synchronous 属性设置为 false,它就不会给调用者返回响应。

例如:

复制代码
<model <span color="#800080">name</span>="<span color="#0000ff">Asynchronous_Message_Pattern</span>">
<service <span color="#800080">name</span>="<span color="#0000ff">AsynchronousService</span>">
<inbound>
<jms:inbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">test.in</span>" <span color="#800080">synchronous</span>="<span color="#0000ff">false</span>"/>
</inbound>
<component <span color="#800080">class</span>="<span color="#0000ff">org.myorg.WidgetHandler</span>"/>
<outbound>
<pass-through-router>
<jms:outbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">test.out</span>">
</pass-through-router>
</outbound>
</service>
</model>

Request-Response

在简单的 Request-Response 场景中,服务在一个同步的入口端点上接收请求,并处理该请求,然后将它作为回复发送给调用者。例如,如果用户在 HTML 表单中输入一个值,想转换该值并将其结果显示在同一个页面上,那么可以在该服务上简单地配置一个同步入站端点,由该服务完成数据转换。这种场景并不需要使用出站端点。这就是 request-response 消息类型。

例如:

复制代码
<model <span color="#800080">name</span>="<span color="#0000ff">Request-Response_Message_Pattern</span>">
<service <span color="#800080">name</span>="<span color="#0000ff">SynchronousService</span>">
<span color="#000080"><!-- 为了返回 response 将 synchronous 的值设置为“true”--></span>
<inbound>
<http:inbound-endpoint <span color="#800080">host</span>="<span color="#0000ff">localhost</span>" <span color="#800080">port</span>="<span color="#0000ff">8080</span>"
<span color="#800080">path</span>="<span color="#0000ff">/mule/services</span>" <span color="#800080">synchronous</span>="<span color="#0000ff">true</span>"/>
</inbound>
<span color="#000080"><!-- 指定处理该请求的组件 --></span>
<component <span color="#800080">class</span>="<span color="#0000ff">org.myorg.WidgetHandler</span>"/>
</service>
</model>

同步

如果为了进一步处理消息,需要将消息传递给第二个服务,那么需要在第一个服务上配置一个出站路由器将该消息传递给第二个服务。在第二个服务处理完消息后,第一个服务将它作为回复发送给调用者。值得注意的是将第一个服务设置为同步入口端点就意味着之后的所有服务都会以同步的方式处理该消息,所以无需在第二个服务上设置 synchronous 属性的值。这就是同步消息类型。

例如:

复制代码
<model <span color="#800080">name</span>="<span color="#0000ff">Synchronous_Message_Pattern</span>">
<service <span color="#800080">name</span>="<span color="#0000ff">SynchronousService</span>">
<inbound>
<span color="#000080"><!-- 为了返回 response 将 synchronous 的值设置为“true” --></span>
<jms:inbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">test.in</span>" <span color="#800080">synchronous</span>="<span color="#0000ff">true</span>"/>
</inbound>
<component <span color="#800080">class</span>="<span color="#0000ff">org.myorg.WidgetHandler</span>"/>
<outbound>
<span color="#000080"><!-- 使用 pass-through 路由器时,如果想返回 response 必须将 synchronous 的值设置为“true”--></span>
<pass-through-router>
<!-- 设置出站端点 -->
<jms:outbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">test.out</span>" <span color="#800080">synchronous</span>="<span color="#0000ff">true</span>"/>
</pass-through-router>
</outbound>
</service>
<span color="#000080"><!-- 配置第二个服务,并将它的入站端点设置为上一个服务的出站端点的路径。<br></br> 值得注意的是无需设置 synchronous 的值,因为在第一个服务中已经将消息设置为 synchronous 了。<br></br> --></span>
<service>
<inbound>
<jms:inbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">test.out</span>"/>
</inbound>
<component <span color="#800080">class</span>="<span color="#0000ff">org.myorg.WidgetProcesser</span>"/>
</service>
</model>

注意:在 Mule 的以往版本中,远程服务需要设置 remoteSync 属性的值为 true 用于配置同步处理。从 Mule 2.2 开始,remoteSync 属性已经被删除,只需要设置 synchronous 属性的值为 true 就可以创建同步流。

异步 Request-Response

在大多数复杂的场景中,可以使用 request-response 消息,并使用后端(back-end)流程调用其它的服务,并基于多个服务调用的结果异步地返回一个回复。你可以将入站端点的 synchronous 属性设置为 false,因为异步回复路由器会处理该回复,除非你想给调用者发送响应。这就是异步 request-response 消息类型。

在下面的例子中,HTTP 端点接收一个请求,并使用 Multicast 路由器将该请求广播到两个端点,再将这些结果以异步的方式发送到一个 JMS 端点。

复制代码
<model <span color="#800080">name</span>="<span color="#0000ff">Async_Request-Response_Message_Pattern</span>">
<service <span color="#800080">name</span>="<span color="#0000ff">AsyncRequestResponseService</span>">
<inbound>
<span color="#000080"><!-- <br></br> 将 synchronous 设置为“false”,因为 response 将由异步回复路由器处理 --></span>
<http:inbound-endpoint <span color="#800080">host</span>="<span color="#0000ff">localhost</span>" <span color="#800080">port</span>="<span color="#0000ff">8080</span>"
<span color="#800080">path</span>="<span color="#0000ff">/mule/services</span>" <span color="#800080">synchronous</span>synchronous="<span color="#0000ff">false</span>"/>
</inbound>
<component <span color="#800080">class</span>="<span color="#0000ff">org.myorg.WidgetHandler</span>"/>
<span color="#000080"><!-- 配置异步回复的设置。这个例子使用了收集异步回复路由器,<br></br> 在发送回复信息之前,它将所有的响应信息收集在一起。 --></span>
<async-reply <span color="#800080">timeout</span>="<span color="#0000ff">5000</span>>
<collection-async-reply-router/>
<jms:inbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">reply.queue</span>"/>
</async-reply>
<span color="#000080"><!-- 设置负责接收和处理消息的端点以及回复消息的端点 --></span>
<outbound>
<multicasting-router>
<reply-to <span color="#800080">address</span>="<span color="#0000ff">jms://reply.queue</span>"/>
<jms:outbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">service1</span>" <span color="#800080">synchronous</span>="<span color="#0000ff">false</span>"/>
<jms:outbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">service2</span>" <span color="#800080">synchronous</span>="<span color="#0000ff">false</span>"/>
</multicasting-router>
</outbound>
</service>
</model>

关于消息类型的全部内容可以参看 Mule 用户指南中的 Mule 的消息类型

现在我们已经理解了在不同的场景中可以使用哪种消息类型路由消息,下面让我们看看哪些路由器可以很好地控制消息路由。更多消息路由的信息可以参看 Mule 用户指南中的使用消息路由器

将消息传递到另一个端点

pass-through 路由器是为简化端点间的消息传递而设计的。比如,它对分发消息给一个队列非常有用。

也可以使用 pass-through 路由器将协议桥接到其它的出站端点。例如:

复制代码
<service <span color="#800080">name</span>="<span color="#0000ff">HttpProxyService</span>">
<inbound>
<inbound-endpoint <span color="#800080">address</span>="<span color="#0000ff">http://localhost:8888</span>" <span color="#800080">synchronous</span>="<span color="#0000ff">true</span>"/>
</inbound>
<outbound>
<pass-through-router>
<outbound-endpoint
<span color="#800080">address</span>="<span color="#0000ff">http://www.webservicex.net#[header:http.request]</span>"
<span color="#800080">synchronous</span>="<span color="#0000ff">true</span>"/>
</pass-through-router>
</outbound>
</service>

当使用 pass-through 路由器时,如果想返回一个响应,必须将出站端点的 synchronous 属性设置为 true。其它的路由器,比如 chaining 路由器并不需将出站端点的 synchronous 属性设置为 true,该路由器总会在同步的场景中返回一个响应。因此,如果将消费发送给多个服务,可能会用 chaining 路由器代替 pass-through 路由器,因为 chaining 路由器中不需要将每个端点的 synchronous 设置为 true。

过滤消息

使用过滤器可以控制服务处理哪些消息。选择性消费者路由器(Selective Consumer Router)用于入站端点,它可以控制服务处理哪些消息。过滤路由器(Filtering Router)用于出站端点,可以控制哪些消息发送到下一个服务上。可以组合使用这些过滤器来控制消息流。

例如,如果只想处理不包含错误的消息,那么可以使用选择性消费者以确保只处理结果代码为 success 的消息。并使用 Catch-all 策略将其它的消息转发到另外端点上作为错误处理:

复制代码
<inbound>
<selective-consumer-router>
<mulexml:jxpath-filter <span color="#800080">expression</span>="<span color="#0000ff">msg/header/resultcode = 'success'</span>"/>
</selective-consumer-router>
<forwarding-catch-all-strategy>
<jms:endpoint <span color="#800080">topic</span>="<span color="#0000ff">error.topic</span>"/>
</forwarding-catch-all-strategy>
</inbound>

在服务处理消息时,如果想通过指定的标准决定将消息发送到哪个端点,那么可以在出站端点上使用过滤路由器。在下面的示例中,将包含异常信息的消息发送到系统管理员的 email 邮箱,将包含特定字符串的消息发送到名为 string.queue 的队列,并使用 forwarding catch-all 路由器接收余下的所有消息,并将它们发送到名为 error.queue 的死信队列:

复制代码
<outbound>
<filtering-router>
<smtp:outbound-endpoint <span color="#800080">to</span>="<span color="#0000ff">ross@muleumo.org</span>"/>
<payload-type-filter <span color="#800080">expectedType</span>expectedType="<span color="#0000ff">java.lang.Exception</span>"/>
</filtering-router>
<filtering-router>
<jms:outbound-endpoint <span color="#800080">to</span>="<span color="#0000ff">string.queue</span>"/>
<and-filter>
<payload-type-filter <span color="#800080">expectedType</span>="<span color="#0000ff">java.lang.String</span>"/>
<regex-filter <span color="#800080">pattern</span>="<span color="#0000ff">the quick brown (.*)</span>"/>
</and-filter>
</filtering-router>
<forwarding-catch-all-strategy>
<jms:outbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">error.queue</span>"/>
</forwarding-catch-all-strategy>
</outbound>

与过滤路由器(filtering router)相似的路由器有转发路由器(forwarding router),它可以处理一些消息并可以选择性地将消息转发到其它路由器,还有 wiretap router,这种路由器可以处理所有的消息,并将它们发送到端点上,同时也将消息的副本发送到另外一个端点。更多信息可以参看 Mule 用户指南中的入站路由器(Inbound Routers)

将多个出站端点链接在一起

假设我们有一个验证服务,当消息没有通过验证时,想将该消息以及验证异常转发到另一个服务,并将消息和验证异常返回给调用者。那么可以使用链接路由器(chaining router),它是一个高速的、轻量级的可配置路由器,可用于将消息发送到端点,然后将该端点的输出结果发送到另一个端点。例如:

复制代码
<chaining-router>
<span color="#000080"><!-- 首先,将消息发送到这个端点,用于验证。 --></span>
<vm:outbound-endpoint path="<span color="#0000ff">ValidationService</span>" synchronous="true"/>
<span color="#000080"><!-- 接着将包含表达式的消息发送到这个端点上 --></span>
<vm:outbound-endpoint path="<span color="#0000ff">ValidationError</span>" synchronous="<span color="#0000ff">true</span>">
<exception-type-filter expectedType="<span color="#0000ff">java.lang.Exception</span>"/>
</vm:outbound-endpoint>
</chaining-router>

消息分解

消息分解器(message splitter)可用于将输出消息(outgoing message)分解成多个部分,再将他们分发到配置在路由器(router)上的不同端点。例如,在订单处理应用中,如果想将经消息分解后的不同部分分发给不同的服务去处理,那么可以使用下面的路由器:

列表消息分解器(List Message Splitter):接收一个对象列表,这些对象将被路由到不同的端点。例如:

复制代码
<outbound>
<list-message-splitter-router">
<span color="#000080"><!-- 将 order 路由到队列 order.queue --></span>
<jms:outbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">order.queue</span>">
<payload-type-filter <span color="#800080">expectedType</span>="<span color="#0000ff">com.foo.Order</span>"/>
</jms:outbound-endpoint>
<span color="#000080"><!-- 将 items 路由到队列 item.queue --></span>
<jms:outbound-endpoint <span color="#800080">queue</span>="<span color="#0000ff">item.queue</span>">
<payload-type-filter <span color="#800080">expectedType</span>="<span color="#0000ff">com.foo.Item</span>"/>
</jms:outbound-endpoint>
</list-message-splitter-router>
</outbound>

表达式分解路由器(Expression Splitter Router):它与列表消息分解器相似,只是它是基于表达式分解消息,将消息分解成一个或者多个部分。例如:

复制代码
<outbound>
<expression-splitter-router
<span color="#800080">evaluator</span>="<span color="#0000ff">xpath</span>"
<span color="#800080">expression</span>="<span color="#0000ff">/mule:mule/mule:model/mule:service</span>"
<span color="#800080">disableRoundRobin</span>="<span color="#0000ff">true</span>"
<span color="#800080">failIfNoMatch</span>="<span color="#0000ff">false</span>">
<outbound-endpoint <span color="#800080">ref</span>="<span color="#0000ff">service1</span>">
<expression-filter
<span color="#800080">evaluator</span>="<span color="#0000ff">xpath</span>"
<span color="#800080">expression</span>="<span color="#0000ff">/mule:service/@name = 'service splitter'</span>"/>
</outbound-endpoint>
<outbound-endpoint <span color="#800080">ref</span>="<span color="#0000ff">service2</span>">
<expression-filter
<span color="#800080">evaluator</span>="<span color="#0000ff">xpath</span>"
<span color="#800080">expression</span>="<span color="#0000ff">/mule:service/@name = 'round robin deterministic'</span>"/>
</outbound-endpoint>
</expression-splitter-router>
</outbound>

关于如何配置表达式分解路由器的更多信息,可以参看 Mule 的用户手册中的表达式配置参考(Expressions Configuration Reference)

为了提高性能也可以将消息分解成多个部分。轮叫(Round Robin)消息分解器将消息分解成多个部分,并以轮叫(round-robin)的方式将它们发送到端点。Message Chunking Router 将消息按固定长度分解成多个部分,并将它们路由到同一个端点。

消息分解之后,可以使用 Message Chunking Aggregator 重新将消息块聚合在一起。该聚合器(aggregator)通过关联 ID(correlation ID)来识别哪些消息块属于同一个消息,关联 ID(correlation ID)在出站路由器(outbound router)上设置。

复制代码
<inbound>
<message-chunking-aggregator-router>
<expression-message-info-mapping
<span color="#800080">correlationIdExpression</span>="<span color="#0000ff">#[header:correlation]</span>"/>
<payload-type-filter <span color="#800080">expectedType</span>="<span color="#0000ff">org.foo.some.Object</span>"/>
</message-chunking-aggregator-router>
</inbound>

处理消息仅有一次

幂等接收器(Idempotent Receiver)通过核对输入消息的唯一消息 ID 来保证只有拥有唯一 ID 的消息才能被服务所接收。消息 ID 可以通过使用一个表达式从消息中产生,该表达式在 idExpression 属性中定义。#[message:id] 是默认的表达式,也就是说如果要实现该功能,端点必须支持唯一性消息 ID。在下面的例子中,唯一性 ID 是由消息 ID 和消息标头中标签的内容组合而成。所有的消息 ID 都被记录到一个简单的文本文件中,用于追踪哪些消息已经处理过。

复制代码
<inbound>
<idempotent-receiver-router <span color="#800080">idExpression</span>="<span color="#0000ff">#[message:id]-#[header:label]</span>">
<simple-text-file-store <span color="#800080">directory</span>="<span color="#0000ff">./idempotent"/</span>>
</idempotent-receiver-router>
</inbound>

通过组件绑定调用外部服务

除了使用消息路由器控制服务间的消息流之外,也可以通过组件绑定(Component Bindings)调用处理消息的外部服务(External Service)。

在这个方法中,可以将 Mule 的端点绑定到 Java 接口方法。该方法的优势在于,在组件仍在处理消息时,你可以使用外部服务,而无需使用 Mule 的 API 或者修改组件的代码。相反,只需要在 XML 配置文件中配置组件绑定,从而指定外部服务的端点。例如,在下面的绑定例子中,当 sayHello 方法被调用时,HelloInterface 中的 sayHello 方法会调用外部的 HelloWeb 服务。

复制代码
<component <span color="#800080">class</span>="<span color="#0000ff">org.mule.examples.bindings.InvokerComponent</span>">
<binding <span color="#800080">interface</span>="<span color="#0000ff">org.mule.examples.bindings.HelloInterface</span>"
<span color="#800080">method</span>="<span color="#0000ff">sayHello</span>">
<cxf:outbound-endpoint
<span color="#800080">address</span>="<span color="#0000ff">http://myhost.com:81/services/HelloWeb?method=helloMethod</span>"
<span color="#800080">synchronous</span>="<span color="#0000ff">true</span>"/>
</binding>
</component>

更多信息,可以参看 Mule 用户指南中的组件绑定(Component Bindings)

总结

Mule 为控制应用中的消息如何交换提供了多种方法。本文总体上介绍了 Mule 的消息路由,以及它所支持的消息类型,一些常用的路由器和组件绑定。关于消息路由的完整信息,可以参看 Mule 的用户手册

查看英文原文: Routing Messages in Mule


译者简介:

陈义,计算机应用技术专业硕士研究生,一直专注于 SOA、BPM、ESB、EAI 和 MOM 的研究及应用。热衷于开源 SOA 项目,有志致力于 Mule、ServiceMix、ODE、ActiveBPEL、ActiveMQ、OpenJMS、Camel、CXF、XFire 以及 Tuscany 在中文社区的研究和推广工作。您可以通过 honnom (at) 163.com 联系到他。

感谢胡键对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2009 年 8 月 17 日 00:307686

评论

发布
暂无评论
发现更多内容

记录一次Neokylin_Server_V5系统已有分区的扩容操作

星河寒水

分区扩容

Ta想做一粒智慧的种子

白洞计划

ACM金牌选手整理的【LeetCode刷题顺序】

编程熊

Java 面试 算法 面经 笔试

我为什么要学习业务建模?

escray

极客时间 学习笔记 7月日更 如何落地业务建模

图解DDD建模六个问题与六个步骤

JAVA前线

Java 领域驱动设计 DDD

模块一作业

架构0期-Bingo

图像的模板匹配,Python OpenCV 取经之旅第 29 天

梦想橡皮擦

7月日更

架构训练营模块 1 作业 - 1班助教

Geek_97d901

京东智造云:在世界人工智能大会上,听到的工业智能生长的声音

脑极体

Linux之find命令的参数详解

入门小站

Linux

Kats-Facebook最新开源的时序分析工具

好孩子

PowerShell 正则表达式

耳东

PowerShell 7月日更

密码你真的了解吗

卢卡多多

7月日更

模块8作业

dwade

#架构实战营

长文图解:金字塔思维如何指导技术系统优化

JAVA前线

性能优化 金字塔 结构化思维

架构实战营 模块八课后作业

iProcess

架构实战营

在线脑图思维导图生成工具

入门小站

工具

从明天起开始认真更新了

IT蜗壳-Tango

7月日更

正式加入字节跳动!如何才能更容易拿到大厂Offer

欢喜学安卓

android 程序员 面试 移动开发

我赌一包辣条这是全网最详细的代码审计(没有之一)

网络安全学海

黑客 网络安全 信息安全 代码审计 漏洞分析

网络攻防学习笔记 Day70

穿过生命散发芬芳

网络攻防 7月日更

只更新代码,然后发布版本:基于 Serverless Devs 原子化操作阿里云函数计算

Serverless Devs

Javascript 的工作原理:引擎、运行时和调用堆栈概述

devpoint

JavaScript V8 7月日更

模块一作业

Always

架构实战营

TEMS模型--衡量你的人生资源

俞凡

认知

暑假期间快手将重点整治平台:短视频平台如何完善内容审核机制

石头IT视角

推荐系统的未来发展(三十三)

数据与智能

价值观 推荐系统

全面了解Java并发编程基础!超详细!

程序员的时光

Java 并发编程

【LeetCode】基于时间的键值存储Java题解

HQ数字卡

算法 LeetCode 7月日更

直接上干货!这些细节在Android面试上要注意了

欢喜学安卓

android 程序员 面试 移动开发

公司内部使用的数仓开发规范

白程序员的自习室

数据仓库 开发规范 数仓规范 7月日更

低代码的认知误区与落地实践

低代码的认知误区与落地实践

Mule的消息路由-InfoQ