写点什么

简化异步操作(下):构建 AsyncTaskDispatcher 简化多个异步操作之间的协作调用

  • 2009-02-21
  • 本文字数:13059 字

    阅读完需:约 43 分钟

前言

在《简化异步操作(上):使用CCR 和AsyncEnumerator 简化异步操作》 一文中,我们谈到了异步编程的重要性,使用异步操作的难点,以及如何使用CCR 和AsyncEnumerator 来简化异步操作的调用。有了这些组件的辅 助,异步操作也可以真正使用传统的开发方式来编写了——这意味着各种缺陷,例如无法在“堆栈”中保存临时变量,无法try…catch…finally 和 using 等问题都不复存在了。这些组件让异步编程一下子美好了许多。

不过,现有的辅助还不足以面对一些复杂的场景。例如,要使多个有依赖的异步操作尽可能的“并行”,我们还需要构建额外的解决方案。在这片文章 里,我们将编写一个AsyncTaskDispatcher 来简化此类场景下的开发。自然,您也可以下载它的代码后加以修改,使它能更进一步满足您的需 求。

AsyncTaskDispatcher 的设计与实现

AsyncTaskDispatcher 的目标是为了简化多个异步操作之间的协调调用,使得各异步操作之间能够尽可能的“并行化”。在我的设想中,AsyncTaskDispatcher 应该这样使用:

  1. 首先创建一个 AsyncTaskDispatcher 对象。
  2. 在 AsyncTaskDispatcher 对象中注册各种异步任务,同时指定各任务之间的依赖关系。
  3. 使用 AsyncTaskDispatcher 对象的 BeginDispatch 和 EndDispath 方法组成了一个 APM 模式,把所有异步任务作为一个整体来调用。
  4. 如果任何一个异步任务抛出异常:
  • AsyncTaskDispatcher 停止分派,并取消正在执行的任务,整个异步调用立即完成。
  • AsyncTaskDispatcher 停止分派,并等待正在执行的任务完成后整个异步调用才算结束。

在经过了思考和对比之后,最终我选择基于 AsyncEnumerator 构造 AsyncTaskDispatcher,因为:

  • AsyncEnumerator 久经测试,可以让我把注意力集中在分配异步任务的逻辑上,而不需要过渡关注异步操作中的各种稀奇古怪的问题。
  • AsyncEnumerator 的使用模型保证了一点:除了异步操作本身之外,其他的逻辑都可以使用串行的方式来执行。这样进一步避免了多线程环境下的各种问题,尤其是在控制各种状态的时候这点更为重要(当然 CCR 也有这个特点)。
  • AsyncEnumerator 个头小巧,仅数十 K 大小,如果基于 CCR 开发,则需要一个几兆的再分发包(redistribution package)。
  • AsyncEnumerator 自带了 BeginExecute 和 EndExecute 方法组成的 APM 模式,可以直接使用
  • AsyncEnumerator 已经包含内置的“取消”功能。
  • AsyncEnumerator 已经内置 SynchronizationContext,能够在 WinForm 或 WPF 界面中直接使用。

那么我们就开始 AsyncTaskDispatcher 的实现。首先我们要确定一个异步任务包含哪些信息。的是“那些东西构成了一个异步任务”,或者说,“构造一个异步任务需要哪些信息”。AsyncTask 的定义如下:

复制代码
<span>public class </span><span>AsyncTask<br></br></span>{
<span>private </span><span>AsyncTask</span>[] m_dependencies;
<span>private </span><span>HashSet</span><<span>AsyncTask</span>> m_successors;
<span>public object </span>Id { <span>get</span>; <span>private set</span>; }
<span>public object </span>Context { <span>get</span>; <span>private set</span>; }
<span>public </span><span>AsyncTaskStatus </span>Status { <span>get</span>; <span>internal set</span>; }
<span>internal </span><span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> Predicate { <span>get</span>; <span>private set</span>; }
<span>internal </span><span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> Begin { <span>get</span>; <span>private set</span>; }
<span>internal </span><span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> End { <span>get</span>; <span>private set</span>; }
<span>private </span>AsyncTask() { }
<span>internal </span>AsyncTask(
<span>object </span>id,
<span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> predicate,
<span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> begin,
<span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> end,
<span>object </span>context,
<span>IEnumerable</span><<span>AsyncTask</span>> dependencies)
{
<span>this</span>.Id = id;
<span>this</span>.Predicate = predicate;
<span>this</span>.Begin = begin;
<span>this</span>.End = end;
<span>this</span>.Context = context;
<span>this</span>.Status = <span>AsyncTaskStatus</span>.Pending;
<span>this</span>.m_successors = <span>new </span><span>HashSet</span><<span>AsyncTask</span>>();
<span>this</span>.m_dependencies = dependencies.Where(d => d != <span>null</span>).Distinct().ToArray();
<span>foreach </span>(<span>var </span>task <span>in this</span>.m_dependencies)
{
task.m_successors.Add(<span>this</span>);
}
}
<span>internal </span><span>IEnumerable</span><<span>AsyncTask</span>> Successors
{
<span>get<br></br></span>{
<span>return this</span>.m_successors;
}
}
<span>internal bool </span>DependenciesSucceeded
{
<span>get<br></br></span>{
<span>return this</span>.m_dependencies.All(d =>
d.Status == <span>AsyncTaskStatus</span>.Succeeded ||
d.Status == <span>AsyncTaskStatus</span>.MarkedAsSucceeded);
}
}
<span>internal void </span>Close()
{
<span>this</span>.Predicate = <span>null</span>;
<span>this</span>.Begin = <span>null</span>;
<span>this</span>.End = <span>null</span>;
<span>this</span>.m_dependencies = <span>null</span>;
<span>this</span>.m_successors = <span>null</span>;
}
<span>internal </span><span>AsyncTask </span>MakeSnapshot()
{
<span>return new </span><span>AsyncTask<br></br></span>{
Id = <span>this</span>.Id,
Status = <span>this</span>.Status
};
}
}

以下是构造 AsyncTask 对象时需要提供的信息:

  • id:一个 object 类型对象,用于标识一个异步任务,在同一个 AsyncTaskDispatcher 中唯一。
  • context:一个 object 类型的上下文对象,它是注册异步任务时提供的任意对象,AsyncTaskDispatcher 会在其他操作中提供该对象。
  • predicate:一个 Func
  • begin:一个 Func
  • end:一个 Action
  • dependencies:当前任务所依赖的其他异步任务

此外,还有一些成员用于辅助开发:

  • Status 属性:表明该异步任务的状态,它们是:
    • MarkedAsSucceeded:表示该任务在 predicate 委托执行时被标记为“成功”
    • Succeeded:表示该任务正常运行成功
    • Failed:表示该任务执行失败
    • Pending:表示该任务还处于等待状态
    • MarkedAsCancelled:表示该任务在 predicate 委托执行时被标记为“取消”
    • Cancelling:表明该任务已经发起,但因为其他任务的失败正处于取消阶段,等待结束。
    • Cancelled:表明该任务已经发起,但因为其他任务的失败而被取消。
    • Executing:表明该任务正在执行
  • Successors 属性:该任务的所有后继异步任务
  • DependenciesSucceeded 属性:表示该任务所依赖的异步任务是否全部成功
  • Close 方法:清除该任务中所有非公开数据,使它们在 AsyncTask 被外部引用的情况下也能尽早回收
  • MakeSnapshot 方法:返回表示当前 AsyncTask 对象状态的快照对象

AsyncTask 的大部分成员非常容易理解。只有一点值得一提,就是在某个异步任务执行失败的情况下 AsyncTaskDispatcher 的行为如何。我实现的 AsyncTaskDispatcher 在构造时可以接受一个参数 cancelOnFailure,表明某个任务出错之后是立即结束 整个分派过程(cancelOnFailure 为 true),还是需要等待正在运行的任务完成(cancelOnFailure 为 false)。

