写点什么

C# 8 中的 Async Streams

Learn about the New Async Stream Pattern

  • 2018-09-19
  • 本文字数:10677 字

    阅读完需:约 35 分钟

关键要点

  • 异步编程技术提供了一种提高程序响应能力的方法。
  • Async/Await 模式在 C# 5 中首次亮相,但只能返回单个标量值。
  • C# 8 添加了异步流(Async Streams),允许异步方法返回多个值,从而扩展了其可用性。
  • 异步流提供了一种用于表示异步数据源的绝佳方法。
  • 异步流是 Java 和 JavaScript 中使用的反应式编程模型的替代方案。

C# 5 引入了 Async/Await,用以提高用户界面响应能力和对 Web 资源的访问能力。换句话说,异步方法用于执行不阻塞线程并返回一个标量结果的异步操作。

微软多次尝试简化异步操作,因为 Async/Await 模式易于理解,所以在开发人员当中获得了良好的认可。

现有异步方法的一个重要不足是它必须提供一个标量返回结果(一个值)。比如这个方法 async Task<int> DoAnythingAsync(),DoAnythingAsync 的结果是一个整数(一个值)。

由于存在这个限制,你不能将这个功能与 yield 关键字一起使用,并且也不能将其与 async IEnumerable<int>(返回异步枚举)一起使用。

