1
+ import asyncio , collections , curses , enum , random
2
+
3
+ P = collections .namedtuple ('P' , 'x y' ) # Position
4
+ D = enum .Enum ('D' , 'n e s w' ) # Direction
5
+
6
+ def main (screen ):
7
+ curses .curs_set (0 ) # Makes cursor invisible.
8
+ screen .nodelay (True ) # Makes getch() non-blocking.
9
+ asyncio .run (main_coroutine (screen )) # Starts running asyncio code.
10
+
11
+ async def main_coroutine (screen ):
12
+ state = {'*' : P (0 , 0 ), ** {id_ : P (30 , 10 ) for id_ in range (10 )}}
13
+ moves = asyncio .Queue ()
14
+ coros = (* (random_controller (id_ , moves ) for id_ in range (10 )),
15
+ human_controller (screen , moves ),
16
+ model (moves , state , * screen .getmaxyx ()),
17
+ view (state , screen ))
18
+ await asyncio .wait (coros , return_when = asyncio .FIRST_COMPLETED )
19
+
20
+ async def random_controller (id_ , moves ):
21
+ while True :
22
+ moves .put_nowait ((id_ , random .choice (list (D ))))
23
+ await asyncio .sleep (random .random () / 2 )
24
+
25
+ async def human_controller (screen , moves ):
26
+ while True :
27
+ ch = screen .getch ()
28
+ key_mappings = {259 : D .n , 261 : D .e , 258 : D .s , 260 : D .w }
29
+ if ch in key_mappings :
30
+ moves .put_nowait (('*' , key_mappings [ch ]))
31
+ await asyncio .sleep (0.01 )
32
+
33
+ async def model (moves , state , height , width ):
34
+ while state ['*' ] not in {p for id_ , p in state .items () if id_ != '*' }:
35
+ id_ , d = await moves .get ()
36
+ p = state [id_ ]
37
+ deltas = {D .n : P (0 , - 1 ), D .e : P (1 , 0 ), D .s : P (0 , 1 ), D .w : P (- 1 , 0 )}
38
+ new_p = P (* [sum (a ) for a in zip (p , deltas [d ])])
39
+ if 0 <= new_p .x < width - 1 and 0 <= new_p .y < height :
40
+ state [id_ ] = new_p
41
+
42
+ async def view (state , screen ):
43
+ while True :
44
+ screen .clear ()
45
+ for id_ , p in state .items ():
46
+ screen .addstr (p .y , p .x , str (id_ ))
47
+ await asyncio .sleep (0.01 )
48
+
49
+ curses .wrapper (main )
0 commit comments