前言
在《简化异步操作(上):使用CCR 和AsyncEnumerator 简化异步操作》 一文中,我们谈到了异步编程的重要性,使用异步操作的难点,以及如何使用CCR 和AsyncEnumerator 来简化异步操作的调用。有了这些组件的辅 助,异步操作也可以真正使用传统的开发方式来编写了——这意味着各种缺陷,例如无法在“堆栈”中保存临时变量,无法try…catch…finally 和 using 等问题都不复存在了。这些组件让异步编程一下子美好了许多。
不过,现有的辅助还不足以面对一些复杂的场景。例如,要使多个有依赖的异步操作尽可能的“并行”,我们还需要构建额外的解决方案。在这片文章 里,我们将编写一个AsyncTaskDispatcher 来简化此类场景下的开发。自然,您也可以下载它的代码后加以修改,使它能更进一步满足您的需 求。
AsyncTaskDispatcher 的设计与实现
AsyncTaskDispatcher 的目标是为了简化多个异步操作之间的协调调用,使得各异步操作之间能够尽可能的“并行化”。在我的设想中,AsyncTaskDispatcher 应该这样使用:
- 首先创建一个 AsyncTaskDispatcher 对象。
- 在 AsyncTaskDispatcher 对象中注册各种异步任务,同时指定各任务之间的依赖关系。
- 使用 AsyncTaskDispatcher 对象的 BeginDispatch 和 EndDispath 方法组成了一个 APM 模式,把所有异步任务作为一个整体来调用。
- 如果任何一个异步任务抛出异常:
- AsyncTaskDispatcher 停止分派,并取消正在执行的任务,整个异步调用立即完成。
- AsyncTaskDispatcher 停止分派,并等待正在执行的任务完成后整个异步调用才算结束。
在经过了思考和对比之后,最终我选择基于 AsyncEnumerator 构造 AsyncTaskDispatcher,因为:
- AsyncEnumerator 久经测试,可以让我把注意力集中在分配异步任务的逻辑上,而不需要过渡关注异步操作中的各种稀奇古怪的问题。
- AsyncEnumerator 的使用模型保证了一点:除了异步操作本身之外,其他的逻辑都可以使用串行的方式来执行。这样进一步避免了多线程环境下的各种问题,尤其是在控制各种状态的时候这点更为重要(当然 CCR 也有这个特点)。
- AsyncEnumerator 个头小巧,仅数十 K 大小,如果基于 CCR 开发,则需要一个几兆的再分发包(redistribution package)。
- AsyncEnumerator 自带了 BeginExecute 和 EndExecute 方法组成的 APM 模式,可以直接使用
- AsyncEnumerator 已经包含内置的“取消”功能。
- AsyncEnumerator 已经内置 SynchronizationContext,能够在 WinForm 或 WPF 界面中直接使用。
那么我们就开始 AsyncTaskDispatcher 的实现。首先我们要确定一个异步任务包含哪些信息。的是“那些东西构成了一个异步任务”,或者说,“构造一个异步任务需要哪些信息”。AsyncTask 的定义如下:
<span>public class </span><span>AsyncTask<br></br></span>{ <span>private </span><span>AsyncTask</span>[] m_dependencies; <span>private </span><span>HashSet</span><<span>AsyncTask</span>> m_successors; <span>public object </span>Id { <span>get</span>; <span>private set</span>; } <span>public object </span>Context { <span>get</span>; <span>private set</span>; } <span>public </span><span>AsyncTaskStatus </span>Status { <span>get</span>; <span>internal set</span>; } <span>internal </span><span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> Predicate { <span>get</span>; <span>private set</span>; } <span>internal </span><span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> Begin { <span>get</span>; <span>private set</span>; } <span>internal </span><span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> End { <span>get</span>; <span>private set</span>; } <span>private </span>AsyncTask() { } <span>internal </span>AsyncTask( <span>object </span>id, <span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> predicate, <span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> begin, <span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> end, <span>object </span>context, <span>IEnumerable</span><<span>AsyncTask</span>> dependencies) { <span>this</span>.Id = id; <span>this</span>.Predicate = predicate; <span>this</span>.Begin = begin; <span>this</span>.End = end; <span>this</span>.Context = context; <span>this</span>.Status = <span>AsyncTaskStatus</span>.Pending; <span>this</span>.m_successors = <span>new </span><span>HashSet</span><<span>AsyncTask</span>>(); <span>this</span>.m_dependencies = dependencies.Where(d => d != <span>null</span>).Distinct().ToArray(); <span>foreach </span>(<span>var </span>task <span>in this</span>.m_dependencies) { task.m_successors.Add(<span>this</span>); } } <span>internal </span><span>IEnumerable</span><<span>AsyncTask</span>> Successors { <span>get<br></br></span>{ <span>return this</span>.m_successors; } } <span>internal bool </span>DependenciesSucceeded { <span>get<br></br></span>{ <span>return this</span>.m_dependencies.All(d => d.Status == <span>AsyncTaskStatus</span>.Succeeded || d.Status == <span>AsyncTaskStatus</span>.MarkedAsSucceeded); } } <span>internal void </span>Close() { <span>this</span>.Predicate = <span>null</span>; <span>this</span>.Begin = <span>null</span>; <span>this</span>.End = <span>null</span>; <span>this</span>.m_dependencies = <span>null</span>; <span>this</span>.m_successors = <span>null</span>; } <span>internal </span><span>AsyncTask </span>MakeSnapshot() { <span>return new </span><span>AsyncTask<br></br></span>{ Id = <span>this</span>.Id, Status = <span>this</span>.Status }; } }
以下是构造 AsyncTask 对象时需要提供的信息:
- id:一个 object 类型对象,用于标识一个异步任务,在同一个 AsyncTaskDispatcher 中唯一。
- context:一个 object 类型的上下文对象,它是注册异步任务时提供的任意对象,AsyncTaskDispatcher 会在其他操作中提供该对象。
- predicate:一个 Func
- begin:一个 Func
- end:一个 Action
- dependencies:当前任务所依赖的其他异步任务
此外,还有一些成员用于辅助开发:
- Status 属性:表明该异步任务的状态,它们是:
- MarkedAsSucceeded:表示该任务在 predicate 委托执行时被标记为“成功”
- Succeeded:表示该任务正常运行成功
- Failed:表示该任务执行失败
- Pending:表示该任务还处于等待状态
- MarkedAsCancelled:表示该任务在 predicate 委托执行时被标记为“取消”
- Cancelling:表明该任务已经发起,但因为其他任务的失败正处于取消阶段,等待结束。
- Cancelled:表明该任务已经发起,但因为其他任务的失败而被取消。
- Executing:表明该任务正在执行
- Successors 属性:该任务的所有后继异步任务
- DependenciesSucceeded 属性:表示该任务所依赖的异步任务是否全部成功
- Close 方法:清除该任务中所有非公开数据,使它们在 AsyncTask 被外部引用的情况下也能尽早回收
- MakeSnapshot 方法:返回表示当前 AsyncTask 对象状态的快照对象
AsyncTask 的大部分成员非常容易理解。只有一点值得一提,就是在某个异步任务执行失败的情况下 AsyncTaskDispatcher 的行为如何。我实现的 AsyncTaskDispatcher 在构造时可以接受一个参数 cancelOnFailure,表明某个任务出错之后是立即结束 整个分派过程(cancelOnFailure 为 true),还是需要等待正在运行的任务完成(cancelOnFailure 为 false)。
<span>public class </span><span>AsyncTaskDispatcher<br></br></span>{ <span>private </span><span>Dictionary</span><<span>object</span>, <span>AsyncTask</span>> m_tasks; <span>public bool </span>CancelOnFailure { <span>get</span>; <span>private set</span>; } <span>public </span><span>DispatchStatus </span>Status { <span>get</span>; <span>private set</span>; } ... <span>public </span>AsyncTaskDispatcher() : <span>this</span>(<span>false</span>) { } <span>public </span>AsyncTaskDispatcher(<span>bool </span>cancelOnFailure) { <span>this</span>.CancelOnFailure = cancelOnFailure; <span>this</span>.m_tasks = <span>new </span><span>Dictionary</span><<span>object</span>, <span>AsyncTask</span>>(); <span>this</span>.Status = <span>DispatchStatus</span>.NotStarted; } ... }
如果 cancelOnFailure 为 true,作为一个整体的异步分派操作在遇到错误时会立即完成,外部逻辑便可继续执行下去。但是对于那些 已经开始的异步任务,它们的 End 委托在接下来的某一时刻依旧会被调用,否则就可能会造成资源泄露。一个异步任务的 End 委托可以通过它的 cancelling 参数来判断当前调用是一个正常的结束,还是一个取消过程。如果 cancelling 为 true,则说明整个分派过程已经结束,而其中 某个异步任务出现错误;如果 cancelling 为 false,则表明整个分派过程还在进行,当前异步任务正常结束——但是这并不表示此时整体分派过程中 没有出现问题,有可能某个异步任务已经出错,但是由于 cancelOnFailure 为 false,AsyncTaskDispatcher 正在等待其他 异步任务(也包括当前任务)的“正常结束”。
在 AsyncTaskDispatcher 中,我们将维护一个字典,以便通过 id 来查找一个 AsyncTask 对象。开发人员可以通过 RegisterTask 方法来创建一个 AsyncTask 对象:
<span>public </span><span>AsyncTask </span>RegisterTask( <span>object </span>id, <span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> predicate, <span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> begin, <span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> end, <span>object </span>context, <span>params object</span>[] dependencies) { <span>lock </span>(<span>this</span>.m_tasks) { <span>if </span>(<span>this</span>.Status != <span>DispatchStatus</span>.NotStarted) { <span>throw new </span><span>InvalidOperationException</span>(<span>"Task can only be registered before dispatching."</span>); } <span>this</span>.CheckRegisterTaskArgs(id, begin, end, dependencies); <span>AsyncTask </span>task = <span>new </span><span>AsyncTask</span>( id, predicate, begin, end, context, dependencies.Select(d => <span>this</span>.m_tasks[d])); <span>this</span>.m_tasks.Add(id, task); <span>return </span>task; } }
AsyncTaskDispatcher 在各方法内部使用了 lock,这是种非常粗略的锁行为——虽然确保了它在多线程环境下也能保持状态的一 致,但是其性能却比精打细算的锁控制要差一些。不过这毕竟不是目前的关键,更何况需要在多线程环境下使用 AsyncTaskDispatcher 的场景少 之又少。RegisterTask 方法有多个重载,它们都将调用直接发送给上面的 RegisterTask 实现。这个方法会根据传入的依赖 id 找出新任务 所依赖的异步任务,构造一个 AsyncTask 对象,并将其放入字典中。
为了形成一个标准的 APM 模式,AsyncTaskDispatcher 提供了 BeginDispatch 和 EndDispatch 方法:
<span>private </span><span>Dictionay</span><<span>object</span>, <span>Exception</span>> m_taskExceptions; <span>public </span><span>IAsyncResult </span>BeginDispatch(<span>AsyncCallback </span>asyncCallback, <span>object </span>asyncState) { <span>lock </span>(<span>this</span>.m_tasks) { <span>if </span>(<span>this</span>.Status != <span>DispatchStatus</span>.NotStarted) { <span>throw new </span><span>InvalidOperationException</span>(<span>"An AsyncTaskDispatcher can be started only once."</span>); } <span>this</span>.Status = <span>DispatchStatus</span>.Dispatching; } <span>this</span>.m_taskExceptions = <span>new </span><span>Dictionay</span><<span>object</span>, <span>Exception</span>>(); <span>var </span>taskToStart = <span>this</span>.m_tasks.Values.Where(t => t.DependenciesSucceeded).ToList(); <span>IEnumerator</span><<span>int</span>> enumerator = <span>this</span>.GetWorkerEnumerator(taskToStart); <span>this</span>.m_asyncEnumerator = <span>new </span><span>AsyncEnumerator</span>(); <span>return this</span>.m_asyncEnumerator.BeginExecute(enumerator, asyncCallback, asyncState); } <span>public void </span>EndDispatch(<span>IAsyncResult </span>asyncResult) { <span>this</span>.m_asyncEnumerator.EndExecute(asyncResult); <span>if </span>(<span>this</span>.m_taskExceptions.Count > 0) { <span>throw new </span><span>DispatchException</span>(<span>this</span>.GetTaskSnapshots(), <span>this</span>.m_taskExceptions); } }
在 BeginDispatch 方法中,将会选择出所有不存在依赖的任务,它们会被作为首批发起的异步任务。它们会被传入 GetWorkerEnumerator 方法用于构造一个 IEnumerator 对象,并将其交给 AsyncEnumerator 执行。整个执行过程中遇到的异常都会被收集起来,并且在 EndDispatch 执行中抛出一个包含这些异常信息的 DispatchException。异 步任务抛出的每个异常都会使用 HandleTaskFailure 方法进行处理:
<span>private bool </span>HandleTaskFailure(<span>AsyncTask </span>task, <span>Exception </span>ex) { <span>this</span>.m_taskExceptions.Add(task.Id, ex); task.Status = <span>AsyncTaskStatus</span>.Failed; task.Close(); <span>if </span>(<span>this</span>.CancelOnFailure) { <span>lock </span>(<span>this</span>.m_tasks) { <span>var </span>runningTasks = <span>this</span>.m_tasks.Values.Where(t => t.Status == <span>AsyncTaskStatus</span>.Executing); <span>foreach </span>(<span>AsyncTask </span>t <span>in </span>runningTasks) { t.Status = <span>AsyncTaskStatus</span>.Cancelling; } } <span>this</span>.Status = <span>DispatchStatus</span>.Cancelling; <span>this</span>.m_asyncEnumerator.Cancel(<span>null</span>); <span>return true</span>; } <span>else<br></br></span>{ <span>this</span>.Status = <span>DispatchStatus</span>.Waiting; <span>return false</span>; } }
HandleTaskFailure 接受两个参数,一是出错的 AsyncTask 对象,二便是被抛出的异常,并返回一个值表示是否应该立即中止 任务分派。在收集了异常信息,设定了任务状态并将其关闭之后。则会根据构造 AsyncTaskDispatcher 对象时所指定的 CancelOnFailure 值来确定接下来的行为。如果 CancelOnFailure 为 true(表示取消正在运行的对象),便将正在执行的异步任 务全部标记为 Cancelling 状态,并调用 AsyncEnumerator 的 Cancel 方法取消正在执行的异步任务。根据 CancelOnFailure 的不同,AsyncTaskDispatcher 自身状态将会变成 Cancelling(表示正在取消已经分派的任务,这 个状态不会维持长久)或 Waiting(表示正在等待已经分派的任务完成),而 HandleTaskFailure 方法也会返回对应的结果。
之前提到,如果一个异步任务的 End 委托对象被执行时,其 cancelling 参数为 false,并不能说明其他任务没有遇到任何错误。不过从 这段实现中便可得知,开发人员只要配合 AsyncTaskDispatcher 的状态便可确认更多细节:如果此时 AsyncTaskDispatcher 的状态为 Waiting,则表示之前已经有任务失败了。
<span>public </span><span>IEnumerator</span><<span>int</span>> GetWorkerEnumerator(<span>IEnumerable</span><<span>AsyncTask</span>> tasksToStart) { <span>this</span>.m_runningTaskCount = 0; <span>foreach </span>(<span>AsyncTask </span>task <span>in </span>tasksToStart) { <span>try<br></br></span>{ <span>this</span>.Start(task); } <span>catch </span>(<span>AsyncTaskException </span>ex) { <span>if </span>(<span>this</span>.HandleTaskFailure(ex.Task, ex.InnerException)) { <span>this</span>.Status = <span>DispatchStatus</span>.Finished; <span>yield break</span>; } <span>break</span>; } } ... }
以上代码片断为 GetWorkerEnumerator 方法的一部分,用于启动首批被分派的异步任务。我们将遍历每个异步任务,并调用 Start 方法将其启动。Start 方法可能会抛出一个类型为 AsyncTaskException 的内部异常,这个异常不对外释放,它的作用仅仅是为了 方法间的“通信”——“异常”的确是用来表示错误的一种优秀方式,它能够轻易地从逻辑中跳出,并且在合适的地方进行处理。在捕获 AsyncTaskException 异常之后,就会调用之前的 HandleTaskFailure 方法进行处理,如果它返回 true(表示需要立即结束 分派)则使用 yiled break 来跳出 GetWorkerEnumerator 方法,否则便使用 break 来跳出循环——仅仅是跳出循环而并非整个 GetWorkerEnumerator 方法,因为此时并不要求整个分派立即结束,接下来的代码将会等待某个已经发起的任务完成,并进行处理:
<span>public </span><span>IEnumerator</span><<span>int</span>> GetWorkerEnumerator(<span>IEnumerable</span><<span>AsyncTask</span>> tasksToStart) { ... <span>while </span>(<span>this</span>.m_runningTaskCount > 0) { <span>yield return </span>1; <span>this</span>.m_runningTaskCount--; <span>IAsyncResult </span>asyncResult = <span>this</span>.m_asyncEnumerator.DequeueAsyncResult(); <span>AsyncTask </span>finishedTask = (<span>AsyncTask</span>)asyncResult.AsyncState; <span>try<br></br></span>{ finishedTask.End(asyncResult, <span>false</span>, finishedTask.Context); finishedTask.Status = <span>AsyncTaskStatus</span>.Succeeded; <span>if </span>(<span>this</span>.Status == <span>DispatchStatus</span>.Dispatching) { <span>this</span>.StartSuccessors(finishedTask); } finishedTask.Close(); } <span>catch </span>(<span>AsyncTaskException </span>ex) { <span>if </span>(<span>this</span>.HandleTaskFailure(ex.Task, ex.InnerException)) <span>break</span>; } <span>catch </span>(<span>Exception </span>ex) { <span>if </span>(<span>this</span>.HandleTaskFailure(finishedTask, ex)) <span>break</span>; } } <span>this</span>.Status = <span>DispatchStatus</span>.Finished; }
在 Start 方法中,如果成功发起了这个异步请求,则会将 m_runningTaskCount 的数值加一,以此表示正在执行(确切地说,是已 经“开始”但还没有进行“结束”处理)的异步任务数目。while 循环体每执行一遍,则表示处理完一个异步请求。因此一般情况下,在没有处理完所有异步请 求的时候 while 循环体会不断执行。与之前 WebRequest 的示例相似,AsyncTask 对象作为 AsyncState 保存在 IAsyncResult 对象中——由于增加了 AsyncTask 这一抽象层,因此我们成功地将所有异步操作的处理方式统一了起来。
接着便依次调用 End 委托(请注意 cancelling 参数为 false)、设置属性、并且在 AsyncTaskDispatcher 为 Dispatching 状态时使用 StartSuccessors 方法来分派可执行的后继任务——AsyncTaskDispatcher 的状态也有可能 是 Waiting,这表示已经有任务出错,此时正在等待已经开始的任务完成,这种情况下就不应该发起新的异步任务了。值得注意的是,用户实现的 End 委托 可能会抛出异常,而 StartSuccessors 方法也可能抛出 AsyncTaskException 以表示某个异步任务启动失败。这时就要使用 HandleTaskFailure 方法来处理异常,并且在合适的时候立即终止整个分派操作(跳出循环)。
跳出 while 循环则表示整个派送操作已经完成,此时自然要将 AsyncTaskDispatcher 的状态设为 Finished。
而最后剩下的,只是用于启动任务的 Start 方法,用于取消任务的 CancelTask 方法,以及发起后继任务的 StartSuccessors 方法了:
<span>private void </span>Start(<span>AsyncTask </span>task) { <span>DispatchPolicy </span>policy; <span>try<br></br></span>{ policy = task.Predicate == <span>null </span>? <span>DispatchPolicy</span>.Normal : task.Predicate(task.Context); } <span>catch </span>(<span>Exception </span>ex) { <span>throw new </span><span>AsyncTaskException</span>(task, ex); } <span>if </span>(policy == <span>DispatchPolicy</span>.Normal) { <span>try<br></br></span>{ task.Begin(<span>this</span>.m_asyncEnumerator.EndVoid(0, <span>this</span>.CancelTask), task, task.Context); <span>this</span>.m_runningTaskCount++; task.Status = <span>AsyncTaskStatus</span>.Executing; } <span>catch </span>(<span>Exception </span>ex) { <span>throw new </span><span>AsyncTaskException</span>(task, ex); } } <span>else if </span>(policy == <span>DispatchPolicy</span>.MarkAsCancelled) { task.Status = <span>AsyncTaskStatus</span>.MarkedAsCancelled; task.Close(); } <span>else </span><span>// policy == DispatchPolicy.Succeeded<br></br></span>{ task.Status = <span>AsyncTaskStatus</span>.MarkedAsSucceeded; <span>this</span>.StartSuccessors(task); task.Close(); } } <span>private void </span>StartSuccessors(<span>AsyncTask </span>task) { <span>Func</span><<span>AsyncTask</span>, <span>bool</span>> predicate = t => t.Status == <span>AsyncTaskStatus</span>.Pending && t.DependenciesSucceeded; <span>foreach </span>(<span>AsyncTask </span>successor <span>in </span>task.Successors.Where(predicate)) { <span>this</span>.Start(successor); } } <span>private void </span>CancelTask(<span>IAsyncResult </span>asyncResult) { <span>AsyncTask </span>task = (<span>AsyncTask</span>)asyncResult.AsyncState; <span>try<br></br></span>{ task.End(asyncResult, <span>true</span>, task.Context); } <span>catch </span>{ } <span>finally<br></br></span>{ <span>this</span>.m_runningTaskCount--; task.Status = <span>AsyncTaskStatus</span>.Cancelled; task.Close(); } }
在 Start 方法中,首先获取异步任务的 Predicate 委托的执行结果,并根据这个结果选择是将当前异步任务进行正常处理,还是标记为“取 消”或“成功”。如果应该正常执行,则调用 Begin 委托以发起异步任务,把任务状态标记为 Executing,并将 m_runningTaskCount 计数加一。如果选择标记为“成功”,那么还需要调用 StartSuccessors 来发起后继任务,自然在 StartSuccessors 方法中也是使用 Start 方法来发起单个任务——这是一种间接递归,使用“深度优先”的方式发起所有依赖已经全部完成的异 步请求。由于 Start 方法需要抛出 AsyncTaskException 来表示任务出错,因此在每段由用户实现的逻辑(即 Predicate 和 Begin 委托)执行时都要小心地捕获异常。
值得一提的是,如果需要使用 AsyncEnumerator 的 Cancel 方法来取消已经发起的异步任务,那么在从 AsyncEnumerator 获取异步回调委托(AsyncCallback 对象)的时候还必须提供一个委托作为取消这个任务时的回调函数。这便是 CancelTask 方法的作用,在 CancelTask 方法中也会调用 End 委托(请注意 cancelling 参数为 true),以确保资源能够被正确 释放。执行 End 委托时抛出的任何异常都会被默默“吞下”。一个任务被取消之后,它的状态会被修改为 Cancelled,最后则被关闭。
至此,AsyncTaskDispatcher 已经全部实现到这里就告一段落了。可见,有了 AsyncEnumerator 的协助,实现这样一 个组件并不那么困难,从头至尾总共只有 200 多行代码。事实上,写目前这篇文章所消耗的时间和精力已经数倍于实现一个完整的 AsyncTaskDispatcher。当然,目前的实现远不完美,它虽然较好地实现了有依赖关系的多个异步操作之间的协作调用,但是还缺少一些有用的 功能。例如,您可能需要在分派过程中动态添加新的任务,或是改变任务之间的依赖关系;而且,对于异步操作往往都会指定一个超时时间,可惜目前的 AsyncTaskDispatcher 并不包含超时功能。
您可以在这里获得AsyncTaskDispatcher 的完整代码,并且根据需要添加任何功能——尤其是刚才提到的缺陷,实现起来实际并不困难。
AsyncTaskDispatcher 的使用
现在我们基于 AsyncTaskDispatcher 来重新实现上一篇文章中提到的场景。假设您在开发一个 ASP.NET 页面用于展示一篇文章,其中需要显示各种信息:
- 文章内容
- 评论信息
- 对评论内容进行打分的用户
- 打分者的收藏
由于程序架构的原因,数据需要从各个不同服务或数据源中获取(这是个很常见的情况)。因此,程序中已经准备了如下的数据读取接口:
- Begin/EndGetContent:根据文章 ID(Int32),获取文章内容(String)
- Begin/EndGetComments:根据文章 ID(Int32),获取所有评论(IEnumerable)
- Begin/EndGetUsers:根据多个用户 ID(IEnumerable),获取一批用户(Dictionary
- Begin/EndGetCommentRaters:根据多个评论 ID(IEnumerable),获取所有打分者(IEnumerable)
- Begin/EndGetFavorites:根据多个用户 ID(IEnumerable),获取所有收藏(IEnumerable)
我们可以轻易得出五个异步操作之间的依赖状况:
因此,我们可以编写如下的代码。请注意,我们使用异步任务的 id 来表示它们之间的依赖关系:
<span>private void </span>RegisterAsyncTasks(<span>AsyncTaskDispatcher </span>dispatcher, <span>int </span>articleId) { <span>// 获取文章内容 <br></br></span><span>string </span>taskGetContent = <span>"get content"</span>; dispatcher.RegisterTask( taskGetContent, <span>// 任务 ID </span> (cb, state, context) => <span>// Begin 委托对象 </span> { <span>return </span><span>Service</span>.BeginGetContent(articleId, cb, state); }, (ar, cancelling, context) => <span>// End 委托对象 </span> { <span>this</span>.Content = <span>Service</span>.EndGetContent(ar); }); <span>// 获取评论 <br></br></span><span>string </span>taskGetComments = <span>"get comments"</span>; <span>IEnumerable</span><<span>Comment</span>> comments = <span>null</span>; dispatcher.RegisterTask( taskGetComments, (cb, state, context) => { <span>return </span><span>Service</span>.BeginGetComments(articleId, cb, state); }, (ar, cancelling, context) => { comments = <span>Service</span>.EndGetComments(ar); }); <span>// 获取评论者信息,并结合评论绑定至控件 <br></br></span><span>string </span>taskGetCommentUsers = <span>"get comment users"</span>; dispatcher.RegisterTask( taskGetCommentUsers, (cb, state, context) => { <span>return </span><span>Service</span>.BeginGetUsers(comments.Select(c => c.UserID), cb, state); }, (ar, cancelling, context) => { <span>var </span>users = <span>Service</span>.EndGetUsers(ar); <span>this</span>.rptComments.DataSource = <span>from </span>c <span>in </span>comments <span>select new<br></br></span>{ Comment = c, User = users[c.UserID] }; <span>this</span>.rptComments.DataBind(); }, taskGetComments); <span>// 指定任务之间的依赖关系 </span> <span>// 获取评论的打分者,并绑定至控件 <br></br></span><span>string </span>taskGetCommentRaters = <span>"get comment raters"</span>; <span>IEnumerable</span><<span>User</span>> raters = <span>null</span>; dispatcher.RegisterTask( taskGetCommentRaters, (cb, state, context) => { <span>return </span><span>Service</span>.BeginGetCommentRaters(comments.Select(c => c.CommentID), cb, state); }, (ar, cancelling, context) => { raters = <span>Service</span>.EndGetCommentRaters(ar); <span>this</span>.rptRaters.DataSource = raters; <span>this</span>.rptRaters.DataBind(); }, taskGetComments); <span>// 获取打分者的收藏内容,并绑定至控件 <br></br></span><span>string </span>taskGetFavorites = <span>"get favorites"</span>; dispatcher.RegisterTask( taskGetFavorites, (cb, state, context) => { <span>return </span><span>Service</span>.BeginGetFavorites(raters.Select(u => u.UserID), cb, state); }, (ar, cancelling, context) => { <span>this</span>.rptFavorites.DataSource = <span>Service</span>.EndGetFavorites(ar); <span>this</span>.rptFavorites.DataBind(); }, taskGetCommentRaters); }
与之前的作法相比,似乎代码量提高了,但是观察后可以发现,多出来的代码其实都是在创建匿名的委托对象,而一个个匿名的委托对象将代码进行了有 条理的分割,并充分利用“匿名方法”形成的闭包,使各委托对象能够共享“调用堆栈”上的数据。现在的实现使用了一种直观的方式表现了各异步操作之间的依赖 关系,代码一下子变得条理清晰,易于维护了。此外还有一点非常重要:虽然异步任务为“并行”执行,但是其中所有的委托对象只会依次调用,因此开发人员可以 放心地编写代码,而不用担心线程安全方面的问题。
我们可以把上面的代码结合 ASP.NET WebForm 页面的异步特性一起使用:
<span>protected void </span>Page_Load(<span>object </span>sender, <span>EventArgs </span>e) { <span>this</span>.AddOnPreRenderCompleteAsync( <span>new </span><span>BeginEventHandler</span>(BeginAsyncOperation), <span>new </span><span>EndEventHandler</span>(EndAsyncOperation)); } <span>private </span><span>IAsyncResult </span>BeginAsyncOperation( <span>object </span>sender, <span>EventArgs </span>e, <span>AsyncCallback </span>callback, <span>object </span>extraData) { <span>this</span>.m_dispatcher = <span>new </span><span>AsyncTaskDispatcher</span>(); <span>this</span>.RegisterAsyncTasks(<span>this</span>.m_dispatcher, 1); <span>return this</span>.m_dispatcher.BeginDispatch(callback, extraData); } <span>private void </span>EndAsyncOperation(<span>IAsyncResult </span>ar) { <span>this</span>.m_dispatcher.EndDispatch(ar); }
这个页面与 AsyncTaskDispatcher 的源代码同时发布,并且增加了一些简单的时间统计代码。您会发现,原本使用“串行”方式需要 10 秒钟才能完成的任务,使用如今的“并行”方式只需 6 秒钟即可完成:
- Get Content: 00:00:00 - 00:00:02.0010000
- Get Comments: 00:00:00.0010000 - 00:00:02.0010000
- Get Comment Users: 00:00:02.0010000 - 00:00:04.0010000
- Get Comment Raters: 00:00:02.0010000 - 00:00:04.0010000
- Get Favorites: 00:00:04.0010000 - 00:00:06.0010000
这与上一篇文章中提到的“理想情况” 完全一致:
由于 CCR 和 AsyncEnumerator 难以“并行”地执行异步代码,因此我们需要提出新的解决方案来满足这方面的需求。在 AsyncEnumerator 的基础上开发一个 AsyncTaskDispatcher 并不困难,但是这个组件能够有效地简化多个异步操作之间的协作调 用。一般来说,这样的做法能够使应用程序的性能与伸缩性得到比较明显的提高。AsyncTaskDispatcher 的代码在 MSDN Code Gallery 上完全公开,您可以自由修改,使它更好地满足您的需求。
相关文章:简化异步操作(上)──使用CCR 和AsyncEnumerator 简化异步操作。
给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。
评论