Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(idempotency): leverage new DynamoDB Failed conditional writes behavior with ReturnValuesOnConditionCheckFailure #3446

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
92d21ec
feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency…
Dec 4, 2023
7a8267f
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
Dec 4, 2023
bd92f33
feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency…
Dec 4, 2023
0e01865
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 4, 2024
3a7e60a
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 10, 2024
5727c48
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 12, 2024
dd3707d
feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency…
Jan 14, 2024
6a67bde
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 15, 2024
887e2e9
Improving code readability
leandrodamascena Jan 15, 2024
88ed83c
Reverting function
leandrodamascena Jan 15, 2024
d688322
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 15, 2024
7cb1b2c
Adding comments about some logic decisions
leandrodamascena Jan 15, 2024
efe99a2
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 15, 2024
d2798ad
Use DynamoDBPersistenceLayer passed in for test_idempotent_lambda_exp…
dastra Jan 16, 2024
a9a598c
Adding docs
leandrodamascena Jan 17, 2024
296e9ee
wording
Jan 17, 2024
cb94a14
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 17, 2024
7c66456
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 17, 2024
deca935
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 18, 2024
488fc01
Adressing Ruben's feedback
leandrodamascena Jan 18, 2024
1d6d944
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 18, 2024
7ec8343
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 19, 2024
85b829c
Adressing Ruben's feedback
leandrodamascena Jan 19, 2024
19e042e
Merge branch 'develop' into feat/dynamodb-failed-conditional-writes
leandrodamascena Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
STATUS_CONSTANTS,
BasePersistenceLayer,
)
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
STATUS_CONSTANTS,
DataRecord,
)
from aws_lambda_powertools.utilities.idempotency.serialization.base import (
Expand Down Expand Up @@ -118,12 +120,17 @@ def _process_idempotency(self):
data=self.data,
remaining_time_in_millis=self._get_remaining_time_in_millis(),
)
except IdempotencyKeyError:
except (IdempotencyKeyError, IdempotencyValidationError):
raise
except IdempotencyItemAlreadyExistsError:
# Now we know the item already exists, we can retrieve it
record = self._get_idempotency_record()
if record is not None:
except IdempotencyItemAlreadyExistsError as exc:
# Attempt to retrieve the existing record, either from the exception ReturnValuesOnConditionCheckFailure
# or perform a GET operation if the information is not available.
# We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective
# way of retrieving the existing record after a failed conditional write operation.
record = exc.old_data_record or self._get_idempotency_record()

# If a record is found, handle it for status
if record:
return self._handle_for_status(record)
except Exception as exc:
raise IdempotencyPersistenceLayerError(
Expand Down
14 changes: 14 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from typing import Optional, Union

from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord


class BaseError(Exception):
"""
Expand All @@ -30,6 +32,18 @@ class IdempotencyItemAlreadyExistsError(BaseError):
Item attempting to be inserted into persistence store already exists and is not expired
"""

def __init__(self, *args: Optional[Union[str, Exception]], old_data_record: Optional[DataRecord] = None):
self.old_data_record = old_data_record
super().__init__(*args)

def __str__(self):
"""
Return all arguments formatted or original message
"""
old_data_record = f" from [{(str(self.old_data_record))}]" if self.old_data_record else ""
message = super().__str__()
return f"{message}{old_data_record}"


class IdempotencyItemNotFoundError(BaseError):
"""
Expand Down
112 changes: 21 additions & 91 deletions aws_lambda_powertools/utilities/idempotency/persistence/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
import os
import warnings
from abc import ABC, abstractmethod
from types import MappingProxyType
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union

import jmespath

Expand All @@ -18,95 +17,18 @@
from aws_lambda_powertools.shared.json_encoder import Encoder
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyInvalidStatusError,
IdempotencyItemAlreadyExistsError,
IdempotencyKeyError,
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
STATUS_CONSTANTS,
DataRecord,
)
from aws_lambda_powertools.utilities.jmespath_utils import PowertoolsFunctions

logger = logging.getLogger(__name__)

STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})


class DataRecord:
"""
Data Class for idempotency records.
"""

def __init__(
self,
idempotency_key: str,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: str = "",
payload_hash: str = "",
) -> None:
"""

Parameters
----------
idempotency_key: str
hashed representation of the idempotent data
status: str, optional
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
response data from previous executions using the record
"""
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

@property
def is_expired(self) -> bool:
"""
Check if data record is expired

Returns
-------
bool
Whether the record is currently expired or not
"""
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)

@property
def status(self) -> str:
"""
Get status of data record

Returns
-------
str
"""
if self.is_expired:
return STATUS_CONSTANTS["EXPIRED"]
elif self._status in STATUS_CONSTANTS.values():
return self._status
else:
raise IdempotencyInvalidStatusError(self._status)

def response_json_as_dict(self) -> Optional[dict]:
"""
Get response data deserialized to python dict

Returns
-------
Optional[dict]
previous response data deserialized
"""
return json.loads(self.response_data) if self.response_data else None


class BasePersistenceLayer(ABC):
"""
Expand Down Expand Up @@ -238,16 +160,20 @@ def _generate_hash(self, data: Any) -> str:
hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode())
return hashed_data.hexdigest()

