写点什么

tensorflow 中 ASGD with Delay Compensation 优化器代码实现

  • 2019-11-29
  • 本文字数:8344 字

    阅读完需:约 27 分钟

tensorflow中ASGD with Delay Compensation优化器代码实现

一. DC-ASGD 算法介绍

此前,和大家也一起讨论过 DC-ASGD 算法,详细可见:https://zhuanlan.zhihu.com/p/80978479


DC-ASGD 算法主要解决的问题是:异步的随机梯度下降法(ASGD)在深度学习模型的训练中会存在 delayed gradients 的问题,就是当一个 worker 向参数 server 端提交它算出的梯度时,server 端其实已经被其它 worker 更新过好多次了。主要解决方案是利用梯度项的泰勒展开式去近似逼近 loss 函数的 Hessian 矩阵。


具体算法:


二. DC-ASGD 算法 tensorflow 实现

那么如何在 tensorflow 中实现 dc-asgd 算法呢?在上一篇文章中,我们讨论过 tensorflow 中 Optimizer 类的源码解析,其实就是为该篇文章做铺垫。接下来我们就具体分析下 Optimizer 的子类-DelayCompensatedGradientDescentOptimizer 类。


"""DelayCompensatedGradientDescentOptimizer for TensorFlow."""from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_function
from tensorflow.python.framework import opsfrom tensorflow.python.ops import array_opsfrom tensorflow.python.ops import control_flow_opsfrom tensorflow.python.ops import math_opsfrom tensorflow.python.ops import state_opsfrom tensorflow.python.training import optimizerfrom tensorflow.python.training import training_ops
GATE_NONE = 0GATE_OP = 1GATE_GRAPH = 2

class DelayCompensatedGradientDescentOptimizer(optimizer.Optimizer): """Optimizer that implements the DelayCompensatedGradientDescent algorithm. See [](https://arxiv.org/abs/1609.08326) ([](https://arxiv.org/pdf/1609.08326.pdf)). """
def __init__(self, learning_rate, variance_parameter=2.0, num_workers=1, use_locking=False, name="DelayCompensatedGradientDescentOptimizer"):
"""Construct a gradient descent optimizer with delay compensation. It is cricial to note the `num_workers` in constructor and `worker_index` in `minimize()` and `apply_gradients()`. Contrast to AdaMaxamOptimizer, the sparse implementation of this algorithm (used when the gradient is an IndexedSlices object, typically because of `tf.gather` or an embedding lookup in the forward pass) only updates variable slices and corresponding `shadow_t` term when that part of the variable was used in the forward pass. This means that the sparse behavior is contrast to the dense behavior (similar to some momentum implementations which ignore momentum unless a variable slice was actually used). Args: learning_rate: A Tensor or a floating point value. The learning rate. variance_parameter: A Tensor or a floating point value. The variance control parameter. num_workers: A int value. The number of workers. use_locking: If True use locks for update operations. name: Optional name for the operations created when applying gradients. Defaults to "DelayCompensatedGradientDescentOptimizer". """ num_workers = self._call_if_callable(num_workers) if num_workers <= 0: raise ValueError("num_workers must be positive: %s" % num_workers) super(DelayCompensatedGradientDescentOptimizer, self).__init__(use_locking, name) self._lr = learning_rate self._lambda = variance_parameter self._num_workers = num_workers self._learning_rate_tensor = None self._lambda_tensor = None self._use_locking = use_locking
def _create_slots(self, var_list): for index in range(self._num_workers): for v in var_list: self._zeros_slot(v, "shadow_{0}".format(index), self._name)
def _prepare(self): lr = self._call_if_callable(self._lr) lambda_ = self._call_if_callable(self._lambda)
self._learning_rate_tensor = ops.convert_to_tensor(lr, name="learning_rate") self._lambda_tensor = ops.convert_to_tensor(lambda_, name="lambda")
def _apply_dense(self, grad, var):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index)) return training_ops.apply_delay_compensated_gradient_descent( var, math_ops.cast(self._learning_rate_tensor, grad.dtype.base_dtype), grad, math_ops.cast(self._lambda_tensor, grad.dtype.base_dtype), shadow, use_locking=self._use_locking).op
def _resource_apply_dense(self, grad, var):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index)) return training_ops.resource_apply_delay_compensated_gradient_descent( var.handle, math_ops.cast(self._learning_rate_tensor, grad.dtype.base_dtype), grad, math_ops.cast(self._lambda_tensor, grad.dtype.base_dtype), shadow.handle, use_locking=self._use_locking)
def _apply_sparse_shared(self, grad, var, indices):
shadow = self.get_slot(var, "shadow_{0}".format(self.worker_index)) # if shadow is None: # raise ValueError("None shadow with index = " + str(self.worker_index) + " and var = " + str(var)) lambda_ = math_ops.cast(self._lambda_tensor, var.dtype.base_dtype) lr = math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype)
var_slice = array_ops.gather(var, indices) shadow_slice = array_ops.gather(shadow, indices)
var_scaled_g_values = lr * (grad + lambda_ * grad * grad * (var_slice - shadow_slice))
var_t = state_ops.scatter_add(var, indices, -var_scaled_g_values, use_locking=self._use_locking)
with ops.control_dependencies([var_t]): shadow_t = state_ops.assign(shadow, var_t)
return control_flow_ops.group(*[var_t, shadow_t])
def _apply_sparse(self, grad, var): return self._apply_sparse_shared( grad.values, var, grad.indices)
def _resource_apply_sparse(self, grad, var, indices): return self._apply_sparse_shared( grad, var, indices)
def minimize(self, loss, global_step=None, var_list=None, gate_gradients=GATE_OP, aggregation_method=None, colocate_gradients_with_ops=False, name=None, grad_loss=None, worker_index=0): self.worker_index = worker_index return super(DelayCompensatedGradientDescentOptimizer, self).minimize(loss=loss, global_step=global_step, var_list=var_list, gate_gradients=gate_gradients, aggregation_method=aggregation_method, colocate_gradients_with_ops=colocate_gradients_with_ops, name=name, grad_loss=grad_loss)
def apply_gradients(self, grads_and_vars, global_step=None, name=None, worker_index=0): self.worker_index = worker_index return super(DelayCompensatedGradientDescentOptimizer, self).apply_gradients(grads_and_vars=grads_and_vars,
复制代码


                                                                                 global_step=global_step, name=name)
