写点什么

简化异步操作(下):构建 AsyncTaskDispatcher 简化多个异步操作之间的协作调用

  • 2009-02-21
  • 本文字数:13059 字

    阅读完需:约 43 分钟

前言

在《简化异步操作(上):使用CCR 和AsyncEnumerator 简化异步操作》 一文中,我们谈到了异步编程的重要性,使用异步操作的难点,以及如何使用CCR 和AsyncEnumerator 来简化异步操作的调用。有了这些组件的辅 助,异步操作也可以真正使用传统的开发方式来编写了——这意味着各种缺陷,例如无法在“堆栈”中保存临时变量,无法try…catch…finally 和 using 等问题都不复存在了。这些组件让异步编程一下子美好了许多。

不过,现有的辅助还不足以面对一些复杂的场景。例如,要使多个有依赖的异步操作尽可能的“并行”,我们还需要构建额外的解决方案。在这片文章 里,我们将编写一个AsyncTaskDispatcher 来简化此类场景下的开发。自然,您也可以下载它的代码后加以修改,使它能更进一步满足您的需 求。

AsyncTaskDispatcher 的设计与实现

AsyncTaskDispatcher 的目标是为了简化多个异步操作之间的协调调用,使得各异步操作之间能够尽可能的“并行化”。在我的设想中,AsyncTaskDispatcher 应该这样使用:

  1. 首先创建一个 AsyncTaskDispatcher 对象。
  2. 在 AsyncTaskDispatcher 对象中注册各种异步任务,同时指定各任务之间的依赖关系。
  3. 使用 AsyncTaskDispatcher 对象的 BeginDispatch 和 EndDispath 方法组成了一个 APM 模式,把所有异步任务作为一个整体来调用。
  4. 如果任何一个异步任务抛出异常:
  • AsyncTaskDispatcher 停止分派,并取消正在执行的任务,整个异步调用立即完成。
  • AsyncTaskDispatcher 停止分派,并等待正在执行的任务完成后整个异步调用才算结束。

在经过了思考和对比之后,最终我选择基于 AsyncEnumerator 构造 AsyncTaskDispatcher,因为:

  • AsyncEnumerator 久经测试,可以让我把注意力集中在分配异步任务的逻辑上,而不需要过渡关注异步操作中的各种稀奇古怪的问题。
  • AsyncEnumerator 的使用模型保证了一点:除了异步操作本身之外,其他的逻辑都可以使用串行的方式来执行。这样进一步避免了多线程环境下的各种问题,尤其是在控制各种状态的时候这点更为重要(当然 CCR 也有这个特点)。
  • AsyncEnumerator 个头小巧,仅数十 K 大小,如果基于 CCR 开发,则需要一个几兆的再分发包(redistribution package)。
  • AsyncEnumerator 自带了 BeginExecute 和 EndExecute 方法组成的 APM 模式,可以直接使用
  • AsyncEnumerator 已经包含内置的“取消”功能。
  • AsyncEnumerator 已经内置 SynchronizationContext,能够在 WinForm 或 WPF 界面中直接使用。

那么我们就开始 AsyncTaskDispatcher 的实现。首先我们要确定一个异步任务包含哪些信息。的是“那些东西构成了一个异步任务”,或者说,“构造一个异步任务需要哪些信息”。AsyncTask 的定义如下:

复制代码
<span>public class </span><span>AsyncTask<br></br></span>{
<span>private </span><span>AsyncTask</span>[] m_dependencies;
<span>private </span><span>HashSet</span><<span>AsyncTask</span>> m_successors;
<span>public object </span>Id { <span>get</span>; <span>private set</span>; }
<span>public object </span>Context { <span>get</span>; <span>private set</span>; }
<span>public </span><span>AsyncTaskStatus </span>Status { <span>get</span>; <span>internal set</span>; }
<span>internal </span><span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> Predicate { <span>get</span>; <span>private set</span>; }
<span>internal </span><span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> Begin { <span>get</span>; <span>private set</span>; }
<span>internal </span><span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> End { <span>get</span>; <span>private set</span>; }
<span>private </span>AsyncTask() { }
<span>internal </span>AsyncTask(
<span>object </span>id,
<span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> predicate,
<span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> begin,
<span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> end,
<span>object </span>context,
<span>IEnumerable</span><<span>AsyncTask</span>> dependencies)
{
<span>this</span>.Id = id;
<span>this</span>.Predicate = predicate;
<span>this</span>.Begin = begin;
<span>this</span>.End = end;
<span>this</span>.Context = context;
<span>this</span>.Status = <span>AsyncTaskStatus</span>.Pending;
<span>this</span>.m_successors = <span>new </span><span>HashSet</span><<span>AsyncTask</span>>();
<span>this</span>.m_dependencies = dependencies.Where(d => d != <span>null</span>).Distinct().ToArray();
<span>foreach </span>(<span>var </span>task <span>in this</span>.m_dependencies)
{
task.m_successors.Add(<span>this</span>);
}
}
<span>internal </span><span>IEnumerable</span><<span>AsyncTask</span>> Successors
{
<span>get<br></br></span>{
<span>return this</span>.m_successors;
}
}
<span>internal bool </span>DependenciesSucceeded
{
<span>get<br></br></span>{
<span>return this</span>.m_dependencies.All(d =>
d.Status == <span>AsyncTaskStatus</span>.Succeeded ||
d.Status == <span>AsyncTaskStatus</span>.MarkedAsSucceeded);
}
}
<span>internal void </span>Close()
{
<span>this</span>.Predicate = <span>null</span>;
<span>this</span>.Begin = <span>null</span>;
<span>this</span>.End = <span>null</span>;
<span>this</span>.m_dependencies = <span>null</span>;
<span>this</span>.m_successors = <span>null</span>;
}
<span>internal </span><span>AsyncTask </span>MakeSnapshot()
{
<span>return new </span><span>AsyncTask<br></br></span>{
Id = <span>this</span>.Id,
Status = <span>this</span>.Status
};
}
}

以下是构造 AsyncTask 对象时需要提供的信息:

  • id:一个 object 类型对象,用于标识一个异步任务,在同一个 AsyncTaskDispatcher 中唯一。
  • context:一个 object 类型的上下文对象,它是注册异步任务时提供的任意对象,AsyncTaskDispatcher 会在其他操作中提供该对象。
  • predicate:一个 Func
  • begin:一个 Func
  • end:一个 Action
  • dependencies:当前任务所依赖的其他异步任务

此外,还有一些成员用于辅助开发:

  • Status 属性:表明该异步任务的状态,它们是:
    • MarkedAsSucceeded:表示该任务在 predicate 委托执行时被标记为“成功”
    • Succeeded:表示该任务正常运行成功
    • Failed:表示该任务执行失败
    • Pending:表示该任务还处于等待状态
    • MarkedAsCancelled:表示该任务在 predicate 委托执行时被标记为“取消”
    • Cancelling:表明该任务已经发起,但因为其他任务的失败正处于取消阶段,等待结束。
    • Cancelled:表明该任务已经发起,但因为其他任务的失败而被取消。
    • Executing:表明该任务正在执行
  • Successors 属性:该任务的所有后继异步任务
  • DependenciesSucceeded 属性:表示该任务所依赖的异步任务是否全部成功
  • Close 方法:清除该任务中所有非公开数据,使它们在 AsyncTask 被外部引用的情况下也能尽早回收
  • MakeSnapshot 方法:返回表示当前 AsyncTask 对象状态的快照对象

AsyncTask 的大部分成员非常容易理解。只有一点值得一提,就是在某个异步任务执行失败的情况下 AsyncTaskDispatcher 的行为如何。我实现的 AsyncTaskDispatcher 在构造时可以接受一个参数 cancelOnFailure,表明某个任务出错之后是立即结束 整个分派过程(cancelOnFailure 为 true),还是需要等待正在运行的任务完成(cancelOnFailure 为 false)。

复制代码
<span>public class </span><span>AsyncTaskDispatcher<br></br></span>{
<span>private </span><span>Dictionary</span><<span>object</span>, <span>AsyncTask</span>> m_tasks;
<span>public bool </span>CancelOnFailure { <span>get</span>; <span>private set</span>; }
<span>public </span><span>DispatchStatus </span>Status { <span>get</span>; <span>private set</span>; }
...
<span>public </span>AsyncTaskDispatcher()
: <span>this</span>(<span>false</span>)
{ }
<span>public </span>AsyncTaskDispatcher(<span>bool </span>cancelOnFailure)
{
<span>this</span>.CancelOnFailure = cancelOnFailure;
<span>this</span>.m_tasks = <span>new </span><span>Dictionary</span><<span>object</span>, <span>AsyncTask</span>>();
<span>this</span>.Status = <span>DispatchStatus</span>.NotStarted;
}
...
}

