写点什么

tensorflow 中 ASGD with Delay Compensation 优化器代码实现

  • 2019-11-29
  • 本文字数:8344 字

    阅读完需:约 27 分钟

tensorflow中ASGD with Delay Compensation优化器代码实现

一. DC-ASGD 算法介绍

此前,和大家也一起讨论过 DC-ASGD 算法,详细可见:https://zhuanlan.zhihu.com/p/80978479


DC-ASGD 算法主要解决的问题是:异步的随机梯度下降法(ASGD)在深度学习模型的训练中会存在 delayed gradients 的问题,就是当一个 worker 向参数 server 端提交它算出的梯度时,server 端其实已经被其它 worker 更新过好多次了。主要解决方案是利用梯度项的泰勒展开式去近似逼近 loss 函数的 Hessian 矩阵。


具体算法:


二. DC-ASGD 算法 tensorflow 实现

那么如何在 tensorflow 中实现 dc-asgd 算法呢?在上一篇文章中,我们讨论过 tensorflow 中 Optimizer 类的源码解析,其实就是为该篇文章做铺垫。接下来我们就具体分析下 Optimizer 的子类-DelayCompensatedGradientDescentOptimizer 类。


"""DelayCompensatedGradientDescentOptimizer for TensorFlow."""from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_function
from tensorflow.python.framework import opsfrom tensorflow.python.ops import array_opsfrom tensorflow.python.ops import control_flow_opsfrom tensorflow.python.ops import math_opsfrom tensorflow.python.ops import state_opsfrom tensorflow.python.training import optimizerfrom tensorflow.python.training import training_ops
GATE_NONE = 0GATE_OP = 1GATE_GRAPH = 2

class DelayCompensatedGradientDescentOptimizer(optimizer.Optimizer): """Optimizer that implements the DelayCompensatedGradientDescent algorithm. See [](https://arxiv.org/abs/1609.08326) ([](https://arxiv.org/pdf/1609.08326.pdf)). """
def __init__(self, learning_rate, variance_parameter=2.0, num_workers=1, use_locking=False, name="DelayCompensatedGradientDescentOptimizer"):
"""Construct a gradient descent optimizer with delay compensation. It is cricial to note the `num_workers` in constructor and `worker_index` in `minimize()` and `apply_gradients()`. Contrast to AdaMaxamOptimizer, the sparse implementation of this algorithm (used when the gradient is an IndexedSlices object, typically because of `tf.gather` or an embedding lookup in the forward pass) only updates variable slices and corresponding `shadow_t` term when that part of the variable was used in the forward pass. This means that the sparse behavior is contrast to the dense behavior (similar to some momentum implementations which ignore momentum unless a variable slice was actually used). Args: learning_rate: A Tensor or a floating point value. The learning rate. variance_parameter: A Tensor or a floating point value. The variance control parameter. num_workers: A int value. The number of workers. use_locking: If True use locks for update operations. name: Optional name for the operations created when applying gradients. Defaults to "DelayCompensatedGradientDescentOptimizer". """ num_workers = self._call_if_callable(num_workers) if num_workers <= 0: raise ValueError("num_workers must be positive: %s" % num_workers) super(DelayCompensatedGradientDescentOptimizer, self).__init__(use_locking, name) self._lr = learning_rate self._lambda = variance_parameter self._num_workers = num_workers self._learning_rate_tensor = None self._lambda_tensor = None self._use_locking = use_locking
def _create_slots(self, var_list): for index in range(self._num_workers): for v in var_list: self._zeros_slot(v, "shadow_{0}".format(index), self._name)
def _prepare(self): lr = self._call_if_callable(self._lr) lambda_ = self._call_if_callable(self._lambda)
self._learning_rate_tensor = ops.convert_to_tensor(lr, name="learning_rate") self._lambda_tensor = ops.convert_to_tensor(lambda_, name="lambda")
def _apply_dense(self, grad, var):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index)) return training_ops.apply_delay_compensated_gradient_descent( var, math_ops.cast(self._learning_rate_tensor, grad.dtype.base_dtype), grad, math_ops.cast(self._lambda_tensor, grad.dtype.base_dtype), shadow, use_locking=self._use_locking).op
def _resource_apply_dense(self, grad, var):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index)) return training_ops.resource_apply_delay_compensated_gradient_descent( var.handle, math_ops.cast(self._learning_rate_tensor, grad.dtype.base_dtype), grad, math_ops.cast(self._lambda_tensor, grad.dtype.base_dtype), shadow.handle, use_locking=self._use_locking)
def _apply_sparse_shared(self, grad, var, indices):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index)) # if shadow is None: # raise ValueError("None shadow with index = " + str(self.worker_index) + " and var = " + str(var)) lambda_ = math_ops.cast(self._lambda_tensor, var.dtype.base_dtype) lr = math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype)
var_slice = array_ops.gather(var, indices) shadow_slice = array_ops.gather(shadow, indices)
var_scaled_g_values = lr * (grad + lambda_ * grad * grad * (var_slice - shadow_slice))
var_t = state_ops.scatter_add(var, indices, -var_scaled_g_values, use_locking=self._use_locking)
with ops.control_dependencies([var_t]): shadow_t = state_ops.assign(shadow, var_t)
return control_flow_ops.group(*[var_t, shadow_t])
def _apply_sparse(self, grad, var): return self._apply_sparse_shared( grad.values, var, grad.indices)
def _resource_apply_sparse(self, grad, var, indices): return self._apply_sparse_shared( grad, var, indices)
def minimize(self, loss, global_step=None, var_list=None, gate_gradients=GATE_OP, aggregation_method=None, colocate_gradients_with_ops=False, name=None, grad_loss=None, worker_index=0): self.worker_index = worker_index return super(DelayCompensatedGradientDescentOptimizer, self).minimize(loss=loss, global_step=global_step, var_list=var_list, gate_gradients=gate_gradients, aggregation_method=aggregation_method, colocate_gradients_with_ops=colocate_gradients_with_ops, name=name, grad_loss=grad_loss)
def apply_gradients(self, grads_and_vars, global_step=None, name=None, worker_index=0): self.worker_index = worker_index return super(DelayCompensatedGradientDescentOptimizer, self).apply_gradients(grads_and_vars=grads_and_vars,
复制代码


                                                                                 global_step=global_step, name=name)