复制代码


_create_slots 函数用来创建一些额外的参数,这里创建的是每一个 worker 上的每一个 variable 所对应的备份变量 shadow。_prepare 函数用来准备优化器的常规超参数。


我们重点关注下_apply_sparse 函数,该函数调用的是_apply_sparse_shared 函数,参数 grad 的数据类型是 IndexedSlices 类型,那么什么是 IndexedSlices 类型呢?这里 Slice 的意思是从 Tensor 里面取特定的一些下标得到原先 tensor 变量的一部分,比如说原来的 tensor 的 shape 是[10,10],取下标[0]得到一个[10]的 Tensor,这个 Tensor 就是原 Tensor 的一个 Slice。那么 IndexedSlices 其实就是一堆 Slices 和它们所对应的下标(也就是 Index)。在梯度更新过程中,如果只需要更新某几行的梯度值,就可以将梯度表示成这种数据结构,来节省计算资源。


所以_apply_sparse_shared 函数参数传入的是 grad.values 和 grad.indices,分别表示特定行的梯度值和行的下标。在计算梯度项时:var_scaled_g_values = lr *(grad + lambda_ * grad * grad *(var_slice - shadow_slice)),也需要先求出特定行的 var_slice 和 shadow_slice。然后根据求出的梯度项更新参数时:var_t = state_ops.scatter_add(var, indices,-var_scaled_g_values, use_locking=self._use_locking),也是在特定的那些行(根据 indices 确定的)做更新。


当这一轮的参数做完更新后,需要将当前时刻的变量 var_t 备份一下,以用于下一时刻的参数更新:shadow_t = state_ops.assign(shadow, var_t)。最后将 var_t, shadow_t 的更新操作放进 control_flow_ops 中。


我们举一个简单的 example 来说明一下这种 IndexedSlices 类型的梯度是怎么更新的:


import numpy as npimport tensorflow as tffrom tensorflow.python.framework import constant_opfrom tensorflow.python.framework import opsfrom tensorflow.python.ops import variablesfrom tensorflow.python.training import adam

if __name__ == '__main__': value_a = np.ones(shape=[3, 10]) indices_a = np.array([0, 3, 8]) dense_shape_a = [10, 10] grad_slices_a = ops.IndexedSlices(constant_op.constant(value_a), constant_op.constant(indices_a), constant_op.constant(dense_shape_a))
var_np = np.ones(shape=[10, 10])
var0 = variables.RefVariable(var_np) opt = adam.AdamOptimizer() update = opt.apply_gradients(zip([grad_slices_a], [var0])) # variables.global_variables_initializer().run() sess = tf.Session() sess.run(tf.global_variables_initializer()) print("initial variable is:", sess.run(var0)) sess.run(update) print("update 1 time variable is:", sess.run(var0))

