Skip to content

Commit d4b35b4

Browse files
committed
generator-based event loop impl
1 parent d6d0eae commit d4b35b4

File tree

4 files changed

+110
-83
lines changed

4 files changed

+110
-83
lines changed

README.md

+38-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
# Simple but workable event loop implementation in Python (< 100 LOC)
1+
## Simple but workable event loop in Python (< 100 LOC)
22

3-
Example:
3+
Check out the series about the event loop on my blog <a href="https://micromind.me/en/posts/explain-event-loop-in-100-lines-of-code/">Explain event loop in 100 lines of code</a> and <a href="https://micromind.me/en/posts/from-callback-hell-to-async-await-heaven/">From Callback Hell to async/await Heaven</a>.
4+
5+
### Example using callback-style:
46
```python
57
import socket as _socket
68

@@ -51,5 +53,38 @@ Give it a try:
5153
> python event_loop.py 53210
5254
```
5355

54-
Check out the article about this code on my blog <a href="https://micromind.me/en/posts/explain-event-loop-in-100-lines-of-code/">Explain event loop in 100 lines of code</a>.
56+
### Example using async-await-like style:
57+
```python
58+
import socket as _socket
59+
60+
class EventLoop: pass
5561

62+
class Context: pass
63+
64+
class socket(Context): pass
65+
66+
def http_get(sock):
67+
try:
68+
yield sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n')
69+
return sock.recv(1024)
70+
finally:
71+
sock.close()
72+
73+
def main(serv_addr):
74+
sock = socket(AF_INET, SOCK_STREAM)
75+
yield sock.connect(serv_addr)
76+
resp = yield http_get(sock)
77+
print(resp)
78+
79+
if __name__ == '__main__':
80+
event_loop = EventLoop()
81+
Context.set_event_loop(event_loop)
82+
83+
serv_addr = ('t.co', 80)
84+
event_loop.run(main)
85+
```
86+
87+
Give it a try:
88+
```bash
89+
> python event_loop_gen.py 53210
90+
```

event_loop.py

+32-2
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ def on_account(err, acc=None):
278278
set_timer(random.randint(0, 10e6), on_timer)
279279

280280

281-
def main(serv_addr):
281+
def main1(serv_addr):
282282
def on_balance(err, balance=None):
283283
if err:
284284
print('ERROR', err)
@@ -289,10 +289,40 @@ def on_balance(err, balance=None):
289289
get_user_balance(serv_addr, i, on_balance)
290290

291291

292+
def main2(*args):
293+
sock = socket(_socket.AF_INET, _socket.SOCK_STREAM)
294+
295+
def on_conn(err):
296+
if err:
297+
return print(err)
298+
299+
def on_sent(err):
300+
if err:
301+
sock.close()
302+
return print(err)
303+
304+
def on_resp(err, resp=None):
305+
sock.close()
306+
if err:
307+
return print(err)
308+
print(resp)
309+
310+
sock.recv(1024, on_resp)
311+
312+
sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n', on_sent)
313+
314+
sock.connect(('t.co', 80), on_conn)
315+
316+
292317
if __name__ == '__main__':
318+
print('Run main1()')
293319
event_loop = EventLoop()
294320
Context.set_event_loop(event_loop)
295321

296322
serv_addr = ('127.0.0.1', int(sys.argv[1]))
297-
event_loop.run(main, serv_addr)
323+
event_loop.run(main1, serv_addr)
298324

325+
print('\nRun main2()')
326+
event_loop = EventLoop()
327+
Context.set_event_loop(event_loop)
328+
event_loop.run(main2)

event_loop_ng.py event_loop_gen.py

+37-75
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,11 @@ def set_timer(self, duration):
4242
def _execute(self, callback, *args):
4343
self._time = hrtime()
4444
try:
45-
# unwind(callback(*args)).catch(
46-
# lambda e: print('Uncaught rejection:', e))
4745
ret = callback(*args)
4846
if is_generator(ret):
49-
unwind2(ret,
50-
ok=lambda *_: None,
51-
fail=lambda e: print('Uncaught rejection:', e))
47+
unwind(ret,
48+
ok=lambda *_: None,
49+
fail=lambda e: print('Uncaught rejection:', e))
5250

5351
except Exception as err:
5452
print('Uncaught exception:', err)
@@ -119,7 +117,7 @@ def evloop(self):
119117
return self._event_loop
120118

121119

122-
def unwind2(gen, ok, fail, ret=None, method='send'):
120+
def unwind(gen, ok, fail, ret=None, method='send'):
123121
try:
124122
ret = getattr(gen, method)(ret)
125123
except StopIteration as stop:
@@ -128,16 +126,16 @@ def unwind2(gen, ok, fail, ret=None, method='send'):
128126
return fail(e)
129127

130128
if is_generator(ret):
131-
unwind2(ret,
132-
ok=lambda x: unwind2(gen, ok, fail, x),
133-
fail=lambda e: unwind2(gen, ok, fail, e, 'throw'))
129+
unwind(ret,
130+
ok=lambda x: unwind(gen, ok, fail, x),
131+
fail=lambda e: unwind(gen, ok, fail, e, 'throw'))
134132
elif is_promise(ret):
135-
ret.then(lambda x=None: unwind2(gen, ok, fail, x)) \
136-
.catch(lambda e: unwind2(gen, ok, fail, e, 'throw'))
133+
ret.then(lambda x=None: unwind(gen, ok, fail, x)) \
134+
.catch(lambda e: unwind(gen, ok, fail, e, 'throw'))
137135
else:
138-
wait_all(ret,
139-
lambda x=None: unwind2(gen, ok, fail, x),
140-
lambda e: unwind2(gen, ok, fail, e, 'throw'))
136+
wait_all(ret,
137+
lambda x=None: unwind(gen, ok, fail, x),
138+
lambda e: unwind(gen, ok, fail, e, 'throw'))
141139