复制代码
<span>public class </span><span>AsyncTaskDispatcher<br></br></span>{
<span>private </span><span>Dictionary</span><<span>object</span>, <span>AsyncTask</span>> m_tasks;
<span>public bool </span>CancelOnFailure { <span>get</span>; <span>private set</span>; }
<span>public </span><span>DispatchStatus </span>Status { <span>get</span>; <span>private set</span>; }
...
<span>public </span>AsyncTaskDispatcher()
: <span>this</span>(<span>false</span>)
{ }
<span>public </span>AsyncTaskDispatcher(<span>bool </span>cancelOnFailure)
{
<span>this</span>.CancelOnFailure = cancelOnFailure;
<span>this</span>.m_tasks = <span>new </span><span>Dictionary</span><<span>object</span>, <span>AsyncTask</span>>();
<span>this</span>.Status = <span>DispatchStatus</span>.NotStarted;
}
...
}

如果 cancelOnFailure 为 true,作为一个整体的异步分派操作在遇到错误时会立即完成,外部逻辑便可继续执行下去。但是对于那些 已经开始的异步任务,它们的 End 委托在接下来的某一时刻依旧会被调用,否则就可能会造成资源泄露。一个异步任务的 End 委托可以通过它的 cancelling 参数来判断当前调用是一个正常的结束,还是一个取消过程。如果 cancelling 为 true,则说明整个分派过程已经结束,而其中 某个异步任务出现错误;如果 cancelling 为 false,则表明整个分派过程还在进行,当前异步任务正常结束——但是这并不表示此时整体分派过程中 没有出现问题,有可能某个异步任务已经出错,但是由于 cancelOnFailure 为 false,AsyncTaskDispatcher 正在等待其他 异步任务(也包括当前任务)的“正常结束”。

在 AsyncTaskDispatcher 中,我们将维护一个字典,以便通过 id 来查找一个 AsyncTask 对象。开发人员可以通过 RegisterTask 方法来创建一个 AsyncTask 对象:

复制代码
<span>public </span><span>AsyncTask </span>RegisterTask(
<span>object </span>id,
<span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> predicate,
<span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> begin,
<span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> end,
<span>object </span>context,
<span>params object</span>[] dependencies)
{
<span>lock </span>(<span>this</span>.m_tasks)
{
<span>if </span>(<span>this</span>.Status != <span>DispatchStatus</span>.NotStarted)
{
<span>throw new </span><span>InvalidOperationException</span>(<span>"Task can only be registered before dispatching."</span>);
}
<span>this</span>.CheckRegisterTaskArgs(id, begin, end, dependencies);
<span>AsyncTask </span>task = <span>new </span><span>AsyncTask</span>(
id,
predicate,
begin,
end,
context,
dependencies.Select(d => <span>this</span>.m_tasks[d]));
<span>this</span>.m_tasks.Add(id, task);
<span>return </span>task;
}
}

AsyncTaskDispatcher 在各方法内部使用了 lock,这是种非常粗略的锁行为——虽然确保了它在多线程环境下也能保持状态的一 致,但是其性能却比精打细算的锁控制要差一些。不过这毕竟不是目前的关键,更何况需要在多线程环境下使用 AsyncTaskDispatcher 的场景少 之又少。RegisterTask 方法有多个重载,它们都将调用直接发送给上面的 RegisterTask 实现。这个方法会根据传入的依赖 id 找出新任务 所依赖的异步任务,构造一个 AsyncTask 对象,并将其放入字典中。

为了形成一个标准的 APM 模式,AsyncTaskDispatcher 提供了 BeginDispatch 和 EndDispatch 方法:

复制代码
<span>private </span><span>Dictionay</span><<span>object</span>, <span>Exception</span>> m_taskExceptions;
<span>public </span><span>IAsyncResult </span>BeginDispatch(<span>AsyncCallback </span>asyncCallback, <span>object </span>asyncState)
{
<span>lock </span>(<span>this</span>.m_tasks)
{
<span>if </span>(<span>this</span>.Status != <span>DispatchStatus</span>.NotStarted)
{
<span>throw new </span><span>InvalidOperationException</span>(<span>"An AsyncTaskDispatcher can be started only once."</span>);
}
<span>this</span>.Status = <span>DispatchStatus</span>.Dispatching;
}
<span>this</span>.m_taskExceptions = <span>new </span><span>Dictionay</span><<span>object</span>, <span>Exception</span>>();
<span>var </span>taskToStart = <span>this</span>.m_tasks.Values.Where(t => t.DependenciesSucceeded).ToList();
<span>IEnumerator</span><<span>int</span>> enumerator = <span>this</span>.GetWorkerEnumerator(taskToStart);
<span>this</span>.m_asyncEnumerator = <span>new </span><span>AsyncEnumerator</span>();
<span>return this</span>.m_asyncEnumerator.BeginExecute(enumerator, asyncCallback, asyncState);
}
<span>public void </span>EndDispatch(<span>IAsyncResult </span>asyncResult)
{
<span>this</span>.m_asyncEnumerator.EndExecute(asyncResult);
<span>if </span>(<span>this</span>.m_taskExceptions.Count > 0)
{
<span>throw new </span><span>DispatchException</span>(<span>this</span>.GetTaskSnapshots(), <span>this</span>.m_taskExceptions);
}
}

