Skip to content

Commit 9cd41b2

Browse files
committed
feat: make FailedWrite, ConflictedWrite exceptions
Rather than throwing the lower-level errors from datapath, we'll now use one of these two higher-level errors.
1 parent 3343b41 commit 9cd41b2

5 files changed

+296
-53
lines changed

src/denokv/_kv_writes.py

+110-39
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
from denokv._pycompat.typing import Any
3636
from denokv._pycompat.typing import ClassVar
3737
from denokv._pycompat.typing import Container
38+
from denokv._pycompat.typing import Final
3839
from denokv._pycompat.typing import Generic
40+
from denokv._pycompat.typing import Iterable
3941
from denokv._pycompat.typing import Mapping
4042
from denokv._pycompat.typing import MutableSequence
4143
from denokv._pycompat.typing import Never
@@ -60,6 +62,7 @@
6062
from denokv.datapath import AnyKvKey
6163
from denokv.datapath import CheckFailure
6264
from denokv.datapath import pack_key
65+
from denokv.errors import DenoKvError
6366
from denokv.kv_keys import KvKey
6467
from denokv.result import AnyFailure
6568
from denokv.result import AnySuccess
@@ -1330,65 +1333,126 @@ def _enqueue(self, enqueue: Enqueue, /) -> Self:
13301333
return self
13311334

13321335

1333-
@dataclass(init=False, unsafe_hash=True, **slots_if310())
1334-
class ConflictedWrite(FrozenAfterInitDataclass, AnyFailure):
1336+
EMPTY_MAP: Final[Mapping[Any, Any]] = MappingProxyType({})
1337+
1338+
1339+
# TODO: Support capturing retries in the FailedWrite/CommittedWrite?
1340+
@dataclass(init=False, unsafe_hash=True)
1341+
class FailedWrite(FrozenAfterInitDataclass, AnyFailure, DenoKvError):
13351342
if TYPE_CHECKING:
13361343

13371344
def _AnyFailure_marker(self, no_call: Never) -> Never: ...
13381345

1339-
ok: Literal[False]
1340-
conflicts: Mapping[AnyKvKey, CheckRepresentation]
1341-
versionstamp: None
1342-
checks: Sequence[CheckRepresentation]
1343-
mutations: Sequence[MutationRepresentation]
1344-
enqueues: Sequence[EnqueueRepresentation]
1345-
endpoint: EndpointInfo
1346+
checks: Final[Sequence[CheckRepresentation]] = field()
1347+
failed_checks: Final[Sequence[int]] = field()
1348+
mutations: Final[Sequence[MutationRepresentation]] = field()
1349+
enqueues: Final[Sequence[EnqueueRepresentation]] = field()
1350+
endpoint: Final[EndpointInfo] = field()
1351+
ok: Final[Literal[False]] = False # noqa: PYI064
1352+
versionstamp: Final[None] = None
13461353