如果可以将 Async/Await 特性与 yield 操作符一起使用,我们就可以使用非常强大的编程模型(如异步数据拉取或基于拉取的枚举,在 F#中被称为异步序列)。

C# 8 中新提出的 Async Streams 去掉了标量结果的限制,并允许异步方法返回多个结果。

这个变更将使异步模式变得更加灵活,这样就可以按照延迟异步序列的方式从数据库中获取数据,或者按照异步序列的方式下载数据(这些数据在可用时以块的形式返回)。

例如:

复制代码
foreach await (var streamChunck in asyncStreams)
{
Console.WriteLine($“Received data count = {streamChunck.Count}”);
}

Reactive Extensions(Rx)是解决异步编程问题的另一种方法。Rx 越来越受到开发人员的欢迎。很多其他编程语言(如 Java 和 JavaScript)已经实现了这种技术(RxJava、RxJS)。Rx 基于推送式编程模型(Push Programming Model),也称为反应式编程。反应式编程是事件驱动编程的一种类型,它处理的是数据而不是通知。

通常,在推送式编程模型中,你不需要控制 Publisher。数据被异步推送到队列中,消费者在数据到达时消费数据。与 Rx 不同,Async Streams 可以按需被调用,并生成多个值,直到达到枚举的末尾。

在本文中,我将对拉取模型和推送模型进行比较,并演示每一种技术各自的适用场景。我将使用很多代码示例向你展示整个概念和它们的优点,最后,我将讨论 Async Streams 功能,并向你展示示例代码。

拉取式编程模型与推送式编程模型

图 -1- 拉取式编程模型与推送式编程模型

我使用的例子是著名的生产者和消费者问题,但在我们的场景中,生产者不是生成食物,而是生成数据,消费者消费的是生成的数据,如图 -1 所示。拉取模型很容易理解。消费者询问并拉取生产者的数据。另一种方法是使用推送模型。生产者将数据发布到队列中,消费者通过订阅队列来接收所需的数据。

拉取模型更合适“快生产者和慢消费者”的场景,因为消费者可以从生产者那里拉取其所需的数据,避免消费者出现溢出。推送模型更适合“慢生产者和快消费者”的场景,因为生产者可以将数据推送给消费者,避免消费者不必要的等待时间。

Rx 和 Akka Streams (流式编程模型)使用了回压技术(一种流量控制机制)。它使用拉取模型或推送模型来解决上面提到的生产者和消费者问题。

在下面的示例中,我使用了一个慢消费者从快生产者那里异步拉取数据序列。消费者在处理完一个元素后,会向生产者请求下一个元素,依此类推,直到到达序列的末尾。

动机和背景

要了解我们为什么需要 Async Streams,让我们来看下面的代码。

复制代码
// 对参数 (count) 进行循环相加操作
static int SumFromOneToCount(int count)
{
ConsoleExt.WriteLine("SumFromOneToCount called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
}

方法调用:

复制代码
const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

我们可以通过使用 yield 运算符让这个方法变成惰性的,如下所示。

复制代码
static IEnumerable<int> SumFromOneToCountYield(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountYield called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
yield return sum;
}
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{
ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

正如你在输出窗口中看到的那样,结果被分成几个部分返回,而不是作为一个值返回。以上显示的累积结果被称为惰性枚举。但是,仍然存在一个问题,即 sum 方法阻塞了代码的执行。如果你查看线程,可以看到所有东西都在主线程中运行。

现在,让我们将 async 应用于第一个方法 SumFromOneToCount 上(没有 yield 关键字)。

复制代码
static async Task<int> SumFromOneToCountAsync(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountAsync called!");
var result = await Task.Run(() =>
{
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
});
return result;
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// 相加操作是异步进行得!这样还不够,我们要求不仅是异步的,还必须是惰性的。
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

我们可以看到计算过程是在另一个线程中运行,但结果仍然是作为一个值返回!

想象一下,我们可以按照命令式风格将惰性枚举(yield return)与异步方法结合起来。这种组合称为 Async Streams。这是 C# 8 中新提出的功能。这个新功能为我们提供了一种很好的技术来解决拉取式编程模型问题,例如从网站下载数据或从文件或数据库中读取记录。

让我们尝试使用当前的 C# 版本。我将 async 关键字添加到 SumFromOneToCountYield 方法中,如下所示。

图 -2 组合使用 async 关键字和 yield 发生错误

我们试着将 async 添加到 SumFromOneToCountYield,但直接出现错误,如上所示!

让我们试试别的吧。我们可以将 IEnumerable 放入任务中并删除 yield 关键字,如下所示:

复制代码
static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!");
var collection = new Collection<int>();
var result = await Task.Run(() =>
{
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
collection.Add(sum);
}
return collection;
});
return result;
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");
var scs = await SumFromOneToCountTaskIEnumerable(count);
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!");
foreach (var sc in scs)
{
// 这不是我们想要的,结果将作为块返回!!!!
ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");
}
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

输出:

可以看到,我们异步计算所有的内容,但仍然存在一个问题。结果(所有结果都在集合中累积)作为一个块返回,但这不是我们想要的惰性行为,我们的目标是将惰性行为与异步计算风格相结合。

为了实现所需的行为,你需要使用外部库,如 Ix(Rx 的一部分),或者你必须使用新提出的 C#特性 Async Streams。

回到我们的代码示例。我使用了一个外部库来显示异步行为。

复制代码
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{
ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");
await sequence.ForEachAsync(value =>
{
ConsoleExt.WriteLineAsync($"Consuming the value: {value}");
// 模拟延迟!
Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});
}
static IEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{
ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
// 模拟延迟!
Task.Delay(TimeSpan.FromSeconds(0.5)).Wait();
yield return sum;
}
}

调用方法:

复制代码
const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!");
// 启动一个新任务,用于生成异步数据序列!
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable();
ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");
// 启动另一个新任务,用于消费异步数据序列!
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));
// 出于演示目的,等待任务完成!
consumingTask.Wait();
ConsoleExt.WriteLineAsync("Async Streams Demo Done!");

输出:

最后,我们实现了我们想要的行为!我们可以在枚举上进行异步迭代。

源代码在这里

客户端 / 服务器端的异步拉取

我将使用一个更现实的例子来解释这个概念。客户端 / 服务器端架构是演示这一功能优势的绝佳方法。

客户端 / 服务器端同步调用

客户端向服务器端发送请求,客户端必须等待(客户端被阻塞),直到服务器端做出响应,如图 -3 所示。

图 -3 同步数据拉取,客户端等待请求完成

异步数据拉取

客户端发出数据请求然后继续执行其他操作。一旦有数据到达,客户端就继续处理达到的数据。

