Skip to content

Commit 06c8146

Browse files
Dominitioalpian
authored andcommitted
Add execution stats
Add get_consumed_ios and get_timing_information methods in BufferedCursor and StreamCursor class.
1 parent 284b063 commit 06c8146

18 files changed

+486
-97
lines changed

.github/workflows/pythonapp.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ jobs:
2323
with:
2424
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
2525
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
26+
aws-session-token: ${{ secrets.AWS_SESSION_TOKEN }}
2627
aws-region: us-east-1
2728
- uses: actions/checkout@v2
2829
- name: Set up Python ${{ matrix.python-version }}

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
### Release 3.1.0
2+
Add support for obtaining basic server-side statistics on individual statement executions.
3+
4+
#### :tada: Enhancements
5+
* Added `get_consumed_ios` and `get_timing_information` methods in `BufferedCursor` and `StreamCursor` classes to provide server-side execution statistics.
6+
* `get_consumed_ios` returns a dictionary containing the number of read IO requests for a statement execution.
7+
* `get_timing_information` returns a dictionary containing the server side processing time in milliseconds for a statement execution.
8+
* `get_consumed_ios` and `get_timing_information` methods in the `StreamCursor` class are stateful, meaning the statistics returned by them reflect the state at the time of method execution.
9+
110
### Release 3.0.0 (August 20, 2020)
211
This is a public and generally available(GA) release of the driver, and this version can be used in production applications.
312

pyqldb/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@
99
# CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
1010
# and limitations under the License.
1111

12-
__version__ = '3.0.0'
12+
__version__ = '3.1.0'

pyqldb/cursor/buffered_cursor.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ def __init__(self, cursor):
2323
self._buffered_values.append(item)
2424

2525
self._buffered_values_iterator = iter(self._buffered_values)
26+
self._consumed_ios = cursor.get_consumed_ios()
27+
self._timing_information = cursor.get_timing_information()
2628

2729
def __iter__(self):
2830
"""
@@ -35,3 +37,21 @@ def __next__(self):
3537
Iterator function to implement the iterator protocol. Get next value in _buffered_values_iterator.
3638
"""
3739
return next(self._buffered_values_iterator)
40+
41+
def get_consumed_ios(self):
42+
"""
43+
Return a dictionary containing the total amount of IO requests for a statement's execution.
44+
45+
:rtype: dict
46+
:return: The amount of read IO requests for a statement's execution.
47+
"""
48+
return self._consumed_ios
49+
50+
def get_timing_information(self):
51+
"""
52+
Return a dictionary containing the total amount of processing time for a statement's execution.
53+
54+
:rtype: dict
55+
:return: The amount of processing time in milliseconds for a statement's execution.
56+
"""
57+
return self._timing_information