输出:initial variable is: [[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]update 1 time variable is: [[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ] [0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999] [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]]
复制代码


可以很清楚地看到,执行一次梯度更新之后,只有 0,3,8 这三行的变量值发生了改变。这就是使用 IndexedSlices 类型的优势。


另外,training_ops.apply_delay_compensated_gradient_descent 这个函数是在 tensorflow/core/kernels/training_ops.cc 中实现的,核心代码如下:


template <typename T>struct ApplyDelayCompensatedGradientDescent<CPUDevice, T> {  void operator()(const CPUDevice& d, typename TTypes<T>::Flat var,                   typename TTypes<T>::ConstScalar lr,                   typename TTypes<T>::ConstFlat grad,                   typename TTypes<T>::ConstScalar variance,                   typename TTypes<T>::Flat shadow) {    var.device(d) -= lr() * (grad + variance() * grad * grad * (var - shadow));    shadow.device(d) = var;  }};
复制代码


其实除了这两个文件之外,还需要写一下注册 ApplyDelayCompensatedGradientDescent 的 OP 接口,这里就不详细讲解了。

三.如何使用 DC-ASGD 算法

在 tensorflow 源码目录中修改或添加完 dc-asgd 算法的几个相关文件后,需要重新编译一下 tensorflow。编译成功后,就可以愉快地使用 dc-asgd 算法的接口啦。


下面给大家举一个使用 DelayCompensatedGradientDescentOptimizer 优化器的分布式训练 demo:


from __future__ import print_function, absolute_import, division
import tensorflow as tf
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps hosts")tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker hosts")tf.app.flags.DEFINE_string("job_name", "worker", "'ps' or'worker'")tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")tf.app.flags.DEFINE_integer("num_workers", 2, "Number of workers")tf.app.flags.DEFINE_boolean("is_sync", False, "using synchronous training or not")
FLAGS = tf.app.flags.FLAGS

def model(images): """Define a simple mnist classifier""" net = tf.layers.dense(images, 500, activation=tf.nn.relu) net = tf.layers.dense(net, 500, activation=tf.nn.relu) net = tf.layers.dense(net, 10, activation=None) return net

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()x_train = x_train.reshape(-1, 784).astype('float32')x_test = x_test.reshape(-1, 784).astype('float32')x_train /= 255x_test /= 255

def get_batch(image, label, batch_size=32, training=True): df = tf.data.Dataset.from_tensor_slices((image, label)) if training: df = df.repeat(10).shuffle(buffer_size=1000) df = df.batch(batch_size).prefetch(batch_size) iterator = df.make_one_shot_iterator() batch_x, batch_y = iterator.get_next() return batch_x, batch_y

def main(_): ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",")
# create the cluster configured by `ps_hosts' and 'worker_hosts' cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# create a server for local task server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
train_batch_x, train_batch_y = get_batch(x_train, y_train) test_batch_x, test_batch_y = get_batch(x_test, y_test, training=False)
if FLAGS.job_name == "ps": server.join() # ps hosts only join elif FLAGS.job_name == "worker": # workers perform the operation # ps_strategy = tf.contrib.training.GreedyLoadBalancingStrategy(FLAGS.num_ps)
# Note: tf.train.replica_device_setter automatically place the paramters (Variables) # on the ps hosts (default placement strategy: round-robin over all ps hosts, and also # place multi copies of operations to each worker host with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
logits = model(train_batch_x) loss = tf.reduce_mean( tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=tf.one_hot(train_batch_y, 10)))
# The StopAtStepHook handles stopping after running given steps. hooks = [tf.train.StopAtStepHook(last_step=10000)]
global_step = tf.train.get_or_create_global_step() #optimizer = tf.train.AdamOptimizer(learning_rate=1e-04) optimizer = tf.contrib.opt.DelayCompensatedGradientDescentOptimizer(learning_rate=0.001)
if FLAGS.is_sync: # asynchronous training # use tf.train.SyncReplicasOptimizer wrap optimizer # ref: https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer optimizer = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=FLAGS.num_workers, total_num_replicas=FLAGS.num_workers) # create the hook which handles initialization and queues hooks.append(optimizer.make_session_run_hook((FLAGS.task_index == 0)))
train_op = optimizer.minimize(loss, global_step=global_step)
# The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(FLAGS.task_index == 0), checkpoint_dir="./checkpoint_dir", hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # mon_sess.run handles AbortedError in case of preempted PS. _, ls, step = mon_sess.run([train_op, loss, global_step]) if step % 100 == 0: print("Train step %d, loss: %f" % (step, ls))

