前言
Rails 已在数据库驱动型 Web 应用领域站稳脚跟。而集成了消息处理的插件式框架——ActiveMessaging,更扩大了 Rails 的应用范围。利用 ActiveMessaging 和 Rails,我们可以轻松集成全异系统(如 MQ 消息发送模块、J2EE Web 应用和后台常驻任务等),或创建事件和消息驱动型架构。有人这样总结过:ActiveMessaging 就像 ActiveRecord 为数据库所做的一切那样,正为消息处理而努力。本文将介绍 ActiveMessaging、与其相关的技术,以及在 Rails 中的使用方法。
消息处理标准与代理
实现消息处理,首先需要三个方面的支持:通讯协议、协议服务端代理、协议客户端库。从技术角度而言,剩下的工作就是开发一个实现消息发送和接收的系统了。在本文的例子中,我们将把服务端和客户端合二为一,放在同一个程序中。ActiveMessaging 支持 Stomp 协议。这是一个通过无线连接,使用 TCP/IP,实现文本型消息流式传输的协议标准。其设计理念要求尽可能将协议简化,以求无论利用哪种语言开发客户端或服务端,都能相当轻松(比如 Ruby Stomp 客户端代码,连带注释和空行,总行数仅约 400)。因此,它几乎可以集成任何系统。
Apache ActiveMQ 消息代理缺省支持Stomp ,当然也可以使用其他代理,而且选择是越来越多。比如最近公布的S tompConnect 项目,计划向绝大多数开源消息处理类产品开放 Stomp 和 ActiveMessaging,并实现任何 JMS 代理到 Stomp 代理的转换。在客户端,ActiveMessaging 支持基于适配器模式(Adapter Pattern)的协议,就 Stomp 而言,它使用在 Stomp RubyGem 基础上扩展而成的适配器。未来可能支持的适配器更多,但目前只有 Stomp。
本文中,我们使用还未正式发布的 ActiveMQ4.2 代理。ActiveMQ 在所有平台上都有安装说明;如果你有Java1.4 或更高版本,那么可以简化安装过程:
cd /usr/local/src<br></br> #unix and cygwin<br></br> wget http://people.apache.org/repo/m2-snapshot-repository/org/apache/activemq/apache-activemq/4.2-SNAPSHOT/apache-activemq-4.2-20070221.081507-10-src.tar.gz<br></br>#os x<br></br>curl -O http://people.apache.org/repo/m2-snapshot-repository/org/apache/activemq/apache-activemq/4.2-SNAPSHOT/apache-activemq-4.2-20070221.081507-10-src.tar.gz<p>cd ..</p><br></br>tar xvfz apache-activemq-4.2-20070221.081507-10-src.tar.gz<br></br>cd apache-activemq-4.2-incubator-SNAPSHOT<br></br> ./bin/activemq
这就 OK 了!缺省情况下,ActiveMQ 对 Stomp 的支持会自动配置完成;若自动配置不成功,你可以在 conf/activemq.xml 中添加如下设置:
<transportConnectors> <br></br> ...<br></br> <!-- Add this line --><br></br> <transportConnector name="stomp" uri="stomp://localhost:61613"/><br></br></transportConnectors>
以上是 ActiveMQ 所需的最小化配置。更详细的情况可以参考 ActiveMessaging 的 ActiveMQ 维客,以及 ActiveMQ 用户手册。
Rails 的配置
Rails 1.1 和 MySql 准备就绪后,还需要安装两个工具: daemons 和 Stomp :
sudo gem install daemons<br></br>sudo gem install stomp
现在让我们新建一个 Rails 工程“MessageMonster”以及对应数据库,最后安装 ActiveMessaging 插件。
cd /usr/local/dev<br></br>rails MessageMonster<br></br>mysqladmin create messagemonster_development -u root<br></br>cd MessageMonster<br></br>script/plugin install http://activemessaging.googlecode.com/svn/trunk/plugins/activemessaging
所有准备工作就绪,现在可以写程序了。
消息处理
首先需要创建“处理器”,用 ActiveMessaging 的术语来说,就是一个支持消息目标者有序接收和处理消息的类。它在设计模型中是必不可少,相当于消息驱动的控制器,代码量也占最大比例。
如你所愿,处理器可以自动生成:
script/generate processor PersistMessage
第一次运行“generate”,会输出如下结果:
create app/processors<br></br>create app/processors/persist_message_processor.rb<br></br>create config/messaging.rb<br></br>create config/broker.yml<br></br>create app/processors/application.rb<br></br>create script/poller
下面,我们具体讨论上述各行输出结果的含义。
- app/processors:全部处理器所在目录。
- app/processors/persist_message_processor.rb:按缺省配置生成的新处理器。
- app/processors/application.rb:供新处理器继承的通用超类。它实现了一些通用功能,如错误处理。
- config/broker.yml:代理配置,指明了连接消息代理的方法。类似 database.yml,但它是用于消息代理的。
- config/messaging.rb:消息传输用到的其他配置,如目的地、消息头。
- script/poller:ActiveMessaging 用以监听消息的后台程序。后文对此还有说明。
首先考察 broker.yml,注意各项参数,确保代理配置正确:
<span>development</span><span>:</span><br></br><span>adapter</span><span>:</span> stomp<br></br><span>login</span><span>:</span> <span>""</span><br></br><span>passcode</span><span>:</span> <span>""</span><br></br><span>host</span><span>:</span> localhost<br></br><span>port</span><span>:</span> 61613<br></br><span>reliable</span><span>:</span> true<br></br><span>reconnectDelay</span><span>:</span> 5<br></br> ...
以上只包括了 ActiveMQ 的缺省配置项。如果是正式发布的产品,远程运行代理,则还需要增加访问用户和密码项。
接下来再看看 persist_message_processor.rb:
<span>class </span><span>PersistMessageProcessor</span> <span><</span> <span>ApplicationProcessor</span><br></br><span>subscribes_to</span> <span>:persist_message</span><p><span>def </span><span>on_message</span><span>(</span><span>message</span><span>)</span><span>logger</span><span>.</span><span>debug</span> <span>"</span><span>PersistMessageProcessor received: </span><span>"</span> <span>+</span> <span>message</span></p><p><span>end</span><span>end</span></p>
上述代码实现了:persist_message 消息的订阅,并指定了方法 on_message。当:persist_message 消息到来时,ActiveMessaging 将回调 on_message。参数 message 负责按字符串格式传递消息内容;依靠 @message,还可以直接访问 Stomp 消息对象。通过再次调用 subscribes_to,处理器可以订阅更多消息。
:persist_message 是处理器订阅的目标消息的逻辑名。在 config/messaging.rb 中定义目标消息:
<span>ActiveMessaging</span><span>::</span><span>Gateway</span><span>.</span><span>define</span> <span>do</span> <span>|</span><span>s</span><span>|</span><br></br><span>s</span><span>.</span><span>queue</span> <span>:persist_message</span><span>,</span> <span>'</span><span>/queue/PersistMessage</span><span>'</span><br></br><span>end</span>
messaging.rb 类似于 Rails 中的 routes.rb。它主要实现逻辑名(:persist_message)到代理(/queue/PersistMessage)中目标消息的映射。当然还有很多其他配置项,但就本例而言,自动生成的缺省配置已经足够了。
接下来,我们要告知处理器预先存储消息以供处理,如下创建一个消息模型:
script/generate model message
现在,用如下两行代码更新 db/migrate/001_create_messages.rb:
<span>create_table</span> <span>:messages</span> <span>do</span> <span>|</span><span>t</span><span>|</span><br></br><span>t</span><span>.</span><span>column</span> <span>:body</span><span>,</span> <span>:text</span><br></br><span>t</span><span>.</span><span>column</span> <span>:received_date</span><span>,</span> <span>:datetime</span><br></br><span>end</span>
调用 migrate 创建该数据表:
rake db:migrate
接下来修改处理器的 on_message 方法,这将使用到新的消息模型:
<span>def </span><span>on_message</span><span>(</span><span>message</span><span>)</span><br></br><span>logger</span><span>.</span><span>debug</span> <span>"</span><span>PersistMessageProcessor received: </span><span>"</span> <span>+</span> <span>message</span><br></br><span>my_message</span> <span>=</span> <span>Message</span><span>.</span><span>create</span><span>(</span><span>:body=</span><span>></span><span>message</span><span>,</span> <span>:received_date=</span><span>></span><span>DateTime</span><span>.</span><span>new</span><span>)</span><br></br><span>end</span>
看到这里,我们仔细想想会发现,其实没有什么新东西,和在 Rails 控制器中编码完成相同功能的过程差不多。因此使用 ActiveMessaging 时,Rails 应用环境的所有便利性都可供利用。
处理器一旦捕获消息,就可将它抛出(像瓶装后逐个抛售那样)等待处理。
发送消息
发送消息时,我们会在同一个应用程序中用到 Rails 视图和控制器:
script/generate controller SendMessage index
编辑视图,提交消息文本,显示预先持久化的消息的可更新列表:
<span> <span>style=</span><span>"color: green"</span><span>></span><span></span><span></span> <span></span>Send Message<span></span> <span></span> <span><span>></span>Message<span></span> <span></span> <span></span> <span></span> <span></span> <span></span><tbody></tbody><tr></tr></span></span><td> </td> <span></span><span></span><td> </td> <span></span><span></span>
编辑控制器,发送消息。ActiveMessaging 包含一个提供消息简单发布方法的小型类 MessageSender,利用它可从任何类轻松实现消息发送。publishes_to 定义了消息接收者,但除了验证 config/messaging.rb 中所配置目标者的有效性,不干别的事。如下是控制器使用这些方法的过程:
<span>require</span> <span>'</span><span>activemessaging/processor</span><span>'</span><p><span>class </span><span>SendMessageController</span> <span><</span> <span>ApplicationController</span></p><p><span>include</span> <span>ActiveMessaging</span><span>::</span><span>MessageSender</span></p><br></br><span>publishes_to</span> <span>:persist_message</span><p><span>def </span><span>index</span><span>@messages</span> <span>=</span> <span>Message</span><span>.</span><span>find</span> <span>:all</span></p><p><span>end</span><span>def </span><span>new</span><span>@message</span> <span>=</span> <span>params</span><span>[</span><span>:message</span><span>]</span></p><br></br><span>publish</span> <span>:persist_message</span><span>,</span> <span>@message</span><br></br><span>flash</span><span>[</span><span>:notice</span><span>]</span> <span>=</span> <span>"</span><span>'<span>#{@message}</span>' sent</span><span>"</span><br></br><span>@messages</span> <span>=</span> <span>Message</span><span>.</span><span>find</span> <span>:all</span><br></br><span>render</span> <span>:action</span> <span>=></span> <span>'</span><span>index</span><span>'</span><p><span>end</span><span>end</span></p>
至此,大功告成。控制器将在用户提交时发出消息,当你下次浏览或者刷新此页面时,将看到被处理器保存的消息。
例子运行
ActiveMessaging 在独立进程中运行,由 script/poller 控制。因此仅仅启动 Rails 服务器,是无法接收到消息的。运行这个例子,我们总共需要启动三个进程:ActiveMQ 代理、Rails 服务器(发送消息)和 script/poller(接收消息)——每个进程都是系统的独立节点。
启动 ActiveMQ:
cd /usr/local/apache-activemq-4.2-incubator-SNAPSHOT<br></br> ./bin/activemq
启动 Rails:
cd /usr/local/dev/MessageMonster<br></br> script/server
启动 ActiveMessaging:
cd /usr/local/dev/MessageMonster<br></br>script/poller run``<span face="Arial">script/poller 遵循后台程序的运行规则。处理让进程保持运转的“run”参数外,还可以传递“start”、“stop”、“restart”和“status”。另外,为提高消息处理效率,也能同时运行多个进程实例。</span>
现在,打开测试页面(如 http://localhost:3000/send_message ),输入消息吧,比如经典的“Hello World”。消息送达时,你应该在 poller 窗口看到如下信息:
Loading /usr/local/dev/MessageMonster/app/processors/application.rb<br></br>Loading /usr/local/dev/MessageMonster/app/processors/persist_message_processor.rb<br></br>=> Subscribing to /queue/PersistMessage (processed by PersistMessageProcessor)<br></br>=> All subscribed, now polling<br></br>PersistMessageProcessor received: Hello World ``<span face="Arial">poller 总会显示启动时加载的处理器,以及代理上的订阅请求。如果代理不可用或不存在,系统会根据 broker.yml 的配置尝试别的代理。另外,你还能在 poller 中看到 ActiveMessaging 处理器接收的、来自于 Rails 的消息。而且,如果你使用“Refresh List”功能,将会在 Rails 视图中看到被 ActiveMessaging 处理器持久化的消息。</span>
结束语
希望你能喜欢这篇介绍 ActiveMessaging 的文章,相信你已经感觉到了它的潜力和易用性。其实,ActiveMessaging 还有很多本文没有提及的高级功能,如优化的异常处理、多 poller 运行群组处理器,以及异步消息中 Stomp 和 ActiveMQ 消息头的处理方法、JMS 集成和可选订阅等。是不是相当惊讶呢,现在不会认为 ActiveMessaging 无法满足你的需要了吧!
- 若要了解更多信息,我推荐你细读 ActiveMessaging 站点,尤其是其中的维客和源代码。
- 我们还在继续改进 ActiveMessaging,欢迎你参加我们的讨论,多提意见和建议。查看英文原文: Introduction to ActiveMessaging for Rails
作者简介:Andrew Kuklewicz,具有 10 年软件开发经验,PRX(Public Radio Exchange,提供电台内容分发、同行评审和许可的免费服务)的资深 Web 开发师。业余时间,他是 Ruby Stomp 项目成员和 ActiveMessaging 的当前维护者,同时还是 Ruby On Rails、Plone 和 Java 等开源项目的活跃分子。 译者简介:罗小平,上海某大型公司互联网中心技术总监, CSDN 大版主,网络 ID 为 lxpbuaa(桂枝香在故国晚秋),曾著有《Delphi 精要》一书。个人博客为 http://blog.csdn.net/lxpbuaa ,现在 CSDN 主持翻译国外专家 Herb Sutter 的中文博客。他的 Email 和 MSN 为 lxpbuaa AT 263.net 。
评论