如果 cancelOnFailure 为 true,作为一个整体的异步分派操作在遇到错误时会立即完成,外部逻辑便可继续执行下去。但是对于那些 已经开始的异步任务,它们的 End 委托在接下来的某一时刻依旧会被调用,否则就可能会造成资源泄露。一个异步任务的 End 委托可以通过它的 cancelling 参数来判断当前调用是一个正常的结束,还是一个取消过程。如果 cancelling 为 true,则说明整个分派过程已经结束,而其中 某个异步任务出现错误;如果 cancelling 为 false,则表明整个分派过程还在进行,当前异步任务正常结束——但是这并不表示此时整体分派过程中 没有出现问题,有可能某个异步任务已经出错,但是由于 cancelOnFailure 为 false,AsyncTaskDispatcher 正在等待其他 异步任务(也包括当前任务)的“正常结束”。

在 AsyncTaskDispatcher 中,我们将维护一个字典,以便通过 id 来查找一个 AsyncTask 对象。开发人员可以通过 RegisterTask 方法来创建一个 AsyncTask 对象:

复制代码
<span>public </span><span>AsyncTask </span>RegisterTask(
<span>object </span>id,
<span>Func</span><<span>object</span>, <span>DispatchPolicy</span>> predicate,
<span>Func</span><<span>AsyncCallback</span>, <span>object</span>, <span>object</span>, <span>IAsyncResult</span>> begin,
<span>Action</span><<span>IAsyncResult</span>, <span>bool</span>, <span>object</span>> end,
<span>object </span>context,
<span>params object</span>[] dependencies)
{
<span>lock </span>(<span>this</span>.m_tasks)
{
<span>if </span>(<span>this</span>.Status != <span>DispatchStatus</span>.NotStarted)
{
<span>throw new </span><span>InvalidOperationException</span>(<span>"Task can only be registered before dispatching."</span>);
}
<span>this</span>.CheckRegisterTaskArgs(id, begin, end, dependencies);
<span>AsyncTask </span>task = <span>new </span><span>AsyncTask</span>(
id,
predicate,
begin,
end,
context,
dependencies.Select(d => <span>this</span>.m_tasks[d]));
<span>this</span>.m_tasks.Add(id, task);
<span>return </span>task;
}
}

AsyncTaskDispatcher 在各方法内部使用了 lock,这是种非常粗略的锁行为——虽然确保了它在多线程环境下也能保持状态的一 致,但是其性能却比精打细算的锁控制要差一些。不过这毕竟不是目前的关键,更何况需要在多线程环境下使用 AsyncTaskDispatcher 的场景少 之又少。RegisterTask 方法有多个重载,它们都将调用直接发送给上面的 RegisterTask 实现。这个方法会根据传入的依赖 id 找出新任务 所依赖的异步任务,构造一个 AsyncTask 对象,并将其放入字典中。

为了形成一个标准的 APM 模式,AsyncTaskDispatcher 提供了 BeginDispatch 和 EndDispatch 方法:

复制代码
<span>private </span><span>Dictionay</span><<span>object</span>, <span>Exception</span>> m_taskExceptions;
<span>public </span><span>IAsyncResult </span>BeginDispatch(<span>AsyncCallback </span>asyncCallback, <span>object </span>asyncState)
{
<span>lock </span>(<span>this</span>.m_tasks)
{
<span>if </span>(<span>this</span>.Status != <span>DispatchStatus</span>.NotStarted)
{
<span>throw new </span><span>InvalidOperationException</span>(<span>"An AsyncTaskDispatcher can be started only once."</span>);
}
<span>this</span>.Status = <span>DispatchStatus</span>.Dispatching;
}
<span>this</span>.m_taskExceptions = <span>new </span><span>Dictionay</span><<span>object</span>, <span>Exception</span>>();
<span>var </span>taskToStart = <span>this</span>.m_tasks.Values.Where(t => t.DependenciesSucceeded).ToList();
<span>IEnumerator</span><<span>int</span>> enumerator = <span>this</span>.GetWorkerEnumerator(taskToStart);
<span>this</span>.m_asyncEnumerator = <span>new </span><span>AsyncEnumerator</span>();
<span>return this</span>.m_asyncEnumerator.BeginExecute(enumerator, asyncCallback, asyncState);
}
<span>public void </span>EndDispatch(<span>IAsyncResult </span>asyncResult)
{
<span>this</span>.m_asyncEnumerator.EndExecute(asyncResult);
<span>if </span>(<span>this</span>.m_taskExceptions.Count > 0)
{
<span>throw new </span><span>DispatchException</span>(<span>this</span>.GetTaskSnapshots(), <span>this</span>.m_taskExceptions);
}
}

在 BeginDispatch 方法中,将会选择出所有不存在依赖的任务,它们会被作为首批发起的异步任务。它们会被传入 GetWorkerEnumerator 方法用于构造一个 IEnumerator 对象,并将其交给 AsyncEnumerator 执行。整个执行过程中遇到的异常都会被收集起来,并且在 EndDispatch 执行中抛出一个包含这些异常信息的 DispatchException。异 步任务抛出的每个异常都会使用 HandleTaskFailure 方法进行处理:

