写点什么

Step Functions 支持动态并行性

  • 2019 年 9 月 26 日
  • 本文字数:3782 字

    阅读完需:约 12 分钟

Step Functions 支持动态并行性

微服务可使应用程序更容易扩展,并加快其开发速度,但协调分布式应用程序的组件可能是一项艰巨的任务。 AWS Step Functions 是一种完全托管的服务,使用这种服务,您可以设计和运行包含多个步骤的工作流,让每个步骤以上一步骤的输出为输入,从而简化任务协调。例如,诺华生物医学研究所正在使用 Step Functions,让科学家能够在不依赖集群专家的前提下运行图像分析。


Step Functions 近来添加了一些非常有趣的功能,例如用于简化人工活动与第三方服务的集成的回调模式,以及将模块化、可重用的工作流组合在一起的嵌套工作流。如今,我们又在工作流中添加了对于动态并行性的支持!


动态并行性的运作方式


状态机使用 Amazon States Language 定义,Amazon States Language 是一种基于 JSON 的结构化语言。Parallel 状态可用于并行执行在状态机中定义的固定数量的分支。 现在,Step Functions 支持面向动态并行性的新 Map 状态类型。


要配置 Map 状态,您需要定义一个 Iterator,这是一个完整的子工作流。当 Step Functions 执行进入 Map 状态时,它会遍历状态输入中的 JSON 数组。对于每个项目,Map 状态都会执行一个子工作流,而且可能会并行执行。当所有子工作流执行完毕后,Map 状态将返回一个数组,其中包含 Iterator 处理的每一项的输出。


您可以通过添加 MaxConcurrency 字段来配置 Map 执行的并发子工作流数的上限。 默认值为 0,即在并行性方面没有限制,并且会尽可能地并发调用迭代。 如果 MaxConcurrency 值为 1,则效果是一次调用 Iterator 的一个元素,调用顺序依循元素在输入状态中的出现顺序,并且在上一次迭代执行完毕后才会启动迭代。


使用新 Map 状态的一种方法是在工作流中利用扇出或“分散-集中”消息传递模式:


  • 扇出用于向多个目标传递消息的情况,在订单处理或批量数据处理等工作流中可能非常有用。例如,您可以从 Amazon SQS 检索消息数组,Map 会将每条消息发送到单独的 AWS Lambda 函数。

  • 分散-集中可将一条消息广播到多个目标(分散),然后再聚合响应,以用于后续步骤(集中)。 这在文件处理和测试自动化中非常有用。例如,您可以并行转码 10 个 500MB 的媒体文件,然后再连接这些文件,以创建一个 5GB 的文件。

  • 与 Parallel 和 Task 状态相似,Map 支持 Retry 和 Catch 字段,以处理服务和自定义异常。您还可以向 Iterator 内的状态应用 Retry 和 Catch,以处理异常。如果因未处理某个错误或者已转换为 Fail 状态而导致任何 Iterator 执行失败,则整个 Map 状态均会被视作失败,并且其所有迭代都会停止。如果 Map 状态本身未处理错误,则 Step Functions 会停止执行工作流,并显示错误。


使用 Map 状态


下面我们来构建一个订单处理工作流,并使用 Map 状态并行处理订单中的商品。此工作流中执行的所有任务都以 Lambda 函数的形式标识,但借助 Step Functions,您可以使用其他 AWS 服务集成,并在 EC2 实例、容器或本地基础设施上运行代码。


下面是我们的示例订单(采用 JSON 文档形式),其中订购了几本图书,以及一些在读书时品啜的咖啡。该订单包含一个 detail 部分,其中包含订单内的 items 列表。


{  "orderId": "12345678",  "orderDate": "20190820101213",  "detail": {    "customerId": "1234",    "deliveryAddress": "123, Seattle, WA",    "deliverySpeed": "1-day",    "paymentMethod": "aCreditCard",    "items": [      {        "productName": "Agile Software Development",        "category": "book",        "price": 60.0,        "quantity": 1      },      {        "productName": "Domain-Driven Design",        "category": "book",        "price": 32.0,        "quantity": 1      },      {        "productName": "The Mythical Man Month",        "category": "book",        "price": 18.0,        "quantity": 1      },      {        "productName": "The Art of Computer Programming",        "category": "book",        "price": 180.0,        "quantity": 1      },      {        "productName": "Ground Coffee, Dark Roast",        "category": "grocery",        "price": 8.0,        "quantity": 6      }    ]  }}
复制代码