pyqldb/cursor/read_ahead_cursor.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ class ReadAheadCursor(StreamCursor):
2525
An iterable class representing a read ahead cursor on a statement's result set. This class will create a queue of
2626
size `read_ahead` and fetch results asynchronously to fill the queue.
2727
28-
:type page: dict
29-
:param page: The page containing the initial result set data dictionary of the statement's execution.
28+
:type statement_result: dict
29+
:param statement_result: The initial result set data dictionary of the statement's execution.
3030
3131
:type session: :py:class:`pyqldb.communication.session_client.SessionClient`
3232
:param session: The parent session that represents the communication channel to QLDB.
@@ -40,8 +40,8 @@ class ReadAheadCursor(StreamCursor):
4040
:type executor: :py:class:`concurrent.futures.thread.ThreadPoolExecutor`
4141
:param executor: The optional executor for asynchronous retrieval. If none specified, a new thread is created.
4242
"""
43-
def __init__(self, page, session, transaction_id, read_ahead, executor):
44-
super().__init__(page, session, transaction_id)
43+
def __init__(self, statement_result, session, transaction_id, read_ahead, executor):
44+
super().__init__(statement_result, session, transaction_id)
4545
self._queue = Queue(read_ahead - 1)
4646
if executor is None:
4747
thread = Thread(target=self._populate_queue)
@@ -63,23 +63,24 @@ def _next_page(self):
6363
queue_result = self._queue.get()
6464
if isinstance(queue_result, Exception):
6565
raise queue_result
66-
self._page = queue_result
66+
super()._accumulate_query_stats(queue_result)
67+
self._page = queue_result.get('Page')
6768
self._index = 0
6869

6970
def _populate_queue(self):
7071
"""
71-
Fill the buffer queue with pages. If ClientError is received, it is put in the queue and execution stops.
72-
If the parent transaction is closed, stop fetching results.
72+
Fill the buffer queue with the statement_result fetched. If ClientError is received, it is put in the queue and
73+
execution stops. If the parent transaction is closed, stop fetching results.
7374
"""
7475
try:
7576
next_page_token = self._page.get('NextPageToken')
7677
while next_page_token is not None:
7778
statement_result = self._session._fetch_page(self._transaction_id, next_page_token)
78-
page = statement_result.get('Page')
7979
while True:
8080
try:
8181
# Timeout of 50ms.
82-
self._queue.put(page, timeout=0.05)
82+
self._queue.put(statement_result, timeout=0.05)
83+
page = statement_result.get('Page')
8384
next_page_token = page.get('NextPageToken')
8485
break
8586
except Full:

pyqldb/cursor/stream_cursor.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,26 @@ class StreamCursor:
1717
"""
1818
An iterable class representing a stream cursor on a statement's result set.
1919
20-
:type page: dict
21-
:param page: The page containing the initial result set data dictionary of the statement's execution.
20+
:type statement_result: dict
21+
:param statement_result: The initial result set data dictionary of the statement's execution.
2222
2323
:type session: :py:class:`pyqldb.communication.session_client.SessionClient`
2424
:param session: The parent session that represents the communication channel to QLDB.
2525
2626
:type transaction_id: str
2727
:param transaction_id: The ID of this cursor's parent transaction, required to fetch pages.
2828
"""
29-
def __init__(self, page, session, transaction_id):
30-
self._page = page
29+
def __init__(self, statement_result, session, transaction_id):
30+
self._page = statement_result.get('FirstPage')
3131
self._session = session
3232
self._index = 0
3333
self._is_open = True
3434
self._transaction_id = transaction_id
35+
self._read_ios = None
36+
self._write_ios = None
37+
self._processing_time_milliseconds = None
38+
39+
self._accumulate_query_stats(statement_result)
3540

3641
def __iter__(self):
3742
"""
@@ -71,6 +76,24 @@ def close(self):
7176
"""
7277
self._is_open = False
7378

79+
def get_consumed_ios(self):
80+
"""
81+
Return a dictionary containing the accumulated amount of IO requests for a statement's execution.
82+
83+
:rtype: dict
84+
:return: The amount of read IO requests for a statement's execution.
85+
"""
86+
return {'ReadIOs': self._read_ios}
87+
88+
def get_timing_information(self):
89+
"""
90+
Return a dictionary containing the accumulated amount of processing time for a statement's execution.
91+
92+
:rtype: dict
93+
:return: The amount of processing time in milliseconds for a statement's execution.
94+
"""
95+
return {'ProcessingTimeMilliseconds': self._processing_time_milliseconds}
96+
7497
def _are_there_more_results(self):
7598
"""
7699
Check if there are more results.
@@ -82,10 +105,33 @@ def _next_page(self):
82105
Get the next page using this cursor's session.
83106
"""
84107
statement_result = self._session._fetch_page(self._transaction_id, self._page.get('NextPageToken'))
108+
self._accumulate_query_stats(statement_result)
109+
85110
page = statement_result.get('Page')
86111
self._page = page
87112
self._index = 0
88113

114+
def _accumulate_query_stats(self, statement_result):
115+
"""
116+
From the statement_result, get the query stats and accumulate them.
117+
"""
118+
self._processing_time_milliseconds = self._accumulate(statement_result, 'TimingInformation',
119+
'ProcessingTimeMilliseconds',
120+
self._processing_time_milliseconds)
121+
self._read_ios = self._accumulate(statement_result, 'ConsumedIOs', 'ReadIOs', self._read_ios)
122+
self._write_ios = self._accumulate(statement_result, 'ConsumedIOs', 'WriteIOs', self._write_ios)
123+
124+
@staticmethod
125+
def _accumulate(statement_result, query_statistics_key, metric_key, metric_to_accumulate):
126+
query_statistics = statement_result.get(query_statistics_key)
127+
if query_statistics:
128+
metric = query_statistics.get(metric_key)
129+
if metric:
130+
if metric_to_accumulate is None:
131+
metric_to_accumulate = 0
132+
metric_to_accumulate += metric
133+
return metric_to_accumulate
134+
89135
@staticmethod
90136
def _value_holder_to_ion_value(value):
91137
"""