图 -4 异步数据拉取,客户端可以在请求数据时执行其他操作

异步序列数据拉取

客户端发出数据块请求,然后继续执行其他操作。一旦数据块到达,客户端就处理接收到的数据块并询问下一个数据块,依此类推,直到达到最后一个数据块为止。这正是 Async Streams 想法的来源。图 -5 显示了客户端可以在收到任何数据时执行其他操作或处理数据块。

图 -5 异步序列数据拉取(Async Streams),客户端未被阻塞!

Async Streams

与 IEnumerable<T> 和 IEnumerator<T> 类似,Async Streams 提供了两个新接口 IAsyncEnumerable<T> 和 IAsyncEnumerator<T>,定义如下:

复制代码
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
Task<bool> MoveNextAsync();
T Current { get; }
}
// Async Streams Feature 可以被异步销毁
public interface IAsyncDisposable
{
Task DiskposeAsync();
}

Jonathan Allen 已经在 InfoQ 网站上介绍过这个主题,我不想在这里再重复一遍,所以我建议你也阅读一下他的文章

关键在于 Task<bool> MoveNextAsync() 的返回值(从 bool 改为 Task<bool>,bool IEnumerator.MoveNext())。这样可以让整个计算和迭代都保持异步。大多数情况下,这仍然是拉取模型,即使它是异步的。IAsyncDisposable 接口可用于进行异步清理。有关异步的更多信息,请点击此处

语法

最终语法应如下所示:

复制代码
foreach await (var dataChunk in asyncStreams)
{
// 处理数据块或做一些其他的事情!
}

如上所示,我们现在可以按顺序计算多个值,而不只是计算单个值,同时还能够等待其他异步操作结束。

重写微软的示例

我重写了微软的演示代码,你可以从我的GitHub 下载相关代码

这个例子背后的想法是创建一个大的 MemoryStream(20000 字节的数组),并按顺序异步迭代集合中的元素或 MemoryStream。每次迭代从数组中拉取 8K 字节。

在 (1) 处,我们创建了一个大字节数组并填充了一些虚拟值。在 (2) 处,我们定义了一个叫作 checksum 的变量。我们将使用 checksum 来确保计算的总和是正确的。数组和 checksum 位于内存中,并通过一个元组返回,如 (3) 所示。

在 (4) 处,AsEnumarble(或者叫 AsAsyncEnumarble)是一种扩展方法,用于模拟由 8KB 块组成的异步流( (6) 处所示的 BufferSize = 8000)。

通常,你不必继承 IAsyncEnumerable,但在上面的示例中,微软这样做是为了简化演示,如 (5) 处所示。

(7) 处是“foreach”,它从异步内存流中拉取 8KB 的块数据。当消费者(foreach 代码块)准备好接收更多数据时,拉取过程是顺序进行的,然后它从生产者(内存流数组)中拉取更多的数据。最后,当迭代完成后,应用程序将’c’的校验和与 checksum 进行比较,如果它们匹配,就打印出“Checksums match!”,如 (8) 所示!

微软演示的输出窗口:

概要

我们已经讨论过 Async Streams,它是一种出色的异步拉取技术,可用于进行生成多个值的异步计算。

Async Streams 背后的编程概念是异步拉取模型。我们请求获取序列的下一个元素,并最终得到答复。这与 IObservable<T> 的推送模型不同,后者生成与消费者状态无关的值。Async Streams 提供了一种表示异步数据源的绝佳方法,例如,当消费者尚未准备好处理更多数据时。示例包含了 Web 应用程序或从数据库中读取记录。

我已经演示了如何生成异步枚举数据,并使用外部异步序列库来消费枚举数据。我也演示了如何将这个功能用于从 Web 站点下载内容。最后,我们看到了新的 Async Streams 语法和一个完整的示例,该示例是基于微软的 Build Demo Code( 2018 年 5 月 7 日至 9 日,西雅图,华盛顿州)。

关于作者

