介绍
并发与协调运行时(Concurrency and Coordination Runtime,CCR)是一个.NET 平台上的异步信息传递类库,提供了一套细小而强大的基础功能,能够使用不同的方式来组织应用程序。应用程序通过有效使用 CCR 可以获得更好的响应能力,以及更好的伸缩性及容错性。而它最神奇的地方则在于,开发人员获得这些便利的同时,还减少(甚至完全消除)了对线程、锁、互斥体(mutex)或其他同步元素的直接操作(或捕获错误)。
如果您的应用程序是单线程的,CCR 可以使您的程序提高响应能力,并且更充分利用 CPU 核心——同时让您的代码库从概念上保持整洁。如果您的程序已经启用了多线程,那么 CCR 能够在简化您代码库的同时,保持(甚至改进)程序的吞吐量。
简单说来,CCR 提供了如下功能:
- 一个简单而高性能的消息传递实现。使用非常轻量级且类型安全的通道,以面向 Action 的视角把各种对象连接了起来。
- 一套基础的调度约束。调度几乎就是 CCR 的全部。您可以创建各种任务,把消息传递给进程中的其他组件,并且通过一些叫做仲裁器(Arbiter)的对象来声明一些约束,以此对请求进行处理。在执行您的代码之前,CCR 能够保证这些约束已经得到满足。
- 为失败处理提供了一种更好的模型。CCR 提供了“Causality”,意味着为一系列相关异步子任务提供一个上下文,这样某个任务失败时(例如抛出了异常),那么这样的失败能在单独的地方进行隔离处理,这与发起任务的线程并没有关系。
- 更好的使用已有的(或将来会有的)计算能力。CCR 能够使用现有的线程池进行调度,如果您需要的话,也可以使用它自带的实现,这在某些情况下能够带来更好的性能。自然,这个机制只会对您的代码产生细微的影响。
- 使异步 IO 操作得以简化。提高独立进程的伸缩性与性能的关键,最终往往归结于高效的 I/O 密集型操作。I/O 密集型操作往往会比计算密集型操作要慢许多倍,而一个阻塞的 I/O 操作会浪费有效的资源(在这里就是线程),使它们无法处理其他等待的任务。使用异步的 I/O 操作能够释放这些资源,让它们去处理其他任务,直到 I/O 操作完成。然而,一系列的异步操作需要将其“开始”及“完成”分开,这使编码难度变得很大。CCR 使用特别的 C#迭代器实现,使开发人员能够轻松控制此类操作。
通过“异步消息传递”机制,我们的组件通过发送数据的方式与另一个组件进行通信,更值得一提的是,数据与后续回复没有确定的临时关系。一个已发送的消息可能需要在未来某个时候才会得到处理,也只有到那个时候消息才会得到回复。
这种异步消息传递模型是大部分情况下跨进程(inter-process)计算的基础,不过与之相比,在现实应用中使用 CCR 来进行纯粹的进程内部(intra-process)通信往往可以得到更好的保证,不像前者在很多情况下都可能失败。因此,CCR 不仅可以用于低级 I/O 操作,对于构建伸缩性强的分布式系统也可以提供很好的辅助。
CCR 的基础类型
CCR 由几种基础类型组成:
- 任务(Task):任意一段用于执行的代码。
- 任务队列(TaskQueue):确切地说,是“分发队列(DispatcherQueue)”。一个待执行任务的列表。一个 CLR 线程池或 CCR 线程池(即 Dispatcher)的线程会从队列中取出任务并加以执行。
- 端口(Port):进程内部连接各组件的消息队列。简单来说这只是个链表(linked-list),生成者在端口内放置消息。CCR 提供了泛型的重载,可以提供类型安全的操作。
- 仲裁器(Arbiters):CCR 的基础元素,它们将端口和任务连接起来。它们定义了一些约束,用于在消息到达端口后确定需要创建的任务,以及使用哪个任务队列进行接受。CCR 中内置了多种仲裁器,其中大部分可以进行组合,以此获得更灵活的功能。
了解了这些基本概念之后,我们来看一些简单的 CCR 代码。首先,我们来定义一个简单的 C#控制台应用程序,它会用来运行所有的示例。注意在这个程序中,我们使用了 CCR 自定义的线程池(Dispatcher)并与我们的任务队列绑定。这意味着队列中任务会被自定义线程池中的线程执行。
<span color="#0000ff">static void</span> Main(<span color="#0000ff">string</span>[] args) { <span color="#0000ff">using (var</span> dr = <span color="#0000ff">new</span> <span color="#008080">Dispatcher())</span> { <span color="#0000ff">using (var</span> taskQueue = <span color="#0000ff">new</span> <span color="#008080">DispatcherQueue</span>(<span color="#ff0000">"samples"</span>, dr)) { <span color="#008000">// Examples will go here...<p> // Need a blocking call to prevent the application</p><br></br> // exiting.<br></br></span> <span color="#008080">Console</span>.ReadLine(); } } }
尽管示例中只使用了一个任务队列,但是在实际应用中还是建议使用多个队列。CCR 在获取任务时会使用轮询策略来访问多个任务队列,避免任何一个队列处于饥饿状态。
首先,我们直接把一个任务放入队列。这是 CCR 中执行一个任务最简单的方法,我们这里连端口都没有用到。Arbiter 类包含了一系列简化开发的方法,例如 FromHandler 直接从一个委托对象来创建任务——在这里我们使用匿名方法来构建该对象。这样任务就被放入了任务队列,可以由分发器来执行了。
<span color="#008000">// Enqueue a task directly</span> taskQueue.Enqueue(<span color="#008080">Arbiter</span>.FromHandler(() => <span color="#008080">Console</span>.WriteLine(<span color="#ff0000">"Hello, world."</span>)));
大多数情况下我们不太会如此直接地向队列中放入任务,一般来说总有一个端口在工作。在下一段代码中,我们会定义一个端口,一个仲裁器,然后向端口里发送消息。这个示例中我们使用 String 强类型的端口,这样委托的签名也需要接受一个字符串。
<span color="#008000">// Post a message to a port to schedule a task.</span> <span color="#0000ff">var</span> port = <span color="#0000ff">new</span> <span color="#008080">Port</span><<span color="#0000ff">string</span>>(); <span color="#008080">Arbiter</span>.Activate(taskQueue, port.Receive(<span color="#008080">Console</span>.WriteLine)); port.Post(<span color="#ff0000">"Hello (again), world</span>");
这里发生了一些不那么简单的东西,需要花一点时间才能了解整个过程。port.Receive() 调用创建了一个可能是最简的仲裁器,或者说是一个“接受者”,一个消息达到端口时它即会生效。消息到达之后,便会创建一个任务,这个任务的功能是调用委托对象,并使用刚才的消息作为参数。 Arbiter.Activate() 调用将创建的任务和特定的任务队列绑定在一起。
要理解 CCR 仲裁器,最关键的一点是它们永远不会阻塞线程。一旦接收器无法获得数据时,线程就会被释放,可用于处理其他正在等待的任务。
仲裁器可以在任何时间创建,这是 CCR 中一个重要的概念。因此,如下所示,即使我们把上面示例中最后两行代码的次序交换,其效果也是一样的。
<span color="#008000">// Post a message to a port to schedule a task.</span> <span color="#0000ff">var</span> port = <span color="#0000ff">new</span> <span color="#008080">Port</span><span color="#0000ff"><string></string></span>(); port.Post(<span color="#ff0000">"Hello (again), world</span>"); <span color="#008080">Arbiter</span>.Activate(taskQueue, port.Receive(<span color="#008080">Console</span>.WriteLine));
现在我们来少许修改一下示例——我们向端口中放入两条消息再使接收器生效,来看看会发生什么……
<span color="#008000">// Post a message to a port to schedule a task</span>. <span color="#0000ff">var</span> port = <span color="#0000ff">new</span> <span color="#008080">Port</span><span color="#0000ff"><string></string></span>(); port.Post(<span color="#ff0000">"Hello (again), world</span>"); port.Post(<span color="#ff0000">"Hello (thrice), world</span>"); <span color="#008080">Arbiter</span>.Activate(taskQueue, port.Receive(<span color="#008080">Console</span>.WriteLine));
现在如果您运行程序,就会发现只打印了一条消息。这是因为 port.Receive() 调用是一个扩展方法,它简化了以下语法,但是并不完全相等:
<span color="#008080">Arbiter</span>.Activate( taskQueue, <span color="#008080">Arbiter</span>.Receive(<span color="#0000ff">false</span>, port, <span color="#008080">Console</span>.WriteLine));
这里最为关键的是传递给 Arbiter.Receive() 调用的第一个(Boolean)参数。它表明这个接收器是临时的,处理完一条消息后就会抛弃。如果我们希望处理所有达到该端口的消息,我们可以将这个参数设为 true。
<span color="#008000">// Post a message to a port to schedule a task</span>. <span color="#0000ff">var</span> port = <span color="#0000ff">new</span> <span color="#008080">Port</span><span color="#0000ff"><string></string></span>(); port.Post(<span color="#ff0000">"Hello (again), world</span>"); port.Post(<span color="#ff0000">"Hello (thrice), world</span>"); <span color="#008080">Arbiter</span>.Activate( taskQueue, <span color="#008080">Arbiter</span>.Receive(true, port, <span color="#008080">Console</span>.WriteLine));
上面的代码有时候会打印出奇怪的结果——两行内容的顺序不一致了。这究竟是怎么回事呢?
在 CCR 中,一旦某个仲裁器(这里是个“接受”操作)被满足之后,它会创建一个任务来处理相关消息。除非这个仲裁器被嵌套在另一个更大的组合内,这些任务都会被放入任务队列中等待执行。在我们上面的示例中,这个持久地接受器会立即分别为两条消息产生一个任务。当存在可用线程的时候,这两个任务将并发被处理,因此并不保证两者的顺序。
CCR 线程池的实现与 CLR 线程池有几点不同。最重要的一点是它包含固定数量的线程,这在创建时便确定下来。如果线程执行的操作不会阻塞,那么就不会有什么问题。但是如果您必须发起阻塞的请求,那还是使用 CLR 线程池对任务队列进行调度为好,因为它能够动态的增长和收缩。这样的任务队列可以使用 DispatcherQueue 默认构造函数来创建。
有几种办法可以保证消息的顺序。可能最简单的方法就是在循环中使用一个临时接收器,这样就能一次只处理一条消息。幸运的是,CCR 包含了一个叫做迭代式任务(Iterative Task)的强大的机制,可以让我们较为自然的实现这个要求。这需要使用 C#迭代器功能,我们来看一个示例:
首先,我们将目前的 Arbiter.Activate 替换成以下调用:
<span color="#008080">Arbiter</span>.Activate( taskQueue, <span color="#0000ff">new</span> <span color="#008080">Arbiter</span><<span color="#008080">Arbiter</span><<span color="#0000ff">string</span>>>(port, ProcessMessages));
这段代码建立了一个名为 ProcessMessages 的迭代式任务,定义如下:
<span color="#0000ff">static</span> <span color="#008080">IEnumerator</span><<span color="#008080">ITask</span>> ProcessMessages(<span color="#008080">Port</span><<span color="#0000ff">string</span>> port) { <span color="#0000ff">while</span> (<span color="#0000ff">true</span>) <span color="#0000ff">yield return</span> port.Receive(<span color="#008080">Console</span>.WriteLine); }
这个方法为一个无限循环,等待(但不阻塞)接受操作以获得满足。接受到消息时委托将被调用,并继续循环。如果我们希望在端口接受到一个空字符串时跳出循环,我们可以编写如下代码(请注意我们使用了 Lambda 表达式构建了一个匿名委托来处理消息):
<span color="#0000ff">static</span> <span color="#008080">IEnumerator</span><span color="#008080"><itask></itask></span>> ProcessMessages(<span color="#008080">Port</span><string></string> port)<br></br> {<br></br><span color="#0000ff">bool</span> fDone = <span color="#0000ff">false</span>;<br></br><span color="#0000ff">while</span> (!fDone)<br></br> {<br></br> yield return port.Receive(message =><br></br> {<br></br><span color="#0000ff">if</span> (<span color="#008080">String</span>.IsNullOrEmpty(message))<br></br> fDone = <span color="#0000ff">true</span>;<p><span color="#0000ff">else</span><span color="#008080">Console</span>.WriteLine(message);</p><br></br> });<br></br> }<br></br><span color="#008080">Console</span>.WriteLine("<span color="#ff0000">Finished</span>");<br></br> }
迭代器是 CCR 工具箱中非常强大的工具——它大大简化了顺序调用的异步操作的编码工作,使非阻塞操作的使用非常接近于同步调用方式。例如,一个级别高的任务可以返回如下的 ProcessMessages():
<span color="#0000ff">static</span> <span color="#008080">IEnumerator</span><<span color="#008080">ITask</span>> TopLevelTask(<span color="#008080">Port</span><<span color="#0000ff">static</span>> port) { <span color="#008000">// Yield to the nested task</span> <span color="#0000ff">yield return</span> new <span color="#008080">IterativeTask<port></port></span><<span color="#0000ff">string</span>>>(port, ProcessMessages); <span color="#008080">Console</span>.WriteLine(<span color="#ff0000">"Finished nested task.</span>"); }
到目前为止,我们只看到了简单接受器的使用——当单个消息到达单个端口时仲裁器便会安排一个任务。那么现在就可以来看一下更高级的仲裁方式了——其提供了一种黏着剂,可以将接受器进行嵌套以建立更强大的功能。
“选择器(Choice)”是其中最常用的功能之一,它会从多个接受器选择一个,并且只选择其中一个接受器来处理。例如,以下迭代任务会在处理前等待一个字符串或一个信号。
<span color="#0000ff">static</span> <span color="#008080">IEnumerator</span><<span color="#008080">ITask</span>> ProcessChoice(PortSet<<span color="#0000ff">string</span>, EmptyValue> portSet) { <span color="#0000ff">bool</span> fDone = <span color="#0000ff">false</span>; <span color="#0000ff">while</span> (!fDone) { <span color="#0000ff">yield return</span> portSet.Choice( message => <span color="#008080">Console</span>.WriteLine(message), signal => fDone = <span color="#0000ff">true</span>); } <span color="#008080">Console</span>.WriteLine("<span color="#ff0000">Finished</span>"); }
选择器一般用于确定一个异步操作的成功或者失败,不过您也可以用它来选择一个或任意数量的选择器。
端口集(PortSet)是对一个或多个独立端口的包装,使它们能作为一个整体来接受消息。一个典型的示例便是 CCR 中的 SuccessFailurePort,它继承了 PortSet。
另一个常用的仲裁器是级联(Join)。它会在两个内嵌的接受器都得到满足的情况下被激活。下面的示例便演示了这种方式:
<span color="#0000ff">var</span> port1 = <span color="#0000ff">new</span> <span color="#008080">Port</span><<span color="#0000ff">int</span>>(); <span color="#0000ff">var</span> port2 = <span color="#0000ff">new</span> <span color="#008080">Port</span><<span color="#0000ff">int</span>>(); port1.Post(1); port2.Post(3); <span color="#008080">Arbiter</span>.Activate( taskQueue, <span color="#008080">Arbiter</span>.JoinedReceive(<span color="#0000ff">false</span>, port1, port2, (x, y) => { <span color="#008080">Console</span>.WriteLine(x + y); }));
在资源有限的情况下,级联可以非常有效地控制访问。第一个端口包含了对资源的请求,另一个则包含了有效的资源。使用级联之后,我们可以限制请求只在有空闲资源的时候才进行处理。另一个高级别的仲裁方式是“交织(Interleave)”。这在概念上与读写锁(read-write lock)较为接近,只能在非阻塞的异步语法中使用。读取任务能够与其他读取任务同时运行,但是写入任务(它比读取的优先级高)只能在没有其他任务执行的情况下进行。以下是这种仲裁器的声明,它用于保护某种概念上的“缓存”:
<span color="#0000ff">var</span> updatePort = <span color="#0000ff">new</span> <span color="#008080">Port</span><<span color="#008080">UpdateCache</span>>(); <span color="#0000ff">var</span> queryPort = <span color="#0000ff">new</span> <span color="#008080">Port</span><<span color="#008080">QueryCache</span>>(); <span color="#0000ff">var</span> stopPort = <span color="#0000ff">new</span> <span color="#008080">Port</span><<span color="#008080">Shutdown</span>>(); <span color="#0000ff">var</span> interleave = <span color="#0000ff">new</span> <span color="#008080">Interleave</span>( <span color="#0000ff">new</span> <span color="#008080">TeardownReceiverGroup(</span> <span color="#008080">Arbiter</span>.Receive(<span color="#0000ff">false</span>, stopPort, ClearDownCache)), <span color="#0000ff">new</span> <span color="#008080">ExclusiveReceiverGroup(</span> <span color="#008080">Arbiter</span>.Receive(<span color="#0000ff">true</span>, updatePort, OnUpdateCache)), <span color="#0000ff">new</span> <span color="#008080">ConcurrentReceiverGroup(</span> <span color="#008080">Arbiter</span>.Receive(<span color="#0000ff">true</span>, queryPort, OnQueryCache))); <span color="#008080">Arbiter</span>.Activate(taskQueue, interleave);
在这里,持久接受器被放入合适的组中,这便开始了“交织”仲裁。任何属于 ConcurrentReceiverGroup() 的接受器能够让关联的任务之间并发执行。相反,ExclusiveReceiverGroup 中的接受器只能独立于其他接受器执行。此外对于放入该组的接受器,我们可以限制它们完全按照消息传递的顺序来执行任务。任何属于 TeardownReceiverGroup 组中的接受器会在关闭“交织”仲裁时,也就是在最后被调用——因此这样的接受器不能是持久化的。
“交织”仲裁使用轮询的方式对各接受器进行较为公平的调度。此外关于执行的顺序,即使在 ExclusiveReceiverGroup 内部也是各端口独立的。对于发送至相互无关的端口的两条消息,它们的执行顺序并不确保与它们的到达顺序相同。
之前提到过,CCR 迭代任务能让我们用接近于普通阻塞式同步操作的方式,来编写逻辑上是顺序执行的非阻塞异步操作。这样的异步操作一般为 I/O 密集型操作,可能是一个 Web 请求,一个数据库操作,或基础文件 I/O 等。由于现在我们可以更好地控制这些操作,现在编写这种异步 I/O 操作变得愈发简单,并且能有效地提高应用程序的吞吐量和伸缩性。
在 APM 世界中连接 BeginXXX 和 EndXXX 的重要模式为 AsyncCallback 委托,它的形式是:
<span color="#0000ff">public delegate void</span> <span color="#008080">AsyncCallback(IAsyncResult</span> ar);
CCR 便基于此,Port 的 Post 操作与之正好吻合。使用这种这种方式,我们可以用如下的迭代任务进行异步文件复制:
<span color="#0000ff">static</span> <span color="#008080">IEnumerator</span><<span color="#008080">ITask</span>> Copy(<span color="#008080">FileStream</span> source, <span color="#008080">FileStream</span> target) { <span color="#0000ff">var</span> buffer = <span color="#0000ff">new byte</span> byte[128 * 1024]; <span color="#0000ff">var</span> port = <span color="#0000ff">new</span> <span color="#008080">Port<iasyncresult></iasyncresult></span>>(); <span color="#0000ff">var</span> bytesRead = 0; <span color="#0000ff">do</span> { source.BeginRead(buffer, 0, buffer.Length, port.Post, <span color="#0000ff">null</span>); <span color="#0000ff">yield return</span> <span color="#008080">Arbiter</span>.Receive( <span color="#0000ff">false</span>, port, iar => bytesRead = source.EndRead(iar)); target.BeginWrite(buffer, 0, bytesRead, port.Post, null); <span color="#0000ff">yield return</span> <span color="#008080">Arbiter</span>.Receive( <span color="#0000ff">false</span>, port, iar => target.EndWrite(iar)); } <span color="#0000ff">while</span> (bytesRead > 0); }
从根本上说,这些异步操作在完成时都会向我们的端口中传递一个 IAsyncResult 对象。在这里,我们会等待端口的接受操作得到满足,这意味着异步操作已经完成,可以接着下一步继续开始。这完全是一个异步实现(我们完全可能使用少量的线程来执行数千个这样的操作),但是我们这些代码的意图可谓一目了然 ——我们读取一块数据,再写入一块数据,反复不断直到完成。
为了使文件复制的示例尽可能保持简单,上面的代码省略了处理异常的代码,不过强壮的代码必须能够处理读写操作可能引发的异常。在这里无法运用普通的 try/catch,因为这种异常处理方式具有线程线程相关性,而 CCR 任务可能被运行于任何可用的线程之上——事实上对于这种迭代任务,任务的每个“步骤”都可能与之前在不同的线程上执行。
在 CCR 中处理异常有两种基本的方式。第一种是显式地在每个操作中将错误进行捕获,并由端口传播出去。然而这会让调用者和被调用者双方造成可观的代码膨胀。如果要在前例中进行显式地错误处理,文件访问方式就会发生根本性的改变。下面的代码展示了文件读取方面需要进行的改变:
<span color="#0000ff">static</span> <span color="#008080">IEnumerator</span><<span color="#008080">ITask</span>> Copy( <span color="#008080">FileStream</span> source, <span color="#008080">FileStream</span> target, SuccessFailurePort resultPort) { <span color="#0000ff">var</span> buffer = <span color="#0000ff">new byte</span> [128 * 1024]; <span color="#0000ff">var</span> port = <span color="#0000ff">new</span> <span color="#008080">Port</span><<span color="#008080">IAsyncResult</span>>(); <span color="#0000ff">var</span> bytesRead = 0; <span color="#0000ff">do</span> { <span color="#008000">// Deal with a failure on the BeginRead</span> <span color="#0000ff">try</span> { source.BeginRead(buffer, 0, buffer.Length, port.Post, <span color="#0000ff">null</span>); } <span color="#0000ff">catch</span> (<span color="#008080">Exception</span> e) { resultPort.Post(e); <span color="#0000ff">yield break</span>; } <span color="#008000">// Deal with a failure on the EndRead</span> <span color="#0000ff">yield return</span> <span color="#008080">Arbiter</span> .Receive( <span color="#0000ff">false</span>, port, iar => { <span color="#0000ff">try</span> { bytesRead = source.EndRead(iar); } <span color="#0000ff">catch</span> (<span color="#008080">Exception</span> e) { resultPort.Post(e); } }); <span color="#008000">// Stop processing if there is a failure.</span> <span color="#0000ff">if</span> (bytesRead == 0) <span color="#0000ff">yield break</span>; <span color="#008000">// And so on for the write...</span> } <span color="#0000ff">while</span> (bytesRead > 0); resultPort.Post(<span color="#0000ff">new</span> <span color="#008080">SuccessResult</span> ()); }
很显然,这个方式十分笨重,而且非常容易出错——这不是代码中应该使用的做法,我们要把错误处理路径独立出来。
值得庆幸的是,CCR 为错误处理提供了一种干净而强大的支持,这种机制被称作为“因果关系(Causality)”。它不仅让我们的代码从显式处理异常的繁杂中释放出来,还在声明后继续保持原有任务的执行路径。这使我们能够建立统一的错误处理代码来应对任意复杂的异步操作。
建立 Causality 的典型方式是在初始化分发器之后为它的异常端口附加一个委托对象,以此来通知 CCR 处理异常的方式。
<span color="#0000ff">var</span> exceptionPort = <span color="#0000ff">new</span> <span color="#008080">Port</span><<span color="#008080">Exception</span>>(); <span color="#008080">Arbiter</span>.Activate(taskQueue, exceptionPort.Receive(e => { <span color="#008080">Console</span>.ForegroundColor = ConsoleColor.Red; <span color="#008080">Console</span>.WriteLine("Caught " + e.ToString()); <span color="#008080">Console</span>.ResetColor(); })); <span color="#008080">Dispatcher</span>.AddCausality(<span color="#0000ff">new</span> <span color="#008080">Causality</span>("Example", exceptionPort));
这样,我们可以用普通方式创建任务或传递消息了,Causality 会捕获任何任务在执行过程中产生的异常,并由 Causality 交给异常端口,并在相关的委托对象上执行。
结论
CCR 使您的应用程序能够用它提供的这些方式来表现,这包括一些基础的数据依赖关系,在运行时调度数据以使用有效的 CPU 核。这些作法直接将您从显式的线程和锁控制中释放开来,同时让您的应用程序能够完全利用日益强大的多核运算资源。
注意:文章中所有观点均为作者所有,与其雇主无关。
文章中的所有示例能够在运行在免费提供的“Microsoft Robotics Developer Studio 2008 Express Edition”中。请仔细阅读许可协议以获取更多信息。
给 InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。
评论