11 月 19 - 20 日 Apache Pulsar 社区年度盛会来啦,立即报名! 了解详情
写点什么

如何使用 Apache Beam 对 TensorFlow 管道进行预处理?

  • 2019-10-04
  • 本文字数:5190 字

    阅读完需:约 17 分钟

如何使用Apache Beam对TensorFlow管道进行预处理?


在生产中使用机器学习时,确保在模型的离线培训期间应用的特征工程步骤与使用模型用于预测时应用的特征工程步骤相同通常是一个挑战。此外,在当今世界,机器学习模型在非常大的数据集上进行训练,因此在训练期间应用的预处理步骤在大规模分布式计算框架(例如 Google Cloud Dataflow 或 Apache Spark)上实现。因此,训练环境通常与服务环境非常不同,可能在训练和服务期间执行的特征工程之间产生不一致。幸运的是,我们现在有了tf.Transform,一个 TensorFlow 库,它提供了一个优雅的解决方案。


机器学习模型需要数据来训练,但是通常需要对这些数据进行预处理,以便在训练模型时有用。这种预处理(通常称为“特征工程”)采用多种形式,例如:标准化和缩放数据,将分类值编码为数值,形成词汇表,以及连续数值的分级。


在生产中使用机器学习时,确保在模型的离线培训期间应用的特征工程步骤与使用模型用于预测时应用的特征工程步骤相同通常是一个挑战。此外,在当今世界,机器学习模型在非常大的数据集上进行训练,因此在训练期间应用的预处理步骤在大规模分布式计算框架(例如 Google Cloud Dataflow 或 Apache Spark)上实现。因此,训练环境通常与服务环境非常不同,可能在训练和服务期间执行的特征工程之间产生不一致。


幸运的是,我们现在有了tf.Transform,一个 TensorFlow 库,它提供了一个优雅的解决方案,以确保在培训和服务期间功能工程步骤的一致性。在这篇博文中,我们将提供在 Google Cloud Dataflow 上使用 tf.Transform 的具体示例,以及在 Cloud ML Engine 上进行模型培训和服务。



应用于机器模拟用例的变换

ecc.ai是一个有助于优化机器配置的平台。我们模拟物理机器(例如瓶子灌装机或饼干机器)以找到更优化的参数设置。由于每个模拟物理机器的目标是具有与实际机器相同的输入/输出特性,我们称之为“数字双胞胎”。


这篇博客文章将描述这个“数字双胞胎”的设计和实现过程。在最后一段中,您可以找到有关我们随后如何使用这些数字双胞胎来优化机器配置的更多信息。


tf.Transform 解释

tf.Transform是 TensorFlow 的一个库,它允许用户定义预处理管道并使用大规模数据处理框架运行这些管道,同时还以可以作为 TensorFlow 图的一部分运行的方式导出管道。用户通过组合模块化 Python 函数来定义管道,然后 tf.Transform 随Apache Beam执行。tf.Transform 导出的 TensorFlow 图可以在使用训练模型进行预测时复制预处理步骤,例如在使用TensorFlow Serving服务模型时。



tf.Transform 允许用户定义预处理管道。用户可以实现预处理数据以用于 TensorFlow 训练,还可以导出将转换编码为 tensorFlow 图的 tf.Transform 图。然后可以将该变换图结合到用于推断的模型图中。


tf.Transform 建立数字双胞胎

数字双模型的目标是能够根据其输入预测机器的所有输出参数。为了训练这个模型,我们分析了包含这种关系的观察记录历史的日志数据。由于日志数据量可能相当广泛,理想情况下应该以分布式方式运行此步骤。此外,必须在训练和服务时间之间使用相同的概念和代码,而对预处理代码的改动最小。


当我们开始开发时,我们在任何现有的开源项目中都找不到此功能。因此,我们开始构建用于Apache Beam预处理的自定义工具,这使我们能够分配我们的工作负载并轻松地在多台机器之间切换。不幸的是,这种方法不允许我们在服务时重复使用相同的代码作为 TensorFlow 图的一部分运行(即在生产环境中使用训练模型时)。