Bassam Alugili 是 STRATEC AG 的高级软件专家和数据库专家。STRATEC 是全球领先的全自动分析仪系统、实验室数据管理软件和智能耗材的合作伙伴。

查看英文原文 Async Streams in C# 8

2018-09-19 18:323093
用户头像

发布了 731 篇内容, 共 455.7 次阅读, 收获喜欢 2003 次。

关注

评论

发布
暂无评论
发现更多内容

C++编译器和链接器的完全指南

小万哥

c++ 程序员 面试 后端 开发

23年最新Java岗常见面试题及答案(1000道),90% 的公司都会问到

Java你猿哥

Java MySQL zookeeper JVM java面试

2023最新后端中大厂面经&在面试过程中如何反问?

Java你猿哥

Java ssm java面试 面试官 Java面经

由Elasticsearch7.8评分脚本引起的一个索引迁移解决方法

北桥苏

elasticsearch Logstash ELK Stack

独一份,15年经验汇聚而成的《SpringBoot“踩坑”手册》首次开源

做梦都在改BUG

Java spring Spring Boot 框架

深入解析Java适配器模式:将接口转换为你所需要的形式

做梦都在改BUG

Java 适配器

Windows下hadoop环境搭建之NameNode启动报错

北桥苏

大数据 hadoop

使用Spring Boot接入ChatGPT

Java你猿哥

Java spring Spring Boot ssm ChatGPT

创意世界在 Photoshop 上运行~

真大的脸盆

Mac ps Mac 软件 Photoshop 2022下载

程序员晋升指南!13年顶级架构设计经验的锦囊妙计与实践分享

Java你猿哥

Java 架构 ssm 架构设计 架构师

毫不夸张的说,这份SpringBoot学习指南能解决你遇到的98%的问题

做梦都在改BUG

Java spring Spring Boot 框架

【RabbitMQ】| 带你 (超详细) 从0到1使用SpringBoot操作RabbitMQ

Java你猿哥

Java spring Spring Boot ssm RabbitMQ

如何通过Logstash将MySQL数据同步到ElasticSearch

北桥苏

php MySQL elasticsearch Logstash

PoseiSwap:合规、隐私与支持更广泛的资产

鳄鱼视界

Logstash同步MySQL一对多关联表到Elasticsearch父子文档

北桥苏

elasticsearch Logstash ELK Stack

Logstash同步MySQL关联表到Elasticsearch的嵌套文档中

北桥苏

elasticsearch Logstash ELK Stack

逆袭!阿里专家手码23版Java面试三件套,Github星标直线狂飙

Java你猿哥

Java 微服务 面经 算法题 java核心知识点

干货力荐!京东首席架构师:亿级流量架构的核心技术文档

做梦都在改BUG

Java 架构 亿级流量

一篇文章教你在业务开发中高效玩转TDD(测试驱动开发)

Java你猿哥

Java ssm TDD

必知必会的JavaScript前端面试题篇(二),不看后悔!

Immerse

Kubernetes CNI之Flannel网络模型分析

王玉川

Kubernetes 云原生 flannel VXLAN cni

单调栈模板总结及应用

timerring

算法

简单聊聊Java中线程安全有哪些实现思路?

做梦都在改BUG

Java 多线程

Github标星90K!不愧是阿里大牛珍藏的LeetCode题解全彩小册

Java你猿哥

面试 算法 LeetCode ssm 算法题

Logstash如何批量同步MySQL多表到ElasticSearch

北桥苏

elasticsearch Logstash ELK Stack

windows下Hive搭建踩坑汇总

北桥苏

hadoop hive hql

最简单的canal 1.1.6服务搭建方法

北桥苏

elasticsearch canal

Go语言中如何通过接口来实现单一职责原则

Jack

如何优化Golang中重复的错误处理

乌龟哥哥

三周年连更

系列课程:从零开始接触人工智能大模型(介绍)

茶桁

深入探秘OpenTelemetry Agent奇特的muzzle机制

骑牛上青山

Java Java Agent OpenTelemetry

C# 8中的Async Streams_.NET_Bassam Alugili_InfoQ精选文章