@@ -22,35 +22,35 @@ def main():
22
22
23
23
#Server Setup
24
24
cluster_spec = {'ps' :['localhost:2222' ],
25
- 'worker' :['localhost:2223' ,'localhost:2224' ]}
25
+ 'worker' :['localhost:2223' ,'localhost:2224' ]}
26
26
n_pss = len (cluster_spec ['ps' ]) #the number of parameter servers
27
27
n_workers = len (cluster_spec ['worker' ]) #the number of worker nodes
28
28
cluster = tf .train .ClusterSpec (cluster_spec ) #allows this node know about all other nodes
29
29
30
30
if FLAGS .job_name == 'ps' : #checks if parameter server
31
31
server = tf .train .Server (cluster ,
32
- job_name = "ps" ,
33
- task_index = FLAGS .task_index ,
34
- config = config )
32
+ job_name = "ps" ,
33
+ task_index = FLAGS .task_index ,
34
+ config = config )
35
35
server .join ()
36
36
else : #it must be a worker server
37
37
is_chief = (FLAGS .task_index == 0 ) #checks if this is the chief node
38
38
server = tf .train .Server (cluster ,
39
- job_name = "worker" ,
40
- task_index = FLAGS .task_index ,
41
- config = config )
39
+ job_name = "worker" ,
40
+ task_index = FLAGS .task_index ,
41
+ config = config )
42
42
43
43
# Graph
44
44
# Local operations
45
45
with tf .device ("/job:worker/replica:0/task:%d" % FLAGS .task_index ):
46
46
a = tf .Variable (tf .constant (0. ,shape = [2 ]),dtype = tf .float32 ,
47
- collections = [tf .GraphKeys .LOCAL_VARIABLES ])
47
+ collections = [tf .GraphKeys .LOCAL_VARIABLES ])
48
48
b = tf .Variable (tf .constant (0. ,shape = [2 ]),dtype = tf .float32 ,
49
- collections = [tf .GraphKeys .LOCAL_VARIABLES ])
49
+ collections = [tf .GraphKeys .LOCAL_VARIABLES ])
50
50
c = a + b
51
51
52
52
local_step = tf .Variable (0 ,dtype = tf .int32 ,trainable = False ,name = 'local_step' ,
53
- collections = ['local_non_trainable' ])
53
+ collections = ['local_non_trainable' ])
54
54
lr = .0001
55
55
56
56
#loptimizer = tf.train.GradientDescentOptimizer(lr*FLAGS.task_index) #local optimizer
@@ -66,13 +66,13 @@ def main():
66
66
if t != 0 :
67
67
with tf .control_dependencies ([opt_local ]): #compute gradients only if the local opt was run
68
68
grads , varss = zip (* loptimizer .compute_gradients ( \
69
- loss ,var_list = tf .local_variables ()))
69
+ loss ,var_list = tf .local_variables ()))
70
70
else :
71
71
grads , varss = zip (* loptimizer .compute_gradients ( \
72
- loss ,var_list = tf .local_variables ()))
72
+ loss ,var_list = tf .local_variables ()))
73
73
grad_list .append (grads ) #add gradients to the list
74
74
opt_local = loptimizer .apply_gradients (zip (grads ,varss ),
75
- global_step = local_step ) #update local parameters
75
+ global_step = local_step ) #update local parameters
76
76
77
77
grads = tf .reduce_sum (grad_list ,axis = 0 ) #sum updates before applying globally
78
78
grads = tuple ([grads [i ]for i in range (len (varss ))])
@@ -83,9 +83,8 @@ def main():
83
83
# delete the variables from the global collection
84
84
clear_global_collection ()
85
85
86
- with tf .device (tf .train .replica_device_setter (
87
- ps_tasks = n_pss ,
88
- worker_device = "/job:%s/task:%d" % (FLAGS .job_name ,FLAGS .task_index ))):
86
+ with tf .device (tf .train .replica_device_setter (ps_tasks = n_pss ,
87
+ worker_device = "/job:%s/task:%d" % (FLAGS .job_name ,FLAGS .task_index ))):
89
88
global_step = tf .Variable (0 ,dtype = tf .int32 ,trainable = False ,name = 'global_step' )
90
89
91
90
# all workers use the same learning rate and it is decided on by the task 0
@@ -95,8 +94,8 @@ def main():
95
94
# create global variables and/or references
96
95
local_to_global , global_to_local = create_global_variables (lopt_vars )
97
96
opt = optimizer .apply_gradients (
98
- zip (grads ,[local_to_global [v ] for v in varss ])
99
- ,global_step = global_step ) #apply the gradients to variables on ps
97
+ zip (grads ,[local_to_global [v ] for v in varss ])
98
+ ,global_step = global_step ) #apply the gradients to variables on ps
100
99
101
100
# Pull params from global server
102
101
with tf .control_dependencies ([opt ]):
@@ -111,7 +110,7 @@ def main():
111
110
# Init ops
112
111
init = tf .global_variables_initializer () # for global variables
113
112
init_local = tf .variables_initializer (tf .local_variables () \
114
- + tf .get_collection ('local_non_trainable' )) #for local variables
113
+ + tf .get_collection ('local_non_trainable' )) #for local variables
115
114
116
115
# Session
117
116
stop_hook = tf .train .StopAtStepHook (last_step = 60 )
@@ -120,12 +119,12 @@ def main():
120
119
121
120
# Monitored Training Session
122
121
sess = tf .train .MonitoredTrainingSession (master = server .target ,
123
- is_chief = is_chief ,
124
- config = config ,
125
- scaffold = scaff ,
126
- hooks = hooks ,
127
- save_checkpoint_secs = 1 ,
128
- checkpoint_dir = 'logdir' )
122
+ is_chief = is_chief ,
123
+ config = config ,
124
+ scaffold = scaff ,
125
+ hooks = hooks ,
126
+ save_checkpoint_secs = 1 ,
127
+ checkpoint_dir = 'logdir' )
129
128
130
129
if is_chief :
131
130
sess .run (assign_global ) #Assigns chief's initial values to ps
@@ -207,8 +206,8 @@ def create_global_variables(local_optimizer_vars = []):
207
206
shape = v .shape ,
208
207
dtype = v .dtype ,
209
208
trainable = True ,
210
- collections = [tf .GraphKeys .GLOBAL_VARIABLES , \
211
- tf .GraphKeys .TRAINABLE_VARIABLES ])
209
+ collections = [tf .GraphKeys .GLOBAL_VARIABLES ,
210
+ tf .GraphKeys .TRAINABLE_VARIABLES ])
212
211
local_to_global [v ] = v_g
213
212
global_to_local [v_g ] = v
214
213
return local_to_global ,global_to_local
0 commit comments