在实践中,我们必须在 Apache Beam 中编写自定义分析步骤,计算并保存每个变量所需的元数据,以便在后续步骤中进行实际的预处理。我们在培训期间使用 Apache Beam 执行后续预处理步骤,并在服务期间作为 API 的一部分执行。不幸的是,由于它不是 TensorFlow 图的一部分,我们不能简单地使用 ML Engine 将我们的模型部署为 API,而我们的 API 总是由预处理部分和模型部分组成,这使得统一升级变得更加困难。此外,我们需要为我们想要使用的每个现有和新的转换实施和维护分析并转换步骤。


TensorFlow Transform 解决了这些问题。自宣布以来,我们将其直接整合为我们完整管道的主要构建块。


简化数字双胞胎示例流程

我们现在将专注于构建和使用特定机器的数字双胞胎。举个例子,我们将转向一个假想的布朗尼面团机器。这台机器采用不同的原始组件,然后加热并混合它们,直到产生完美的质地。我们将从批处理问题开始,这意味着数据在完整的生产批次中汇总,而不是在连续流中汇总。


数据


我们有两种类型的数据:


  • 输入数据:原料描述(绿色)和布朗尼面团机(蓝色)的设置。您可以在下面找到列名称和 3 个示例行。



  • 输出数据:具有这些原材料的机器设置结果:消耗的能量,输出的质量度量和输出量。您可以在下面找到列名称和 3 个示例行。



制作数字双胞胎



在这里,我们根据存储为云存储中两种不同类型文件的历史日志数据来训练系统的数字双胞胎。该数字双胞胎将能够基于输入数据预测输出数据。上图说明了我们在此流程中使用的 Google 服务。


预处理


预处理(制作训练示例)将使用 Apache Beam 使用 tf.Transform 函数完成。


预处理阶段包括 4 个步骤,代码如下:


1.组合输入/输出数据并使原始数据 PCollection。


raw_data_input = (


p


| ‘ReadInputData’ >> textio.ReadFromText(train_data_file)


| ‘ParseInputCSV’>> beam.Map(converter_input.decode)


| ‘ExtractBatchKeyIn’>> beam.Map(extract_batchkey))


raw_data_output = (


p


| ‘ReadOutputData’ >> textio.ReadFromText(train_data_file)


| ‘ParseOutputCSV’>> beam.Map(converter_output.decode)


| ‘ExtractBatchKeyOut’>> beam.Map(extract_batchkey))


raw_data = (


(raw_data_input, raw_data_output)


| ‘JoinData’ >> CoGroupByKey()


| ‘RemoveKeys’>> beam.Map(remove_keys))


2.定义将预处理原始数据的预处理功能。此函数将组合多个 TF-Transform 函数,以生成 TensorFlow Estimators 的示例。


语言:Python


def preprocessing_fn(inputs):


“”“Preprocess input columns into transformed columns.”""


outputs = {}


# Encode categorical column:


outputs[‘Mixing Speed’] = tft.string_to_int(inputs[‘Mixing Speed’])


# Calculate Derived Features:


outputs[‘Total Mass’] = inputs[‘Butter Mass’] + inputs[‘Sugar Mass’] + inputs[‘Flour Mass’]


for ingredient in [‘Butter’, ‘Sugar’, ‘Flour’]:


ingredient_percentage = inputs[’{} Mass’.format(ingredient)] / outputs[‘Total Mass’]


outputs[‘Norm {} perc’.format(ingredient)] = tft.scale_to_z_score(ingredient_percentage)


# Keep absolute numeric columns


for key in [‘Total Volume’, ‘Energy’]:


outputs[key]=inputs[key]


# Normalize other numeric columns


for key in [


‘Butter Temperature’,


‘Sugar Humidity’,


‘Flour Humidity’


‘Heating Time’,


‘Mixing Time’,


‘Density’,


‘Temperature’,


‘Humidity’,


]:


outputs[key] = tft.scale_to_z_score(inputs[key])


# Extract Specific Problems


chunks_detected_str = tf.regex_replace(


inputs[‘Problems’],


‘.chunk.


‘chunk’,


name=‘Detect Chunk’)


outputs[‘Chunks’]=tf.equal(chunks_detected_str,‘chunk’) return outputs


3.使用预处理功能分析和转换整个数据集。这部分代码将采用预处理功能,首先分析数据集,即完整传递数据集以计算分类列的词汇表,然后计算平均值和标准化列的标准偏差。接下来,Analyze 步骤的输出用于转换整个数据集。


transform_fn = raw_data | AnalyzeDataset(preprocessing_fn)


transformed_data = (raw_data, transform_fn) | TransformDataset()