def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> None:
def _validate_payload(
self,
data_payload: Union[Dict[str, Any], DataRecord],
stored_data_record: DataRecord,
) -> None:
"""
Validate that the hashed payload matches data provided and stored data record

Parameters
----------
data: Dict[str, Any]
data_payload: Union[Dict[str, Any], DataRecord]
Payload
data_record: DataRecord
DataRecord instance
stored_data_record: DataRecord
DataRecord fetched from Dynamo or cache

Raises
----------
Expand All @@ -256,8 +182,12 @@ def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> No

"""
if self.payload_validation_enabled:
data_hash = self._get_hashed_payload(data=data)
if data_record.payload_hash != data_hash:
if isinstance(data_payload, DataRecord):
data_hash = data_payload.payload_hash
else:
data_hash = self._get_hashed_payload(data=data_payload)

if stored_data_record.payload_hash != data_hash:
raise IdempotencyValidationError("Payload does not match stored record for this event key")

def _get_expiry_timestamp(self) -> int:
Expand Down Expand Up @@ -448,14 +378,14 @@ def get_record(self, data: Dict[str, Any]) -> Optional[DataRecord]:
cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
if cached_record:
logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
self._validate_payload(data=data, data_record=cached_record)
self._validate_payload(data_payload=data, stored_data_record=cached_record)
return cached_record

record = self._get_record(idempotency_key=idempotency_key)

self._save_to_cache(data_record=record)

self._validate_payload(data=data, data_record=record)
self._validate_payload(data_payload=data, stored_data_record=record)
return record

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Data Class for idempotency records.
"""

import datetime
import json
import logging
from types import MappingProxyType
from typing import Optional

logger = logging.getLogger(__name__)

STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})


class DataRecord:
"""
Data Class for idempotency records.
"""

def __init__(
self,
idempotency_key: str,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: str = "",
payload_hash: str = "",
) -> None:
"""

Parameters
----------
idempotency_key: str
hashed representation of the idempotent data
status: str, optional
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
response data from previous executions using the record
"""
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

@property
def is_expired(self) -> bool:
"""
Check if data record is expired

Returns
-------
bool
Whether the record is currently expired or not
"""
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)

@property
def status(self) -> str:
"""
Get status of data record

Returns
-------
str
"""
if self.is_expired:
return STATUS_CONSTANTS["EXPIRED"]
if self._status in STATUS_CONSTANTS.values():
return self._status

from aws_lambda_powertools.utilities.idempotency.exceptions import IdempotencyInvalidStatusError

raise IdempotencyInvalidStatusError(self._status)

def response_json_as_dict(self) -> Optional[dict]:
"""
Get response data deserialized to python dict

Returns
-------
Optional[dict]
previous response data deserialized
"""
return json.loads(self.response_data) if self.response_data else None
Loading