复制代码
<span>private bool </span>HandleTaskFailure(<span>AsyncTask </span>task, <span>Exception </span>ex)
{
<span>this</span>.m_taskExceptions.Add(task.Id, ex);
task.Status = <span>AsyncTaskStatus</span>.Failed;
task.Close();
<span>if </span>(<span>this</span>.CancelOnFailure)
{
<span>lock </span>(<span>this</span>.m_tasks)
{
<span>var </span>runningTasks = <span>this</span>.m_tasks.Values.Where(t => t.Status == <span>AsyncTaskStatus</span>.Executing);
<span>foreach </span>(<span>AsyncTask </span>t <span>in </span>runningTasks)
{
t.Status = <span>AsyncTaskStatus</span>.Cancelling;
}
}
<span>this</span>.Status = <span>DispatchStatus</span>.Cancelling;
<span>this</span>.m_asyncEnumerator.Cancel(<span>null</span>);
<span>return true</span>;
}
<span>else<br></br></span>{
<span>this</span>.Status = <span>DispatchStatus</span>.Waiting;
<span>return false</span>;
}
}

HandleTaskFailure 接受两个参数,一是出错的 AsyncTask 对象,二便是被抛出的异常,并返回一个值表示是否应该立即中止 任务分派。在收集了异常信息,设定了任务状态并将其关闭之后。则会根据构造 AsyncTaskDispatcher 对象时所指定的 CancelOnFailure 值来确定接下来的行为。如果 CancelOnFailure 为 true(表示取消正在运行的对象),便将正在执行的异步任 务全部标记为 Cancelling 状态,并调用 AsyncEnumerator 的 Cancel 方法取消正在执行的异步任务。根据 CancelOnFailure 的不同,AsyncTaskDispatcher 自身状态将会变成 Cancelling(表示正在取消已经分派的任务,这 个状态不会维持长久)或 Waiting(表示正在等待已经分派的任务完成),而 HandleTaskFailure 方法也会返回对应的结果。

之前提到,如果一个异步任务的 End 委托对象被执行时,其 cancelling 参数为 false,并不能说明其他任务没有遇到任何错误。不过从 这段实现中便可得知,开发人员只要配合 AsyncTaskDispatcher 的状态便可确认更多细节:如果此时 AsyncTaskDispatcher 的状态为 Waiting,则表示之前已经有任务失败了。

复制代码
<span>public </span><span>IEnumerator</span><<span>int</span>> GetWorkerEnumerator(<span>IEnumerable</span><<span>AsyncTask</span>> tasksToStart)
{
<span>this</span>.m_runningTaskCount = 0;
<span>foreach </span>(<span>AsyncTask </span>task <span>in </span>tasksToStart)
{
<span>try<br></br></span>{
<span>this</span>.Start(task);
}
<span>catch </span>(<span>AsyncTaskException </span>ex)
{
<span>if </span>(<span>this</span>.HandleTaskFailure(ex.Task, ex.InnerException))
{
<span>this</span>.Status = <span>DispatchStatus</span>.Finished;
<span>yield break</span>;
}
<span>break</span>;
}
}
...
}

以上代码片断为 GetWorkerEnumerator 方法的一部分,用于启动首批被分派的异步任务。我们将遍历每个异步任务,并调用 Start 方法将其启动。Start 方法可能会抛出一个类型为 AsyncTaskException 的内部异常,这个异常不对外释放,它的作用仅仅是为了 方法间的“通信”——“异常”的确是用来表示错误的一种优秀方式,它能够轻易地从逻辑中跳出,并且在合适的地方进行处理。在捕获 AsyncTaskException 异常之后,就会调用之前的 HandleTaskFailure 方法进行处理,如果它返回 true(表示需要立即结束 分派)则使用 yiled break 来跳出 GetWorkerEnumerator 方法,否则便使用 break 来跳出循环——仅仅是跳出循环而并非整个 GetWorkerEnumerator 方法,因为此时并不要求整个分派立即结束,接下来的代码将会等待某个已经发起的任务完成,并进行处理:

复制代码
<span>public </span><span>IEnumerator</span><<span>int</span>> GetWorkerEnumerator(<span>IEnumerable</span><<span>AsyncTask</span>> tasksToStart)
{
...
<span>while </span>(<span>this</span>.m_runningTaskCount > 0)
{
<span>yield return </span>1;
<span>this</span>.m_runningTaskCount--;
<span>IAsyncResult </span>asyncResult = <span>this</span>.m_asyncEnumerator.DequeueAsyncResult();
<span>AsyncTask </span>finishedTask = (<span>AsyncTask</span>)asyncResult.AsyncState;
<span>try<br></br></span>{
finishedTask.End(asyncResult, <span>false</span>, finishedTask.Context);
finishedTask.Status = <span>AsyncTaskStatus</span>.Succeeded;
<span>if </span>(<span>this</span>.Status == <span>DispatchStatus</span>.Dispatching)
{
<span>this</span>.StartSuccessors(finishedTask);
}
finishedTask.Close();
}
<span>catch </span>(<span>AsyncTaskException </span>ex)
{
<span>if </span>(<span>this</span>.HandleTaskFailure(ex.Task, ex.InnerException)) <span>break</span>;
}
<span>catch </span>(<span>Exception </span>ex)
{
<span>if </span>(<span>this</span>.HandleTaskFailure(finishedTask, ex)) <span>break</span>;
}
}
<span>this</span>.Status = <span>DispatchStatus</span>.Finished;
}

在 Start 方法中,如果成功发起了这个异步请求,则会将 m_runningTaskCount 的数值加一,以此表示正在执行(确切地说,是已 经“开始”但还没有进行“结束”处理)的异步任务数目。while 循环体每执行一遍,则表示处理完一个异步请求。因此一般情况下,在没有处理完所有异步请 求的时候 while 循环体会不断执行。与之前 WebRequest 的示例相似,AsyncTask 对象作为 AsyncState 保存在 IAsyncResult 对象中——由于增加了 AsyncTask 这一抽象层,因此我们成功地将所有异步操作的处理方式统一了起来。

接着便依次调用 End 委托(请注意 cancelling 参数为 false)、设置属性、并且在 AsyncTaskDispatcher 为 Dispatching 状态时使用 StartSuccessors 方法来分派可执行的后继任务——AsyncTaskDispatcher 的状态也有可能 是 Waiting,这表示已经有任务出错,此时正在等待已经开始的任务完成,这种情况下就不应该发起新的异步任务了。值得注意的是,用户实现的 End 委托 可能会抛出异常,而 StartSuccessors 方法也可能抛出 AsyncTaskException 以表示某个异步任务启动失败。这时就要使用 HandleTaskFailure 方法来处理异常,并且在合适的时候立即终止整个分派操作(跳出循环)。

跳出 while 循环则表示整个派送操作已经完成,此时自然要将 AsyncTaskDispatcher 的状态设为 Finished。

而最后剩下的,只是用于启动任务的 Start 方法,用于取消任务的 CancelTask 方法,以及发起后继任务的 StartSuccessors 方法了:

复制代码
<span>private void </span>Start(<span>AsyncTask </span>task)
{
<span>DispatchPolicy </span>policy;
<span>try<br></br></span>{
policy = task.Predicate == <span>null </span>? <span>DispatchPolicy</span>.Normal : task.Predicate(task.Context);
}
<span>catch </span>(<span>Exception </span>ex)
{
<span>throw new </span><span>AsyncTaskException</span>(task, ex);
}
<span>if </span>(policy == <span>DispatchPolicy</span>.Normal)
{
<span>try<br></br></span>{
task.Begin(<span>this</span>.m_asyncEnumerator.EndVoid(0, <span>this</span>.CancelTask), task, task.Context);
<span>this</span>.m_runningTaskCount++;
task.Status = <span>AsyncTaskStatus</span>.Executing;
}
<span>catch </span>(<span>Exception </span>ex)
{
<span>throw new </span><span>AsyncTaskException</span>(task, ex);
}
}
<span>else if </span>(policy == <span>DispatchPolicy</span>.MarkAsCancelled)
{
task.Status = <span>AsyncTaskStatus</span>.MarkedAsCancelled;
task.Close();
}
<span>else </span><span>// policy == DispatchPolicy.Succeeded<br></br></span>{
task.Status = <span>AsyncTaskStatus</span>.MarkedAsSucceeded;
<span>this</span>.StartSuccessors(task);
task.Close();
}
}
<span>private void </span>StartSuccessors(<span>AsyncTask </span>task)
{
<span>Func</span><<span>AsyncTask</span>, <span>bool</span>> predicate = t =>
t.Status == <span>AsyncTaskStatus</span>.Pending &&
t.DependenciesSucceeded;
<span>foreach </span>(<span>AsyncTask </span>successor <span>in </span>task.Successors.Where(predicate))
{
<span>this</span>.Start(successor);
}
}
<span>private void </span>CancelTask(<span>IAsyncResult </span>asyncResult)
{
<span>AsyncTask </span>task = (<span>AsyncTask</span>)asyncResult.AsyncState;
<span>try<br></br></span>{
task.End(asyncResult, <span>true</span>, task.Context);
}
<span>catch </span>{ }
<span>finally<br></br></span>{
<span>this</span>.m_runningTaskCount--;
task.Status = <span>AsyncTaskStatus</span>.Cancelled;
task.Close();
}
}