pyqldb/driver/qldb_driver.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def list_tables(self):
186186
Get the list of table names in the ledger.
187187
188188
:rtype: :py:class:`pyqldb.cursor.buffered_cursor.BufferedCursor`
189-
:return: Iterable of table names in :py:class:`amazon.ion.simple_types.IonPyText`.
189+
:return: Iterable of table names in :py:class:`amazon.ion.simple_types.IonPyText` format found in the ledger.
190190
191191
:raises DriverClosedError: When this driver is closed.
192192
"""
@@ -202,9 +202,12 @@ def execute_lambda(self, query_lambda, retry_config=None):
202202
This is the primary method to execute a transaction against Amazon QLDB ledger.
203203
204204
:type query_lambda: function
205-
:param query_lambda: The lambda function to execute. A lambda function cannot have any side effects as
206-
it may be invoked multiple times, and the result cannot be trusted until the transaction is
207-
committed.
205+
:param query_lambda: The lambda function to execute. The function receives an instance of
206+
:py:class:`pyqldb.execution.executor.Executor` which can be used to execute statements.
207+
The instance of :py:class:`pyqldb.execution.executor.Executor` wraps an implicitly created
208+
transaction. The transaction will be implicitly committed when the passed function returns.
209+
The lambda function cannot have any side effects as it may be invoked multiple
210+
times, and the result cannot be trusted until the transaction is committed.
208211
209212
:type retry_config: :py:class:`pyqldb.config.retry_config.RetryConfig`
210213
:param retry_config: Config to specify max number of retries, base and custom backoff strategy for retries.

pyqldb/transaction/transaction.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,11 @@ def _execute_statement(self, statement, *parameters):
164164
parameters = tuple(map(self._to_ion, parameters))
165165
self._update_hash(statement, parameters)
166166
statement_result = self._session._execute_statement(self._id, statement, parameters)
167-
first_page = statement_result.get('FirstPage')
167+
168168
if self._read_ahead > 0:
169-
cursor = ReadAheadCursor(first_page, self._session, self._id, self._read_ahead, self._executor)
169+
cursor = ReadAheadCursor(statement_result, self._session, self._id, self._read_ahead, self._executor)
170170
else:
171-
cursor = StreamCursor(first_page, self._session, self._id)
171+
cursor = StreamCursor(statement_result, self._session, self._id)
172172

173173
self._cursors.append(cursor)
174174
return cursor

requirements.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
amazon.ion~=0.5.0
2-
boto3~=1.9.237
3-
botocore~=1.12.237
1+
amazon.ion~=0.7.0
2+
boto3~=1.16.56
3+
botocore~=1.19.56
44
ionhash~=1.1.0
55
pytest~=4.6.3
66
pytest-cov~=2.7.1

setup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
ROOT = os.path.join(os.path.dirname(__file__), 'pyqldb')
1616
VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.a-z\-]+)['"]''')
17-
requires = ['amazon.ion>=0.5.0,<0.6',
18-
'boto3>=1.9.237,<2',
19-
'botocore>=1.12.237,<2',
17+
requires = ['amazon.ion>=0.7.0,<1',
18+
'boto3>=1.16.56,<2',
19+
'botocore>=1.19.56,<2',
2020
'ionhash>=1.1.0,<2']
2121

2222

0 commit comments

Comments
 (0)