Skip to content

Commit def6893

Browse files
committed
v2.5.0
1 parent da60aea commit def6893

File tree

10 files changed

+67
-48
lines changed

10 files changed

+67
-48
lines changed

Diff for: CHANGELOG.md

+18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
---
9+
## Version 2.5.0, 9/24/2022
10+
11+
### Added
12+
13+
N/A
14+
15+
### Removed
16+
17+
N/A
18+
19+
### Changed
20+
21+
For consistency,
22+
23+
1. Refactor post office APIs for sending single event request as "send_request".
24+
2. Update platform APIs for sending event request and parallel requests as "send_request" and "send_parallel_requests"
25+
826
---
927
## Version 2.3.6, 6/16/2022
1028

Diff for: README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ try:
147147
if isinstance(result, EventEnvelope):
148148
print('Received RPC response:')
149149
print("HEADERS =", result.get_headers(), ", BODY =", result.get_body(),
150-
", STATUS =", result.get_status(),
150+
", STATUS =", result.get_status(),
151151
", EXEC =", result.get_exec_time(), ", ROUND TRIP =", result.get_round_trip(), "ms")
152152
except TimeoutError as e:
153153
print("Exception: ", str(e))

Diff for: docs/guides/CHAPTER-1.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,19 @@ try:
8080
if isinstance(result, EventEnvelope):
8181
print('Received RPC response:')
8282
print("HEADERS =", result.get_headers(), ", BODY =", result.get_body(),
83-
", STATUS =", result.get_status(),
84-
", EXEC =", result.get_exec_time(), ", ROUND TRIP =", result.get_round_trip(), "ms")
83+
", STATUS =", result.get_status(),
84+
", EXEC =", result.get_exec_time(), ", ROUND TRIP =", result.get_round_trip(), "ms")
8585
except TimeoutError as e:
8686
print("Exception: ", str(e))
8787

8888
# for async call
8989
po.send('hello.world.1', headers={'one': 1}, body='hello world one')
9090
```
9191

92-
## Massive parallel processing
92+
## Parallel processing
9393

9494
A function is invoked when an event happens. Before the event arrives, the function is just an entry in a routing
95-
table and it does not consume any additional resources like threads.
95+
table, and it does not consume any additional resources like threads.
9696

9797
All functions are running in parallel without special coding. Behind the curtain, the system uses Python futures and
9898
asyncio event loops for very efficient function execution.

Diff for: docs/guides/CHAPTER-3.md

+26-20
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,23 @@ The Mercury framework is 100% event-driven and all communications are asynchrono
2626
it suspends the calling function and uses temporary Inbox as a callback function. The called function will send
2727
the reply to the callback function which in turns wakes up the calling function.
2828

29-
To make a RPC call, you can use the `request` method.
29+
To make a RPC call, you can use the `request` or the `send_request` method.
30+
With the latter, you can send the request as an event and set event metadata such as correlation-ID.
3031

3132
```python
32-
request(self, route: str, timeout_seconds: float,
33-
headers: dict = None, body: any = None,
34-
correlation_id: str = None) -> EventEnvelope
33+
def request(self, route: str, timeout_seconds: float,
34+
headers: dict = None, body: any = None,
35+
correlation_id: str = None) -> EventEnvelope:
3536

3637
# example
37-
result = po.request('hello.world.2', 2.0, headers={'some_key': 'some_value'}, body='hello world')
38+
result = po.send_request('hello.world.2', 2.0, headers={'some_key': 'some_value'}, body='hello world')
3839
print(result.get_body())
3940

41+
# send request using an event directly
42+
def send_request(self, event: EventEnvelope, timeout_seconds: float) -> EventEnvelope:
43+
44+
# example
45+
result = po.send_request(event, 2.0)
4046
```
4147

4248
Note that Mercury supports Python primitive or dictionary in the message body. If you put other object, it may throw
@@ -47,7 +53,7 @@ serialization exception or the object may become empty.
4753
To make an asynchronous call, use the `send` method.
4854

4955
```python
50-
send(self, route: str, headers: dict = None, body: any = None, reply_to: str = None, me=True) -> None
56+
def send(self, route: str, headers: dict = None, body: any = None, reply_to: str = None, me=True) -> None""
5157
```
5258

5359
You may put key-value pairs in the "headers" field for holding parameters. For message payload, put Python primitive
@@ -56,7 +62,7 @@ or dictionary in the "body" field.
5662
### Deferred delivery
5763

5864
```python
59-
send_later(self, route: str, headers: dict = None, body: any = None, seconds: float = 1.0) -> None
65+
def send_later(self, route: str, headers: dict = None, body: any = None, seconds: float = 1.0) -> None:
6066
```
6167

6268
### Call-back
@@ -148,7 +154,7 @@ Broadcast is the easiest way to do "pub/sub". To broadcast an event to multiple
148154
use the `broadcast` method.
149155

150156
```python
151-
broadcast(self, route: str, headers: dict = None, body: any = None) -> None
157+
def broadcast(self, route: str, headers: dict = None, body: any = None) -> None:
152158

