写点什么

C# 8 中的 Async Streams

Learn about the New Async Stream Pattern

  • 2018-09-19
  • 本文字数:10677 字

    阅读完需:约 35 分钟

关键要点

  • 异步编程技术提供了一种提高程序响应能力的方法。
  • Async/Await 模式在 C# 5 中首次亮相,但只能返回单个标量值。
  • C# 8 添加了异步流(Async Streams),允许异步方法返回多个值,从而扩展了其可用性。
  • 异步流提供了一种用于表示异步数据源的绝佳方法。
  • 异步流是 Java 和 JavaScript 中使用的反应式编程模型的替代方案。

C# 5 引入了 Async/Await,用以提高用户界面响应能力和对 Web 资源的访问能力。换句话说,异步方法用于执行不阻塞线程并返回一个标量结果的异步操作。

微软多次尝试简化异步操作,因为 Async/Await 模式易于理解,所以在开发人员当中获得了良好的认可。

现有异步方法的一个重要不足是它必须提供一个标量返回结果(一个值)。比如这个方法 async Task<int> DoAnythingAsync(),DoAnythingAsync 的结果是一个整数(一个值)。

由于存在这个限制,你不能将这个功能与 yield 关键字一起使用,并且也不能将其与 async IEnumerable<int>(返回异步枚举)一起使用。

