我之前为 InfoQ 撰写的文章,《超越 F#基础——工作流》,介绍了工作流这种新语言特性。这篇文章将探讨一下称之为异步工作流的工作流特性的有趣用法,它的目的就是要简化.NET 的异步编程模型。
什么是 F#?
F#是一个针对.NET 框架的静态类型化函数式编程语言。它具有 OCaml 常见的核心语言功能,以及其他流行的函数式编程语言的一些特性,并从很多其他编程语言获取了一些思想,包括 Haskell、Erlang 和 C#。简而言之,这意味着 F#是一个具有优雅语法的编程语言,当我们能交互式地执行代码的时候感觉有点像脚本编程,但是它都是类型安全且有着良好性能的编译语言。这篇文章不是 F#的介绍文章,不过网络上有很多资源可以让我们容易地学习 F#。可以参阅在我之前文章中的侧边栏所附加的一个“F#资源”列表。
异步编程模型
当使用.NET BCL 的所有 I/O 操作的时候,有两个模型可用,同步模型和异步模型。异步模型是通过一个通用的编程模式来支持的,即成对出现的 BeginXXX 和 EndXXX 方法。程序员通过调用 BeginXXX 来开始异步操作,这个方法开始执行后,就马上返回。而,程序员在得到异步操作已经结束的提醒后,必须调用 EndXXX 方法完成整个过程。
以我的经验来看,大部分程序员都喜欢用同步模型,这是由于它比较简单,以及在 BCL 中的很多类只支持同步模型;不过在很多情况下,异步编程模型能产生响应更灵敏更具伸缩性的应用程序。为了阐述一下异步模型的困难之处,让我们来看一个简单的例子:打开一个文件,从文件中读取字节,下面是一些实现同步处理的代码:
#light<br></br><span color="#0000c0">open</span> System.IO<p><span color="#0000c0">let</span> openFile() =</p><br></br><span color="#0000c0">use</span> fs = new FileStream(<span color="#7f0055">@"C:\Program Files\Internet Explorer\iexplore.exe"</span>,<br></br> FileMode.Open, FileAccess.Read, FileShare.Read)<br></br><span color="#0000c0">let</span> data = Array.create (int fs.Length) 0uy<br></br><span color="#0000c0">let</span> bytesRead = fs.Read(data, 0, data.Length)<br></br> printfn <span color="#7f0055">"Read Bytes: %i, First bytes were: %i %i %i ..."</span> <br></br> bytesRead data.(1) data.(2) data.(3)<p> openFile()</p>
BCL 提供了一个更简单的方法来完成上面的事情,即是“File.ReadAllBytes”,但是这个最简单的方式却具有一个等效的异步使用方式。从文件中读取内容的操作是非常直截了当的:我们打开一个文件流对象,创建一个数组来放置数据,最后把所有数据都读到这个数组中。注意,在创建文件流对象的时候我们是如何使用“use”赋值语法的,这种用法大致上和 C#的 using 语句相当,即意味着文件流对象将在离开作用域的时候被销毁掉。
现在,让我们来看一下使用异步编程模型的等效代码:
#light<br></br><span color="#0000c0">open</span> System.IO<p><span color="#0000c0">let</span> openFile() =</p><br></br><span color="#0000c0">let</span> fs = <span color="#0000c0">new</span> FileStream(<span color="#7f0055">@"C:\Program Files\Internet Explorer\iexplore.exe"</span>,<br></br> FileMode.Open, FileAccess.Read, FileShare.Read)<br></br><span color="#0000c0">let</span> data = Array.create (int fs.Length) 0uy<br></br><span color="#0000c0">let</span> callback ar =<br></br><span color="#0000c0">let</span> bytesRead = fs.EndRead(ar)<br></br> fs.Dispose()<br></br> printfn <span color="#7f0055">"Read Bytes: %i, First bytes were: %i %i %i ..."</span><br></br> bytesRead data.(1) data.(2) data.(3)<br></br> fs.BeginRead(data, 0, data.Length, (<span color="#0000c0">fun</span> ar <span color="#0000c0">-></span> callback ar), <span color="#0000c0">null</span>) |> ignore<p> openFile()</p>
这个简单的例子中,打开文件的代码还是容易理解的,不过接下来事情就很明显地变得愈来愈复杂了。所有步骤的第一步和之前的是非常的类似:打开文件流对象,创建一个用于放置数据的数组。不过,从这里开始事情就变得有得糟糕了,我们需要定义一个回调来处理“EndRead”方法的调用,这个回调需要伴随着一个状态对象(在这里,由于我们不需要状态对象,所以传递的是空)传递到“BeginRead”方法中。也要重点注意的一个地方就是,我们不能再使用 “use”语法了,这是因为文件流对象在调用“BeginRead”方法的时候已经离开了作用域了,这样就意味着如果使用“use”赋值语法,那么它不久就可能被销毁掉,则当“EndRead”方法被调用的时候它可能已经不可用了。这也意味着,我们需要添加对“Dispose”方法的调用,这样,我们会就丧失在最后代码块(Finally Block)中调用“Dispose”方法的安全措施。虽然对于一个打开文件的简单例子来说,这些额外的复杂操作似乎还是比较合理,但当你添加更多功能和更进一步的异步读取能力到应用程序中的时候,你不久就会麻烦不断了。
异步工作流
异步工作流就是为了应付这个特殊的问题而引入的。那么,现在让我们来看看异步工作流的版本:
#light<p><span color="#0000c0">open</span> System.IO</p><br></br><span color="#0000c0">open</span> Microsoft.FSharp.Control.CommonExtensions<p><span color="#0000c0">let</span> openFile =</p><br></br> async { <span color="#0000c0">use</span> fs = <span color="#0000c0">new</span> FileStream(<span color="#7f0055">@"C:\Program Files\Internet Explorer\iexplore.exe"</span>,<br></br> FileMode.Open, FileAccess.Read, FileShare.Read)<br></br><span color="#0000c0">let</span> data = Array.create (int fs.Length) 0uy<br></br><span color="#0000c0">let!</span> bytesRead = fs.ReadAsync(data, 0, data.Length)<br></br><span color="#0000c0">do</span> printfn <span color="#7f0055">"Read Bytes: %i, First bytes were: %i %i %i ..."</span><br></br> bytesRead data.(1) data.(2) data.(3) }<p> Async.Run openFile</p>
关于工作流版本要注意最重要的事情就是,它和同步版本的唯一区别只是一小点字符。在“async{…}”工作流声明语句的加入后,更重要的是,我们要改变从文件读取内容的代码为:
<span color="#0000c0">let!</span> bytesRead = fs.ReadAsync(data, 0, data.Length)
我们添加一个感叹号(!)到 let 关键字中,现在我们就可以调用“ReadAsync”而非“Read”了。在异步工作流中,“let!”告诉我们,接下来的赋值过程将使用异步的方式,而 ReadAsync 函数为 BeginRead 和 EndRead 方法如何被调用提供了规范。注意,如果我们不使用异步函数,那么我们就会得到一个编译类型错误。那么,“ReadAsync”来自哪里?它不是一个存在于“FileStream”类中的函数。细心的读者可能会注意到这句代码"open Microsoft.FSharp.Control.CommonExtensions",它打开了一个包含很多 F#类型控制的命名空间。这个东西和 C# 的扩展方法很类似,允许你为现存的类添加额外的函数,而命名空间 “Microsoft.FSharp.Control.CommonExtensions”为异步工作流的使用提供了很多扩展。
还需要注意的地方就是,我们依旧使用了“use”赋值语句来销毁文件对象,即使这意味着文件对象将会在另外一个线程中被销毁,我们也不用太担心,它能正常地工作的。
其他明显的改变就是,我们如何执行这些代码,“fileOpen”标识符不会马上打开文件,它只是一个工作流,一个等待执行的动作。为了执行这个动作,我们需要使用“Async.Run”函数,它会执行一个单独的工作流并等待其完成后获得执行结果。
我发现,在“ReadAsync”附近添加一些对调试函数的调用,有助于理解异步程序是如何执行的,它让我们能够看到程序的那个线程正在被执行以及相应的线程堆栈跟踪信息,但是我打算把这个联系留给读者自己完成:
let printThreadDetails() =<br></br> Console.WriteLine("Thread ID {0}", Thread.CurrentThread.ManagedThreadId)<br></br> Console.WriteLine((new StackTrace()).ToString())
回头看一下我之前的关于工作流的文章( http://www.infoq.com/articles/pickering-fsharp-workflow )也是一个学习“let!”语句的好地方,你可以看到“let!”是如何给持续函数带来一些特殊效果的。这也可以帮助你理解,异步工作流如何在“let!”之后开启另外一个线程的。
量化性能的改善
那么,当使用异步工作流的时候我们希望看到哪方面的性能改善呢?与大多数性能相关的问题一样,在没有诉诸试验之前是很难回答的。典型的程序不是在进行计算就是在进行 I/O 处理,通过利用异步工作流,你通常在这两个方面都能看到性能有所提升。不过,应该注意的是,你所使用的硬件也将对性能产生巨大的影响,如果你的任务主要进行 I/O 处理,那么你不会看到显著的性能提升,除非你的磁盘提供了很好的并发访问能力,另外,就算存在一些能提供非常好的并发访问能力的磁盘,那也是配合高规格的服务器来使用,而不是用于笔记本或台式机上的。如果你的任务是主要依靠处理器的计算,那么你在大部分现代的笔记本和台式机上通常可以看到更好的性能改善,这些机器都配备了双核处理器,并且很多读者可能已经在考虑为自己订购一台将要上市的四核机型了。这意味着,通过正确利用异步工作流,你将能够充分使用这些额外的处理器能力。
让我们来完成一个涉及到 I/O 和计算处理的任务例子,来看看我们获得了那种类型的性能改善。假如我们有一些希望进行分析的 ascii 文本,首先要计算单词总数,然后计算唯一单词(即只出现一次的单词)的数量。打开和读取文件将涉及 I/O 的操作,计算单词总数和唯一单词数量会占用 CPU 的计算开销。为了进行测试,我从 Project Gutenberg 下载了 Henry Fielding 的所有作品。
首先,我们来看一个同步方式分析这些作品的脚本:
#light<br></br><span color="#0000c0">open</span> System<br></br><span color="#0000c0">open</span> System.Diagnostics<br></br><span color="#0000c0">open</span> System.IO<br></br><span color="#0000c0">open</span> System.Text.RegularExpressions<p><span color="#0000c0">let</span> path = <span color="#7f0055">@"C:\Users\robert\Documents\Fielding"</span></p><br></br><span color="#0000c0">let</span> readFile filePath =<p><span color="#008000">// open and read file</span><span color="#0000c0">let</span> fileStream = File.OpenText(filePath)</p><br></br><span color="#0000c0">let</span> text = fileStream.ReadToEnd()<p><span color="#008000">// find all the "words" using a regex</span><span color="#0000c0">let</span> word = <span color="#0000c0">new</span> Regex("\w+")</p><br></br><span color="#0000c0">let</span> matches = word.Matches(text)<br></br><span color="#0000c0">let</span> words = { <span color="#0000c0">for</span> m <span color="#0000c0">in</span> matches -> m.Value }<p><span color="#008000">// count unique words using a set</span><span color="#0000c0">let</span> uniqueWords = Set.of_seq words</p><p><span color="#008000">// print the results</span><span color="#0000c0">let</span> name = Path.GetFileNameWithoutExtension(filePath)</p><br></br> Console.WriteLine(<span color="#7f0055">"{0} - Words: {1} Unique words: {2} "</span>,<br></br> name, matches.Count, uniqueWords.Count)<p><span color="#0000c0">let</span> main() =</p><br></br><span color="#0000c0">let</span> filePaths = Directory.GetFiles(path)<br></br><span color="#0000c0">for</span> filePath in filePaths do readFile filePath
正如你所看到的这样,我们的脚步是非常直观的,首先我们打开和读取文件,接着我们利用正在表达式计数所有的单词(在这里,我们定义单词为一个或多个连贯的字符),最后我们创建一个 Set 的实例来计数唯一单词。“Set”类型是 F#原生函数库中的一部分,其实现了数学中 Set 的模型,它是一种不可变的数据结构,可以很有效率地完成计算文档中唯一单词的工作;就算如此,CPU 对于这样的计算还是比较敏感的。
现在,让我们来看一下异步方式的版本:
#light<br></br><span color="#0000c0">open</span> System<br></br><span color="#0000c0">open</span> System.IO<br></br><span color="#0000c0">open</span> System.Text.RegularExpressions<br></br><span color="#0000c0">open</span> Microsoft.FSharp.Control.CommonExtensions<p><span color="#0000c0">let</span> path = <span color="#7f0055">@"C:\Users\robert\Documents\Fielding"</span></p><p><span color="#0000c0">let</span> readFileAsync filePath =</p><br></br> async { <span color="#008000">// open and read file</span><br></br><span color="#0000c0">let</span> fileStream = File.OpenText(filePath)<br></br><span color="#0000c0">let!</span> text = fileStream.ReadToEndAsync()<p><span color="#008000">// find all the "words" using a regex</span><span color="#0000c0">let </span>word = <span color="#0000c0">new</span> Regex("\w+")</p><br></br><span color="#0000c0">let</span> matches = word.Matches(text)<br></br><span color="#0000c0">let</span> words = { <span color="#0000c0">for</span> m <span color="#0000c0">in</span> matches <span color="#0000c0">-></span> m.Value }<p><span color="#008000">// count unique words using a set</span><span color="#0000c0">let</span> uniqueWords = Set.of_seq words</p><p><span color="#008000">// print the results</span><span color="#0000c0">let</span> name = Path.GetFileNameWithoutExtension(filePath)</p><br></br> do Console.WriteLine(<span color="#7f0055">"{0} - Words: {1} Unique words: {2} ",</span><br></br> name, matches.Count, uniqueWords.Count) }<p><span color="#0000c0">let</span> main() =</p><br></br><span color="#0000c0">let</span> filePaths = Directory.GetFiles(path)<br></br><span color="#0000c0">let</span> tasks = [ for filePath in filePaths -> readFileAsync filePath ]<br></br> Async.Run (Async.Parallel tasks)<p> main()</p>
正如我们所看到的那样,文件读取函数进行了一点改变——除了使用“async{…}”进行工作流的代码进行包装外,还用 “ReadToEndAsync”来代替“ReadToEnd”对这个工作流进行调用。在“main”函数里的改变更有意思,在这里我们首先映射我们文件的了吧到一个异步工作流的列表上,并把它赋值给一个“tasks”标识符(即变量)。记住,在此时,工作流还未执行,为了执行它们,我们使用“Async.Parallel”来把任务列表(tasks)转换为可以并行执行的一个工作流。接着,我们使用“Async.Run”来并行运行这些任务。
我在我的笔记本(双核)上通过 F#的交互界面运行这些测试,这个交互界面可以提供一些非常棒的计时工具。我的方式非常简单:我运行两个脚本一次,并获得运行结果(这样做为了消除磁盘缓存对性能的影响),接着我再运行每个脚本 3 次:
同步 异步 第一次运行 16.807 12.928 第二次运行 16.781 13.182 第三次运行 16.909 13.233 平均 16.832 13.114 因而,不需要对代码进行大量的修改,只需调整几行代码,在双核机器上异步版本大约比同步版本运行快 22%,当为什么我们没有看到 1900% 的速度提升呢?答案是非常简单的,这个任务不完全在进行计算处理,如果我们添加更多的计算工作到我们的算法里面,比如计算每个单词出现的次数,或者查找类似单词,这样我们可能会看到速度提升百分比的提高。读取和处理文件不仅是异步工作流应用的一个地方,它们也能被用于网络编程。实际上,网络数据访问比磁盘访问慢得多,而使用异步工作流可以在网络访问完成之前避免阻塞线程,这样做获得的性能提升要比访问文件要高得多。
结论
异步工作流解决了一个很具体的问题,如何正确地使用.NET 异步编程模型,来在.NET 框架中提供最优雅的解决方案。使用异步编程模型能帮助你让应用程序更具伸缩性,而异步工作流可以帮你更容易地完成这样的工作。
额外阅读
Jeffery Richter 在这里使用C#实现异步编程模型的过程中,谈到了一些问题和解决方案。
异步工作流在《高级F#》(Expert F#)一书中的第13 章有所谈及,并在第14 章提供了一些例子。
评论