在 BeginDispatch 方法中,将会选择出所有不存在依赖的任务,它们会被作为首批发起的异步任务。它们会被传入 GetWorkerEnumerator 方法用于构造一个 IEnumerator 对象,并将其交给 AsyncEnumerator 执行。整个执行过程中遇到的异常都会被收集起来,并且在 EndDispatch 执行中抛出一个包含这些异常信息的 DispatchException。异 步任务抛出的每个异常都会使用 HandleTaskFailure 方法进行处理:

复制代码
<span>private bool </span>HandleTaskFailure(<span>AsyncTask </span>task, <span>Exception </span>ex)
{
<span>this</span>.m_taskExceptions.Add(task.Id, ex);
task.Status = <span>AsyncTaskStatus</span>.Failed;
task.Close();
<span>if </span>(<span>this</span>.CancelOnFailure)
{
<span>lock </span>(<span>this</span>.m_tasks)
{
<span>var </span>runningTasks = <span>this</span>.m_tasks.Values.Where(t => t.Status == <span>AsyncTaskStatus</span>.Executing);
<span>foreach </span>(<span>AsyncTask </span>t <span>in </span>runningTasks)
{
t.Status = <span>AsyncTaskStatus</span>.Cancelling;
}
}
<span>this</span>.Status = <span>DispatchStatus</span>.Cancelling;
<span>this</span>.m_asyncEnumerator.Cancel(<span>null</span>);
<span>return true</span>;
}
<span>else<br></br></span>{
<span>this</span>.Status = <span>DispatchStatus</span>.Waiting;
<span>return false</span>;
}
}

HandleTaskFailure 接受两个参数,一是出错的 AsyncTask 对象,二便是被抛出的异常,并返回一个值表示是否应该立即中止 任务分派。在收集了异常信息,设定了任务状态并将其关闭之后。则会根据构造 AsyncTaskDispatcher 对象时所指定的 CancelOnFailure 值来确定接下来的行为。如果 CancelOnFailure 为 true(表示取消正在运行的对象),便将正在执行的异步任 务全部标记为 Cancelling 状态,并调用 AsyncEnumerator 的 Cancel 方法取消正在执行的异步任务。根据 CancelOnFailure 的不同,AsyncTaskDispatcher 自身状态将会变成 Cancelling(表示正在取消已经分派的任务,这 个状态不会维持长久)或 Waiting(表示正在等待已经分派的任务完成),而 HandleTaskFailure 方法也会返回对应的结果。

之前提到,如果一个异步任务的 End 委托对象被执行时,其 cancelling 参数为 false,并不能说明其他任务没有遇到任何错误。不过从 这段实现中便可得知,开发人员只要配合 AsyncTaskDispatcher 的状态便可确认更多细节:如果此时 AsyncTaskDispatcher 的状态为 Waiting,则表示之前已经有任务失败了。

复制代码
<span>public </span><span>IEnumerator</span><<span>int</span>> GetWorkerEnumerator(<span>IEnumerable</span><<span>AsyncTask</span>> tasksToStart)
{
<span>this</span>.m_runningTaskCount = 0;
<span>foreach </span>(<span>AsyncTask </span>task <span>in </span>tasksToStart)
{
<span>try<br></br></span>{
<span>this</span>.Start(task);
}
<span>catch </span>(<span>AsyncTaskException </span>ex)
{
<span>if </span>(<span>this</span>.HandleTaskFailure(ex.Task, ex.InnerException))
{
<span>this</span>.Status = <span>DispatchStatus</span>.Finished;
<span>yield break</span>;
}
<span>break</span>;
}
}
...
}