142140

143141
def wait_all(col, ok, fail):
@@ -157,7 +155,7 @@ def _do_resolve(val):
157155

158156
for i, c in enumerate(col):
159157
if is_generator(c):
160-
unwind2(c, ok=_resolve_single(i), fail=fail)
158+
unwind(c, ok=_resolve_single(i), fail=fail)
161159
continue
162160

163161
if is_promise(c):
@@ -168,59 +166,6 @@ def _do_resolve(val):
168166
'can be yielded to event loop')
169167

170168

171-
def unwind(gen):
172-
p = Promise()
173-
if not is_generator(gen):
174-
p._resolve(gen)
175-
return p
176-
177-
def _on_fulfilled(res=None):
178-
ret = None
179-
try:
180-
ret = gen.send(res)
181-
except StopIteration as stop:
182-
return p._resolve(stop.value)
183-
except Exception as exc:
184-
return p._reject(exc)
185-
_next(ret)
186-
187-
def _on_rejected(err):
188-
ret = None
189-
try:
190-
ret = gen.throw(err)
191-
except StopIteration as stop:
192-
return p._resolve(stop.value)
193-
except Exception as exc:
194-
return p._reject(exc)
195-
_next(ret)
196-
197-
def _next(ret):
198-
nextp = to_promise(ret)
199-
if not nextp:
200-
return _on_rejected(Exception('Only promise or generator '
201-
'can be yielded to event loop'))
202-
nextp.then(_on_fulfilled)
203-
nextp.catch(_on_rejected)
204-
205-
_on_fulfilled()
206-
return p
207-
208-
209-
def to_promise(val):
210-
if is_promise(val):
211-
return val
212-
213-
if is_generator(val):
214-
return unwind(val)
215-
216-
if isinstance(val, list):
217-
promises = [to_promise(x) for x in val]
218-
if len(val) == len(list(filter(None, promises))):
219-
return Promise.all(promises)
220-
221-
return None
222-
223-
224169
def is_generator(val):
225170
return isinstance(val, types.GeneratorType)
226171

@@ -293,7 +238,7 @@ def _reject(self, err):
293238
self._value = err
294239
for cb in self._on_reject:
295240
cb(err)
296-
241+
297242

298243
class IOError(Exception):
299244
def __init__(self, message, errorno, errorcode):
@@ -322,7 +267,7 @@ def __init__(self, *args):
322267
# 1 - connecting
323268
# 2 - connected
324269
# 3 - closed
325-
self._state = 0
270+
self._state = 0
326271
self._callbacks = {}
327272

328273
def connect(self, addr):
@@ -365,7 +310,7 @@ def _on_write_ready(err):
365310
nonlocal data
366311
if err:
367312
return p._reject(err)
368-
313+
369314
n = self._sock.send(data)
370315
if n < len(data):
371316
data = data[n:]
@@ -408,11 +353,11 @@ def _on_event(self, mask):
408353
cb(err)
409354

410355
def _get_sock_error(self):
411-
err = self._sock.getsockopt(_socket.SOL_SOCKET,
356+
err = self._sock.getsockopt(_socket.SOL_SOCKET,
412357
_socket.SO_ERROR)
413358
if not err:
414359
return None
415-
return IOError('connection failed',
360+
return IOError('connection failed',
416361
err, errno.errorcode[err])
417362

418363
###############################################################################
@@ -457,7 +402,7 @@ def print_balance(serv_addr, user_id):
457402
print('Catched:', exc)
458403

459404

460-
def main(serv_addr):
405+
def main1(serv_addr):
461406
def on_sleep():
462407
b = yield get_user_balance(serv_addr, 1)
463408
print('side flow:', b)
@@ -469,10 +414,27 @@ def on_sleep():
469414
yield tasks
470415

471416

417+
def main2(*args):
418+
sock = socket(_socket.AF_INET, _socket.SOCK_STREAM)
419+
yield sock.connect(('t.co', 80))
420+
421+
try:
422+
yield sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n')
423+
val = yield sock.recv(1024)
424+
print(val)
425+
finally:
426+
sock.close()
427+
428+
472429
if __name__ == '__main__':
430+
print('Run main1()')
473431
event_loop = EventLoop()
474432
Context.set_event_loop(event_loop)
475433

476434
serv_addr = ('127.0.0.1', int(sys.argv[1]))
477-
event_loop.run(main, serv_addr)
435+
event_loop.run(main1, serv_addr)
478436

437+
print('\nRun main2()')
438+
event_loop = EventLoop()
439+
Context.set_event_loop(event_loop)
440+
event_loop.run(main2)

server.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ def handle(self):
2424
raise Exception('Max request length exceeded')
2525

2626
method, entity_kind, entity_id = req[:-1].split(' ', 3)
27-
if (method != 'GET'
28-
or entity_kind not in ('user', 'account')
27+
if (method != 'GET'
28+
or entity_kind not in ('user', 'account')
2929
or not entity_id.isdigit()):
3030
raise Exception('Bad request')
3131

@@ -38,7 +38,7 @@ def handle(self):
3838

3939
if 'account_id' not in user:
4040
account_id = str(len(self.accounts) + 1)
41-
account = {'id': account_id,
41+
account = {'id': account_id,
4242
'balance': random.randint(0, 100)}
4343
self.accounts[account_id] = account
4444
user['account_id'] = account_id

0 commit comments

Comments
 (0)