如果可以将 Async/Await 特性与 yield 操作符一起使用,我们就可以使用非常强大的编程模型(如异步数据拉取或基于拉取的枚举,在 F#中被称为异步序列)。

C# 8 中新提出的 Async Streams 去掉了标量结果的限制,并允许异步方法返回多个结果。

这个变更将使异步模式变得更加灵活,这样就可以按照延迟异步序列的方式从数据库中获取数据,或者按照异步序列的方式下载数据(这些数据在可用时以块的形式返回)。

例如:

复制代码
foreach await (var streamChunck in asyncStreams)
{
Console.WriteLine($“Received data count = {streamChunck.Count}”);
}

Reactive Extensions(Rx)是解决异步编程问题的另一种方法。Rx 越来越受到开发人员的欢迎。很多其他编程语言(如 Java 和 JavaScript)已经实现了这种技术(RxJava、RxJS)。Rx 基于推送式编程模型(Push Programming Model),也称为反应式编程。反应式编程是事件驱动编程的一种类型,它处理的是数据而不是通知。

通常,在推送式编程模型中,你不需要控制 Publisher。数据被异步推送到队列中,消费者在数据到达时消费数据。与 Rx 不同,Async Streams 可以按需被调用,并生成多个值,直到达到枚举的末尾。

在本文中,我将对拉取模型和推送模型进行比较,并演示每一种技术各自的适用场景。我将使用很多代码示例向你展示整个概念和它们的优点,最后,我将讨论 Async Streams 功能,并向你展示示例代码。

拉取式编程模型与推送式编程模型

图 -1- 拉取式编程模型与推送式编程模型

我使用的例子是著名的生产者和消费者问题,但在我们的场景中,生产者不是生成食物,而是生成数据,消费者消费的是生成的数据,如图 -1 所示。拉取模型很容易理解。消费者询问并拉取生产者的数据。另一种方法是使用推送模型。生产者将数据发布到队列中,消费者通过订阅队列来接收所需的数据。

拉取模型更合适“快生产者和慢消费者”的场景,因为消费者可以从生产者那里拉取其所需的数据,避免消费者出现溢出。推送模型更适合“慢生产者和快消费者”的场景,因为生产者可以将数据推送给消费者,避免消费者不必要的等待时间。

Rx 和 Akka Streams (流式编程模型)使用了回压技术(一种流量控制机制)。它使用拉取模型或推送模型来解决上面提到的生产者和消费者问题。

在下面的示例中,我使用了一个慢消费者从快生产者那里异步拉取数据序列。消费者在处理完一个元素后,会向生产者请求下一个元素,依此类推,直到到达序列的末尾。

动机和背景

要了解我们为什么需要 Async Streams,让我们来看下面的代码。

复制代码
// 对参数 (count) 进行循环相加操作
static int SumFromOneToCount(int count)
{
ConsoleExt.WriteLine("SumFromOneToCount called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
}

方法调用:

复制代码
const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

我们可以通过使用 yield 运算符让这个方法变成惰性的,如下所示。

复制代码
static IEnumerable<int> SumFromOneToCountYield(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountYield called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
yield return sum;
}
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{
ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

正如你在输出窗口中看到的那样,结果被分成几个部分返回,而不是作为一个值返回。以上显示的累积结果被称为惰性枚举。但是,仍然存在一个问题,即 sum 方法阻塞了代码的执行。如果你查看线程,可以看到所有东西都在主线程中运行。

现在,让我们将 async 应用于第一个方法 SumFromOneToCount 上(没有 yield 关键字)。

复制代码
static async Task<int> SumFromOneToCountAsync(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountAsync called!");
var result = await Task.Run(() =>
{
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
});
return result;
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// 相加操作是异步进行得!这样还不够,我们要求不仅是异步的,还必须是惰性的。
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

我们可以看到计算过程是在另一个线程中运行,但结果仍然是作为一个值返回!

想象一下,我们可以按照命令式风格将惰性枚举(yield return)与异步方法结合起来。这种组合称为 Async Streams。这是 C# 8 中新提出的功能。这个新功能为我们提供了一种很好的技术来解决拉取式编程模型问题,例如从网站下载数据或从文件或数据库中读取记录。

让我们尝试使用当前的 C# 版本。我将 async 关键字添加到 SumFromOneToCountYield 方法中,如下所示。

图 -2 组合使用 async 关键字和 yield 发生错误

我们试着将 async 添加到 SumFromOneToCountYield,但直接出现错误,如上所示!

让我们试试别的吧。我们可以将 IEnumerable 放入任务中并删除 yield 关键字,如下所示:

复制代码
static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!");
var collection = new Collection<int>();
var result = await Task.Run(() =>
{
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
collection.Add(sum);
}
return collection;
});
return result;
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");
var scs = await SumFromOneToCountTaskIEnumerable(count);
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!");
foreach (var sc in scs)
{
// 这不是我们想要的,结果将作为块返回!!!!
ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");
}
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

可以看到,我们异步计算所有的内容,但仍然存在一个问题。结果(所有结果都在集合中累积)作为一个块返回,但这不是我们想要的惰性行为,我们的目标是将惰性行为与异步计算风格相结合。

为了实现所需的行为,你需要使用外部库,如 Ix(Rx 的一部分),或者你必须使用新提出的 C#特性 Async Streams。

回到我们的代码示例。我使用了一个外部库来显示异步行为。

复制代码
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{
ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");
await sequence.ForEachAsync(value =>
{
ConsoleExt.WriteLineAsync($"Consuming the value: {value}");
// 模拟延迟!
Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});
}
static IEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{
ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
// 模拟延迟!
Task.Delay(TimeSpan.FromSeconds(0.5)).Wait();
yield return sum;
}
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!");
// 启动一个新任务,用于生成异步数据序列!
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable();
ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");
// 启动另一个新任务,用于消费异步数据序列!
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));
// 出于演示目的,等待任务完成!
consumingTask.Wait();
ConsoleExt.WriteLineAsync("Async Streams Demo Done!");

输出:

最后,我们实现了我们想要的行为!我们可以在枚举上进行异步迭代。

源代码在这里

客户端 / 服务器端的异步拉取

我将使用一个更现实的例子来解释这个概念。客户端 / 服务器端架构是演示这一功能优势的绝佳方法。

客户端 / 服务器端同步调用

客户端向服务器端发送请求,客户端必须等待(客户端被阻塞),直到服务器端做出响应,如图 -3 所示。

图 -3 同步数据拉取,客户端等待请求完成

异步数据拉取

客户端发出数据请求然后继续执行其他操作。一旦有数据到达,客户端就继续处理达到的数据。

图 -4 异步数据拉取,客户端可以在请求数据时执行其他操作

异步序列数据拉取

客户端发出数据块请求,然后继续执行其他操作。一旦数据块到达,客户端就处理接收到的数据块并询问下一个数据块,依此类推,直到达到最后一个数据块为止。这正是 Async Streams 想法的来源。图 -5 显示了客户端可以在收到任何数据时执行其他操作或处理数据块。

图 -5 异步序列数据拉取(Async Streams),客户端未被阻塞!

Async Streams

与 IEnumerable<T> 和 IEnumerator<T> 类似,Async Streams 提供了两个新接口 IAsyncEnumerable<T> 和 IAsyncEnumerator<T>,定义如下:

复制代码
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
Task<bool> MoveNextAsync();
T Current { get; }
}
// Async Streams Feature 可以被异步销毁
public interface IAsyncDisposable
{
Task DiskposeAsync();
}