以上代码片断为 GetWorkerEnumerator 方法的一部分,用于启动首批被分派的异步任务。我们将遍历每个异步任务,并调用 Start 方法将其启动。Start 方法可能会抛出一个类型为 AsyncTaskException 的内部异常,这个异常不对外释放,它的作用仅仅是为了 方法间的“通信”——“异常”的确是用来表示错误的一种优秀方式,它能够轻易地从逻辑中跳出,并且在合适的地方进行处理。在捕获 AsyncTaskException 异常之后,就会调用之前的 HandleTaskFailure 方法进行处理,如果它返回 true(表示需要立即结束 分派)则使用 yiled break 来跳出 GetWorkerEnumerator 方法,否则便使用 break 来跳出循环——仅仅是跳出循环而并非整个 GetWorkerEnumerator 方法,因为此时并不要求整个分派立即结束,接下来的代码将会等待某个已经发起的任务完成,并进行处理:

复制代码
<span>public </span><span>IEnumerator</span><<span>int</span>> GetWorkerEnumerator(<span>IEnumerable</span><<span>AsyncTask</span>> tasksToStart)
{
...
<span>while </span>(<span>this</span>.m_runningTaskCount > 0)
{
<span>yield return </span>1;
<span>this</span>.m_runningTaskCount--;
<span>IAsyncResult </span>asyncResult = <span>this</span>.m_asyncEnumerator.DequeueAsyncResult();
<span>AsyncTask </span>finishedTask = (<span>AsyncTask</span>)asyncResult.AsyncState;
<span>try<br></br></span>{
finishedTask.End(asyncResult, <span>false</span>, finishedTask.Context);
finishedTask.Status = <span>AsyncTaskStatus</span>.Succeeded;
<span>if </span>(<span>this</span>.Status == <span>DispatchStatus</span>.Dispatching)
{
<span>this</span>.StartSuccessors(finishedTask);
}
finishedTask.Close();
}
<span>catch </span>(<span>AsyncTaskException </span>ex)
{
<span>if </span>(<span>this</span>.HandleTaskFailure(ex.Task, ex.InnerException)) <span>break</span>;
}
<span>catch </span>(<span>Exception </span>ex)
{
<span>if </span>(<span>this</span>.HandleTaskFailure(finishedTask, ex)) <span>break</span>;
}
}
<span>this</span>.Status = <span>DispatchStatus</span>.Finished;
}

在 Start 方法中,如果成功发起了这个异步请求,则会将 m_runningTaskCount 的数值加一,以此表示正在执行(确切地说,是已 经“开始”但还没有进行“结束”处理)的异步任务数目。while 循环体每执行一遍,则表示处理完一个异步请求。因此一般情况下,在没有处理完所有异步请 求的时候 while 循环体会不断执行。与之前 WebRequest 的示例相似,AsyncTask 对象作为 AsyncState 保存在 IAsyncResult 对象中——由于增加了 AsyncTask 这一抽象层,因此我们成功地将所有异步操作的处理方式统一了起来。

接着便依次调用 End 委托(请注意 cancelling 参数为 false)、设置属性、并且在 AsyncTaskDispatcher 为 Dispatching 状态时使用 StartSuccessors 方法来分派可执行的后继任务——AsyncTaskDispatcher 的状态也有可能 是 Waiting,这表示已经有任务出错,此时正在等待已经开始的任务完成,这种情况下就不应该发起新的异步任务了。值得注意的是,用户实现的 End 委托 可能会抛出异常,而 StartSuccessors 方法也可能抛出 AsyncTaskException 以表示某个异步任务启动失败。这时就要使用 HandleTaskFailure 方法来处理异常,并且在合适的时候立即终止整个分派操作(跳出循环)。

跳出 while 循环则表示整个派送操作已经完成,此时自然要将 AsyncTaskDispatcher 的状态设为 Finished。

而最后剩下的,只是用于启动任务的 Start 方法,用于取消任务的 CancelTask 方法,以及发起后继任务的 StartSuccessors 方法了:

复制代码
<span>private void </span>Start(<span>AsyncTask </span>task)
{
<span>DispatchPolicy </span>policy;
<span>try<br></br></span>{
policy = task.Predicate == <span>null </span>? <span>DispatchPolicy</span>.Normal : task.Predicate(task.Context);
}
<span>catch </span>(<span>Exception </span>ex)
{
<span>throw new </span><span>AsyncTaskException</span>(task, ex);
}
<span>if </span>(policy == <span>DispatchPolicy</span>.Normal)
{
<span>try<br></br></span>{
task.Begin(<span>this</span>.m_asyncEnumerator.EndVoid(0, <span>this</span>.CancelTask), task, task.Context);
<span>this</span>.m_runningTaskCount++;
task.Status = <span>AsyncTaskStatus</span>.Executing;
}
<span>catch </span>(<span>Exception </span>ex)
{
<span>throw new </span><span>AsyncTaskException</span>(task, ex);
}
}
<span>else if </span>(policy == <span>DispatchPolicy</span>.MarkAsCancelled)
{
task.Status = <span>AsyncTaskStatus</span>.MarkedAsCancelled;
task.Close();
}
<span>else </span><span>// policy == DispatchPolicy.Succeeded<br></br></span>{
task.Status = <span>AsyncTaskStatus</span>.MarkedAsSucceeded;
<span>this</span>.StartSuccessors(task);
task.Close();
}
}
<span>private void </span>StartSuccessors(<span>AsyncTask </span>task)
{
<span>Func</span><<span>AsyncTask</span>, <span>bool</span>> predicate = t =>
t.Status == <span>AsyncTaskStatus</span>.Pending &&
t.DependenciesSucceeded;
<span>foreach </span>(<span>AsyncTask </span>successor <span>in </span>task.Successors.Where(predicate))
{
<span>this</span>.Start(successor);
}
}
<span>private void </span>CancelTask(<span>IAsyncResult </span>asyncResult)
{
<span>AsyncTask </span>task = (<span>AsyncTask</span>)asyncResult.AsyncState;
<span>try<br></br></span>{
task.End(asyncResult, <span>true</span>, task.Context);
}
<span>catch </span>{ }
<span>finally<br></br></span>{
<span>this</span>.m_runningTaskCount--;
task.Status = <span>AsyncTaskStatus</span>.Cancelled;
task.Close();
}
}

在 Start 方法中,首先获取异步任务的 Predicate 委托的执行结果,并根据这个结果选择是将当前异步任务进行正常处理,还是标记为“取 消”或“成功”。如果应该正常执行,则调用 Begin 委托以发起异步任务,把任务状态标记为 Executing,并将 m_runningTaskCount 计数加一。如果选择标记为“成功”,那么还需要调用 StartSuccessors 来发起后继任务,自然在 StartSuccessors 方法中也是使用 Start 方法来发起单个任务——这是一种间接递归,使用“深度优先”的方式发起所有依赖已经全部完成的异 步请求。由于 Start 方法需要抛出 AsyncTaskException 来表示任务出错,因此在每段由用户实现的逻辑(即 Predicate 和 Begin 委托)执行时都要小心地捕获异常。

值得一提的是,如果需要使用 AsyncEnumerator 的 Cancel 方法来取消已经发起的异步任务,那么在从 AsyncEnumerator 获取异步回调委托(AsyncCallback 对象)的时候还必须提供一个委托作为取消这个任务时的回调函数。这便是 CancelTask 方法的作用,在 CancelTask 方法中也会调用 End 委托(请注意 cancelling 参数为 true),以确保资源能够被正确 释放。执行 End 委托时抛出的任何异常都会被默默“吞下”。一个任务被取消之后,它的状态会被修改为 Cancelled,最后则被关闭。

至此,AsyncTaskDispatcher 已经全部实现到这里就告一段落了。可见,有了 AsyncEnumerator 的协助,实现这样一 个组件并不那么困难,从头至尾总共只有 200 多行代码。事实上,写目前这篇文章所消耗的时间和精力已经数倍于实现一个完整的 AsyncTaskDispatcher。当然,目前的实现远不完美,它虽然较好地实现了有依赖关系的多个异步操作之间的协作调用,但是还缺少一些有用的 功能。例如,您可能需要在分派过程中动态添加新的任务,或是改变任务之间的依赖关系;而且,对于异步操作往往都会指定一个超时时间,可惜目前的 AsyncTaskDispatcher 并不包含超时功能。

