
上一篇文章,给大家介绍了 ring all-reduce 算法的过程和优点,那如何在 Tensorflow 代码中实现 ring all-reduce 呢,现在主要有两种方式:1.Tensorflow estimator 接口搭配 MultiWorkerMirroredStrategy API 使用;2. Tensorflow 搭配 horovod 使用。
Tensorflow estimator 接口搭配 MultiWorkerMirroredStrategy
tf.distribute.experimental.MultiWorkerMirroredStrategy 实现了多机多卡的同步训练,在每个 gpu 卡上都创建了所有变量的副本。它使用 CollectiveOps 操作作为多机之间的通信方式。Tensorflow 针对 performance 还做了一些优化,比如 static optimization that converts multiple all-reductions on small tensors into fewer all-reductions on larger tensors。
import tensorflow as tfimport osimport json
NUM_WORKERS = 1IP_ADDRS = ["xx.xxx.xx.xxxx", "xx.xxx.xx.xxxx"]PORTS = [2222, 2222]
def model_fn(...): .....
def input_fn(...): .....
# 需要每个机器配置TF_CONFIG环境变量os.environ['TF_CONFIG'] = json.dumps({ 'cluster': { 'worker': ['%s:%d' % (IP_ADDRS[w], PORTS[w]) for w in range(NUM_WORKERS)] }, 'task': {'type': 'worker', 'index': 0}})
# Method for using MultiWorkerMirroredStrategystrategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator( model_fn=model_fn, model_dir='/tmp/multiworker', config=config)tf.estimator.train_and_evaluate( classifier, train_spec=tf.estimator.TrainSpec(input_fn=input_fn), eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))
缺点:
只能和 Tensorflow 的 estimator 框架搭配使用,当训练代码是低阶的 API 时,就没有办法使用这种接口。
Tensorflow 搭配 horovod
Horovod 是 Uber 开源的高效分布式训练通信框架,Horovod 本身只负责节点间网络通信、梯度融合,在运行时需要绑定 TensorFlow 做单机运算。
Horovod 可以搭配任何 Tensorflow 的代码使用,比如搭配低阶 API 的,estimator 框架的,keres 的,因此 Horovod 使用起来更方便,没有任何局限性。
horovod 搭配低阶 API
参考代码:https://github.com/horovod/horovod/blob/master/examples/tensorflow_mnist.py
import osimport errnoimport tensorflow as tfimport horovod.tensorflow as hvdimport numpy as npfrom tensorflow import keras
tf.logging.set_verbosity(tf.logging.INFO)
def create_model(feature, target, mode): .....
def train_input_generator(x_train, y_train, batch_size=64): .....
def main(_): # Horovod: initialize Horovod. hvd.init()
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets') if not os.path.exists(cache_dir): try: os.mkdir(cache_dir) except OSError as e: if e.errno == errno.EEXIST and os.path.isdir(cache_dir): pass else: raise
# Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = \ keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())
# The shape of downloaded data is (-1, 28, 28), hence we need to reshape it # into (-1, 784) to feed into our network. Also, need to normalize the # features between 0 and 1. x_train = np.reshape(x_train, (-1, 784)) / 255.0 x_test = np.reshape(x_test, (-1, 784)) / 255.0
# Build model... with tf.name_scope('input'): image = tf.placeholder(tf.float32, [None, 784], name='image') label = tf.placeholder(tf.float32, [None], name='label') predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN)
# Horovod: adjust learning rate based on number of GPUs. opt = tf.train.RMSPropOptimizer(0.001 * hvd.size())
# Horovod: add Horovod Distributed Optimizer. opt = hvd.DistributedOptimizer(opt)
global_step = tf.train.get_or_create_global_step() train_op = opt.minimize(loss, global_step=global_step)
hooks = [ # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states # from rank 0 to all other processes. This is necessary to ensure consistent # initialization of all workers when training is started with random weights # or restored from a checkpoint. hvd.BroadcastGlobalVariablesHook(0),
# Horovod: adjust number of steps based on number of GPUs. tf.train.StopAtStepHook(last_step=20000 // hvd.size()),
tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss}, every_n_iter=10), ]
# Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank())
# Horovod: save checkpoints only on worker 0 to prevent other workers from # corrupting them. checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None training_batch_generator = train_input_generator(x_train, y_train, batch_size=100) # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, hooks=hooks, config=config) as mon_sess: while not mon_sess.should_stop(): # Run a training step synchronously. image_, label_ = next(training_batch_generator) mon_sess.run(train_op, feed_dict={image: image_, label: label_})
if __name__ == "__main__": tf.app.run()2. horovod搭配estimator
from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_function
import osimport errnoimport numpy as npimport tensorflow as tfimport horovod.tensorflow as hvd
from tensorflow import keras
tf.logging.set_verbosity(tf.logging.INFO)
def create_model(input_shape, num_classes): .....
def cnn_model_fn(features, labels, mode): model = create_model() logits = model(features) predictions = { # Generate predictions (for PREDICT and EVAL mode) "classes": tf.argmax(input=logits, axis=1), # Add `softmax_tensor` to the graph. It is used for PREDICT and by the # `logging_hook`. "probabilities": tf.nn.softmax(logits, name="softmax_tensor") } if mode == tf.estimator.ModeKeys.PREDICT: return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
# Calculate Loss (for both TRAIN and EVAL modes) onehot_labels = tf.one_hot(indices=tf.cast(labels, tf.int32), depth=10) loss = tf.losses.softmax_cross_entropy( onehot_labels=onehot_labels, logits=logits)
# Configure the Training Op (for TRAIN mode) if mode == tf.estimator.ModeKeys.TRAIN: # Horovod: scale learning rate by the number of workers. optimizer = tf.train.MomentumOptimizer( learning_rate=0.001 * hvd.size(), momentum=0.9)
# Horovod: add Horovod Distributed Optimizer. optimizer = hvd.DistributedOptimizer(optimizer)
train_op = optimizer.minimize( loss=loss, global_step=tf.train.get_global_step()) return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
# Add evaluation metrics (for EVAL mode) eval_metric_ops = { "accuracy": tf.metrics.accuracy( labels=labels, predictions=predictions["classes"])} return tf.estimator.EstimatorSpec( mode=mode, loss=loss, eval_metric_ops=eval_metric_ops)
def main(unused_argv): # Horovod: initialize Horovod. hvd.init()
cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets') if not os.path.exists(cache_dir): try: os.mkdir(cache_dir) except OSError as e: if e.errno == errno.EEXIST and os.path.isdir(cache_dir): pass else: raise
# Download and load MNIST dataset. (train_data, train_labels), (eval_data, eval_labels) = \ keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())
train_data = np.reshape(train_data, (-1, 784)) / 255.0 eval_data = np.reshape(eval_data, (-1, 784)) / 255.0
# Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank())
# Horovod: save checkpoints only on worker 0 to prevent other workers from # corrupting them. model_dir = './mnist_convnet_model' if hvd.rank() == 0 else None
# Create the Estimator mnist_classifier = tf.estimator.Estimator( model_fn=cnn_model_fn, model_dir=model_dir, config=tf.estimator.RunConfig(session_config=config))
# Set up logging for predictions # Log the values in the "Softmax" tensor with label "probabilities" tensors_to_log = {"probabilities": "softmax_tensor"} logging_hook = tf.train.LoggingTensorHook( tensors=tensors_to_log, every_n_iter=500)
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states from # rank 0 to all other processes. This is necessary to ensure consistent # initialization of all workers when training is started with random weights or # restored from a checkpoint. bcast_hook = hvd.BroadcastGlobalVariablesHook(0)
# Train the model train_input_fn = tf.estimator.inputs.numpy_input_fn( x={"x": train_data}, y=train_labels, batch_size=100, num_epochs=None, shuffle=True)
# Horovod: adjust number of steps based on number of GPUs. mnist_classifier.train( input_fn=train_input_fn, steps=20000 // hvd.size(), hooks=[logging_hook, bcast_hook])
# Evaluate the model and print results eval_input_fn = tf.estimator.inputs.numpy_input_fn( x={"x": eval_data}, y=eval_labels, num_epochs=1, shuffle=False) eval_results = mnist_classifier.evaluate(input_fn=eval_input_fn) print(eval_results)
if __name__ == "__main__": tf.app.run()
Horovod 的优势
运行简单,单机多卡启动命令:
horovodrun -np 4 -H localhost:4 python train.py
多机多卡启动命令,不需要在每个机器上都启动,只需要在第一个机器上启动该命令即可。
horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
一套代码可实现单机单卡、单机多卡、多机多卡。
可搭配各种代码框架使用,比如低阶 API 的 tensorflow,estimator,keres,还支持与其他深度学习框架一起使用,比如和 pytorch,mxnet 一起使用。
参考文献:
https://www.tensorflow.org/guide/distribute_strategy
https://zhuanlan.zhihu.com/p/34172340
https://github.com/horovod/horo
本文转载自 Alex-zhai 知乎账号。
原文链接:https://zhuanlan.zhihu.com/p/69806200
更多内容推荐
让独立思考成为习惯
习惯独立思考,让独立思考成为习惯。
2020 年 6 月 11 日
ARTS 打卡第一周
AERTS打卡第一周
2020 年 5 月 31 日
把嵌套列表作为 Apache Spark SQL 的首选
无
2019 年 7 月 26 日
从五种架构风格推导出 HTTP 的 REST 架构
2019 年 5 月 5 日
深度学习利器:TensorFlow 程序设计
本书TensorFlow程序设计中的关键技术主要包括以下几个方面:TensorFlow编程基础及实践;TensorFlow系统架构及C/C++编程API;分布式TensorFlow技术;TensorFlow与卷积神经网络;TensorFlow与自然语言处理模型;TensorFlow在智能终端中的应用。
FFM 及 DeepFFM 模型在推荐系统的探索及实践
无
2019 年 1 月 7 日
蚂蚁金服褚霸:敲最牛的代码,骑最野的车丨二叉树视频
他是一位技术大神,曾带领阿里云数据库团队做到国内云数据库顶尖水平。号员工,还曾包揽了迅雷的前端。他是褚霸,真名余锋,江湖人称“霸爷”,是我们本季二叉树视频第二位登场嘉宾,他畅然游离于程序员群体所谓的标签世界之外,追寻自己的机车与远方。
应⽤:使⽤TensorFlow 2 训练 ResNet
2020 年 7 月 16 日
18 丨决策树(中):CART,一棵是回归树,另一棵是分类树
CART决策树既可以做回归树,也可以做分类树。
2019 年 1 月 23 日
谷歌宣布 TensorFlow Object Detection API 支持 TensorFlow 2
此版本包含与eager模式兼容的二进制文件、两个新的网络架构和针对所有受支持模型的预训练权重。
Spark Structured Stearming 原理及实战:Structured Streaming 概念、特点、数据模型和应用实战
2020 年 12 月 24 日
架构师训练营第三周作业(9.28-10.4)
请在草稿纸上手写一个单例模式的实现代码,拍照提交作业。
2020 年 10 月 8 日
推荐阅读
DeepFM:如何让你的模型更好地处理特征交叉?
2020 年 11 月 25 日
35 丨 AdaBoost(下):如何使用 AdaBoost 对房价进行预测?
2019 年 3 月 4 日
程序员未来会成为非常内卷的职业?
Week3:作业一
2020 年 7 月 5 日
移动开发新大陆: 边缘智能计算的趋势
2019 年 4 月 6 日
企业级别的 tensorflow 分布式训练架构总结
DMLC:最大开源分布式机器学习项目
电子书

大厂实战PPT下载
换一换 
王昊天 | 螣龙安科创始人兼 CEO 《Web 安全攻防实战》专栏作者
斯文骏 | 阿里巴巴 技术专家
康德胜 | 众安保险 CTO










评论