为了处理这笔订单,我要使用一个状态机来定义如何执行不同的任务。Step Functions 控制台会为我构建的工作流创建可视化表示形式:


  • 首先,我要验证并检查付款。

  • 然后,我可能会并行处理订单中的商品,以检查库存情况,确定是否已经准备好交货,并启动交货流程。

  • 最后,我要将订单汇总发送给客户。

  • 如果付款检查失败,我会拦截工作流,以便采取向客户发送通知等措施。



这是以 JSON 文档形式表示的相同状态机定义。ProcessAllItems 状态使用 Map 并行处理订单中的商品。在本例中,我使用 MaxConcurrency 将并发数量限制为 3。在 Iterator 内,我可以设置任意复杂度的子工作流。在本例中,我有 3 个步骤,分别用于处理商品的 CheckAvailability、PrepareForDelivery 和 StartDelivery。每个步骤都可 Retry 和 Catch 错误,以使子工作流的执行更可靠,例如在与外部服务集成的情况下。


{  "StartAt": "ValidatePayment",  "States": {    "ValidatePayment": {      "Type": "Task",      "Resource": "arn:aws:lambda:us-west-2:123456789012:function:validatePayment",      "Next": "CheckPayment"    },    "CheckPayment": {      "Type": "Choice",      "Choices": [        {          "Not": {            "Variable": "$.payment",            "StringEquals": "Ok"          },          "Next": "PaymentFailed"        }      ],      "Default": "ProcessAllItems"    },    "PaymentFailed": {      "Type": "Task",      "Resource": "arn:aws:lambda:us-west-2:123456789012:function:paymentFailed",      "End": true    },    "ProcessAllItems": {      "Type": "Map",      "InputPath": "$.detail",      "ItemsPath": "$.items",      "MaxConcurrency": 3,      "Iterator": {        "StartAt": "CheckAvailability",        "States": {          "CheckAvailability": {            "Type": "Task",            "Resource": "arn:aws:lambda:us-west-2:123456789012:function:checkAvailability",            "Retry": [              {                "ErrorEquals": [                  "TimeOut"                ],                "IntervalSeconds": 1,                "BackoffRate": 2,                "MaxAttempts": 3              }            ],            "Next": "PrepareForDelivery"          },          "PrepareForDelivery": {            "Type": "Task",            "Resource": "arn:aws:lambda:us-west-2:123456789012:function:prepareForDelivery",            "Next": "StartDelivery"          },          "StartDelivery": {            "Type": "Task",            "Resource": "arn:aws:lambda:us-west-2:123456789012:function:startDelivery",            "End": true          }        }      },      "ResultPath": "$.detail.processedItems",      "Next": "SendOrderSummary"    },    "SendOrderSummary": {      "Type": "Task",      "InputPath": "$.detail.processedItems",      "Resource": "arn:aws:lambda:us-west-2:123456789012:function:sendOrderSummary",      "ResultPath": "$.detail.summary",      "End": true    }  }}
复制代码


此工作流使用的 Lambda 函数并不了解订单 JSON 文档的整体结构。它们只需知道自身要处理的输入状态部分即可。这是在多个工作流中轻松重用这些函数的最佳实践。状态机定义使用 JsonPath 语法通过 InputPath、ItemsPath、ResultPath 和 OutputPath 字段操控用于函数输入和输出的路径:


  • InputPath 用于过滤处于输入状态的数据,例如仅将订单的 detail 传递给 Iterator。

  • ItemsPath 是 Map 状态特有的,用于标示在输入中的何处可以找到要处理的数组字段,例如处理订单 detail 中的 items。

  • ResultPath 让您可以将一项任务的输出添加到输入状态,而不是完全覆盖输入状态,例如将 summary 添加到订单的 detail。

  • 我这次没有使用 OutputPath,但它对于过滤掉不必要的信息,仅将您关注的 JSON 部分传递给下一个状态可能是一种很有用。例如,仅将订单的 detail 作为输出发送。

  • 您可以选择使用 Parameters 字段来自定义每次迭代使用的原始输入。例如,deliveryAddress 包含在订单的 detail 中,但不包含在每个 item 中。为了保证 Iterator 具有商品的 index,并可访问 deliveryAddress,我可以将下面这段代码添加到 Map 状态中:


"Parameters": {  "index.$": "$$.Map.Item.Index",  "item.$": "$$.Map.Item.Value",  "deliveryAddress.$": "$.deliveryAddress"}
复制代码


现已推出


从今天开始,所有可以使用 Step Functions 的区域均可开始使用这项新功能。动态并行性或许是 Step Functions 用户呼声最高的一项功能。它可让用户无障碍地实施新使用案例,并有助于优化现有使用案例。欢迎与我们分享您打算用这项功能来做些什么!


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/new-step-functions-support-for-dynamic-parallelism/


2019 年 9 月 26 日 16:43385
用户头像

发布了 1515 篇内容, 共 57.7 次阅读, 收获喜欢 61 次。

关注

评论

发布
暂无评论
发现更多内容

浅析LSM-Tree存储模型

正向成长

LSM树 KV存储引擎

架构师训练营 4 期 大作业

引花眠

架构师训练营 4 期

Android中的图像格式

如浴春风

android 音视频 安卓 签约计划

Spark原理与实战之部署模式与运行机制

小舰

spark Spark调优 4月日更

Seldon 使用 (二):打包模型

托内多

tensorflow kubeflow Kubernetes PyTorch seldon

Cloudreve 自建云盘实践,我说了没人能限得了我的容量和速度!

小傅哥

Java 小傅哥 Cloudreve 自建云盘

从石器时代到田园牧歌:如何对 API 统一建模

李宇飞

API

猫鼠游戏,一个刷票老千看在线投票项目的防范与取舍

ucsheep

安全 在线投票 防作弊 刷票

树莓派人表情识别

IT蜗壳-Tango

IT蜗壳教学 4月日更

「 最佳内容公布」—— InfoQ 写作平台【 1 周年盛典 】

InfoQ写作社区官方

1 周年盛典 热门活动

AI数学基础之:确定图灵机和非确定图灵机

程序那些事

人工智能 AI 程序那些事 图灵机

重读《重构2》

顿晓

重构 4月日更

嘉云公司研发效能平台实践

小江

研发效能 CI/CD

初入江湖,IT从业者应该如何选择?

云峰

企业上云一张网,华为将在分析师大会上亮出哪些大招?

脑极体

Python变量作用域与LEGB规则

大奎

语法 Python Monad 作用域

TIOBE榜单四月已出:上古语言Fortran重回前20?

Bob

编程语言、 4月日更 Tiobe

微服务网关方案:Kong & Nacos

程序员架构进阶

架构 微服务 API网关 28天写作 4月日更

对前端趋势的一些理解

葱小白

大前端

更简的并发代码,更强的并发控制

万俊峰Kevin

并发 go-zero Go 语言

「免费开源」基于Vue和Quasar的前端SPA项目crudapi后台管理系统实战之业务数据增删改查(七)

crudapi

Vue API crud crudapi quasar

MySQL多表查询详解

若尘

MySQL 查询

css

赫鲁小夫

4月日更

最详细的基于 Prometheus 的 Azure 指标监控

耳东@Erdong

azure Prometheus 4月日更

WebRTC基础知识详解

IT酷盖

签约计划

模块二作业

c

架构实战营

浪潮签约“数字基建”合作伙伴共促工业互联网创新发展

浪潮云

工业互联网

Java 并发基础(五):面试实战之多线程顺序打印

看山

Java并发

陪伴

小天同学

陪伴 育儿 个人感悟 4月日更

AI数据科学认证-2021年的最佳选择 John 易筋 ARTS 打卡 Week 44

John(易筋)

ARTS 打卡计划

「 优秀主题征文名单公布 」—— InfoQ 写作平台【 1 周年盛典 】

InfoQ写作社区官方

1 周年盛典 热门活动

Step Functions 支持动态并行性_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章