Skip to content

Commit f824dff

Browse files
author
Thomas Mulc
committed
first draft of ADAG. Need to check local variables and local steps to make sure they are not being shared.
1 parent c9f9675 commit f824dff

File tree

1 file changed

+69
-22
lines changed

1 file changed

+69
-22
lines changed

ADAG/dist_cpu_sing_mach_sync.py

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def main():
1717
# Configure
1818
config=tf.ConfigProto(log_device_placement=False)
1919

20-
#Distributed Baggage
20+
#Server Setup
2121
cluster_spec = {'ps':['localhost:2222'],
2222
'worker':['localhost:2223','localhost:2224']}
2323
n_pss = len(cluster_spec['ps']) #the number of parameter servers
@@ -32,65 +32,85 @@ def main():
3232
server = tf.train.Server(cluster,job_name="worker",task_index=FLAGS.task_index,config=config)
3333

3434
# Graph
35+
# We must not use train.replicate_device_setter for normal operations
36+
# with tf.device(tf.train.replica_device_setter(ps_tasks=n_pss\
37+
# ,worker_device="/job:%s/task:%d/cpu:0" % (FLAGS.job_name,FLAGS.task_index))):
3538
with tf.device(tf.train.replica_device_setter(ps_tasks=n_pss\
36-
,worker_device="/job:%s/task:%d/cpu:0" % (FLAGS.job_name,FLAGS.task_index))):
39+
,worker_device="/job:%s/task:%d" % (FLAGS.job_name,FLAGS.task_index))):
3740

38-
a = tf.Variable(tf.constant(0.,shape=[2]),dtype=tf.float32)
39-
b = tf.Variable(tf.constant(0.,shape=[2]),dtype=tf.float32)
41+
a = tf.Variable(tf.constant(0.,shape=[2]),dtype=tf.float32,
42+
collections=[tf.GraphKeys.LOCAL_VARIABLES])
43+
b = tf.Variable(tf.constant(0.,shape=[2]),dtype=tf.float32,
44+
collections=[tf.GraphKeys.LOCAL_VARIABLES])
4045
c=a+b
4146

47+
local_step = tf.Variable(0,dtype=tf.int32,trainable=False,name='local_step',
48+
collections=['local_non_trainable'])
49+
#global step is tricky
4250
global_step = tf.Variable(0,dtype=tf.int32,trainable=False,name='global_step')
4351
target = tf.constant(100.,shape=[2],dtype=tf.float32)
4452
loss = tf.reduce_mean(tf.square(c-target))
4553

4654
# all workers use the same learning rate and it is decided on by the task 0
4755
# or maybe the from the graph of the chief worker
4856
base_lr = .0001
57+
loptimizer = tf.train.GradientDescentOptimizer(base_lr)
4958
optimizer = tf.train.GradientDescentOptimizer(base_lr) #the learning rate set here is global
5059

60+
#create global variables and/or references
61+
local_to_global, global_to_local = create_global_variables()
62+
5163
#local optimizers and steps
52-
optimizers=[]
53-
local_steps = []
54-
for w in range(n_workers):
55-
local_steps.append(tf.Variable(0,dtype=tf.int32,trainable=False,name='local_step_%d'%w))
56-
optimizers.append(tf.train.GradientDescentOptimizer(base_lr))
64+
# actually only need one optimizer
65+
# optimizers=[]
66+
# local_steps = []
67+
# for w in range(n_workers):
68+
# local_steps.append(tf.Variable(0,dtype=tf.int32,trainable=False,name='local_step_%d'%w))
69+
# optimizers.append(tf.train.GradientDescentOptimizer(base_lr))
5770