153159
# example
154160
po.broadcast("hello.world.1", body="this is a broadcast message from "+platform.get_origin())
@@ -160,7 +166,7 @@ po.broadcast("hello.world.1", body="this is a broadcast message from "+platform.
160166
You can perform join-n-fork RPC calls using a parallel version of the request, `parallel_request` method.
161167

162168
```python
163-
parallel_request(self, events: list, timeout_seconds: float) -> list
169+
def parallel_request(self, events: list, timeout_seconds: float) -> list:
164170

165171
# illustrate parallel RPC requests
166172
event_list = list()
@@ -172,8 +178,8 @@ try:
172178
print('Received', len(result), 'RPC responses:')
173179
for res in result:
174180
print("HEADERS =", res.get_headers(), ", BODY =", res.get_body(),
175-
", STATUS =", res.get_status(),
176-
", EXEC =", res.get_exec_time(), ", ROUND TRIP =", res.get_round_trip(), "ms")
181+
", STATUS =", res.get_status(),
182+
", EXEC =", res.get_exec_time(), ", ROUND TRIP =", res.get_round_trip(), "ms")
177183
except TimeoutError as e:
178184
print("Exception: ", str(e))
179185
```
@@ -188,14 +194,14 @@ However, if you want to do store-n-forward pub/sub for certain use cases, you ma
188194
Following are some useful pub/sub API:
189195

190196
```python
191-
def feature_enabled()
192-
def create_topic(topic: str)
193-
def delete_topic(topic: str)
194-
def publish(topic: str, headers: dict = None, body: any = None)
195-
def subscribe(self, topic: str, route: str, parameters: list = None)
196-
def unsubscribe(self, topic: str, route: str)
197-
def exists(topic: str)
198-
def list_topics()
197+
def feature_enabled():
198+
def create_topic(topic: str):
199+
def delete_topic(topic: str):
200+
def publish(topic: str, headers: dict = None, body: any = None):
201+
def subscribe(self, topic: str, route: str, parameters: list = None):
202+
def unsubscribe(self, topic: str, route: str):
203+
def exists(topic: str):
204+
def list_topics():
199205

200206
```
201207
Some pub/sub engine would require additional parameters when subscribing a topic. For Kafka, you must provide the
@@ -222,7 +228,7 @@ serialization yourself. The payload can be a dict, bool, str, int or float.
222228
To check if a target service is available, you can use the `exists` method.
223229

224230
```python
225-
exists(self, routes: any)
231+
def exists(self, routes: any) -> bool:
226232

227233
# input can be a route name or a list of routes
228234
# it will return true only when all routes are available

Diff for: examples/standalone-tracing.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
def tracing(headers: dict, body: any):
3030
# no instance parameter because this is a singleton
3131
log.info(f'TRACE {headers}')
32-
return body
3332

3433

3534
def hello(headers: dict, body: any, instance: int):
@@ -54,7 +53,7 @@ def main():
5453
trace_path = 'GET /api/hello/world'
5554
event = EventEnvelope().set_to("hello.world").set_header('some_key', 'some_value').set_body('hello world')
5655
event.set_trace(trace_id, trace_path).set_from('this.demo')
57-
result = po.single_request(event, 2.0)
56+
result = po.send_request(event, 2.0)
5857
if isinstance(result, EventEnvelope):
5958
log.info('Received RPC response:')
6059
log.info(f'HEADERS = {result.get_headers()}, BODY = {result.get_body()}, STATUS = {result.get_status()}, '

Diff for: examples/stream-demo.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ def main():
6565

6666
input_stream.close()
6767
#
68-
# This will keep the main thread running in the background.
69-
# We can use Control-C or KILL signal to stop the application.
70-
platform.run_forever()
68+
# Stop platform after demo
69+
#
70+
platform.stop()
7171

7272

7373
if __name__ == '__main__':

Diff for: mercury/platform.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ def route_instances(self, route: str) -> int:
600600
else:
601601
return 0
602602

603-
def parallel_request(self, events: list, timeout_seconds: float):
603+
def send_parallel_requests(self, events: list, timeout_seconds: float):
604604
timeout_value = self.util.get_float(timeout_seconds)
605605
if timeout_value <= 0:
606606
raise ValueError('timeout value in seconds must be positive number')
@@ -610,7 +610,7 @@ def parallel_request(self, events: list, timeout_seconds: float):
610610
raise ValueError('event list is empty')
611611
if len(events) == 1:
612612
result = list()
613-
result.append(self.request(events[0], timeout_value))
613+
result.append(self.send_request(events[0], timeout_value))
614614
return result
615615
for evt in events:
616616
if not isinstance(evt, EventEnvelope):
@@ -650,12 +650,12 @@ def parallel_request(self, events: list, timeout_seconds: float):
650650
if len(result_list) == len(events):
651651
return result_list
652652
except Empty:
653-
raise TimeoutError(f'Requests timeout for {round(timeout_value, 3)} seconds. '
653+
raise TimeoutError(f'Request timeout for {round(timeout_value, 3)} seconds. '
654654
f'Expect: {total_requests} responses, actual: {len(result_list)}')
655655
finally:
656656
inbox.close()
657657

658-
def request(self, event: EventEnvelope, timeout_seconds: float):
658+
def send_request(self, event: EventEnvelope, timeout_seconds: float):
659659
timeout_value = self.util.get_float(timeout_seconds)
660660
if timeout_value <= 0:
661661
raise ValueError('timeout value in seconds must be positive number')
@@ -724,15 +724,15 @@ def send_event(self, event: EventEnvelope, broadcast=False) -> None:
724724
def send_event_later(self, event: EventEnvelope, delay_in_seconds: float) -> None:
725725
self._loop.call_later(delay_in_seconds, self.send_event, event)
726726

727-
def exists(self, routes: any):
727+
def exists(self, routes: any) -> bool:
728728
if isinstance(routes, str):
729729
single_route = routes
730730
if self.has_route(single_route):
731731
return True
732732
if self.cloud_ready():
733733
event = EventEnvelope()
734734
event.set_to(self.SERVICE_QUERY).set_header('type', 'find').set_header('route', single_route)
735-
result = self.request(event, 8.0)
735+
result = self.send_request(event, 8.0)
736736
if isinstance(result, EventEnvelope):
737737
if result.get_body() is not None:
738738
return result.get_body()
@@ -749,7 +749,7 @@ def exists(self, routes: any):
749749
event = EventEnvelope()
750750
event.set_to(self.SERVICE_QUERY).set_header('type', 'find')
751751
event.set_header('route', '*').set_body(routes)
752-
result = self.request(event, 8.0)
752+
result = self.send_request(event, 8.0)
753753
if isinstance(result, EventEnvelope) and result.get_body() is not None:
754754
return result.get_body()
755755
return False

Diff for: mercury/system/object_stream.py

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222

2323
class ObjectStreamIO:
24-
2524
STREAM_IO_MANAGER = 'object.streams.io'
2625

2726
def __init__(self, expiry_seconds: int = 1800):

Diff for: mercury/system/po.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,16 @@ def request(self, route: str, timeout_seconds: float,
141141
event.set_body(body)
142142
if correlation_id is not None:
143143
event.set_correlation_id(str(correlation_id))
144-
response = self.platform.request(event, timeout_seconds)
144+
response = self.platform.send_request(event, timeout_seconds)
145145
if isinstance(response, EventEnvelope):
146146
if response.get_tag('exception') is None:
147147
return response
148148
else:
149149
raise AppException(response.get_status(), response.get_body())
150150
raise ValueError(f'Expect response is EventEnvelope, actual: ({response})')
151151

152-
def single_request(self, event: EventEnvelope, timeout_seconds: float):
153-
response = self.platform.request(event, timeout_seconds)
152+
def send_request(self, event: EventEnvelope, timeout_seconds: float) -> EventEnvelope:
153+
response = self.platform.send_request(event, timeout_seconds)
154154
if isinstance(response, EventEnvelope):
155155
if response.get_tag('exception') is None:
156156
return response
@@ -159,7 +159,7 @@ def single_request(self, event: EventEnvelope, timeout_seconds: float):
159159
raise ValueError(f'Expect response is EventEnvelope, actual: ({response})')
160160

161161
def parallel_request(self, events: list, timeout_seconds: float) -> list:
162-
return self.platform.parallel_request(events, timeout_seconds)
162+
return self.platform.send_parallel_requests(events, timeout_seconds)
163163

164-
def exists(self, routes: any):
164+
def exists(self, routes: any) -> bool:
165165
return self.platform.exists(routes)

Diff for: setup.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from setuptools import setup
55

66
setup(name='mercury',
7-
version='2.3.6',
7+
version='2.5.0',
88
description='Python Language pack for Mercury',
99
author='Eric Law',
1010
author_email='[email protected]',
@@ -16,13 +16,10 @@
1616
package_dir={'mercury': 'mercury'},
1717
package_data={'mercury': ['resources/application.yml']},
1818
license='Apache 2.0',
19-
python_requires='>=3.6.7',
19+
python_requires='>=3.8.0',
2020
install_requires=['aiohttp', 'msgpack-python', 'PyYAML', 'pytest'],
2121
classifiers=[
2222
'Programming Language :: Python :: 3',
23-
'Programming Language :: Python :: 3.5',
24-
'Programming Language :: Python :: 3.6',
25-
'Programming Language :: Python :: 3.7',
2623
'Programming Language :: Python :: 3.8',
2724
'Programming Language :: Python :: 3.9',
2825
'Programming Language :: Python :: 3.10',

0 commit comments

Comments
 (0)