一. DC-ASGD 算法介绍
此前,和大家也一起讨论过 DC-ASGD 算法,详细可见:https://zhuanlan.zhihu.com/p/80978479
DC-ASGD 算法主要解决的问题是:异步的随机梯度下降法(ASGD)在深度学习模型的训练中会存在 delayed gradients 的问题,就是当一个 worker 向参数 server 端提交它算出的梯度时,server 端其实已经被其它 worker 更新过好多次了。主要解决方案是利用梯度项的泰勒展开式去近似逼近 loss 函数的 Hessian 矩阵。
具体算法:
二. DC-ASGD 算法 tensorflow 实现
那么如何在 tensorflow 中实现 dc-asgd 算法呢?在上一篇文章中,我们讨论过 tensorflow 中 Optimizer 类的源码解析,其实就是为该篇文章做铺垫。接下来我们就具体分析下 Optimizer 的子类-DelayCompensatedGradientDescentOptimizer 类。
"""DelayCompensatedGradientDescentOptimizer for TensorFlow."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import state_ops
from tensorflow.python.training import optimizer
from tensorflow.python.training import training_ops
GATE_NONE = 0
GATE_OP = 1
GATE_GRAPH = 2
class DelayCompensatedGradientDescentOptimizer(optimizer.Optimizer):
"""Optimizer that implements the DelayCompensatedGradientDescent algorithm.
See [](https://arxiv.org/abs/1609.08326)
([](https://arxiv.org/pdf/1609.08326.pdf)).
"""
def __init__(self, learning_rate, variance_parameter=2.0, num_workers=1,
use_locking=False, name="DelayCompensatedGradientDescentOptimizer"):
"""Construct a gradient descent optimizer with delay compensation.
It is cricial to note the `num_workers` in constructor and `worker_index` in
`minimize()` and `apply_gradients()`.
Contrast to AdaMaxamOptimizer, the sparse implementation of this algorithm
(used when the gradient is an IndexedSlices object, typically because of
`tf.gather` or an embedding lookup in the forward pass) only updates
variable slices and corresponding `shadow_t` term when that part of
the variable was used in the forward pass. This means that the sparse
behavior is contrast to the dense behavior (similar to some momentum
implementations which ignore momentum unless a variable slice was actually
used).
Args:
learning_rate: A Tensor or a floating point value. The learning rate.
variance_parameter: A Tensor or a floating point value.
The variance control parameter.
num_workers: A int value. The number of workers.
use_locking: If True use locks for update operations.
name: Optional name for the operations created when applying gradients.
Defaults to "DelayCompensatedGradientDescentOptimizer".
"""
num_workers = self._call_if_callable(num_workers)
if num_workers <= 0:
raise ValueError("num_workers must be positive: %s" % num_workers)
super(DelayCompensatedGradientDescentOptimizer, self).__init__(use_locking, name)
self._lr = learning_rate
self._lambda = variance_parameter
self._num_workers = num_workers
self._learning_rate_tensor = None
self._lambda_tensor = None
self._use_locking = use_locking
def _create_slots(self, var_list):
for index in range(self._num_workers):
for v in var_list:
self._zeros_slot(v, "shadow_{0}".format(index), self._name)
def _prepare(self):
lr = self._call_if_callable(self._lr)
lambda_ = self._call_if_callable(self._lambda)
self._learning_rate_tensor = ops.convert_to_tensor(lr, name="learning_rate")
self._lambda_tensor = ops.convert_to_tensor(lambda_, name="lambda")
def _apply_dense(self, grad, var):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index))
return training_ops.apply_delay_compensated_gradient_descent(
var,
math_ops.cast(self._learning_rate_tensor, grad.dtype.base_dtype),
grad,
math_ops.cast(self._lambda_tensor, grad.dtype.base_dtype),
shadow,
use_locking=self._use_locking).op
def _resource_apply_dense(self, grad, var):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index))
return training_ops.resource_apply_delay_compensated_gradient_descent(
var.handle,
math_ops.cast(self._learning_rate_tensor, grad.dtype.base_dtype),
grad,
math_ops.cast(self._lambda_tensor, grad.dtype.base_dtype),
shadow.handle,
use_locking=self._use_locking)
def _apply_sparse_shared(self, grad, var, indices):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index))
# if shadow is None:
# raise ValueError("None shadow with index = " + str(self.worker_index) + " and var = " + str(var))
lambda_ = math_ops.cast(self._lambda_tensor, var.dtype.base_dtype)
lr = math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype)
var_slice = array_ops.gather(var, indices)
shadow_slice = array_ops.gather(shadow, indices)
var_scaled_g_values = lr * (grad + lambda_ * grad * grad * (var_slice - shadow_slice))
var_t = state_ops.scatter_add(var, indices, -var_scaled_g_values, use_locking=self._use_locking)
with ops.control_dependencies([var_t]):
shadow_t = state_ops.assign(shadow, var_t)
return control_flow_ops.group(*[var_t, shadow_t])
def _apply_sparse(self, grad, var):
return self._apply_sparse_shared(
grad.values, var, grad.indices)
def _resource_apply_sparse(self, grad, var, indices):
return self._apply_sparse_shared(
grad, var, indices)
def minimize(self, loss, global_step=None, var_list=None,
gate_gradients=GATE_OP, aggregation_method=None,
colocate_gradients_with_ops=False, name=None,
grad_loss=None, worker_index=0):
self.worker_index = worker_index
return super(DelayCompensatedGradientDescentOptimizer, self).minimize(loss=loss, global_step=global_step,
var_list=var_list,
gate_gradients=gate_gradients,
aggregation_method=aggregation_method,
colocate_gradients_with_ops=colocate_gradients_with_ops,
name=name,
grad_loss=grad_loss)
def apply_gradients(self, grads_and_vars, global_step=None, name=None, worker_index=0):
self.worker_index = worker_index
return super(DelayCompensatedGradientDescentOptimizer, self).apply_gradients(grads_and_vars=grads_and_vars,
global_step=global_step, name=name)
_create_slots 函数用来创建一些额外的参数,这里创建的是每一个 worker 上的每一个 variable 所对应的备份变量 shadow。_prepare 函数用来准备优化器的常规超参数。
我们重点关注下_apply_sparse 函数,该函数调用的是_apply_sparse_shared 函数,参数 grad 的数据类型是 IndexedSlices 类型,那么什么是 IndexedSlices 类型呢?这里 Slice 的意思是从 Tensor 里面取特定的一些下标得到原先 tensor 变量的一部分,比如说原来的 tensor 的 shape 是[10,10],取下标[0]得到一个[10]的 Tensor,这个 Tensor 就是原 Tensor 的一个 Slice。那么 IndexedSlices 其实就是一堆 Slices 和它们所对应的下标(也就是 Index)。在梯度更新过程中,如果只需要更新某几行的梯度值,就可以将梯度表示成这种数据结构,来节省计算资源。
所以_apply_sparse_shared 函数参数传入的是 grad.values 和 grad.indices,分别表示特定行的梯度值和行的下标。在计算梯度项时:var_scaled_g_values = lr *(grad + lambda_ * grad * grad *(var_slice - shadow_slice)),也需要先求出特定行的 var_slice 和 shadow_slice。然后根据求出的梯度项更新参数时:var_t = state_ops.scatter_add(var, indices,-var_scaled_g_values, use_locking=self._use_locking),也是在特定的那些行(根据 indices 确定的)做更新。
当这一轮的参数做完更新后,需要将当前时刻的变量 var_t 备份一下,以用于下一时刻的参数更新:shadow_t = state_ops.assign(shadow, var_t)。最后将 var_t, shadow_t 的更新操作放进 control_flow_ops 中。
我们举一个简单的 example 来说明一下这种 IndexedSlices 类型的梯度是怎么更新的:
import numpy as np
import tensorflow as tf
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import ops
from tensorflow.python.ops import variables
from tensorflow.python.training import adam
if __name__ == '__main__':
value_a = np.ones(shape=[3, 10])
indices_a = np.array([0, 3, 8])
dense_shape_a = [10, 10]
grad_slices_a = ops.IndexedSlices(constant_op.constant(value_a), constant_op.constant(indices_a),
constant_op.constant(dense_shape_a))
var_np = np.ones(shape=[10, 10])
var0 = variables.RefVariable(var_np)
opt = adam.AdamOptimizer()
update = opt.apply_gradients(zip([grad_slices_a], [var0]))
# variables.global_variables_initializer().run()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
print("initial variable is:", sess.run(var0))
sess.run(update)
print("update 1 time variable is:", sess.run(var0))
输出:
initial variable is: [[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
update 1 time variable is: [[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]]
可以很清楚地看到,执行一次梯度更新之后,只有 0,3,8 这三行的变量值发生了改变。这就是使用 IndexedSlices 类型的优势。
另外,training_ops.apply_delay_compensated_gradient_descent 这个函数是在 tensorflow/core/kernels/training_ops.cc 中实现的,核心代码如下:
template <typename T>
struct ApplyDelayCompensatedGradientDescent<CPUDevice, T> {
void operator()(const CPUDevice& d, typename TTypes<T>::Flat var,
typename TTypes<T>::ConstScalar lr,
typename TTypes<T>::ConstFlat grad,
typename TTypes<T>::ConstScalar variance,
typename TTypes<T>::Flat shadow) {
var.device(d) -= lr() * (grad + variance() * grad * grad * (var - shadow));
shadow.device(d) = var;
}
};
其实除了这两个文件之外,还需要写一下注册 ApplyDelayCompensatedGradientDescent 的 OP 接口,这里就不详细讲解了。
三.如何使用 DC-ASGD 算法
在 tensorflow 源码目录中修改或添加完 dc-asgd 算法的几个相关文件后,需要重新编译一下 tensorflow。编译成功后,就可以愉快地使用 dc-asgd 算法的接口啦。
下面给大家举一个使用 DelayCompensatedGradientDescentOptimizer 优化器的分布式训练 demo:
from __future__ import print_function, absolute_import, division
import tensorflow as tf
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps hosts")
tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker hosts")
tf.app.flags.DEFINE_string("job_name", "worker", "'ps' or'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("num_workers", 2, "Number of workers")
tf.app.flags.DEFINE_boolean("is_sync", False, "using synchronous training or not")
FLAGS = tf.app.flags.FLAGS
def model(images):
"""Define a simple mnist classifier"""
net = tf.layers.dense(images, 500, activation=tf.nn.relu)
net = tf.layers.dense(net, 500, activation=tf.nn.relu)
net = tf.layers.dense(net, 10, activation=None)
return net
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(-1, 784).astype('float32')
x_test = x_test.reshape(-1, 784).astype('float32')
x_train /= 255
x_test /= 255
def get_batch(image, label, batch_size=32, training=True):
df = tf.data.Dataset.from_tensor_slices((image, label))
if training:
df = df.repeat(10).shuffle(buffer_size=1000)
df = df.batch(batch_size).prefetch(batch_size)
iterator = df.make_one_shot_iterator()
batch_x, batch_y = iterator.get_next()
return batch_x, batch_y
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# create the cluster configured by `ps_hosts' and 'worker_hosts'
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# create a server for local task
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
train_batch_x, train_batch_y = get_batch(x_train, y_train)
test_batch_x, test_batch_y = get_batch(x_test, y_test, training=False)
if FLAGS.job_name == "ps":
server.join() # ps hosts only join
elif FLAGS.job_name == "worker":
# workers perform the operation
# ps_strategy = tf.contrib.training.GreedyLoadBalancingStrategy(FLAGS.num_ps)
# Note: tf.train.replica_device_setter automatically place the paramters (Variables)
# on the ps hosts (default placement strategy: round-robin over all ps hosts, and also
# place multi copies of operations to each worker host
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
logits = model(train_batch_x)
loss = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=tf.one_hot(train_batch_y, 10)))
# The StopAtStepHook handles stopping after running given steps.
hooks = [tf.train.StopAtStepHook(last_step=10000)]
global_step = tf.train.get_or_create_global_step()
#optimizer = tf.train.AdamOptimizer(learning_rate=1e-04)
optimizer = tf.contrib.opt.DelayCompensatedGradientDescentOptimizer(learning_rate=0.001)
if FLAGS.is_sync:
# asynchronous training
# use tf.train.SyncReplicasOptimizer wrap optimizer
# ref: https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer
optimizer = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=FLAGS.num_workers,
total_num_replicas=FLAGS.num_workers)
# create the hook which handles initialization and queues
hooks.append(optimizer.make_session_run_hook((FLAGS.task_index == 0)))
train_op = optimizer.minimize(loss, global_step=global_step)
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir="./checkpoint_dir",
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# mon_sess.run handles AbortedError in case of preempted PS.
_, ls, step = mon_sess.run([train_op, loss, global_step])
if step % 100 == 0:
print("Train step %d, loss: %f" % (step, ls))
if __name__ == "__main__":
tf.app.run()
启动命令是:
python dc_asgd_exp.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224 --job_name=ps --task_index=0
python dc_asgd_exp.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224 --job_name=worker --task_index=0
参考文献:
https://zhuanlan.zhihu.com/p/80978479
https://zhuanlan.zhihu.com/p/87348147
https://www.zhihu.com/question/277403551
https://zhuanlan.zhihu.com/p/35083779
本文转载自 Alex-zhai 知乎账号。
原文链接:https://www.zhihu.com/people/alex-zhai-19/posts
更多内容推荐
什么是 Spring-Cloud、需要掌握哪些知识点,Java 面试常问的算法题
zuul:routes:consumer1: /FrancisQ1/**consumer2: /FrancisQ2/**
2021-09-11
如何用 Python 构建机器学习模型?
本文,我们将通过 Python 语言包,来构建一些机器学习模型。
【思特奇杯·云上蓝桥 - 算法集训营】第 2 周 ---- 真题汇总 + 思路分享
【思特奇杯·云上蓝桥-算法集训营】第2周----真题汇总+思路分享
2022-02-15
白话机器学习 (6):用 Lasso Regression 踢出浑水摸鱼的变量
本文适合人群:对机器学习Machine Learning与数据感兴趣的学习者白话ML(6):用Lasso Regression踢出浑水摸鱼的变量
2021-08-09
可视化算法网站汇总,从此简单学算法!(附动图)
轻松学习算法的密码,从此再也不怕算法了。
2020-06-11
23|实战项目(五):做一个类似 LensaAI 的梦幻照相馆
这一讲,我们会训练出一个能高度还原人像效果的LoRA模型,有了这个模型,你就能实现一个梦幻照相馆的项目。
2023-09-08
算法题每日一练 --- 第 12 天:算式 900
小明的作业本上有道思考题:
2022-07-28
Tensorflow 保存神经网络参数有妙招:Saver 和 Restore
摘要:这篇文章将讲解TensorFlow如何保存变量和神经网络参数,通过Saver保存神经网络,再通过Restore调用训练好的神经网络。
2021-09-13
4. 启发式搜索:A* 算法
2023-09-27
11|VAE 系列:如何压缩图像给 GPU 腾腾地方
这一讲,我们将一起了解VAE的基本原理,之后我们训练自己的Stable Diffusion模型时,也会用上VAE这个模块。
2023-08-09
Python 多重继承问题之 MRO 和 C3 算法
Python多继承之MRO和C3算法学习笔记
2020-06-29
架构师训练营 week13 作业
pagerank算法
2020-12-20
Tensorflow 中 ring all-reduce 的实现
那如何在Tensorflow代码中实现ring all-reduce呢
一致性哈希算法
consistent hashing算法早在1997年就在论文Consistent hashing and random trees中被提出,目前在分布式缓存、负载均衡、RPC框架中应用很广泛。
2020-07-08
Github 惊现高星神作,两份算法宝典让你横扫大厂算法面试题
有些小伙伴可能会有疑惑,为什么面试总喜欢问算法与数据结构问题,这是有原因的
2020-10-19
19| 实战项目(三):动手做一个自己的 LoRA 模型
这一讲,我们会从零到一,完成了宝可梦LoRA和彩铅LoRA两个模型的训练。
2023-08-30
25|ControlNet:让你的图拥有一个“骨架”
ControlNet:让你的图拥有一个“骨架”
2023-05-08
性能优化(文件、数据结构、算法、网络 IO)
(至少完成一个)
2020-11-16
Pytorch 基础 -tensor 数据结构
torch.Tensor 是一种包含单一数据类型元素的多维矩阵,类似于 numpy 的 array。 Tensor 可以使用 torch.tensor() 转换 Python 的 list 或序列数据生成,生成的是dtype 默认是 torch.FloatTensor。
2023-01-06
12|实战项目(二):动手训练一个你自己的扩散模型
今天,我们将通过实战的形式,一起训练一个标准扩散模型,并微调一个Stable Diffusion模型,帮你进一步加深对知识的理解。
2023-08-11
推荐阅读
PyTorch 中 nn.Conv2d 与 nn.ConvTranspose2d 函数的用法
2023-01-11
27|模型工程(三):低成本领域模型方案,小团队怎么做大模型?
2023-10-20
如何使用 MegEngine 生态落地一个算法
2023-03-30
1、sds&intset&dict 详解
2023-09-29
26|模型工程(二):算力受限,如何为“无米之炊”?
2023-10-18
大模型训练:数据与算法的关键融合
2023-10-17
opencv 目标检测之 canny 算法
2023-06-26
电子书
大厂实战PPT下载
换一换 刘丹 | 去哪儿网 测试高级经理
何良 | Intel Web Platform Engineering 软件工程师
钟仕骏 | 新东方教育 信息管理部/运维开发高级经理
评论