Skip to content

Commit c9f9675

Browse files
author
Thomas Mulc
committed
multiple workers can see eachothers updates
1 parent 5596293 commit c9f9675

6 files changed

+1426
-0
lines changed
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 1,
6+
"metadata": {
7+
"collapsed": true
8+
},
9+
"outputs": [],
10+
"source": [
11+
"import tensorflow as tf"
12+
]
13+
},
14+
{
15+
"cell_type": "code",
16+
"execution_count": 2,
17+
"metadata": {},
18+
"outputs": [
19+
{
20+
"data": {
21+
"text/plain": [
22+
"'1.3.0'"
23+
]
24+
},
25+
"execution_count": 2,
26+
"metadata": {},
27+
"output_type": "execute_result"
28+
}
29+
],
30+
"source": [
31+
"tf.__version__"
32+
]
33+
},
34+
{
35+
"cell_type": "code",
36+
"execution_count": 3,
37+
"metadata": {},
38+
"outputs": [
39+
{
40+
"name": "stdout",
41+
"output_type": "stream",
42+
"text": [
43+
"Author: Tommy Mulc\n"
44+
]
45+
}
46+
],
47+
"source": [
48+
"print \"Author: Tommy Mulc\""
49+
]
50+
},
51+
{
52+
"cell_type": "markdown",
53+
"metadata": {},
54+
"source": [
55+
"Create a TensorFlow cluster with one worker node and one ps node."
56+
]
57+
},
58+
{
59+
"cell_type": "code",
60+
"execution_count": 4,
61+
"metadata": {
62+
"collapsed": true
63+
},
64+
"outputs": [],
65+
"source": [
66+
"task_index=0"
67+
]
68+
},
69+
{
70+
"cell_type": "code",
71+
"execution_count": 5,
72+
"metadata": {
73+
"collapsed": true
74+
},
75+
"outputs": [],
76+
"source": [
77+
"cluster_spec = tf.train.ClusterSpec({'ps' : ['localhost:2222'],'worker' : ['localhost:2223','localhost:2224']})\n",
78+
"server = tf.train.Server(cluster_spec,job_name='worker',task_index=task_index)"
79+
]
80+
},
81+
{
82+
"cell_type": "markdown",
83+
"metadata": {},
84+
"source": [
85+
"**Now launch run all the cells in the parameter server notebook**"
86+
]
87+
},
88+
{
89+
"cell_type": "markdown",
90+
"metadata": {},
91+
"source": [
92+
"Create variables locally then makes global copy. One worker scenario"
93+
]
94+
},
95+
{
96+
"cell_type": "code",
97+
"execution_count": 6,
98+
"metadata": {
99+
"collapsed": true
100+
},
101+
"outputs": [],
102+
"source": [
103+
"tf.reset_default_graph()"
104+
]
105+
},
106+
{
107+
"cell_type": "code",
108+
"execution_count": 7,
109+
"metadata": {},
110+
"outputs": [],
111+
"source": [
112+
"#create local graph like normal specifying the local device\n",
113+
"with tf.device('/job:worker/task:0'):\n",
114+
" a = tf.Variable([0.],name='a',collections=[tf.GraphKeys.LOCAL_VARIABLES])\n",
115+
" b = tf.constant([100.])\n",
116+
" loss = tf.abs(a-b)\n",
117+
" \n",
118+
" optimizer = tf.train.GradientDescentOptimizer(.1)\n",
119+
" grads,local_vars = zip(*optimizer.compute_gradients(loss,var_list=tf.local_variables()))\n",
120+
" local_update = optimizer.apply_gradients(zip(grads,local_vars))\n",
121+
" \n",
122+
" \n",
123+
" init_local = tf.local_variables_initializer()\n",
124+
"\n",
125+
"#create the globabl copies on the ps\n",
126+
"with tf.device('/job:ps/task:0'):\n",
127+
" for v in tf.local_variables():\n",
128+
" v_g = tf.get_variable('g/'+v.op.name,\n",
129+
" shape = v.shape,\n",
130+
" dtype = v.dtype,\n",
131+
" trainable=True,\n",
132+
" collections=[tf.GraphKeys.GLOBAL_VARIABLES,tf.GraphKeys.TRAINABLE_VARIABLES])\n",
133+
"\n",
134+
"\n",
135+
"#gloabl updates\n",
136+
"with tf.device('/job:worker/task:%d'%task_index):\n",
137+
" #this needs to be updated. Clearly not robust for any graph more complext\n",
138+
" global_vars = tf.global_variables()\n",
139+
" global_update = optimizer.apply_gradients(zip(grads,global_vars))\n",
140+
"\n",
141+
"#create init op on the chief node\n",
142+
"with tf.device('/job:worker/task:%d'%task_index):\n",
143+
" init_global = tf.global_variables_initializer()"
144+
]
145+
},
146+
{
147+
"cell_type": "code",
148+
"execution_count": 8,
149+
"metadata": {
150+
"collapsed": true
151+
},
152+
"outputs": [],
153+
"source": [
154+
"a_global = tf.global_variables()[0]"
155+
]
156+
},
157+
{
158+
"cell_type": "code",
159+
"execution_count": 9,
160+
"metadata": {},
161+
"outputs": [
162+
{
163+
"name": "stdout",
164+
"output_type": "stream",
165+
"text": [
166+
"/job:worker/task:0\n",
167+
"/job:worker/task:0\n",
168+
"/job:worker/task:0\n",
169+
"/job:worker/task:0\n",
170+
"/job:ps/task:0\n",
171+
"/job:ps/task:0\n",
172+
"/job:worker/task:0\n",
173+
"/job:ps/task:0\n"
174+
]
175+
}
176+
],
177+
"source": [
178+
"print(a.device)\n",
179+
"print(b.device)\n",
180+
"print(loss.device)\n",
181+
"#print(optimizer.device)\n",
182+
"print(local_update.device)\n",
183+
"print(global_update.device)\n",
184+
"print(init_global.device)\n",
185+
"print(init_local.device)\n",
186+
"print(a_global.device)"
187+
]
188+
},
189+
{
190+
"cell_type": "code",
191+
"execution_count": 10,
192+
"metadata": {},
193+
"outputs": [
194+
{
195+
"data": {
196+
"text/plain": [
197+
"[None, None]"
198+
]
199+
},
200+
"execution_count": 10,
201+
"metadata": {},
202+
"output_type": "execute_result"
203+
}
204+
],
205+
"source": [
206+
"sess = tf.Session(target=server.target)\n",
207+
"sess.run([init_local,init_global])"
208+
]
209+
},
210+
{
211+
"cell_type": "code",
212+
"execution_count": 11,
213+
"metadata": {},
214+
"outputs": [
215+
{
216+
"data": {
217+
"text/plain": [
218+
"[array([ 0.], dtype=float32), array([ 0.55522525], dtype=float32)]"
219+
]
220+
},
221+
"execution_count": 11,
222+
"metadata": {},
223+
"output_type": "execute_result"
224+
}
225+
],
226+
"source": [
227+
"sess.run([a,a_global])"
228+
]
229+
},
230+
{
231+
"cell_type": "code",
232+
"execution_count": 12,
233+
"metadata": {
234+
"collapsed": true
235+
},
236+
"outputs": [],
237+
"source": [
238+
"sess.run(local_update)"
239+
]
240+
},
241+
{
242+
"cell_type": "code",
243+
"execution_count": 14,
244+
"metadata": {},
245+
"outputs": [
246+
{
247+
"data": {
248+
"text/plain": [
249+
"[array([ 0.1], dtype=float32), array([ 0.55522525], dtype=float32)]"
250+
]
251+
},
252+
"execution_count": 14,
253+
"metadata": {},
254+
"output_type": "execute_result"
255+
}
256+
],
257+
"source": [
258+
"sess.run([a,a_global])"
259+
]
260+
},
261+
{
262+
"cell_type": "code",
263+
"execution_count": 15,
264+
"metadata": {
265+
"collapsed": true
266+
},
267+
"outputs": [],
268+
"source": [
269+
"sess.run(global_update)"
270+
]
271+
},
272+
{
273+
"cell_type": "code",
274+
"execution_count": 17,
275+
"metadata": {},
276+
"outputs": [
277+
{
278+
"data": {
279+
"text/plain": [
280+
"[array([ 0.1], dtype=float32), array([ 0.7552253], dtype=float32)]"
281+
]
282+
},
283+
"execution_count": 17,
284+
"metadata": {},
285+
"output_type": "execute_result"
286+
}
287+
],
288+
"source": [
289+
"sess.run([a,a_global])"
290+
]
291+
},
292+
{
293+
"cell_type": "code",
294+
"execution_count": null,
295+
"metadata": {
296+
"collapsed": true
297+
},
298+
"outputs": [],
299+
"source": []
300+
}
301+
],
302+
"metadata": {
303+
"kernelspec": {
304+
"display_name": "Python [conda env:tensorflow13]",
305+
"language": "python",
306+
"name": "conda-env-tensorflow13-py"
307+
},
308+
"language_info": {
309+
"codemirror_mode": {
310+
"name": "ipython",
311+
"version": 2
312+
},
313+
"file_extension": ".py",
314+
"mimetype": "text/x-python",
315+
"name": "python",
316+
"nbconvert_exporter": "python",
317+
"pygments_lexer": "ipython2",
318+
"version": "2.7.13"
319+
}
320+
},
321+
"nbformat": 4,
322+
"nbformat_minor": 2
323+
}

0 commit comments

Comments
 (0)