复制代码


_create_slots 函数用来创建一些额外的参数,这里创建的是每一个 worker 上的每一个 variable 所对应的备份变量 shadow。_prepare 函数用来准备优化器的常规超参数。


我们重点关注下_apply_sparse 函数,该函数调用的是_apply_sparse_shared 函数,参数 grad 的数据类型是 IndexedSlices 类型,那么什么是 IndexedSlices 类型呢?这里 Slice 的意思是从 Tensor 里面取特定的一些下标得到原先 tensor 变量的一部分,比如说原来的 tensor 的 shape 是[10,10],取下标[0]得到一个[10]的 Tensor,这个 Tensor 就是原 Tensor 的一个 Slice。那么 IndexedSlices 其实就是一堆 Slices 和它们所对应的下标(也就是 Index)。在梯度更新过程中,如果只需要更新某几行的梯度值,就可以将梯度表示成这种数据结构,来节省计算资源。


所以_apply_sparse_shared 函数参数传入的是 grad.values 和 grad.indices,分别表示特定行的梯度值和行的下标。在计算梯度项时:var_scaled_g_values = lr *(grad + lambda_ * grad * grad *(var_slice - shadow_slice)),也需要先求出特定行的 var_slice 和 shadow_slice。然后根据求出的梯度项更新参数时:var_t = state_ops.scatter_add(var, indices,-var_scaled_g_values, use_locking=self._use_locking),也是在特定的那些行(根据 indices 确定的)做更新。


当这一轮的参数做完更新后,需要将当前时刻的变量 var_t 备份一下,以用于下一时刻的参数更新:shadow_t = state_ops.assign(shadow, var_t)。最后将 var_t, shadow_t 的更新操作放进 control_flow_ops 中。


我们举一个简单的 example 来说明一下这种 IndexedSlices 类型的梯度是怎么更新的:


import numpy as npimport tensorflow as tffrom tensorflow.python.framework import constant_opfrom tensorflow.python.framework import opsfrom tensorflow.python.ops import variablesfrom tensorflow.python.training import adam