13471354
def __init__(
13481355
self,
1349-
failed_checks: Sequence[int],
1350-
checks: Sequence[CheckRepresentation],
1351-
mutations: Sequence[MutationRepresentation],
1352-
enqueues: Sequence[EnqueueRepresentation],
1356+
checks: Iterable[CheckRepresentation],
1357+
mutations: Iterable[MutationRepresentation],
1358+
enqueues: Iterable[EnqueueRepresentation],
13531359
endpoint: EndpointInfo,
1360+
*,
1361+
cause: BaseException | None = None,
13541362
) -> None:
1355-
self.ok = False
1356-
try:
1357-
self.conflicts = MappingProxyType(
1358-
{checks[i].key: checks[i] for i in failed_checks}
1359-
)
1360-
except IndexError as e:
1361-
raise ValueError("failed_checks contains out-of-bounds index") from e
1362-
self.versionstamp = None
1363-
self.checks = tuple(checks)
1364-
self.mutations = tuple(mutations)
1365-
self.enqueues = tuple(enqueues)
1366-
self.endpoint = endpoint
1363+
super(FailedWrite, self).__init__()
1364+
self.checks = tuple(checks) # type: ignore[misc] # Cannot assign to final
1365+
# Allow subclass to initialise failed_checks
1366+
if not hasattr(self, "failed_checks"):
1367+
self.failed_checks = tuple() # type: ignore[misc] # Cannot assign to final
1368+
self.mutations = tuple(mutations) # type: ignore[misc] # Cannot assign to final
1369+
self.enqueues = tuple(enqueues) # type: ignore[misc] # Cannot assign to final
1370+
self.endpoint = endpoint # type: ignore[misc] # Cannot assign to final
1371+
self.__cause__ = cause
1372+
1373+
@property
1374+
def conflicts(self) -> Mapping[AnyKvKey, CheckRepresentation]:
1375+
checks = self.checks
1376+
return {checks[i].key: checks[i] for i in self.failed_checks}
1377+
1378+
def _get_cause_description(self) -> str:
1379+
if self.__cause__:
1380+
return type(self.__cause__).__name__
1381+
return "unspecified cause"
1382+
1383+
@property
1384+
def message(self) -> str:
1385+
# TODO: after xxx attempts?
1386+
return (
1387+
f"to {str(self.endpoint.url)!r} "
1388+
f"due to {self._get_cause_description()}, "
1389+
f"with {len(self.checks)} checks, "
1390+
f"{len(self.mutations)} mutations, "
1391+
f"{len(self.enqueues)} enqueues"
1392+
)
1393+
1394+
def __str__(self) -> str:
1395+
return f"Write failed {self.message}"
13671396

13681397
def __repr__(self) -> str:
1398+
return f"<{type(self).__name__} {self.message}>"
1399+
1400+
1401+
def _normalise_failed_checks(
1402+
failed_checks: Iterable[int], check_count: int
1403+
) -> tuple[int, ...]:
1404+
failed_checks = tuple(sorted(failed_checks))
1405+
if failed_checks and (failed_checks[0] < 0 or failed_checks[-1] >= check_count):
1406+
raise ValueError("failed_checks contains out-of-bounds index")
1407+
return failed_checks
1408+
1409+
1410+
class ConflictedWrite(FailedWrite):
1411+
def __init__(
1412+
self,
1413+
failed_checks: Iterable[int],
1414+
checks: Iterable[CheckRepresentation],
1415+
mutations: Iterable[MutationRepresentation],
1416+
enqueues: Iterable[EnqueueRepresentation],
1417+
endpoint: EndpointInfo,
1418+
*,
1419+
cause: BaseException | None = None,
1420+
) -> None:
1421+
_checks = tuple(checks)
1422+
self.failed_checks = _normalise_failed_checks( # type: ignore[misc] # Cannot assign to final attribute "failed_checks"
1423+
failed_checks,
1424+
check_count=len(_checks),
1425+
)
1426+
super(ConflictedWrite, self).__init__(
1427+
_checks, mutations, enqueues, endpoint, cause=cause
1428+
)
1429+
1430+
@property
1431+
def message(self) -> str:
13691432
return (
1370-
f"<{type(self).__name__} "
13711433
f"NOT APPLIED to {str(self.endpoint.url)!r} with "
13721434
f"{len(self.conflicts)}/{len(self.checks)} checks CONFLICTING, "
13731435
f"{len(self.mutations)} mutations, "
13741436
f"{len(self.enqueues)} enqueues"
1375-
f">"
13761437
)
13771438

1439+
def __str__(self) -> str:
1440+
return f"Write {self.message}"
1441+
13781442

13791443
@dataclass(init=False, unsafe_hash=True, **slots_if310())
13801444
class CommittedWrite(FrozenAfterInitDataclass, AnySuccess):
13811445
if TYPE_CHECKING:
13821446

13831447
def _AnySuccess_marker(self, no_call: Never) -> Never: ...
13841448