if __name__ == "__main__": tf.app.run()
复制代码


启动命令是:


python dc_asgd_exp.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224 --job_name=ps --task_index=0python dc_asgd_exp.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2224 --job_name=worker --task_index=0
复制代码


参考文献:


https://zhuanlan.zhihu.com/p/80978479


https://zhuanlan.zhihu.com/p/87348147


https://www.zhihu.com/question/277403551


https://zhuanlan.zhihu.com/p/35083779


本文转载自 Alex-zhai 知乎账号。


原文链接:https://www.zhihu.com/people/alex-zhai-19/posts


2019-11-29 08:00949

评论

发布
暂无评论
发现更多内容

浅析openLooKeng安全认证机制

LooK

大数据 ldap openLooKeng 安全认证

JPA + EclipseLink + 云平台 = 运行在云端的数据库应用

汪子熙

数据库 Cloud Cloud Native 11月日更

使用JPA + Eclipselink操作PostgreSQL数据库

汪子熙

eclipse 数据库 11月日更

飞鹤乳业数智化转型之路

大咖说

云计算 数字化转型 数字化 企业上云

恒源云(GPUSHARE)_U1S1,1年1度GPU云种草大会

恒源云

深度学习

助力邯钢工业4.0!TDengine在深度(平潭)节水减排项目中的应用

TDengine

数据库 tdengine 后端

基于Hive Connector的openLooKeng Connector 创建复用机制剖析

LooK

大数据 hive 多数据源配置 计算引擎 openLooKeng

超简单教程!自动部署openLooKeng

LooK

大数据 计算引擎 openLooKeng 安装部署

硬核干货!TDSQL全局一致性读技术详解|

腾讯云数据库

tdsql 国产数据库

TDSQL | 深度解读HTAP系统的问题与主义之争

腾讯云数据库

tdsql 国产数据库

2020 更新 - 腾讯 Android 面试 (已拿到月薪22K offer)

android 程序员 移动开发

Hazelcast在openLooKeng中的应用(Cache篇)

LooK

大数据 cache 计算引擎 openLooKeng

2020-Android-面试重难点(万字篇),android屏幕适配的五种方式

android 程序员 移动开发

极复杂编码,下载《原神》角色高清图、中日无损配音,爬虫 16 / 120 例

梦想橡皮擦

11月日更

超详细攻略!手把手教你如何在windows下搭建openLooKeng开发环境

LooK

大数据 计算引擎

基于软件分析的智能化开发新型服务与技术

华为云开发者联盟

程序员 开发 漏洞 软件分析 智能开发

云上远程运维的最后那点担心,“云梯”帮你解决

华为云开发者联盟

运维 华为云Stack 远程运维 安全可信 云梯

教你如何在Spark Scala/Java应用中调用Python脚本

华为云开发者联盟

Java Python spark JVM Spark Scala

2020 国内互联网公司的Android工程师薪酬排名!看看你是什么水平

android 程序员 移动开发

2020Android 开发年度总结:“这一年里我到底做了些啥,面试阿里的时候一定会问到的

android 程序员 移动开发

性能优化反思:不要在for循环中操作DB

CRMEB

双11在即,分享一些稳定性保障技术干货

老张

系统稳定性 大促 生产环境全链路压测

【Java原理剖析系列】深度synchronized工作原理分析

码界西柚

java 11月日更

TDSQL已助力20余家金融机构完成核心替换

腾讯云数据库

tdsql 国产数据库

用一套代码实现APP和小程序

Speedoooo

容器 移动开发 ios开发 APP开发 Andriod开发

2020 年需要关注的 5 大 Android 开发技术(1),Android知识总结

android 程序员 移动开发

推荐!DevOps工具正越来越自动化

飞算JavaAI开发助手

新版本发布!openLooKeng v1.4.0上线

LooK

大数据 计算引擎 openLooKeng

实用推荐系统:寻找有用的用户行为

博文视点Broadview

TDSQL将发布免费版本,助力国产数据库生态完善

腾讯云数据库

数据库 tdsql

模块三作业

doublechun

「架构实战营」

tensorflow中ASGD with Delay Compensation优化器代码实现_语言 & 开发_Alex-zhai_InfoQ精选文章