Jonathan Allen 已经在 InfoQ 网站上介绍过这个主题,我不想在这里再重复一遍,所以我建议你也阅读一下他的文章

关键在于 Task<bool> MoveNextAsync() 的返回值(从 bool 改为 Task<bool>,bool IEnumerator.MoveNext())。这样可以让整个计算和迭代都保持异步。大多数情况下,这仍然是拉取模型,即使它是异步的。IAsyncDisposable 接口可用于进行异步清理。有关异步的更多信息,请点击此处

语法

最终语法应如下所示:

复制代码
foreach await (var dataChunk in asyncStreams)
{
// 处理数据块或做一些其他的事情!
}

如上所示,我们现在可以按顺序计算多个值,而不只是计算单个值,同时还能够等待其他异步操作结束。

重写微软的示例

我重写了微软的演示代码,你可以从我的GitHub 下载相关代码

这个例子背后的想法是创建一个大的 MemoryStream(20000 字节的数组),并按顺序异步迭代集合中的元素或 MemoryStream。每次迭代从数组中拉取 8K 字节。

在 (1) 处,我们创建了一个大字节数组并填充了一些虚拟值。在 (2) 处,我们定义了一个叫作 checksum 的变量。我们将使用 checksum 来确保计算的总和是正确的。数组和 checksum 位于内存中,并通过一个元组返回,如 (3) 所示。

在 (4) 处,AsEnumarble(或者叫 AsAsyncEnumarble)是一种扩展方法,用于模拟由 8KB 块组成的异步流( (6) 处所示的 BufferSize = 8000)。

通常,你不必继承 IAsyncEnumerable,但在上面的示例中,微软这样做是为了简化演示,如 (5) 处所示。

(7) 处是“foreach”,它从异步内存流中拉取 8KB 的块数据。当消费者(foreach 代码块)准备好接收更多数据时,拉取过程是顺序进行的,然后它从生产者(内存流数组)中拉取更多的数据。最后,当迭代完成后,应用程序将’c’的校验和与 checksum 进行比较,如果它们匹配,就打印出“Checksums match!”,如 (8) 所示!

微软演示的输出窗口:

概要

我们已经讨论过 Async Streams,它是一种出色的异步拉取技术,可用于进行生成多个值的异步计算。

Async Streams 背后的编程概念是异步拉取模型。我们请求获取序列的下一个元素,并最终得到答复。这与 IObservable<T> 的推送模型不同,后者生成与消费者状态无关的值。Async Streams 提供了一种表示异步数据源的绝佳方法,例如,当消费者尚未准备好处理更多数据时。示例包含了 Web 应用程序或从数据库中读取记录。

我已经演示了如何生成异步枚举数据,并使用外部异步序列库来消费枚举数据。我也演示了如何将这个功能用于从 Web 站点下载内容。最后,我们看到了新的 Async Streams 语法和一个完整的示例,该示例是基于微软的 Build Demo Code( 2018 年 5 月 7 日至 9 日,西雅图,华盛顿州)。