if __name__ == '__main__': value_a = np.ones(shape=[3, 10]) indices_a = np.array([0, 3, 8]) dense_shape_a = [10, 10] grad_slices_a = ops.IndexedSlices(constant_op.constant(value_a), constant_op.constant(indices_a), constant_op.constant(dense_shape_a))
var_np = np.ones(shape=[10, 10])
var0 = variables.RefVariable(var_np) opt = adam.AdamOptimizer() update = opt.apply_gradients(zip([grad_slices_a], [var0])) # variables.global_variables_initializer().run() sess = tf.Session() sess.run(tf.global_variables_initializer()) print("initial variable is:", sess.run(var0)) sess.run(update) print("update 1 time variable is:", sess.run(var0))

输出:initial variable is: [[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]update 1 time variable is: [[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]]
复制代码


可以很清楚地看到,执行一次梯度更新之后,只有 0,3,8 这三行的变量值发生了改变。这就是使用 IndexedSlices 类型的优势。


另外,training_ops.apply_delay_compensated_gradient_descent 这个函数是在 tensorflow/core/kernels/training_ops.cc 中实现的,核心代码如下:


template <typename T>struct ApplyDelayCompensatedGradientDescent<CPUDevice, T> {  void operator()(const CPUDevice& d, typename TTypes<T>::Flat var,                   typename TTypes<T>::ConstScalar lr,                   typename TTypes<T>::ConstFlat grad,                   typename TTypes<T>::ConstScalar variance,                   typename TTypes<T>::Flat shadow) {    var.device(d) -= lr() * (grad + variance() * grad * grad * (var - shadow));    shadow.device(d) = var;  }};
复制代码


其实除了这两个文件之外,还需要写一下注册 ApplyDelayCompensatedGradientDescent 的 OP 接口,这里就不详细讲解了。

三.如何使用 DC-ASGD 算法

在 tensorflow 源码目录中修改或添加完 dc-asgd 算法的几个相关文件后,需要重新编译一下 tensorflow。编译成功后,就可以愉快地使用 dc-asgd 算法的接口啦。


下面给大家举一个使用 DelayCompensatedGradientDescentOptimizer 优化器的分布式训练 demo:


from __future__ import print_function, absolute_import, division
import tensorflow as tf
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps hosts")tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker hosts")tf.app.flags.DEFINE_string("job_name", "worker", "'ps' or'worker'")tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")tf.app.flags.DEFINE_integer("num_workers", 2, "Number of workers")tf.app.flags.DEFINE_boolean("is_sync", False, "using synchronous training or not")
FLAGS = tf.app.flags.FLAGS

def model(images): """Define a simple mnist classifier""" net = tf.layers.dense(images, 500, activation=tf.nn.relu) net = tf.layers.dense(net, 500, activation=tf.nn.relu) net = tf.layers.dense(net, 10, activation=None) return net

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()x_train = x_train.reshape(-1, 784).astype('float32')x_test = x_test.reshape(-1, 784).astype('float32')x_train /= 255x_test /= 255

def get_batch(image, label, batch_size=32, training=True): df = tf.data.Dataset.from_tensor_slices((image, label)) if training: df = df.repeat(10).shuffle(buffer_size=1000) df = df.batch(batch_size).prefetch(batch_size) iterator = df.make_one_shot_iterator() batch_x, batch_y = iterator.get_next() return batch_x, batch_y

def main(_): ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",")
# create the cluster configured by `ps_hosts' and 'worker_hosts' cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# create a server for local task server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
train_batch_x, train_batch_y = get_batch(x_train, y_train) test_batch_x, test_batch_y = get_batch(x_test, y_test, training=False)
if FLAGS.job_name == "ps": server.join() # ps hosts only join elif FLAGS.job_name == "worker": # workers perform the operation # ps_strategy = tf.contrib.training.GreedyLoadBalancingStrategy(FLAGS.num_ps)
# Note: tf.train.replica_device_setter automatically place the paramters (Variables) # on the ps hosts (default placement strategy: round-robin over all ps hosts, and also # place multi copies of operations to each worker host with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
logits = model(train_batch_x) loss = tf.reduce_mean( tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=tf.one_hot(train_batch_y, 10)))
# The StopAtStepHook handles stopping after running given steps. hooks = [tf.train.StopAtStepHook(last_step=10000)]
global_step = tf.train.get_or_create_global_step() #optimizer = tf.train.AdamOptimizer(learning_rate=1e-04) optimizer = tf.contrib.opt.DelayCompensatedGradientDescentOptimizer(learning_rate=0.001)
if FLAGS.is_sync: # asynchronous training # use tf.train.SyncReplicasOptimizer wrap optimizer # ref: https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer optimizer = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=FLAGS.num_workers, total_num_replicas=FLAGS.num_workers) # create the hook which handles initialization and queues hooks.append(optimizer.make_session_run_hook((FLAGS.task_index == 0)))
train_op = optimizer.minimize(loss, global_step=global_step)
# The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(FLAGS.task_index == 0), checkpoint_dir="./checkpoint_dir", hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # mon_sess.run handles AbortedError in case of preempted PS. _, ls, step = mon_sess.run([train_op, loss, global_step]) if step % 100 == 0: print("Train step %d, loss: %f" % (step, ls))