在 Start 方法中,首先获取异步任务的 Predicate 委托的执行结果,并根据这个结果选择是将当前异步任务进行正常处理,还是标记为“取 消”或“成功”。如果应该正常执行,则调用 Begin 委托以发起异步任务,把任务状态标记为 Executing,并将 m_runningTaskCount 计数加一。如果选择标记为“成功”,那么还需要调用 StartSuccessors 来发起后继任务,自然在 StartSuccessors 方法中也是使用 Start 方法来发起单个任务——这是一种间接递归,使用“深度优先”的方式发起所有依赖已经全部完成的异 步请求。由于 Start 方法需要抛出 AsyncTaskException 来表示任务出错,因此在每段由用户实现的逻辑(即 Predicate 和 Begin 委托)执行时都要小心地捕获异常。

值得一提的是,如果需要使用 AsyncEnumerator 的 Cancel 方法来取消已经发起的异步任务,那么在从 AsyncEnumerator 获取异步回调委托(AsyncCallback 对象)的时候还必须提供一个委托作为取消这个任务时的回调函数。这便是 CancelTask 方法的作用,在 CancelTask 方法中也会调用 End 委托(请注意 cancelling 参数为 true),以确保资源能够被正确 释放。执行 End 委托时抛出的任何异常都会被默默“吞下”。一个任务被取消之后,它的状态会被修改为 Cancelled,最后则被关闭。

至此,AsyncTaskDispatcher 已经全部实现到这里就告一段落了。可见,有了 AsyncEnumerator 的协助,实现这样一 个组件并不那么困难,从头至尾总共只有 200 多行代码。事实上,写目前这篇文章所消耗的时间和精力已经数倍于实现一个完整的 AsyncTaskDispatcher。当然,目前的实现远不完美,它虽然较好地实现了有依赖关系的多个异步操作之间的协作调用,但是还缺少一些有用的 功能。例如,您可能需要在分派过程中动态添加新的任务,或是改变任务之间的依赖关系;而且,对于异步操作往往都会指定一个超时时间,可惜目前的 AsyncTaskDispatcher 并不包含超时功能。

您可以在这里获得AsyncTaskDispatcher 的完整代码,并且根据需要添加任何功能——尤其是刚才提到的缺陷,实现起来实际并不困难。

AsyncTaskDispatcher 的使用

现在我们基于 AsyncTaskDispatcher 来重新实现上一篇文章中提到的场景。假设您在开发一个 ASP.NET 页面用于展示一篇文章,其中需要显示各种信息:

  1. 文章内容
  2. 评论信息
  3. 对评论内容进行打分的用户
  4. 打分者的收藏