您可以在这里获得AsyncTaskDispatcher 的完整代码,并且根据需要添加任何功能——尤其是刚才提到的缺陷,实现起来实际并不困难。

AsyncTaskDispatcher 的使用

现在我们基于 AsyncTaskDispatcher 来重新实现上一篇文章中提到的场景。假设您在开发一个 ASP.NET 页面用于展示一篇文章,其中需要显示各种信息:

  1. 文章内容
  2. 评论信息
  3. 对评论内容进行打分的用户
  4. 打分者的收藏

由于程序架构的原因,数据需要从各个不同服务或数据源中获取(这是个很常见的情况)。因此,程序中已经准备了如下的数据读取接口:

  1. Begin/EndGetContent:根据文章 ID(Int32),获取文章内容(String)
  2. Begin/EndGetComments:根据文章 ID(Int32),获取所有评论(IEnumerable)
  3. Begin/EndGetUsers:根据多个用户 ID(IEnumerable),获取一批用户(Dictionary
  4. Begin/EndGetCommentRaters:根据多个评论 ID(IEnumerable),获取所有打分者(IEnumerable)
  5. Begin/EndGetFavorites:根据多个用户 ID(IEnumerable),获取所有收藏(IEnumerable)

我们可以轻易得出五个异步操作之间的依赖状况:

因此,我们可以编写如下的代码。请注意,我们使用异步任务的 id 来表示它们之间的依赖关系:

复制代码
<span>private void </span>RegisterAsyncTasks(<span>AsyncTaskDispatcher </span>dispatcher, <span>int </span>articleId)
{
<span>// 获取文章内容 <br></br></span><span>string </span>taskGetContent = <span>"get content"</span>;
dispatcher.RegisterTask(
taskGetContent, <span>// 任务 ID </span>
(cb, state, context) => <span>// Begin 委托对象 </span>
{
<span>return </span><span>Service</span>.BeginGetContent(articleId, cb, state);
},
(ar, cancelling, context) => <span>// End 委托对象 </span>
{
<span>this</span>.Content = <span>Service</span>.EndGetContent(ar);
});
<span>// 获取评论 <br></br></span><span>string </span>taskGetComments = <span>"get comments"</span>;
<span>IEnumerable</span><<span>Comment</span>> comments = <span>null</span>;
dispatcher.RegisterTask(
taskGetComments,
(cb, state, context) =>
{
<span>return </span><span>Service</span>.BeginGetComments(articleId, cb, state);
},
(ar, cancelling, context) =>
{
comments = <span>Service</span>.EndGetComments(ar);
});
<span>// 获取评论者信息,并结合评论绑定至控件 <br></br></span><span>string </span>taskGetCommentUsers = <span>"get comment users"</span>;
dispatcher.RegisterTask(
taskGetCommentUsers,
(cb, state, context) =>
{
<span>return </span><span>Service</span>.BeginGetUsers(comments.Select(c => c.UserID), cb, state);
},
(ar, cancelling, context) =>
{
<span>var </span>users = <span>Service</span>.EndGetUsers(ar);
<span>this</span>.rptComments.DataSource =
<span>from </span>c <span>in </span>comments
<span>select new<br></br></span>{
Comment = c,
User = users[c.UserID]
};
<span>this</span>.rptComments.DataBind();
},
taskGetComments); <span>// 指定任务之间的依赖关系 </span>
<span>// 获取评论的打分者,并绑定至控件 <br></br></span><span>string </span>taskGetCommentRaters = <span>"get comment raters"</span>;
<span>IEnumerable</span><<span>User</span>> raters = <span>null</span>;
dispatcher.RegisterTask(
taskGetCommentRaters,
(cb, state, context) =>
{
<span>return </span><span>Service</span>.BeginGetCommentRaters(comments.Select(c => c.CommentID), cb, state);
},
(ar, cancelling, context) =>
{
raters = <span>Service</span>.EndGetCommentRaters(ar);
<span>this</span>.rptRaters.DataSource = raters;
<span>this</span>.rptRaters.DataBind();
},
taskGetComments);
<span>// 获取打分者的收藏内容,并绑定至控件 <br></br></span><span>string </span>taskGetFavorites = <span>"get favorites"</span>;
dispatcher.RegisterTask(
taskGetFavorites,
(cb, state, context) =>
{
<span>return </span><span>Service</span>.BeginGetFavorites(raters.Select(u => u.UserID), cb, state);
},
(ar, cancelling, context) =>
{
<span>this</span>.rptFavorites.DataSource = <span>Service</span>.EndGetFavorites(ar);
<span>this</span>.rptFavorites.DataBind();
},
taskGetCommentRaters);
}

与之前的作法相比,似乎代码量提高了,但是观察后可以发现,多出来的代码其实都是在创建匿名的委托对象,而一个个匿名的委托对象将代码进行了有 条理的分割,并充分利用“匿名方法”形成的闭包,使各委托对象能够共享“调用堆栈”上的数据。现在的实现使用了一种直观的方式表现了各异步操作之间的依赖 关系,代码一下子变得条理清晰,易于维护了。此外还有一点非常重要:虽然异步任务为“并行”执行,但是其中所有的委托对象只会依次调用,因此开发人员可以 放心地编写代码,而不用担心线程安全方面的问题。

我们可以把上面的代码结合 ASP.NET WebForm 页面的异步特性一起使用:

复制代码
<span>protected void </span>Page_Load(<span>object </span>sender, <span>EventArgs </span>e)
{
<span>this</span>.AddOnPreRenderCompleteAsync(
<span>new </span><span>BeginEventHandler</span>(BeginAsyncOperation),
<span>new </span><span>EndEventHandler</span>(EndAsyncOperation));
}
<span>private </span><span>IAsyncResult </span>BeginAsyncOperation(
<span>object </span>sender,
<span>EventArgs </span>e,
<span>AsyncCallback </span>callback,
<span>object </span>extraData)
{
<span>this</span>.m_dispatcher = <span>new </span><span>AsyncTaskDispatcher</span>();
<span>this</span>.RegisterAsyncTasks(<span>this</span>.m_dispatcher, 1);
<span>return this</span>.m_dispatcher.BeginDispatch(callback, extraData);
}
<span>private void </span>EndAsyncOperation(<span>IAsyncResult </span>ar)
{
<span>this</span>.m_dispatcher.EndDispatch(ar);
}

这个页面与 AsyncTaskDispatcher 的源代码同时发布,并且增加了一些简单的时间统计代码。您会发现,原本使用“串行”方式需要 10 秒钟才能完成的任务,使用如今的“并行”方式只需 6 秒钟即可完成:

  1. Get Content: 00:00:00 - 00:00:02.0010000
  2. Get Comments: 00:00:00.0010000 - 00:00:02.0010000
  3. Get Comment Users: 00:00:02.0010000 - 00:00:04.0010000
  4. Get Comment Raters: 00:00:02.0010000 - 00:00:04.0010000
  5. Get Favorites: 00:00:04.0010000 - 00:00:06.0010000

这与上一篇文章中提到的“理想情况” 完全一致:

由于 CCR 和 AsyncEnumerator 难以“并行”地执行异步代码,因此我们需要提出新的解决方案来满足这方面的需求。在 AsyncEnumerator 的基础上开发一个 AsyncTaskDispatcher 并不困难,但是这个组件能够有效地简化多个异步操作之间的协作调 用。一般来说,这样的做法能够使应用程序的性能与伸缩性得到比较明显的提高。AsyncTaskDispatcher 的代码在 MSDN Code Gallery 上完全公开,您可以自由修改,使它更好地满足您的需求。

相关文章简化异步操作(上)──使用CCR 和AsyncEnumerator 简化异步操作


给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2009-02-21 12:065522
用户头像

发布了 157 篇内容, 共 54.4 次阅读, 收获喜欢 6 次。

关注

评论

发布
暂无评论
发现更多内容
简化异步操作(下):构建AsyncTaskDispatcher简化多个异步操作之间的协作调用_.NET_赵劼_InfoQ精选文章