4.保存数据并序列化 TransformFn 和元数据文件。


transformed_data | “WriteTrainData” >> tfrecordio.WriteToTFRecord(


transformed_eval_data_base,


coder=example_proto_coder.ExampleProtoCoder(transformed_metadata))


_ = (


transform_fn


| “WriteTransformFn” >>


transform_fn_io.WriteTransformFn(working_dir))


transformed_metadata | ‘WriteMetadata’ >> beam_metadata_io.WriteMetadata(


transformed_metadata_file, pipeline=p)


训练


使用预处理数据作为TFRecords,我们现在可以使用 Estimators 轻松训练带有标准 TensorFlow 代码的 TensorFlow 模型。


导出训练的模型


在分析数据集的结构化方法旁边,tf.Transform 的实际功能在于可以导出预处理图。这允许您导出 TensorFlow 模型,该模型包含与训练数据完全相同的预处理步骤。


为此,我们只需要使用 tf.Transform 输入函数导出训练模型:


tf_transform_output = tft.TFTransformOutput(working_dir)


serving_input_fn = _make_serving_input_fn(tf_transform_output)


exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR)


estimator.export_savedmodel(exported_model_dir, serving_input_fn)


当_make_serving_input_fn 功能是,你可以简单地不同的项目之间重用的项目逻辑,无论一个非常普遍的功能:


语言:Python


def _make_serving_input_fn(tf_transform_output):


raw_feature_spec = RAW_DATA_METADATA.schema.as_feature_spec()


raw_feature_spec.pop(LABEL_KEY)


def serving_input_fn():


raw_input_fn = input_fn_utils.build_parsing_serving_input_fn(


raw_feature_spec)


raw_features, _, default_inputs = raw_input_fn()


transformed_features = tf_transform_output.transform_raw_features(


raw_features)


return input_fn_utils.InputFnOps(transformed_features, None, default_inputs)


return serving_input_fn


使用数字双胞胎


数字双胞胎示例流程的最后一部分使用保存的模型根据输入预测系统的输出。这是我们可以充分利用 tf.Transform 的地方,因为这使得在 Cloud ML Engine 上部署“TrainedModel”(包括预处理)变得非常容易。 


要部署训练模型,您只需运行 2 个命令:


gcloud ml-engine models create MODEL_NAME


gcloud ml-engine versions create VERSION --model=MODEL_NAME --origin=ORIGIN


现在,我们可以使用以下代码轻松与我们的数字双胞胎进行交互


def get_predictions(project, model, instances, version=None):


service = discovery.build(‘ml’, ‘v1’)


name = ‘projects/{}/models/{}’.format(project, model)


if version is not None:    name += '/versions/{}'.format(version)
response = service.projects().predict( name=name, body={'instances': instances}).execute()
if 'error' in response: raise RuntimeError(response['error'])
return response['predictions']
复制代码


if name == “main”:


predictions = get_predictions(


project="<project_id>",


model="<model_name>",


instances=[


{


“Butter Mass”: 121,


“Butter Temperature”: 20,


“Sugar Mass”: 200,


“Sugar Humidity”: 0.22,


"Flour Mass ": 50,


“Flour Humidity”: 0.23,


“Heating Time”: 50,


“Mixing Speed”: “Max Speed”,


“Mixing Time”: 200


}]


)


ecc.ai,我们使用数字双胞胎来优化物理机器的参数。


简而言之,我们的方法包括 3 个步骤(如下图 1 所示):


  1. 使用历史机器数据创建模拟环境。机器的这种“数字双胞胎”将作为允许增强剂学习最佳控制策略的环境。

  2. 使用数字双胞胎使用我们的强化学习(RL)代理查找(新)最佳参数设置。

  3. 使用 RL 代理配置真实机器的参数。



总结

通过 tf.Transform,我们现在已将我们的模型部署在 ML Engine 上作为 API,作为特定布朗尼面团机的数字双胞胎:它采用原始输入功能(成分描述和机器设置)并将返回预测输出机。


好处是我们不需要维护 API 并且包含所有内容 - 因为预处理是服务图的一部分。如果我们需要更新 API,那么所有需要做的就是使用新版本刷新模型,并自动为您更新所有相关的预处理步骤。


此外,如果我们需要为另一个布朗尼面团机器(使用相同数据格式的机器)制作数字双模型,但是在不同的工厂或设置中运行,我们可以轻松地重新运行相同的代码而无需手动调整预处理代码或执行自定义分析步骤。