由于程序架构的原因,数据需要从各个不同服务或数据源中获取(这是个很常见的情况)。因此,程序中已经准备了如下的数据读取接口:

  1. Begin/EndGetContent:根据文章 ID(Int32),获取文章内容(String)
  2. Begin/EndGetComments:根据文章 ID(Int32),获取所有评论(IEnumerable)
  3. Begin/EndGetUsers:根据多个用户 ID(IEnumerable),获取一批用户(Dictionary
  4. Begin/EndGetCommentRaters:根据多个评论 ID(IEnumerable),获取所有打分者(IEnumerable)
  5. Begin/EndGetFavorites:根据多个用户 ID(IEnumerable),获取所有收藏(IEnumerable)

我们可以轻易得出五个异步操作之间的依赖状况:

因此,我们可以编写如下的代码。请注意,我们使用异步任务的 id 来表示它们之间的依赖关系:

复制代码
<span>private void </span>RegisterAsyncTasks(<span>AsyncTaskDispatcher </span>dispatcher, <span>int </span>articleId)
{
<span>// 获取文章内容 <br></br></span><span>string </span>taskGetContent = <span>"get content"</span>;
dispatcher.RegisterTask(
taskGetContent, <span>// 任务 ID </span>
(cb, state, context) => <span>// Begin 委托对象 </span>
{
<span>return </span><span>Service</span>.BeginGetContent(articleId, cb, state);
},
(ar, cancelling, context) => <span>// End 委托对象 </span>
{
<span>this</span>.Content = <span>Service</span>.EndGetContent(ar);
});
<span>// 获取评论 <br></br></span><span>string </span>taskGetComments = <span>"get comments"</span>;
<span>IEnumerable</span><<span>Comment</span>> comments = <span>null</span>;
dispatcher.RegisterTask(
taskGetComments,
(cb, state, context) =>
{
<span>return </span><span>Service</span>.BeginGetComments(articleId, cb, state);
},
(ar, cancelling, context) =>
{
comments = <span>Service</span>.EndGetComments(ar);
});
<span>// 获取评论者信息,并结合评论绑定至控件 <br></br></span><span>string </span>taskGetCommentUsers = <span>"get comment users"</span>;
dispatcher.RegisterTask(
taskGetCommentUsers,
(cb, state, context) =>
{
<span>return </span><span>Service</span>.BeginGetUsers(comments.Select(c => c.UserID), cb, state);
},
(ar, cancelling, context) =>
{
<span>var </span>users = <span>Service</span>.EndGetUsers(ar);
<span>this</span>.rptComments.DataSource =
<span>from </span>c <span>in </span>comments
<span>select new<br></br></span>{
Comment = c,
User = users[c.UserID]
};
<span>this</span>.rptComments.DataBind();
},
taskGetComments); <span>// 指定任务之间的依赖关系 </span>
<span>// 获取评论的打分者,并绑定至控件 <br></br></span><span>string </span>taskGetCommentRaters = <span>"get comment raters"</span>;
<span>IEnumerable</span><<span>User</span>> raters = <span>null</span>;
dispatcher.RegisterTask(
taskGetCommentRaters,
(cb, state, context) =>
{
<span>return </span><span>Service</span>.BeginGetCommentRaters(comments.Select(c => c.CommentID), cb, state);
},
(ar, cancelling, context) =>
{
raters = <span>Service</span>.EndGetCommentRaters(ar);
<span>this</span>.rptRaters.DataSource = raters;
<span>this</span>.rptRaters.DataBind();
},
taskGetComments);
<span>// 获取打分者的收藏内容,并绑定至控件 <br></br></span><span>string </span>taskGetFavorites = <span>"get favorites"</span>;
dispatcher.RegisterTask(
taskGetFavorites,
(cb, state, context) =>
{
<span>return </span><span>Service</span>.BeginGetFavorites(raters.Select(u => u.UserID), cb, state);
},
(ar, cancelling, context) =>
{
<span>this</span>.rptFavorites.DataSource = <span>Service</span>.EndGetFavorites(ar);
<span>this</span>.rptFavorites.DataBind();
},
taskGetCommentRaters);
}

与之前的作法相比,似乎代码量提高了,但是观察后可以发现,多出来的代码其实都是在创建匿名的委托对象,而一个个匿名的委托对象将代码进行了有 条理的分割,并充分利用“匿名方法”形成的闭包,使各委托对象能够共享“调用堆栈”上的数据。现在的实现使用了一种直观的方式表现了各异步操作之间的依赖 关系,代码一下子变得条理清晰,易于维护了。此外还有一点非常重要:虽然异步任务为“并行”执行,但是其中所有的委托对象只会依次调用,因此开发人员可以 放心地编写代码,而不用担心线程安全方面的问题。

我们可以把上面的代码结合 ASP.NET WebForm 页面的异步特性一起使用:

复制代码
<span>protected void </span>Page_Load(<span>object </span>sender, <span>EventArgs </span>e)
{
<span>this</span>.AddOnPreRenderCompleteAsync(
<span>new </span><span>BeginEventHandler</span>(BeginAsyncOperation),
<span>new </span><span>EndEventHandler</span>(EndAsyncOperation));
}
<span>private </span><span>IAsyncResult </span>BeginAsyncOperation(
<span>object </span>sender,
<span>EventArgs </span>e,
<span>AsyncCallback </span>callback,
<span>object </span>extraData)
{
<span>this</span>.m_dispatcher = <span>new </span><span>AsyncTaskDispatcher</span>();
<span>this</span>.RegisterAsyncTasks(<span>this</span>.m_dispatcher, 1);
<span>return this</span>.m_dispatcher.BeginDispatch(callback, extraData);
}
<span>private void </span>EndAsyncOperation(<span>IAsyncResult </span>ar)
{
<span>this</span>.m_dispatcher.EndDispatch(ar);
}

这个页面与 AsyncTaskDispatcher 的源代码同时发布,并且增加了一些简单的时间统计代码。您会发现,原本使用“串行”方式需要 10 秒钟才能完成的任务,使用如今的“并行”方式只需 6 秒钟即可完成:

  1. Get Content: 00:00:00 - 00:00:02.0010000
  2. Get Comments: 00:00:00.0010000 - 00:00:02.0010000
  3. Get Comment Users: 00:00:02.0010000 - 00:00:04.0010000
  4. Get Comment Raters: 00:00:02.0010000 - 00:00:04.0010000
  5. Get Favorites: 00:00:04.0010000 - 00:00:06.0010000

这与上一篇文章中提到的“理想情况” 完全一致:

由于 CCR 和 AsyncEnumerator 难以“并行”地执行异步代码,因此我们需要提出新的解决方案来满足这方面的需求。在 AsyncEnumerator 的基础上开发一个 AsyncTaskDispatcher 并不困难,但是这个组件能够有效地简化多个异步操作之间的协作调 用。一般来说,这样的做法能够使应用程序的性能与伸缩性得到比较明显的提高。AsyncTaskDispatcher 的代码在 MSDN Code Gallery 上完全公开,您可以自由修改,使它更好地满足您的需求。

相关文章简化异步操作(上)──使用CCR 和AsyncEnumerator 简化异步操作


给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家加入到 InfoQ 中文站用户讨论组中与我们的编辑和其他读者朋友交流。

2009-02-21 12:065588
用户头像

发布了 157 篇内容, 共 55.0 次阅读, 收获喜欢 6 次。

关注

评论

发布
暂无评论
发现更多内容

IPQ8072|XGS-PON|Dual Band 10GbE Wifi6 Industrial SBC DR8072V01

wallyslilly

云端利器!香港云主机带你畅享强大的云计算能力!

一只扑棱蛾子

香港云主机

企业转型必修课,用友BIP成为企业数智化首选

用友BIP

国产替代

MQTT 订阅标识符详解

EMQ映云科技

mqtt 订阅标识符

对线面试官 Redis | 十 Redis集群模式

派大星

Java 面试题

完成等保测评后有合格证书吗?是什么样的?

行云管家

等保测评 等保2.0 等级测评

CST电磁仿真软件要怎么学?

思茂信息

操作 仿真软件 cst cst使用教程 cst仿真软件

Github实时数据分析与可视化训练营火热开启!免费领取5000元云上资源

阿里云大数据AI技术

MySQL 开发者 分布式计算 数据可视化 大数据、

2023年中国(深圳)国际耐火材料产业展会

秋硕展览

Spring 能解决所有循环依赖吗?

江南一点雨

Java spring

点云标注的未来发展与技术革新

来自四九城儿

升级数智底座是数智化2.0时代的核心诉求

用友BIP

数智底座

当你成为一个Tech Lead

码猿外

技术管理 Tech Lead

引领AI变革,九章云极DataCanvas公司重磅发布AIFS+DataPilot

九章云极DataCanvas

北京汽车:传统车厂向“用户服务”转型的新范本

字节跳动数据平台

大数据 用户

智能分析云 | 穿透式数据分析赋能数智国资

用友BIP

数据分析

Win11 搭建SD WebUI环境 | 社区征文

IT蜗壳-Tango

AIGC Stable Diffusion 年中技术盘点 SD

浅谈一下自动化运维优点和缺点,哪款工具好?

行云管家

自动化 IT运维 自动化运维

尝试7分钟内上线一个网站,这个工具太赞了!

互联网工科生

低代码 搭建平台 搭建网站

航空机场行业如何绘就全面预算降本增效新画卷?

用友BIP

全面预算

交付和发布的区别,你真的懂吗?

老张

持续集成 线上发布 版本火车

融云「北极星」数据监控平台:数据可视通晓全局,精准分析定位问题

融云 RongCloud

监控 数据 IM RTC 融云

@Import :Spring Bean模块装配的艺术

华为云开发者联盟

spring 开发 华为云 华为云开发者联盟 企业号 7 月 PK 榜

Brotli-压缩算法的潮流 | 社区征文

不叫猫先生

Brotli 压缩算法 年中技术盘点

“多巴胺设计” 来袭,TDesign 主题中心上线

TDesign

设计 主题色 开源系统

基于PaddleOCR与OpenVINO™的结构化输出Pipeline

飞桨PaddlePaddle

人工智能 百度 paddle 飞桨 百度飞桨

转型过程“千变万化”,怎样的数智平台才能够帮助企业顺利转型?

用友BIP

数智底座

Eplan是什么软件?学习Eplan软件的几个关键要点

智造软件

汽车电气架构 CAE CAE软件 EPLAN 电气辅助设计

软件测试/测试开发丨Linux进程与线程学习笔记

测试人

Python Linux 程序员 软件测试

简化异步操作(下):构建AsyncTaskDispatcher简化多个异步操作之间的协作调用_.NET_赵劼_InfoQ精选文章