综述
本文介绍建立一个在 Azure 上使用 Azure 服务总线, 高吞吐量短信平台的必要步骤。在这篇文章中提出的解决方案是在响应由客户的具体要求,建立一个基于 Windows Azure 技术的复杂远程信息处理应用。
在 Windows Azure 中的通讯服务
Windows Azure 平台通过不同的技术支持信息通信:
- 存储队列
- 服务总线
以下各段将给予每个类型的信息传递技术的概览。
存储队列
您可以使用 Windows Azure 中的队列,支持组件之间的异步通信。应用程序中的组件可以发布信息到一个队列。其他组件可以接收信息,并提供处理。队列中提供组件之间耐用的持久性存储,以及负载调平,负载均衡,和缩放的好处。
服务总线
Windows Azure 服务总线为广泛的交流,大型活动分布,命名和服务发布提供了一个托管,安全和广泛可用的基础设施。服务总线为 Windows Communication Foundation(WCF)和其他服务端点提供连接选项 - 包括 REST 端点 - 否则将很难或根本不可能达到。
针对 Windows Server 的服务总线是一组可安装的组件,为 Windows Sever 上提供 Windows Azure 服务总线的信息传递功能。适用于 Windows Server 的服务总线,使您能够在自我管理的环境,并在开发计算机上构建,测试和松耦合运行,信息驱动的应用程序。
通讯设计模式
使用存储队列构建松散耦合的信息传递方案
Windows Azure 的队列存储是一个用于存储大量的信息,可以在世界任何地方通过身份验证的调用使用 HTTP 或 HTTPS 访问服务。一个单一的队列的信息可高达 64KB 的大小,队列可以包含数百万条信息,高达 100TB 的总极限容量的存储帐户。常见队列存储用途包括:
- 建异步处理积压的工作
- Windows Azure Web 角色中传递信息到 Windows Azure Worker 角色
队列服务包含以下组件:
- URL 格式:队列可以通过以下 URL 格式访问:
http://.queue.core.windows.net/
下面的 URL 地址针对图中的队列:
http://myaccount.queue.core.windows.net/imagesToDownload - 存储帐户: 所有 Windows Azure 存储是通过一个存储帐户来访问的。存储帐户是访问队列的最高级别的命名空间。帐户内表格和队列内容存储的总大小不能超过 100TB。
- 队列: 队列中包含一系列信息。所有信息必须在队列中。
- Message: 一个信息, 无论任何格式的最大上限是 64KB
一个涉及多个组件之间的存储队列典型的架构设计模式,可以类似于下列:
在这个模式中,多个前端 Web Role 客户端发送信息到一个或多个存储队列,然后多个 Worker Role 实例从一个或多个队列读取信息及进行处理。
上述架构可以通过以下方式缩放:
- 存储队列:当信息并发数量增加,可以增加更多的存储队列
- Worker Role 实例:当信息的数量增加,Worker Role 实例的数量可以增加,从而使更多的计算资源可用于处理从队列中的信息。
不过,也有数个点,我们需要考虑到,以确保在现实中的架构可以灵活扩展:
- 存储帐户:一个单一的存储帐户的可扩展性目标是每秒 5000 条信息
- 存储队列:一个单一队列的可扩展性目标是能够处理最多每秒 500 条信息
由于上述两个存储帐户和存储队列的限制,如果我们希望能够同时处理大量信息,我们需要分割我们的存储基础设施成多个队列和 / 或存储帐户。
例如,一个客户端以每秒 1000 条信息的速度传送到 Azure 存储队列中,我们希望我们的解决方案以无积压及最快的速度来处理信息,那么我们就需要使用 4 个队列及分割我们我们的信息传递到 4 个队列中,看下图。
每个队列每秒可以处理 500 个信息,包括入口和出口,如果我们想每个队列尽快处理信息,那么每个队列入口和出口每秒应该处理不超过 250 个信息。既然我们的入口和出口每秒要处理 1000 条信息,因此,我们需要共 4 个队列。Web Role 客户端将需要分割信息及均匀地分配到可用的队列以分担工作负载。
在另一情况下,客户端以每秒 10,000 条的速度发送信息到存储队列,那么我们就需要创建多个存储帐户分担工作量。为了处理每秒 10,000 个信息,我们需要 2 个存储帐户,每个存储账户下, 需要 20 个队列。
利用服务总线主题发布订阅信息
服务总线主题和订阅支持信息发布 / 订阅传递通信模式。当使用主题和订阅,分布式应用程序个组件不直接彼此沟通,而是通过一个主题,充当中介的信息交换。
相比在服务总线队列,每个信息是由单一消费者处理的,主题及订阅使用发布 / 订阅模式提供了一个对多个形式的通信。一个主题注册多个订阅是可能的。当一个信息被发送到一个话题后,它会被每个订阅独立处理。您可以选择为基于每一个订阅基础上的主题, 注册过滤规则,它可以让你过滤 / 限制主题订阅中接收的信息。
服务总线主题和订阅,让你在横跨非常大量的用户和应用程序下, 可以扩展到处理一个非常大的信息数目。
自动转发链接服务总线实体来缩放单个主题
自动转发功能能让您使一个订阅或队列链到相同的服务命名下另一个队列或主题。当自动转发启用,服务总线自动移除被放置在第一个队列或订阅(源)的信息,并把它们放入第二队列或主题(目标)。
您可以使用自动转发向外扩充一个单独主题。服务总线限制一个主题订阅的数量。可以通过创建第二层主题容纳更多的订阅。需要注意的是,即使你并不受服务总线订阅数目的限制,增加了第二层的主题,可以提高你的主题的整体吞吐量。
当把别主题串联起来而得到一个拥有许多订阅的复合主题时,建议你有一个中等数量的第一级主题订阅及许多二级的主题订阅。例如,第一级以 20 个主题订阅,彼等各自链接到第二层 200 个主题订阅,允许比第一层 200 个主题订阅每个链接有 20 个第二层主题订阅更高的吞吐量。
参考案例
下面的示例使用中国的 Azure 公共云计算平台,及解释利用服务总线在中国 Azure 平台专门所需的编码。使用的 Azure SDK 2.0 及此示例介绍了 Windows Azure 服务总线在 Azure SDK 2.0 提供的,新的事件驱动编程模式。
Scenario 场景
上面的例子是基于建立连接汽车远程信息处理方案,其中远程信息处理单位不断发送车数据到云端。各种车辆数据还有多个接收器。为了支持非常高吞吐量的汽车数据到云中,例子采用了 2 层自动转发的向外扩展做法。在第一层,每款车数据主题有少量订阅, 例如 10 个,每个订阅自动转发到第二层主题。每个第二层主题然后有多个订阅 - 接收器 Worker Roles. 每个接收器 Worker Role 包含多个实例。
取决于实际吞吐量要求,第一层和第二层的主题可以独立扩展, 增加的主题及订阅的数量。以及每个接收器 Worker Role 可以不断增加对 Worker Role 实例的向外扩展。
连接到服务总线
中国 Azure 比全球 Azure 平台有不同端点 URL,全球 Azure 所提供的一些文档和示例代码并不适合使用在 Azure 中国环境。下面是需要创建连接到 Azure 中国服务总线的示例代码:
NamespaceManager namespaceManager; MessagingFactory messagingFactory; string namespaceAddress = "<your-namespace>"; string issuerName = "<your-issuer-name>"; string issuerKey = "<your-issuer-key>"; string endpointuriAddress = string.Format("sb://{0}.servicebus.chinacloudapi.cn", namespaceAddress); string stsendpointAddress = string.Format("https://{0}-sb.accesscontrol.chinacloudapi. cn/", NamespaceAddress); TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey, new Uri(stsendpointAddress)); namespaceManager = new NamespaceManager(new Uri(endpointuriAddress), tp ); messagingFactory = MessagingFactory.Create(endpointuriAddress, tp);
创建主题及自动转发订阅
public bool CreateTopic(string topicNamePrefix, int topicSuffixIndex, int NumOfTopics) { bool success; try { var topic = this.namespaceManager.CreateTopic(string.Format("{0}_{1}", topicNamePrefix, topicSuffixIndex.ToString())); for (int i = 0; i < NumOfTopics; i++) { CreateSubscription(topicSuffixIndex, i, topic.Path); } success = true; } catch (Exception) { success = false; } return success; } public bool CreateSubscription( int topicSuffixIndex, int subscriptionIndex, string srcTopic) { bool success = false; string destTopic = srcTopic + subscriptionIndex.ToString(); TopicDescription dTopic = null; try { dTopic = this.namespaceManager.GetTopic(destTopic); } catch (Microsoft.ServiceBus.Messaging.MessagingEntityNotFoundException) { dTopic = null; } if (dTopic == null) { dTopic = this.namespaceManager.CreateTopic(destTopic); var subscription = this.namespaceManager.CreateSubscription(dTopic.Path, " Subscription_0" ); } SubscriptionDescription srcSubscription = new SubscriptionDescription(srcTopic, string.Format("Subscription{0}_{1}", topicSuffixIndex.ToString(), subscriptionIndex.ToString())); srcSubscription.ForwardTo = dTopic.Path; var ForwardSubscription = this.namespaceManager.CreateSubscription(srcSubscription, new SqlFilter(string.Format("PartitionID='{0}'", subscriptionIndex.ToString()))); return success; }
执行下面的代码片段创建的主题和订阅。下面的代码执行的结果将创建 5 个第一层主题及 10 个第二个层主题。每个第一层主题将有 10 个自动转发订阅到 10 个第二层主题。
for (int i = 0; i < 5 i++) { CreateTopic(“SBMessageTopic”, i, 10); }
当从 Azure 管理门户网站查看时,可以查看以下主题:
第一层有 10 个主题订阅:
第二层主题包含 2 个订阅为 2 个接收器的 Work Role。
发送信息个到服务总线主题
下面的示例代码并行发送一批信息到其中一个第一层主题:
public bool SendMessage(string msgid, string message, string label, int nIterations, int NumOfPartitions ) { bool success = false; string topicName = “sbmessagetopic_0”; TopicClient topicClient = this.messagingFactory.CreateTopicClient(topicName); List<CustomMessage> MessagesList = new List<CustomMessage>(); for (int i = 0; i < nIterations; i++) { CustomMessage customMessage = new CustomMessage() { Body = message, Date = DateTime.Now, Label = label , MSGID=msgid}; MessagesList.Add(customMessage); } Parallel.ForEach<CustomMessage>(MessagesList, new Action<CustomMessage>( (CarDataMessage) => { BrokeredMessage bm = null; try { bm = new BrokeredMessage(CarDataMessage); bm.Properties["PartitionID"] = RandomDigit(1, NumOfPartitions); topicClient.Send(bm); success = true; } catch (Exception) { // TODO: do something } finally { if (bm != null) { bm.Dispose(); } } } )); return success; } public static string RandomDigit(int size, int length) { Random random = new Random(); const string digitString = "0123456789"; var chars = Enumerable.Range(0, size) .Select(x => digitString[random.Next(0, length)]); return new string(chars.ToArray()); } public class CustomMessage { private string msgid; private DateTime date; private string body; private string label; public DateTime Date { get { return this.date; } set { this.date = value; } } public string Body { get { return this.body; } set { this.body = value; } } public string Label { get { return this.label; } set { this.label = value; } } public string MSGID { get { return this.msgid; } set { this.msgid = value; } } }
下面的代码片段执行发送信息指令,它发送 1000 个信息到第一层主题并从第一层主题订阅随机转发每个信息到第二层主题其中一个:
SendMessage(“Your-Msg-ID”, “Message Text” , “Message Label”, 1000, 10);
同时发送大量的信息时,建议利用大型 Worker Role 并且有多个实例,以提高的并行度。
接收信息
下面的示例代码在 Work Role 启动由 Azure SDK 2.0 提供的事件驱动信息泵:
public void StartMessagePump(string topicName, string subscriptionName) { SubscriptionClient subscriptionClient = this.messagingFactory. CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock); var eventDrivenMessagingOptions = new OnMessageOptions(); eventDrivenMessagingOptions.AutoComplete = true; eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived; eventDrivenMessagingOptions.MaxConcurrentCalls = 256; subscriptionClient.OnMessage(OnMessageArrived, eventDrivenMessagingOptions); }
当信息到达时, OnMessage()被调用:
/// <summary> /// This event will be called each time a message arrives. /// </summary> /// <param name="message"></param> private static void OnMessageArrived(BrokeredMessage message) { var custmsg = message.GetBody<CustomMessage>(); System.Diagnostics.PerformanceCounter perfCounterReceived = new System.Diagnostics.PerformanceCounter("ServiceBusDemo", "Message Receive/ Sec", RoleEnvironment.CurrentRoleInstance.Id, false); if (perfCounterReceived != null) { perfCounterReceived.ReadOnly = false; perfCounterReceived.IncrementBy(1); } Trace.WriteLine(string.Format(" > {0} - Received message for Vehicle: {1} (Thread: {2})", DateTime.Now, custmsg.MSGID, Thread.CurrentThread.ManagedThreadId)); if (custmsg.MSGID == string.Empty) throw new InvalidOperationException("Invalid Message Id: " + custmsg.MSGID); }
和下面的功能在发生错误时被调用时:
/// <summary> /// Event handler for each time an error occurs. /// </summary> /// <param name="sender"></param> /// <param name="e"></param> static void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e) { if (e != null && e.Exception != null) { Trace.WriteLine(string.Format(" > Exception received: {0}", e.Exception.Message)); } }
下面的代码片段是 Worker Role 执行 run()方法进行订阅到第二层主题。这个例子进行订阅 10 个第二层主题:
for (int i = 0; i < 10; i++) { string topic = string.Format("sbmessagetopic_0{0}", i.ToString()); StartMessagePump(topic, "Subscription_1"); } while (true) { Thread.Sleep(10000); Trace.TraceInformation("SBMessageConsumer1 Working", "Information"); }
测试服务总线吞吐量
当信息到达时,OnMessage()功能被调用并且它会增加自定义性能计数器来记录并发接收的信息数量。查看时,通过在实时使用 Perfmon 工具进行监控。
上面的测试是通过使用一个 2 芯信息产生者 Worker Role 实例和一个 2 芯信息的接收者实例,在 1 个第一层主题和 10 个第二层主题。吞吐量保持在每秒 1000 - 1200 信息之间。
该解决方案可以通过加入多个第一层主题,以及更多的第二层主题进一步向外扩展。当适当数量的信息接收器的情况下,服务总线解决方案可以轻松地向外扩展支持每秒数千或数万的信息吞吐量。
小结
本文描述了在 Azure 上处理应用程序和设备间进行信息传递的不同信息传递组件。此外,这篇文章亦介绍了不同的方式在 Azure 上来扩展信息传递的基础设施,以向外扩展的方法来处理大量信息传递并发。
感谢马国耀对本文的审校。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ )或者腾讯微博( @InfoQ )关注我们,并与我们的编辑和其他读者朋友交流。
评论