Skip to content

Commit 72cc538

Browse files
authored
Added python methods for Distributed Lock API (#431)
* Initial implementation for distributed lock API. Adds initial API support and unit tests. Missing: * DocStrings and documentation support * Context Manager-friendly wrapper or API. Signed-off-by: Tiago Alves Macambira <[email protected]> * Doc strings and converts ProtBuff enum to python native. Signed-off-by: Tiago Alves Macambira <[email protected]> * try_lock returns TryLockResponse objects. gRPC's client `try_lock()` method now returns an TryLockResponse object that can be used both: * on boolean context (if-clause) directly, keeping behaviour from previous commits, and * as a ContextManager (with-clause), which would ease auto-unlock and make this implementation similar to DotNet's one. Updated tests to account for new functionality. Signed-off-by: Tiago Alves Macambira <[email protected]> * Updates exemples to acount for new Distributed Locks API. This should also work as an e2e test. Signed-off-by: Tiago Alves Macambira <[email protected]> * Reponse objects inherit from DaprResponse. * Updated the example's output to be deterministic. * Updated example's README.md to account for the new output. Signed-off-by: Tiago Alves Macambira <[email protected]> * Updates to address PR comments. Signed-off-by: Tiago Alves Macambira <[email protected]> * Addressing PR comments (cont'ed.) Signed-off-by: Tiago Alves Macambira <[email protected]> * Removing dead test code. Signed-off-by: Tiago Alves Macambira <[email protected]> * Fixes "Alpha version" msg and adds error message to `assert` clause. Signed-off-by: Tiago Alves Macambira <[email protected]> * Fix type annotation for dapr client and remove deprecated field. Signed-off-by: Tiago Alves Macambira <[email protected]> * Fixes some lint issues. Seems that the lint configuration/version used by the validation steps in GitHub are not matching the ones available in the devcontainer, as GitHub is reporting lint errors that are not being returned by tox validation tasks. Regardless, fixing the issues reported to get the PR approved. Signed-off-by: Tiago Alves Macambira <[email protected]> * Register distributed lock mechanical markdown tests. Signed-off-by: Tiago Alves Macambira <[email protected]> * Move input validation to _helpers, fix mention of "Alpha version". Signed-off-by: Tiago Alves Macambira <[email protected]> * Fix typo in last commit. Signed-off-by: Tiago Alves Macambira <[email protected]>
1 parent cf9cbeb commit 72cc538

File tree

11 files changed

+517
-9
lines changed

11 files changed

+517
-9
lines changed

dapr/clients/grpc/_helpers.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"""
1515

1616
from collections import namedtuple
17-
from typing import Dict, List, Union, Tuple
17+
from typing import Dict, List, Union, Tuple, Optional
1818

1919
from google.protobuf.any_pb2 import Any as GrpcAny
2020
from google.protobuf.message import Message as GrpcMessage
@@ -66,7 +66,7 @@ def to_bytes(data: Union[str, bytes]) -> bytes:
6666
elif isinstance(data, str):
6767
return data.encode('utf-8')
6868
else:
69-
raise(f'invalid data type {type(data)}')
69+
raise f'invalid data type {type(data)}'
7070

7171

7272
def to_str(data: Union[str, bytes]) -> str:
@@ -76,7 +76,7 @@ def to_str(data: Union[str, bytes]) -> str:
7676
elif isinstance(data, bytes):
7777
return data.decode('utf-8')
7878
else:
79-
raise(f'invalid data type {type(data)}')
79+
raise f'invalid data type {type(data)}'
8080

8181

8282
class _ClientCallDetails(
@@ -166,3 +166,12 @@ def intercept_unary_unary(
166166
# Call continuation
167167
response = continuation(new_call_details, request)
168168
return response
169+
170+
171+
# Data validation helpers
172+
173+
174+
def validateNotBlankString(**kwargs: Optional[str]):
175+
for field_name, value in kwargs.items():
176+
if not value or not value.strip():
177+
raise ValueError(f"{field_name} name cannot be empty or blank")

dapr/clients/grpc/_request.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ def __init__(
289289
data: Union[bytes, str],
290290
etag: Optional[str] = None,
291291
operation_type: TransactionOperationType = TransactionOperationType.upsert):
292-
"""Initializes TransactionalStateOperation item from :obj:`runtime_v1.TransactionalStateOperation`.
292+
"""Initializes TransactionalStateOperation item from
293+
:obj:`runtime_v1.TransactionalStateOperation`.
293294
294295
Args:
295296
key (str): state's key.

dapr/clients/grpc/_response.py

+108-1
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
limitations under the License.
1414
"""
1515

16+
from __future__ import annotations
17+
18+
import contextlib
1619
import threading
1720
from enum import Enum
18-
from typing import Dict, Optional, Union, Sequence, List
21+
from typing import Dict, Optional, Union, Sequence, List, TYPE_CHECKING
1922

2023
from google.protobuf.any_pb2 import Any as GrpcAny
2124
from google.protobuf.message import Message as GrpcMessage
@@ -36,6 +39,11 @@
3639
from dapr.proto import api_v1
3740
from dapr.proto import api_service_v1
3841

42+
# Avoid circular import dependency by only importing DaprGrpcClient
43+
# for type checking
44+
if TYPE_CHECKING:
45+
from dapr.clients.grpc.client import DaprGrpcClient
46+
3947

4048
class DaprResponse:
4149
"""A base class for Dapr Response.
@@ -744,3 +752,102 @@ def __init__(
744752
def status(self) -> TopicEventResponseStatus:
745753
"""Gets the status."""
746754
return self._status
755+
756+
757+
class UnlockResponseStatus(Enum):
758+
success = api_v1.UnlockResponse.Status.SUCCESS
759+
'''The Unlock operation for the referred lock was successful.'''
760+
761+
lock_does_not_exist = api_v1.UnlockResponse.Status.LOCK_UNEXIST
762+
''''The unlock operation failed: the referred lock does not exist.'''
763+
764+
lock_belongs_to_others = api_v1.UnlockResponse.Status.LOCK_BELONG_TO_OTHERS
765+
'''The unlock operation failed: the referred lock belongs to another owner.'''
766+
767+
internal_error = api_v1.UnlockResponse.Status.INTERNAL_ERROR
768+
'''An internal error happened while handling the Unlock operation'''
769+
770+
771+
class UnlockResponse(DaprResponse):
772+
'''The response of an unlock operation.
773+
774+
This inherits from DaprResponse
775+
776+
Attributes:
777+
status (UnlockResponseStatus): the status of the unlock operation.
778+
'''
779+
780+
def __init__(
781+
self,
782+
status: UnlockResponseStatus,
783+
headers: MetadataTuple = (),
784+
):
785+
"""Initializes a UnlockResponse.
786+
787+
Args:
788+
status (UnlockResponseStatus): The status of the response.
789+
headers (Tuple, optional): the headers from Dapr gRPC response.
790+
"""
791+
super().__init__(headers)
792+
self._status = status
793+
794+
@property
795+
def status(self) -> UnlockResponseStatus:
796+
"""Gets the status."""
797+
return self._status
798+
799+
800+
class TryLockResponse(contextlib.AbstractContextManager, DaprResponse):
801+
'''The response of a try_lock operation.
802+
803+
This inherits from DaprResponse and AbstractContextManager.
804+
805+
Attributes:
806+
success (bool): the result of the try_lock operation.
807+
'''
808+
def __init__(
809+
self,
810+
success: bool,
811+
client: DaprGrpcClient,
812+
store_name: str,
813+
resource_id: str,
814+
lock_owner: str,
815+
headers: MetadataTuple = (),
816+
):
817+
"""Initializes a TryLockResponse.
818+
819+
Args:
820+
success (bool): the result of the try_lock operation.
821+
client (DaprClient): a reference to the dapr client used for the TryLock request.
822+
store_name (str): the lock store name used in the TryLock request.
823+
resource_id (str): the lock key or identifier used in the TryLock request.
824+
lock_owner (str): the lock owner identifier used in the TryLock request.
825+
headers (Tuple, optional): the headers from Dapr gRPC response.
826+
"""
827+
super().__init__(headers)
828+
self._success = success
829+
self._client = client
830+
self._store_name = store_name
831+
self._resource_id = resource_id
832+
self._lock_owner = lock_owner
833+
834+
def __bool__(self) -> bool:
835+
return self._success
836+
837+
@property
838+
def success(self) -> bool:
839+
"""Gets the response success status."""
840+
return self._success
841+
842+
def __exit__(self, *exc) -> None:
843+
''''Automatically unlocks the lock if this TryLockResponse was used as
844+
a ContextManager / `with` statement.
845+
846+
Notice: we are not checking the result of the unlock operation.
847+
If this is something you care about it might be wiser creating
848+
your own ContextManager that logs or otherwise raises exceptions
849+
if unlock doesn't return `UnlockResponseStatus.success`.
850+
'''
851+
if self._success:
852+
self._client.unlock(self._store_name, self._resource_id, self._lock_owner)
853+
# else: there is no point unlocking a lock we did not acquire.

dapr/clients/grpc/client.py

+106-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@
3838
from dapr.proto import api_v1, api_service_v1, common_v1
3939
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
4040

41-
from dapr.clients.grpc._helpers import MetadataTuple, DaprClientInterceptor, to_bytes
41+
from dapr.clients.grpc._helpers import (
42+
DaprClientInterceptor,
43+
MetadataTuple,
44+
to_bytes,
45+
validateNotBlankString,
46+
)
4247
from dapr.clients.grpc._request import (
4348
InvokeMethodRequest,
4449
BindingRequest,
@@ -50,14 +55,17 @@
5055
GetSecretResponse,
5156
GetBulkSecretResponse,
5257
InvokeMethodResponse,
58+
UnlockResponseStatus,
5359
StateResponse,
5460
BulkStatesResponse,
5561
BulkStateItem,
5662
ConfigurationResponse,
5763
ConfigurationItem,
5864
QueryResponse,
5965
QueryResponseItem,
60-
ConfigurationWatcher
66+
ConfigurationWatcher,
67+
TryLockResponse,
68+
UnlockResponse,
6169
)
6270

6371

@@ -984,6 +992,102 @@ def unsubscribe_configuration(
984992
response: UnsubscribeConfigurationResponse = self._stub.UnsubscribeConfigurationAlpha1(req)
985993
return response.ok
986994

995+
def try_lock(
996+
self,
997+
store_name: str,
998+
resource_id: str,
999+
lock_owner: str,
1000+
expiry_in_seconds: int) -> TryLockResponse:
1001+
"""Tries to get a lock with an expiry.
1002+
1003+
You can use the result of this operation directly on an `if` statement:
1004+
1005+
if client.try_lock(store_name, resource_id, first_client_id, expiry_s):
1006+
# lock acquired successfully...
1007+
1008+
You can also inspect the response's `success` attribute:
1009+
1010+
response = client.try_lock(store_name, resource_id, first_client_id, expiry_s)
1011+
if response.success:
1012+
# lock acquired successfully...
1013+
1014+
Finally, you can use this response with a `with` statement, and have the lock
1015+
be automatically unlocked after the with-statement scope ends
1016+
1017+
with client.try_lock(store_name, resource_id, first_client_id, expiry_s) as lock:
1018+
if lock:
1019+
# lock acquired successfully...
1020+
# Lock automatically unlocked at this point, no need to call client->unlock(...)
1021+
1022+
Args:
1023+
store_name (str): the lock store name, e.g. `redis`.
1024+
resource_id (str): the lock key. e.g. `order_id_111`.
1025+
It stands for "which resource I want to protect".
1026+
lock_owner (str): indicates the identifier of lock owner.
1027+
expiry_in_seconds (int): The length of time (in seconds) for which this lock
1028+
will be held and after which it expires.
1029+
1030+
Returns:
1031+
:class:`TryLockResponse`: With the result of the try-lock operation.
1032+
"""
1033+
# Warnings and input validation
1034+
warn('The Distributed Lock API is an Alpha version and is subject to change.',
1035+
UserWarning, stacklevel=2)
1036+
validateNotBlankString(store_name=store_name,
1037+
resource_id=resource_id,
1038+
lock_owner=lock_owner)
1039+
if not expiry_in_seconds or expiry_in_seconds < 1:
1040+
raise ValueError("expiry_in_seconds must be a positive number")
1041+
# Actual tryLock invocation
1042+
req = api_v1.TryLockRequest(
1043+
store_name=store_name,
1044+
resource_id=resource_id,
1045+
lock_owner=lock_owner,
1046+
expiryInSeconds=expiry_in_seconds)
1047+
response, call = self._stub.TryLockAlpha1.with_call(req)
1048+
return TryLockResponse(
1049+
success=response.success,
1050+
client=self,
1051+
store_name=store_name,
1052+
resource_id=resource_id,
1053+
lock_owner=lock_owner,
1054+
headers=call.initial_metadata())
1055+
1056+
def unlock(
1057+
self,
1058+
store_name: str,
1059+
resource_id: str,
1060+
lock_owner: str) -> UnlockResponse:
1061+
"""Unlocks a lock.
1062+
1063+
Args:
1064+
store_name (str): the lock store name, e.g. `redis`.
1065+
resource_id (str): the lock key. e.g. `order_id_111`.
1066+
It stands for "which resource I want to protect".
1067+
lock_owner (str): indicates the identifier of lock owner.
1068+
metadata (tuple, optional, DEPRECATED): gRPC custom metadata
1069+
1070+
Returns:
1071+
:class:`UnlockResponseStatus`: Status of the request,
1072+
`UnlockResponseStatus.success` if it was successful of some other
1073+
status otherwise.
1074+
"""
1075+
# Warnings and input validation
1076+
warn('The Distributed Lock API is an Alpha version and is subject to change.',
1077+
UserWarning, stacklevel=2)
1078+
validateNotBlankString(store_name=store_name,
1079+
resource_id=resource_id,
1080+
lock_owner=lock_owner)
1081+
# Actual unlocking invocation
1082+
req = api_v1.UnlockRequest(
1083+
store_name=store_name,
1084+
resource_id=resource_id,
1085+
lock_owner=lock_owner)
1086+
response, call = self._stub.UnlockAlpha1.with_call(req)
1087+
1088+
return UnlockResponse(status=UnlockResponseStatus(response.status),
1089+
headers=call.initial_metadata())
1090+
9871091
def wait(self, timeout_s: float):
9881092
"""Waits for sidecar to be available within the timeout.
9891093

examples/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ These examples demonstrate how to use the Dapr Python SDK:
1212
| [Virtual actors](./demo_actor) | Try Dapr virtual actor features
1313
| [Secrets](./secret_store) | Get secrets from a defined secret store
1414
| [Distributed tracing](./w3c-tracing) | Leverage Dapr's built-in tracing support
15+
| [Distributed lock](./distributed_lock) | Keep your application safe from race conditions by using distributed locks
1516

1617
## More information
1718

examples/distributed_lock/README.md

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Example - Acquire and release distributed locks
2+
3+
This example demonstrates the [Distributed Lock component] APIs in Dapr.
4+
It demonstrates the following APIs:
5+
- **try_lock**: Attempts to acquire a distributed lock from the lock store.
6+
- **unlock**: Attempts to release (a previously acquired) distributed lock
7+
8+
It creates a client using `DaprClient`, uses a local lock store defined in
9+
[`./components/lockstore.yaml`](./components/lockstore.yaml) and invokes
10+
all the distributed lock API methods available as example.
11+
12+
## Pre-requisites
13+
14+
- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
15+
- [Install Python 3.7+](https://www.python.org/downloads/)
16+
17+
## Install Dapr python-SDK
18+
19+
<!-- Our CI/CD pipeline automatically installs the correct version, so we can skip this step in the automation -->
20+
21+
```bash
22+
pip3 install dapr dapr-ext-grpc
23+
```
24+
25+
## Run the example
26+
27+
To run this example, the following code can be utilized:
28+
29+
<!-- STEP
30+
name: Run state store example
31+
expected_stdout_lines:
32+
- "== APP == Will try to acquire a lock from lock store named [lockstore]"
33+
- "== APP == The lock is for a resource named [example-lock-resource]"
34+
- "== APP == The client identifier is [example-client-id]"
35+
- "== APP == The lock will will expire in 60 seconds."
36+
- "== APP == Lock acquired successfully!!!"
37+
- "== APP == We already released the lock so unlocking will not work."
38+
- "== APP == We tried to unlock it anyway and got back [UnlockResponseStatus.lock_does_not_exist]"
39+
timeout_seconds: 5
40+
-->
41+
42+
```bash
43+
dapr run --app-id=locksapp --app-protocol grpc --components-path components/ python3 lock.py
44+
```
45+
<!-- END_STEP -->
46+
47+
The output should be as follows:
48+
49+
```
50+
== APP == Will try to acquire a lock from lock store named [lockstore]
51+
== APP == The lock is for a resource named [example-lock-resource]
52+
== APP == The client identifier is [example-client-id]
53+
== APP == The lock will will expire in 60 seconds.
54+
== APP == Lock acquired successfully!!!
55+
== APP == We already released the lock so unlocking will not work.
56+
== APP == We tried to unlock it anyway and got back [UnlockResponseStatus.lock_does_not_exist]
57+
```
58+
59+
## Error Handling
60+
61+
The Dapr python-sdk will pass through errors that it receives from the Dapr runtime.
62+
63+
[Distributed Lock component]: https://docs.dapr.io/developing-applications/building-blocks/distributed-lock/

0 commit comments

Comments
 (0)