1385-
ok: Literal[True]
1386-
conflicts: Mapping[KvKey, CheckRepresentation] # empty
1387-
versionstamp: VersionStamp
1388-
checks: Sequence[CheckRepresentation]
1389-
mutations: Sequence[MutationRepresentation]
1390-
enqueues: Sequence[EnqueueRepresentation]
1391-
endpoint: EndpointInfo
1449+
ok: Final[Literal[True]] # noqa: PYI064
1450+
conflicts: Final[Mapping[KvKey, CheckRepresentation]] # empty
1451+
versionstamp: Final[VersionStamp]
1452+
checks: Final[Sequence[CheckRepresentation]]
1453+
mutations: Final[Sequence[MutationRepresentation]]
1454+
enqueues: Final[Sequence[EnqueueRepresentation]]
1455+
endpoint: Final[EndpointInfo]
13921456

13931457
def __init__(
13941458
self,
@@ -1399,23 +1463,28 @@ def __init__(
13991463
endpoint: EndpointInfo,
14001464
) -> None:
14011465
self.ok = True
1402-
self.conflicts = MappingProxyType({})
1466+
self.conflicts = EMPTY_MAP
14031467
self.versionstamp = versionstamp
14041468
self.checks = tuple(checks)
14051469
self.mutations = tuple(mutations)
14061470
self.enqueues = tuple(enqueues)
14071471
self.endpoint = endpoint
14081472

1409-
def __repr__(self) -> str:
1473+
@property
1474+
def _message(self) -> str:
14101475
return (
1411-
f"<{type(self).__name__} "
14121476
f"version 0x{self.versionstamp} to {str(self.endpoint.url)!r} with "
14131477
f"{len(self.checks)} checks, "
14141478
f"{len(self.mutations)} mutations, "
14151479
f"{len(self.enqueues)} enqueues"
1416-
f">"
14171480
)
14181481

1482+
def __str__(self) -> str:
1483+
return f"Write committed {self._message}"
1484+
1485+
def __repr__(self) -> str:
1486+
return f"<{type(self).__name__} {self._message}>"
1487+
14191488

14201489
CompletedWrite: TypeAlias = Union[CommittedWrite, ConflictedWrite]
14211490

@@ -2078,4 +2147,6 @@ def _evaluate_backoff_schedule(self) -> Sequence[int]:
20782147
return [int(delay * 1000) for delay in delay_seconds]
20792148

20802149

2081-
WriteOperation: TypeAlias = Union[Check, Set, Sum, Min, Max, Delete, Enqueue]
2150+
WriteOperation: TypeAlias = Union[
2151+
CheckRepresentation, MutationRepresentation, EnqueueRepresentation
2152+
]

test/test__kv_writes__CommittedWrite.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ def test_str_repr() -> None:
6161
endpoint=EP,
6262
)
6363
assert (
64-
str(instance) == "<CommittedWrite version 0x00000000000000000001 "
64+
str(instance) == "Write committed version 0x00000000000000000001 "
65+
"to 'https://example.com/' with 1 checks, 1 mutations, 1 enqueues"
66+
)
67+
assert (
68+
repr(instance) == "<CommittedWrite version 0x00000000000000000001 "
6569
"to 'https://example.com/' with 1 checks, 1 mutations, 1 enqueues>"
6670
)
67-
assert str(instance) == repr(instance)

test/test__kv_writes__ConflictedWrite.py

+62-10
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1+
import traceback
12
from datetime import datetime
23

34
import pytest
45
from yarl import URL
56

7+
from denokv import _datapath_pb2 as datapath_pb2
68
from denokv._kv_writes import Check
79
from denokv._kv_writes import ConflictedWrite
810
from denokv._kv_writes import Enqueue
911
from denokv._kv_writes import Set
1012
from denokv.auth import ConsistencyLevel
1113
from denokv.auth import EndpointInfo
14+
from denokv.datapath import CheckFailure
1215
from denokv.kv_keys import KvKey
1316
from denokv.result import is_err
1417

@@ -18,17 +21,32 @@
1821

