Skip to content

Commit

Permalink
feat(idempotency): leverage new DynamoDB Failed conditional writes be…
Browse files Browse the repository at this point in the history
…havior with ReturnValuesOnConditionCheckFailure (#3446)

* feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency to return a copy of the item on failure and avoid a subsequent get #3327

* feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency to return a copy of the item on failure and avoid a subsequent get #3327

* feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency to return a copy of the item on failure and avoid a subsequent get #3327.  Changes after PR comments

* Improving code readability

* Reverting function

* Adding comments about some logic decisions

* Use DynamoDBPersistenceLayer passed in for test_idempotent_lambda_expired_during_request

Co-authored-by: Leandro Damascena <[email protected]>
Signed-off-by: Dan Straw <[email protected]>

* Adding docs

* wording

* Adressing Ruben's feedback

* Adressing Ruben's feedback

---------

Signed-off-by: Dan Straw <[email protected]>
Co-authored-by: Dan Straw <[email protected]>
Co-authored-by: Leandro Damascena <[email protected]>
Co-authored-by: Cavalcante Damascena <[email protected]>
  • Loading branch information
4 people authored Jan 19, 2024
1 parent 79e248c commit f833233
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 153 deletions.
19 changes: 13 additions & 6 deletions aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
STATUS_CONSTANTS,
BasePersistenceLayer,
)
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
STATUS_CONSTANTS,
DataRecord,
)
from aws_lambda_powertools.utilities.idempotency.serialization.base import (
Expand Down Expand Up @@ -118,12 +120,17 @@ def _process_idempotency(self):
data=self.data,
remaining_time_in_millis=self._get_remaining_time_in_millis(),
)
except IdempotencyKeyError:
except (IdempotencyKeyError, IdempotencyValidationError):
raise
except IdempotencyItemAlreadyExistsError:
# Now we know the item already exists, we can retrieve it
record = self._get_idempotency_record()
if record is not None:
except IdempotencyItemAlreadyExistsError as exc:
# Attempt to retrieve the existing record, either from the exception ReturnValuesOnConditionCheckFailure
# or perform a GET operation if the information is not available.
# We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective
# way of retrieving the existing record after a failed conditional write operation.
record = exc.old_data_record or self._get_idempotency_record()

# If a record is found, handle it for status
if record:
return self._handle_for_status(record)
except Exception as exc:
raise IdempotencyPersistenceLayerError(
Expand Down
14 changes: 14 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from typing import Optional, Union

from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord


class BaseError(Exception):
"""
Expand All @@ -30,6 +32,18 @@ class IdempotencyItemAlreadyExistsError(BaseError):
Item attempting to be inserted into persistence store already exists and is not expired
"""

def __init__(self, *args: Optional[Union[str, Exception]], old_data_record: Optional[DataRecord] = None):
self.old_data_record = old_data_record
super().__init__(*args)

def __str__(self):
"""
Return all arguments formatted or original message
"""
old_data_record = f" from [{(str(self.old_data_record))}]" if self.old_data_record else ""
message = super().__str__()
return f"{message}{old_data_record}"


class IdempotencyItemNotFoundError(BaseError):
"""
Expand Down
112 changes: 21 additions & 91 deletions aws_lambda_powertools/utilities/idempotency/persistence/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
import os
import warnings
from abc import ABC, abstractmethod
from types import MappingProxyType
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union

import jmespath

Expand All @@ -18,95 +17,18 @@
from aws_lambda_powertools.shared.json_encoder import Encoder
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyInvalidStatusError,
IdempotencyItemAlreadyExistsError,
IdempotencyKeyError,
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
STATUS_CONSTANTS,
DataRecord,
)
from aws_lambda_powertools.utilities.jmespath_utils import PowertoolsFunctions

logger = logging.getLogger(__name__)

STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})


class DataRecord:
"""
Data Class for idempotency records.
"""

def __init__(
self,
idempotency_key: str,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: str = "",
payload_hash: str = "",
) -> None:
"""
Parameters
----------
idempotency_key: str
hashed representation of the idempotent data
status: str, optional
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
response data from previous executions using the record
"""
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

@property
def is_expired(self) -> bool:
"""
Check if data record is expired
Returns
-------
bool
Whether the record is currently expired or not
"""
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)

@property
def status(self) -> str:
"""
Get status of data record
Returns
-------
str
"""
if self.is_expired:
return STATUS_CONSTANTS["EXPIRED"]
elif self._status in STATUS_CONSTANTS.values():
return self._status
else:
raise IdempotencyInvalidStatusError(self._status)

def response_json_as_dict(self) -> Optional[dict]:
"""
Get response data deserialized to python dict
Returns
-------
Optional[dict]
previous response data deserialized
"""
return json.loads(self.response_data) if self.response_data else None


class BasePersistenceLayer(ABC):
"""
Expand Down Expand Up @@ -238,16 +160,20 @@ def _generate_hash(self, data: Any) -> str:
hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode())
return hashed_data.hexdigest()

