首先回顾下 tensorflow 中 In-graph 和 between-graph 的概念:
In-graph replication:只构建一个 client,这个 client 构建一个 Graph,Graph 中包含一套模型参数,放置在 ps 上,同时这个 Graph 中包含模型计算部分的多个副本,每个副本都放置在一个 worker 上,这样多个 worker 可以同时训练复制的模型。这种方式的缺点也很明显,一旦唯一创建 client 的那个 worker 挂了,整个系统就全崩溃了,容错能力差,因此在实际中使用较少。
Between-graph replication:每个 worker 都会创建一个 client,这个 client 一般还与 task 的主程序在同一进程中。各个 client 构建相同的 Graph,但是参数还是放置在 ps 上。这种方式容错能力比较好,一个 worker 的 client 挂掉之后,其它 worker 还可以继续 running。
我们重点讨论下 Between-graph 的方式,当只有一个 ps 的情况下,参数分配没什么好讨论的,那如果有多个 ps 节点时,变量存储和更新该怎么分配呢?tf.train.replica_device_setter 这个 API 给出了答案,该函数可以使用不同的设备函数来创建不同的参数放置策略。它默认会循环分配 graph 中的参数变量到各个 ps 上,并将计算部分的 op 放置于当前的 worker 节点上。下面先给一个如何使用 tf.train.replica_device_setter 的例子:
# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker
# jobs on hosts worker0, worker1 and worker2.
cluster_spec = {
"ps": ["ps0:2222", "ps1:2222"],
"worker": ["worker0:2222", "worker1:2222", "worker2:2222"]}
with tf.device(tf.train.replica_device_setter(cluster=cluster_spec)):
# Build your graph
v1 = tf.Variable(...) # assigned to /job:ps/task:0
v2 = tf.Variable(...) # assigned to /job:ps/task:1
v3 = tf.Variable(...) # assigned to /job:ps/task:0
那replica_device_setter内部是怎么实现的呢?我们来看下内部的源码:
@tf_export(v1=["train.replica_device_setter"])
def replica_device_setter(ps_tasks=0,
ps_device="/job:ps",
worker_device="/job:worker",
merge_devices=True,
cluster=None,
ps_ops=None,
ps_strategy=None):
"""Return a `device function` to use when building a Graph for replicas.
By default, only Variable ops are placed on ps tasks, and the placement
strategy is round-robin over all ps tasks. A custom `ps_strategy` may be used
to do more intelligent placement, such as
`tf.contrib.training.GreedyLoadBalancingStrategy`.
Args:
ps_tasks: Number of tasks in the `ps` job. Ignored if `cluster` is
provided.
ps_device: String. Device of the `ps` job. If empty no `ps` job is used.
Defaults to `ps`.
worker_device: String. Device of the `worker` job. If empty no `worker`
job is used.
merge_devices: `Boolean`. If `True`, merges or only sets a device if the
device constraint is completely unset. merges device specification rather
than overriding them.
cluster: `ClusterDef` proto or `ClusterSpec`.
ps_ops: List of strings representing `Operation` types that need to be
placed on `ps` devices. If `None`, defaults to `STANDARD_PS_OPS`.
ps_strategy: A callable invoked for every ps `Operation` (i.e. matched by
`ps_ops`), that takes the `Operation` and returns the ps task index to
use. If `None`, defaults to a round-robin strategy across all `ps`
devices.
Returns:
A function to pass to `tf.device()`.
Raises:
TypeError if `cluster` is not a dictionary or `ClusterDef` protocol buffer,
or if `ps_strategy` is provided but not a callable.
"""
if cluster is not None:
if isinstance(cluster, server_lib.ClusterSpec):
cluster_spec = cluster.as_dict()
else:
cluster_spec = server_lib.ClusterSpec(cluster).as_dict()
# Get ps_job_name from ps_device by stripping "/job:".
ps_job_name = pydev.DeviceSpec.from_string(ps_device).job
if ps_job_name not in cluster_spec or cluster_spec[ps_job_name] is None:
return None
ps_tasks = len(cluster_spec[ps_job_name])
if ps_tasks == 0:
return None
if ps_ops is None:
# TODO(sherrym): Variables in the LOCAL_VARIABLES collection should not be
# placed in the parameter server.
ps_ops = list(STANDARD_PS_OPS)
if not merge_devices:
logging.warning(
"DEPRECATION: It is recommended to set merge_devices=true in "
"replica_device_setter")
if ps_strategy is None:
ps_strategy = _RoundRobinStrategy(ps_tasks)
if not six.callable(ps_strategy):
raise TypeError("ps_strategy must be callable")
chooser = _ReplicaDeviceChooser(ps_tasks, ps_device, worker_device,
merge_devices, ps_ops, ps_strategy)
return chooser.device_function
class _RoundRobinStrategy(object):
"""Returns the next ps task index for placement in round-robin order.
This class is not to be used directly by users. See instead
`replica_device_setter()` below.
"""
def __init__(self, num_tasks):
"""Create a new `_RoundRobinStrategy`.
Args:
num_tasks: Number of ps tasks to cycle among.
"""
self._num_tasks = num_tasks
self._next_task = 0
def __call__(self, unused_op):
"""Choose a ps task index for the given `Operation`.
Args:
unused_op: An `Operation` to be placed on ps.
Returns:
The next ps task index to use for the `Operation`. Returns the next
index, in the range `[offset, offset + num_tasks)`.
"""
task = self._next_task
self._next_task = (self._next_task + 1) % self._num_tasks
return task
可以看到参数在 ps 上的默认分配方式是_RoundRobinStrategy,_RoundRobinStrategy 策略以 round-robin 顺序返回 ps 任务的 index。replica_device_setter 函数返回的一个 device function
,而这个 device function 就是用于指定 op 的 device 放置的。
我们结合一个测试 replica_device_setter 函数的例子,来说明 device function 内部的原理:
"""Tests for device function for replicated training."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.core.framework import node_def_pb2
from tensorflow.python.framework import device as pydev
from tensorflow.python.framework import ops
from tensorflow.python.ops import variables
from tensorflow.python.platform import test
from tensorflow.python.training import device_setter
from tensorflow.python.training import server_lib
class _RoundRobinStrategy(object):
"""Returns the next ps task index for placement in round-robin order.
This class is not to be used directly by users. See instead
`replica_device_setter()` below.
"""
def __init__(self, num_tasks):
"""Create a new `_RoundRobinStrategy`.
Args:
num_tasks: Number of ps tasks to cycle among.
"""
self._num_tasks = num_tasks
self._next_task = 0
def __call__(self, unused_op):
"""Choose a ps task index for the given `Operation`.
Args:
unused_op: An `Operation` to be placed on ps.
Returns:
The next ps task index to use for the `Operation`. Returns the next
index, in the range `[offset, offset + num_tasks)`.
"""
task = self._next_task
self._next_task = (self._next_task + 1) % self._num_tasks
return task
class DeviceSetterTest(test.TestCase):
_cluster_spec = server_lib.ClusterSpec({
"ps": ["ps0:2222", "ps1:2222"],
"worker": ["worker0:2222", "worker1:2222", "worker2:2222"]
})
_ps_tasks = 2
_ps_device = "/job:ps"
_worker_device = "/job:worker"
_merge_devices = True
_ps_ops = ("Variable", "VariableV2", "AutoReloadVariable",
"MutableHashTable", "MutableHashTableV2",
"MutableHashTableOfTensors", "MutableHashTableOfTensorsV2",
"MutableDenseHashTable", "MutableDenseHashTableV2",
"VarHandleOp", "BoostedTreesEnsembleResourceHandleOp")
_ps_strategy = _RoundRobinStrategy(_ps_tasks)
def testCPUOverride(self):
with ops.device(
device_setter.replica_device_setter(cluster=self._cluster_spec)):
# with ops.device("/cpu:0"):
v = variables.Variable([1, 2])
with ops.device("/cpu:0"):
w = variables.Variable([2, 1])
print(self.device_function(w.op))
k = variables.Variable([1, 2])
print(self.device_function(k.op))
with ops.device("/cpu:0"):
a = v + w
print(self.device_function(a.op))
# self.assertDeviceEqual("/job:ps/task:0/cpu:0", v.device)
# self.assertDeviceEqual("/job:ps/task:0/cpu:0", v.initializer.device)
# self.assertDeviceEqual("/job:ps/task:1", w.device)
# self.assertDeviceEqual("/job:ps/task:1", w.initializer.device)
# self.assertDeviceEqual("/job:worker/cpu:0", a.device)
def device_function(self, op):
"""Choose a device for `op`.
Args:
op: an `Operation`.
Returns:
The device to use for the `Operation`.
"""
# If we don't return early here, either merge_devices is True, or op.device
# is empty (in which case merging is a no-op). So we can always merge below.
if not self._merge_devices and op.device:
return op.device
current_device = pydev.DeviceSpec.from_string(op.device or "")
# The ps_device will be used for specified ops (ps_ops) whenever it is
# present and ps_tasks is non-zero. However, its task number will only be
# set (using ps_strategy) if there is a job field in ps_device that won't be
# changed by the job field (if present) in current_device.
node_def = op if isinstance(op, node_def_pb2.NodeDef) else op.node_def
if self._ps_tasks and self._ps_device and node_def.op in self._ps_ops:
ps_device = pydev.DeviceSpec.from_string(self._ps_device)
print("ps_device:", ps_device.to_string())
current_job, ps_job = current_device.job, ps_device.job
print("cur_device:", current_device.to_string())
if ps_job and (not current_job or current_job == ps_job):
ps_device = ps_device.replace(task=self._ps_strategy(op))
ps_device = ps_device.make_merged_spec(current_device)
print("merge_ps_device:", ps_device.to_string())
return ps_device.to_string()
worker_device = pydev.DeviceSpec.from_string(self._worker_device or "")
worker_device = worker_device.make_merged_spec(current_device)
return worker_device.to_string()
if __name__ == "__main__":
setter_test = DeviceSetterTest()
setter_test.testCPUOverride()
# test.main()
返回结果:
ps_device: /job:ps
cur_device: /job:ps/task:0
merge_ps_device: /job:ps/task:0
/job:ps/task:0
_____________
ps_device: /job:ps
cur_device: /job:ps/task:1/device:CPU:0
merge_ps_device: /job:ps/task:1/device:CPU:0
/job:ps/task:1/device:CPU:0
_____________
ps_device: /job:ps
cur_device: /job:ps/task:0
merge_ps_device: /job:ps/task:0
/job:ps/task:0
_________________
/job:worker/device:CPU:0
可以清楚的看出其实 device_function 的作用就是根据当前 op 的性质,返回执行该 op 的 device 信息,当 op 是属于_ps_ops 其中一种时,需要将该 op 放置于 ps 节点,否则就放置于 worker 节点。
可以看出如果有多个参数,默认的放置策略是 round-robin,按出现次序将参数挨个放到各个 ps 节点上,但是这种方式可能不能使 ps 达到负载均衡,在 2 个 ps 的情况下可能会出现,所有的参数 variable 都在 ps0,所有的偏置 b 都在 ps1 上,这显然会给 ps0 带来更大的负载压力。如果需要更加合理的参数分配方式,使用 tf.contrib.training.GreedyLoadBalancingStrategy 策略, 这是一个简单的贪婪策略,它可根据参数的内存字节大小来放置到内存合适的 ps 节点上,从而带来更好的负载均衡,如下图所示:
"""Strategies for placing variables on parameter servers.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import hashlib
import numpy as np
from tensorflow.python.framework import tensor_shape
class GreedyLoadBalancingStrategy(object):
"""Returns the least-loaded ps task for op placement.
The load is calculated by a user-specified load function passed in at
construction. There are no units for load, and the load function is
responsible for providing an internally consistent measure.
Note that this strategy is very sensitive to the exact order in which
ps ops (typically variables) are created, as it greedily places ops
on the least-loaded ps at the point each op is processed.
One reasonable heuristic is the `byte_size_load_fn`, which
estimates load as the number of bytes that would be used to store and
transmit the entire variable. More advanced load functions
could consider the difference in access patterns across ops, or trade
off CPU-intensive ops with RAM-intensive ops with network bandwidth.
This class is intended to be used as a `ps_strategy` in
`tf.train.replica_device_setter`.
"""
def __init__(self, num_tasks, load_fn):
"""Create a new `LoadBalancingStrategy`.
Args:
num_tasks: Number of ps tasks to cycle among.
load_fn: A callable that takes an `Operation` and returns a
numeric load value for that op.
"""
self._num_tasks = num_tasks
self._load_fn = load_fn
self._ps_loads = np.zeros(num_tasks)
def __call__(self, op):
"""Choose a ps task index for the given `Operation`.
Args:
op: A `Operation` to be placed on ps.
Returns:
The next ps task index to use for the `Operation`. Greedily
places the op on the least-loaded ps task so far, as determined
by the load function.
"""
task = np.argmin(self._ps_loads)
self._ps_loads[task] += self._load_fn(op)
return task
def byte_size_load_fn(op):
"""Load function that computes the byte size of a single-output `Operation`.
This is intended to be used with `"Variable"` ops, which have a single
`Tensor` output with the contents of the variable. However, it can also be
used for calculating the size of any op that has a single output.
Intended to be used with `GreedyLoadBalancingStrategy`.
Args:
op: An `Operation` with a single output, typically a "Variable" op.
Returns:
The number of bytes in the output `Tensor`.
Raises:
ValueError: if `op` does not have a single output, or if the shape of the
single output is not fully-defined.
"""
if len(op.outputs) != 1:
raise ValueError("Op %s must have a single output" % op)
output = op.outputs[0]
elem_size = output.dtype.size
shape = output.get_shape()
if not shape.is_fully_defined():
# Due to legacy behavior, scalar "Variable" ops have output Tensors that
# have unknown shape when the op is created (and hence passed to this
# load function for placement), even though the scalar shape is set
# explicitly immediately afterward.
shape = tensor_shape.TensorShape(op.get_attr("shape"))
shape.assert_is_fully_defined()
return shape.num_elements() * elem_size
上面是 GreedyLoadBalancingStrategy 策略的源码,还是比较清晰明了的。该策略是根据参数的内存字节大小 shape.num_elements()* elem_size 来选择放置到内存合适的 ps 节点上,每次都会选择当前放置内存被占用最少的那个 ps 节点来放置。所以其实可以自己定义放置策略,比如先把变量按大小排序后,然后每次初始化的时候向当前负载最小的 ps 上放。
以上讨论的都还是小字节的参数,每个 PS 节点都还可以单独处理一个变量。当遇到超大字节的变量,比如千万甚至亿级别的 embedding 特征,该如何处理?TensorFlow 提供了一个分割变量的方法,对于这种超大字节的变量,可使用一个分隔符,把这个变量分成多个部分,分发到不同的 ps 节点上去,如下图所示:
参考文献:
https://blog.csdn.net/tiangcs/article/details/85952007
https://blog.csdn.net/u012133034/article/details/81167040
https://www.youtube.com/watch?
本文转载自 Alex-zhai 知乎账号。
原文链接:https://zhuanlan.zhihu.com/p/90234576
更多内容推荐
Tensorflow 中 ring all-reduce 的实现
那如何在Tensorflow代码中实现ring all-reduce呢
使用循环处理位置参数
2019-08-23
物理内存管理(上):会议室管理员如何分配会议室?
这一节我们主要讲物理内存的组织形式。
2019-05-20
深度学习利器: TensorFlow 系统架构及高性能程序设计
本文首先回顾了TensorFlow 1.0主要新特性及TensorFlow 2017 Dev Summit的主要议程。并随着TensorFlow新版本的不断发布以及新特性的不断增加,TensorFlow使用更加灵活,运行速度更快,使用方式更产品化,已成为目前主流的深度学习平台之一。 接着介绍了TensorFlow的系统架构,包括Client,Master,Worker,Kernel的相关概念及运行方式,是一种适合大规模分布式训练的机器学习平台。从上述系统架构中可以看到,TensorFlow内核采用C/C++开发,当采用Python API去训练模型的时候,需要不断地用Python调用C/C++底层接口,重复的接口调用一定程度上影响了程序的执行性能。如果有最求高性能运算的朋友,可以尝试用下本文高性能运算章节推荐的方法。
Spark on Angel:Spark 机器学习的核心加速器
2016年12月腾讯宣布推出面向机器学习的第三代高性能计算平台——Angel,并于2017年6月开源。本文以L-BFGS为例,分析Spark在机器学习算法的实现上的问题,以及Spark on Angel是如何解决Spark在机器学习任务中的遇到的瓶颈,使Spark的机器学习能力更加强大。
浅谈 Tensorflow 分布式架构:parameter server 及优化策略
当我们想将一个单机的tensorflow训练程序改写成分布式训练(多机多卡)的时候
实现大规模图计算的算法思路
图神经网络这几年特别火爆,无论学术界还是业界,大家都在考虑用图神经网络。
深度学习利器:分布式 TensorFlow 及实例分析
无
docker 的 /var/run/docker.sock 参数
/var/run/docker.sock是运行docker容器时常用的数据卷参数,本文就来学习这个参数的用处,揭示背后的原理
2022-09-26
Spark 上的深度学习框架再添新兵:Yahoo 开源 TensorFlowOnSpark
Yahoo Big ML团队宣布开源TensorFlowOnSpark,他们用来在大数据集群的分布式深度学习最新的开源框架。 Yahoo Big ML团队成员Lee Yang、Jun Shi、Bobbie Chern和Andy Feng日前合著了一篇文章,详细介绍了他们开源的TensorFlowOnSpark的方方面面。 Yahoo开源的TensorFlowOnSpark使Google发起的TensorFlow深度学习开源框架与Apache Spark集群中的数据集兼容,一些组织为了处理大量不同类型的数据而进行维护,对他们来说无疑是个好消息。 Yahoo开源TensorFlowOnSpark采用了Apache 2.0协议许可,并在GitHub上发布。 深度学习通常涉及大量数据进行人工神经网络训练,像照片,然后指导神经网络对新数据做出最佳猜测。 InfoQ翻译并整理本文。
计算虚拟化之 CPU(下):如何复用集团的人力资源?
上一节我们得到了表示体系结构的MachineClass和MachineState。这一节我们继续后面的部分。
2019-07-24
小练习:数一数 tree 的个数
2018-12-10
后台程序员转算法的参考秘籍:大规模机器学习框架的四重境界
如何利用相对廉价的机器搭建分布式超大规模机器学习集群是一件非常复杂的事情,对工程和算法都有极高的要求,从Spark到李沐的通用参数服务器,业界对此都进行过哪些尝试?本文尝试梳理一下这方面的历史和当前最佳实践。
深度学习框架 TensorFlow 在 Kubernetes 上的实践
随着AlphaGo战胜李世石,大数据、人工智能、深度学习这些概念最近已经成为一个非常火的话题。Google、Facebook、百度、阿里等一系列国内外大公司纷纷对外开放宣布人工智能将作为他们下一个战略重心。在AlphaGo、无人车等最新进展的背后,深度学习成为了推动人工智能发展的技术动力。本文根据才云科技首席大数据科学家郑泽宇在QCon2016全球软件开发大会(上海站)上的演讲整理而成,希望大家可以了解如何通过TensorFlow实现深度学习算法,并将深度学习运用到企业实践中。
解析高效分布式训练系统 PERSIA:可用于训练百万亿参数的超大规模深度学习推荐模型
PERSIA在Kwai-video基准测试中实现了比完全同步方法高2.8倍的吞吐量。即使模型大小增加到100万亿个参数,PERSIA也表现出了稳定的训练吞吐量,达到完全同步模式吞吐量的2.6倍。
协程的启动参数
无
2018-07-26
基于 UAI-Train 平台的分布式训练
在大型数据集上进行训练的现代神经网络架构,可以跨广泛的多种领域获取可观的结果,涵盖从图像识别、自然语言处理到欺诈检测和推荐系统等各个方面,但训练这些神经网络模型需要大量浮点计算能力。
计算图反向传播的原理及实现
本文介绍计算图及其上的自动求导原理,用原生Python实现计算图及其上反向传播,搭建多层全连接神经网络,建模MNIST手写数字识别。
MXNet API 入门 —第 2 篇
Apache MXNet是一种功能全面、可以灵活编程并且扩展能力超强的深度学习框架,支持包括卷积神经网络(CNN)与长短期记忆网络(LSTM)在内的顶尖深度模型。这一系列文章介绍了MXNet的基本概念和使用方法。本篇主要介绍了MXNet的Symbol API。
scroll-view 介绍:如果渲染一个滚动的长列表?
2020-06-04
推荐阅读
电子书
大厂实战PPT下载
换一换 母延年 | 录信数软 创始人&CTO
车明君 | 哈啰出行 资深技术专家
颜叶 | 华为 计算产品线/鲲鹏计算产品部部长
评论