5871
# ADAG (simplest case since all batches are the same)
59-
update_window = 5 # T: update window, a.k.a number of gradients to use before sending to ps
72+
update_window = 3 # T: update/communication window, a.k.a number of gradients to use before sending to ps
6073
grad_list = [] # the array to store the gradients through the communication window
6174
for t in range(update_window):
6275
if t != 0:
6376
with tf.control_dependencies([opt_local]):
64-
grads, varss = zip(*optimizers[FLAGS.task_index].compute_gradients(loss)) #compute gradients using local optimizer
77+
grads, varss = zip(*loptimizer.compute_gradients(loss,var_list=tf.local_variables())) #compute gradients
6578
else:
66-
grads, varss = zip(*optimizers[FLAGS.task_index].compute_gradients(loss)) #compute gradients using local optimizer
79+
grads, varss = zip(*loptimizer.compute_gradients(loss,var_list=tf.local_variables())) #compute gradients only if the local opt was run
6780
grad_list.append(grads) #add gradients to the list
68-
opt_local = optimizers[FLAGS.task_index].apply_gradients(zip(grads,varss),
69-
global_step=local_steps[FLAGS.task_index]) #update local parameters
81+
opt_local = loptimizer.apply_gradients(zip(grads,varss),
82+
global_step=local_step) #update local parameters
7083
grads = tf.reduce_mean(grad_list,axis=0) #taking the mean is the same as dividing the sum of gradients by T
7184
grads = tuple([grads[i]for i in range(len(varss))])
72-
opt = optimizer.apply_gradients(zip(grads,varss),global_step=global_step) #apply the gradients globally
85+
opt = optimizer.apply_gradients(
86+
zip(grads,[ local_to_global[v] for v in varss])
87+
,global_step=global_step) #apply the gradients to variables on ps
88+
89+
with tf.control_dependencies([opt]):
90+
assign_locals = assign_global_to_local(global_to_local)
7391

74-
# Init op
75-
init = tf.global_variables_initializer() # must come after other init ops
92+
# Init ops
93+
init_local = tf.variables_initializer(tf.local_variables()+tf.get_collection('local_non_trainable'))#tf.local_variables_initializer() #for local variables
94+
init = tf.global_variables_initializer() # for global variables
95+
96+
# TODO: Add op the assigns local values to global ones for chief to execute
7697

7798

7899
# Session
79-
stop_hook = tf.train.StopAtStepHook(last_step=20)
100+
stop_hook = tf.train.StopAtStepHook(last_step=40)
80101
hooks = [stop_hook]
81-
scaff = tf.train.Scaffold(init_op=init)
102+
scaff = tf.train.Scaffold(init_op=init,local_init_op=init_local)
82103

83104
#Monitored Training Session
84105
sess = tf.train.MonitoredTrainingSession(master = server.target,is_chief=is_chief,config=config,
85106
scaffold=scaff,hooks=hooks,save_checkpoint_secs=1,checkpoint_dir='logdir')
86-
87107
if is_chief:
88-
time.sleep(5) #grace period to wait on other workers before starting training
108+
time.sleep(10) #grace period to wait on other workers before starting training
89109

90110
# Train until hook stops session
91111
print('Starting training on worker %d'%FLAGS.task_index)
92112
while not sess.should_stop():
93-
_,r,gs,ls = sess.run([opt,c,global_step,local_steps[FLAGS.task_index]])
113+
_,_,r,gs,ls = sess.run([opt,assign_locals,c,global_step,local_step])
94114

95115
print(r,"global step: "+str(gs),"worker: "+str(FLAGS.task_index),"local step: "+str(ls))
96116

@@ -106,6 +126,33 @@ def main():
106126
sess.close()
107127
print('Session from worker %d closed cleanly'%FLAGS.task_index)
108128

129+
def assign_global_to_local(global_to_local):
130+
for v in global_to_local.keys():
131+
tf.assign(global_to_local[v],v)
132+
return tf.no_op()
133+
134+
def get_global_variable_by_name(name):
135+
return [v for v in tf.global_variables() if v.name == name][0]
136+
137+
def create_global_variables():
138+
# TODO: swap static string with tf.train.replica_device_setter(ps_tasks=n_pss)
139+
local_to_global = {}
140+
global_to_local = {}
141+
with tf.device('/job:ps/task:0'):
142+
for v in tf.local_variables():
143+
v_g = tf.get_variable('g/'+v.op.name,
144+
shape = v.shape,
145+
dtype = v.dtype,
146+
trainable=True,
147+
collections=[tf.GraphKeys.GLOBAL_VARIABLES,tf.GraphKeys.TRAINABLE_VARIABLES])
148+
local_to_global[v] = v_g
149+
global_to_local[v_g] = v
150+
return local_to_global,global_to_local
151+
152+
# TODO: initialize global ps variables
153+
# according to the chiefs initial values
154+
def assign_global_values():
155+
return None
109156

110157

111158
if __name__ == '__main__':

0 commit comments

Comments
 (0)