作者介绍

张海涛,目前就职于海康威视云基础平台,负责海康威视在全国金融行业 AI 大数据落地的基础架构设计和中间件的开发,专注 AI 大数据方向。Apache Beam 中文社区发起人之一,如果想进一步了解最新 Apache Beam 和 ClickHouse 动态和技术研究成果,请加微信 cyrjkj 入群共同研究和运用。


系列文章传送门:


系列文章第一篇 《Apache Beam 实战指南 | 基础入门》


系列文章第二篇 《Apache Beam 实战指南 | 手把手教你玩转 KafkaIO 与 Flink》


系列文章第三篇 《Apache Beam 实战指南 | 手把手教你玩转大数据存储 HdfsIO》


系列文章第四篇 《Apache Beam 实战指南 | 如何结合 ClickHouse 打造“AI 微服务”?》


系列文章第五篇 《Apache Beam实战指南 | 大数据管道(pipeline)设计及实践》


2019-10-04 08:00544

评论

发布
暂无评论
发现更多内容

技术盘点:消息中间件的过去、现在和未来

阿里巴巴云原生

阿里云 云原生 中间件 消息队列 EventBridge

基于CC2530(ZigBee)设计的自动照明系统

DS小龙哥

2月月更 自动照明系统设计

福昕鲲鹏加入,龙蜥社区迎来版式文档技术服务新伙伴

OpenAnolis小助手

Linux 开源 社区 福昕

一文带你了解 Java 的内存区域

宇宙之一粟

Java 内存 2月月更

“元认知”相关学习总结

panda

思维模型 阅读笔记 元认知

了解一下ProtoBuf

蜜糖的代码注释

protobuf 2月月更

vivo 服务端监控架构设计与实践

vivo互联网技术

服务端 系统监控 构架

技术盘点:2022 年容器、Serverless、可观测、服务网格有哪些值得关注的趋势?

阿里巴巴云原生

阿里云 Serverless 云原生 趋势 可观测

Lyft微服务研发效能提升实践 | 2. 优化快速本地开发

俞凡

研发效能 大厂实践 2月月更 lyft

DDD实战(1):从需求到代码实现生鲜电商系统

深清秋

DDD 软件架构 生鲜电商系统

2022年每个开发者必知的云原生趋势 | 社区征文

翊君

容器 微服务 云原生 新春征文

AI赋能安全技术总结与展望| 社区征文

herosunly

人工智能 新春征文 2月月更

DOM 精通了?请问 Node 和 Element 有何区别?

编程三昧

JavaScript 前端 DOM 2月月更

阿里无影云桌面深度测评

乌龟哥哥

无影云电脑 2月月更

技术盘点:云原生中间件的技术演进与未来趋势展望

阿里巴巴云原生

阿里云 云原生 中间件 趋势

AIGC的“含科量”与“含资量”

脑极体

技术盘点:容器技术的演进路线是什么?未来有哪些想象空间?

阿里巴巴云原生

阿里云 容器 云原生

多图|一文详解Nacos参数!

王磊

nacos

外屏和宽屏浪费了?HarmonyOS折叠屏设计规范教你用起来

HarmonyOS开发者社区

HarmonyOS

Go 并发模式:管道和取消(译)

en

Go

战略规划和战略解码BLM+BEM

wood

bem 战略制定 300天创作 BLM

学生管理系统的架构设计

Fingal

#架构实战营

注册中心

邱学喆

Eureka 注册中心 原理图

大画 Spark :: 网络(4)-Endpoint注册使用与网络环境的构建

dclar

大数据 spark 源代码 框架原理

KubeVela v1.2 发布:你要的图形化操作控制台 VelaUX 终于来了!

阿里巴巴云原生

阿里云 开源 云原生 KubeVela

怎么说服领导,能让我用DDD架构肝项目?

小傅哥

DDD 小傅哥 技术架构 架构实践

改革开放启示录(14/100)

hackstoic

创新管理

大数据培训:构建Flink SQL流式计算平台

@零度

flink sql 大数据开发

架构训练营 毕业设计

ren

Linux系统问题排查

小翁牌坦克

Linux 负载 系统问题

如何使用Apache Beam对TensorFlow管道进行预处理?_AICon_张海涛_InfoQ精选文章