def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> None:
def _validate_payload(
self,
data_payload: Union[Dict[str, Any], DataRecord],
stored_data_record: DataRecord,
) -> None:
"""
Validate that the hashed payload matches data provided and stored data record
Parameters
----------
data: Dict[str, Any]
data_payload: Union[Dict[str, Any], DataRecord]
Payload
data_record: DataRecord
DataRecord instance
stored_data_record: DataRecord
DataRecord fetched from Dynamo or cache
Raises
----------
Expand All @@ -256,8 +182,12 @@ def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> No
"""
if self.payload_validation_enabled:
data_hash = self._get_hashed_payload(data=data)
if data_record.payload_hash != data_hash:
if isinstance(data_payload, DataRecord):
data_hash = data_payload.payload_hash
else:
data_hash = self._get_hashed_payload(data=data_payload)

if stored_data_record.payload_hash != data_hash:
raise IdempotencyValidationError("Payload does not match stored record for this event key")

def _get_expiry_timestamp(self) -> int:
Expand Down Expand Up @@ -448,14 +378,14 @@ def get_record(self, data: Dict[str, Any]) -> Optional[DataRecord]:
cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
if cached_record:
logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
self._validate_payload(data=data, data_record=cached_record)
self._validate_payload(data_payload=data, stored_data_record=cached_record)
return cached_record

record = self._get_record(idempotency_key=idempotency_key)

self._save_to_cache(data_record=record)

self._validate_payload(data=data, data_record=record)
self._validate_payload(data_payload=data, stored_data_record=record)
return record

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Data Class for idempotency records.
"""

import datetime
import json
import logging
from types import MappingProxyType
from typing import Optional

logger = logging.getLogger(__name__)

STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})


class DataRecord:
"""
Data Class for idempotency records.
"""

def __init__(
self,
idempotency_key: str,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: str = "",
payload_hash: str = "",
) -> None:
"""
Parameters
----------
idempotency_key: str
hashed representation of the idempotent data
status: str, optional
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
response data from previous executions using the record
"""
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

@property
def is_expired(self) -> bool:
"""
Check if data record is expired
Returns
-------
bool
Whether the record is currently expired or not
"""
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)

@property
def status(self) -> str:
"""
Get status of data record
Returns
-------
str
"""
if self.is_expired:
return STATUS_CONSTANTS["EXPIRED"]
if self._status in STATUS_CONSTANTS.values():
return self._status

from aws_lambda_powertools.utilities.idempotency.exceptions import IdempotencyInvalidStatusError

raise IdempotencyInvalidStatusError(self._status)

def response_json_as_dict(self) -> Optional[dict]:
"""
Get response data deserialized to python dict
Returns
-------
Optional[dict]
previous response data deserialized
"""
return json.loads(self.response_data) if self.response_data else None
Loading

0 comments on commit f833233

Please sign in to comment.