if __name__ == "__main__": tf.app.run()
复制代码


启动命令是:


python dc_asgd_exp.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224 --job_name=ps --task_index=0python dc_asgd_exp.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224 --job_name=worker --task_index=0
复制代码


参考文献:


https://zhuanlan.zhihu.com/p/80978479


https://zhuanlan.zhihu.com/p/87348147


https://www.zhihu.com/question/277403551


https://zhuanlan.zhihu.com/p/35083779


本文转载自 Alex-zhai 知乎账号。


原文链接:https://www.zhihu.com/people/alex-zhai-19/posts


2019-11-29 08:00878

评论

发布
暂无评论
发现更多内容

强强联合,天翼云安全能力再升级!

天翼云开发者社区

云计算 基础设施 云服务 云安全

模块五

ASCE

一些销售术语

刘旭东

5月月更 销售术语

百度智能云特色城市业务指挥平台,助力城市管理更智能

百度开发者中心

Maven 简介及安装

Emperor_LawD

maven 5月月更

数据中心进入“液冷时代”,曙光引领绿色变革

BeeWorks

首次全面定义,《2022企业应用运维管理指标体系白皮书》重磅发布

博睿数据

白皮书 博睿数据

「可视化案例Vol.4」智慧社区,引领社区管理新风向

ThingJS数字孪生引擎

观测云社区版发布,免费体验全功能私有化实例!

观测云

运维 可观测性 可观测

Hoo网格策略活动仍在进行中 震荡市场持续狂欢

区块链前沿News

量化策略 Hoo 网格

全国唯一!这家企业的工业互联网平台上云啦!

天翼云开发者社区

云计算 解决方案 云服务 工业互联网 云平台

企评家,专注企业评价,为企事业单位提供信息数据支撑

企评家

工业金属零部件质检解决方案详解,让AI质检一步到位!

百度开发者中心

新升级!网易数帆轻舟中间件推出运维稳定性管控服务

网易数帆

Kubernetes 运维 云原生 中间件 稳定性

GaussDB(for Redis)新特性发布:前缀搜索千倍提升与集群版多租隔离

华为云开发者联盟

redis GaussDB(for Redis) 缓存数据库 多租隔离 前缀搜索

Gartner发布CIO最新调研结果,可组装EBC备受瞩目

金蝶云·苍穹

架构实战营作业五

库尔斯

#架构实战营

[ts]后台管理数据权限控制实现(无业务修改)

林逸民

typescript 工厂模式 权限控制 依赖注入

关于数据保护官DPO(34/100)

hackstoic

企业安全 DPO 数据保护官

【C语言】指针One之[概念、前言、内存、地址与指针、变量与地址]

謓泽

C语言 5月月更

为什么你的maven打包老是出现问题

ZuccRoger

5月月更

元宇宙与数字经济的互相融合,PlatoFarm的通证经济模型是根本

西柚子

下个牛市,Web3世界的龙头项目PlatoFarm能否踏足山巅

BlockChain先知

设计微博系统中”微博评论“的高性能高可用计算架构

流火

如何做好FAQ页面的设计

小炮

FAQ

【刷题第六天】35. 搜索插入位置

白日梦

5月月更

ElasticSearch查询流程详解

IT巅峰技术

守护数据安全,天翼云是认真的!

天翼云开发者社区

云计算 云服务 数据安全

再谈JavaScript 中的对象解构

devpoint

JavaScript ES6 5月月更 赋值解构 对象操作

企评家|海信视像科技股份有限公司成长性报告简述

企评家

时序数据库在智慧用电领域的应用

CnosDB

IoT 时序数据库 开源社区 CnosDB infra

tensorflow中ASGD with Delay Compensation优化器代码实现_语言 & 开发_Alex-zhai_InfoQ精选文章