关于作者

Bassam Alugili 是 STRATEC AG 的高级软件专家和数据库专家。STRATEC 是全球领先的全自动分析仪系统、实验室数据管理软件和智能耗材的合作伙伴。

查看英文原文 Async Streams in C# 8

2018-09-19 18:323216
用户头像

发布了 731 篇内容, 共 461.4 次阅读, 收获喜欢 2004 次。

关注

评论

发布
暂无评论
发现更多内容

芯片设计工程师必看:借助Perforce Helix Core和Helix IPLM提高IP重用率,简化设计流程并确保产品质量

龙智—DevSecOps解决方案

Perforce Helix Core 版本控制工具 IP管理

Animoca Brands 投资了Penpad, Scroll 生态再迎壮大

股市老人

支付系统概述(十二):支付成功率

agnostic

支付系统设计与实现

新兴游戏引擎Godot vs. 主流游戏引擎Unity和虚幻引擎,以及版本控制工具Perforce Helix Core如何与其高效集成

龙智—DevSecOps解决方案

Unity 虚幻引擎 游戏开发引擎 Godot

为“风”转“液”加速,一台宁畅服务器的“全液冷”突围

脑极体

AI

嵌入式软件的自动化测试工具TESSY:产品概述、使用场景及功能价值介绍

龙智—DevSecOps解决方案

集成测试 测试 单元测试 嵌入式软件测试 Tessy

使用ReadyAPI自动化测试工具,模拟高负载场景,准确测试API性能,确保你的App不宕机

龙智—DevSecOps解决方案

UI自动化测试

京东JD商品sku信息API返回值详解:轻松获取商品规格

技术冰糖葫芦

API boy api 货币化 API 接口 pinduoduo API

京东JD商品详情API返回值解析:商品数据快速提取

技术冰糖葫芦

API boy API 接口 pinduoduo API

LED显示屏与LCD显示屏的9个区别

Dylan

LED显示屏 全彩LED显示屏 led显示屏厂家 lcd

算子开发到推理加速,一位00后开发者的“升级打怪”之旅

Alter

开发者 昇腾AI大赛 #大模型

2024-04-27:用go语言,在一个下标从 1 开始的 8 x 8 棋盘上,有三个棋子,分别是白色车、白色象和黑色皇后。 给定这三个棋子的位置,请计算出要捕获黑色皇后所需的最少移动次数。 需要注意

福大大架构师每日一题

福大大架构师每日一题

Animoca Brands 投资了Penpad, Scroll 生态再迎壮大

BlockChain先知

支付系统概述(十一):运营能力

agnostic

支付系统设计与实现

Penpad 再获 Animoca Brands 投资,全新生态历程

加密眼界

适用于芯片行业的开发及管理工具:版本控制、持续集成、代码分析及项目管理工具介绍

龙智—DevSecOps解决方案

项目管理 Jira Atlassian Helix Core 版本控制工具 芯片研发

【“AI”协同 创未来】线下研讨会预告:Jira、Confluence及Jira Service Management等Atlassian产品及其AI功能深度解读

龙智—DevSecOps解决方案

可替代IBM DOORS的现代化需求管理解决方案Jama Connect,支持数据迁移及重构、实时可追溯性、简化合规流程

龙智—DevSecOps解决方案

需求管理工具 jama IBM DOORS

01 RCLI

独钓寒江

​Rust

物联网智能手表架构实践

智慧源点

Penpad 再获 Animoca Brands 投资,全新生态历程

石头财经

和鲸科技出席第五届空间数据智能学术会议,执行总裁殷自强受邀发表主题报告

ModelWhale

人工智能 数据模型 空间数据

Lucene环境下基于NGram分词的中文高级检索

alexgaoyh

中文分词 lucene 高级检索 ngram 知网高级检索

C# 8中的Async Streams_.NET_Bassam Alugili_InfoQ精选文章