1922
@pytest.fixture
2023
def instance() -> ConflictedWrite:
24+
pb_checks = [
25+
datapath_pb2.Check(key=bytes(KvKey("a")), versionstamp=None),
26+
datapath_pb2.Check(key=bytes(KvKey("b")), versionstamp=None),
27+
datapath_pb2.Check(key=bytes(KvKey("c")), versionstamp=None),
28+
]
2129
checks = [
2230
Check.for_key_not_set(KvKey("a")),
2331
Check.for_key_not_set(KvKey("b")),
2432
Check.for_key_not_set(KvKey("c")),
2533
]
34+
failed_checks = [0, 2]
35+
36+
cause = CheckFailure(
37+
"Not all checks required by the Atomic Write passed",
38+
all_checks=pb_checks,
39+
failed_check_indexes=failed_checks,
40+
endpoint=EP,
41+
)
42+
2643
return ConflictedWrite(
27-
failed_checks=[0, 2],
28-
checks=list(checks),
44+
failed_checks=failed_checks,
45+
checks=checks,
2946
mutations=[Set(KvKey("a"), 42)],
3047
enqueues=[Enqueue("Hi")],
3148
endpoint=EP,
49+
cause=cause,
3250
)
3351

3452

@@ -53,13 +71,9 @@ def test_constructor(instance: ConflictedWrite) -> None:
5371
assert instance.enqueues == (Enqueue("Hi"),)
5472
assert instance.endpoint is EP
5573

56-
assert dict(instance.conflicts) == {KvKey("a"): checks[0], KvKey("c"): checks[2]}
74+
assert instance.conflicts == {KvKey("a"): checks[0], KvKey("c"): checks[2]}
5775
assert instance.conflicts[KvKey("a")] is checks[0]
5876

59-
# conflicts is immutable
60-
with pytest.raises(TypeError):
61-
del instance.conflicts[KvKey("a")] # type: ignore[attr-defined]
62-
6377
with pytest.raises(ValueError, match=r"failed_checks contains out-of-bounds index"):
6478
ConflictedWrite(
6579
failed_checks=[0, 10],
@@ -70,13 +84,51 @@ def test_constructor(instance: ConflictedWrite) -> None:
7084
)
7185

7286

87+
def test_changes_to_conflicts_do_not_persist(instance: ConflictedWrite) -> None:
88+
assert isinstance(instance.conflicts, dict)
89+
# Changes to conflicts do not persist
90+
assert KvKey("a") in instance.conflicts
91+
del instance.conflicts[KvKey("a")]
92+
assert KvKey("a") in instance.conflicts
93+
94+
7395
def test_is_AnyFailure(instance: ConflictedWrite) -> None:
7496
assert is_err(instance)
7597

7698

77-
def test_str_repr(instance: ConflictedWrite) -> None:
99+
@pytest.mark.parametrize("with_cause", [True, False])
100+
def test_str(instance: ConflictedWrite, with_cause: bool) -> None:
101+
assert instance.__cause__
102+
if not with_cause:
103+
instance.__cause__ = None
78104
assert (
79-
str(instance) == "<ConflictedWrite NOT APPLIED to 'https://example.com/' "
105+
str(instance) == "Write NOT APPLIED to 'https://example.com/' "
106+
"with 2/3 checks CONFLICTING, 1 mutations, 1 enqueues"
107+
)
108+
109+
110+
@pytest.mark.parametrize("with_cause", [True, False])
111+
def test_repr(instance: ConflictedWrite, with_cause: bool) -> None:
112+
assert instance.__cause__
113+
if not with_cause:
114+
instance.__cause__ = None
115+
116+
assert (
117+
repr(instance) == "<ConflictedWrite NOT APPLIED to 'https://example.com/' "
80118
"with 2/3 checks CONFLICTING, 1 mutations, 1 enqueues>"
81119
)
82-
assert str(instance) == repr(instance)
120+
121+
122+
@pytest.mark.parametrize("with_cause", [True, False])
123+
def test_traceback_presentation(instance: ConflictedWrite, with_cause: bool) -> None:
124+
assert instance.__cause__
125+
if not with_cause:
126+
instance.__cause__ = None
127+
128+
assert "\n".join(
129+
traceback.format_exception_only(type(instance), instance)
130+
).strip() == (
131+
"denokv._kv_writes.ConflictedWrite: "
132+
"Write NOT APPLIED to 'https://example.com/' "
133+
"with 2/3 checks CONFLICTING, 1 mutations, 1 enqueues"
134+
)

0 commit comments

Comments
 (0)