Skip to content

Commit fb85c8a

Browse files
N-giveNathan Givens
andcommitted
IWF-808 use Unset object for empty data output (#106)
* IWF-808 use `Unset` object for empty data output --------- Co-authored-by: Nathan Givens <[email protected]>
1 parent f843561 commit fb85c8a

8 files changed

+79
-59
lines changed

iwf/communication.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
WorkflowWorkerRpcRequestInternalChannelInfos,
88
WorkflowWorkerRpcRequestSignalChannelInfos,
99
)
10+
from iwf.iwf_api.types import Unset
1011
from iwf.object_encoder import ObjectEncoder
1112
from iwf.state_movement import StateMovement
1213
from iwf.type_store import TypeStore
@@ -16,7 +17,7 @@ class Communication:
1617
_internal_channel_type_store: TypeStore
1718
_signal_channel_type_store: dict[str, Optional[type]]
1819
_object_encoder: ObjectEncoder
19-
_to_publish_internal_channel: dict[str, list[EncodedObject]]
20+
_to_publish_internal_channel: dict[str, list[Union[EncodedObject, Unset]]]
2021
_state_movements: list[StateMovement]
2122
_internal_channel_infos: Optional[WorkflowWorkerRpcRequestInternalChannelInfos]
2223
_signal_channel_infos: Optional[WorkflowWorkerRpcRequestSignalChannelInfos]

iwf/communication_schema.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ class CommunicationMethod:
1616
is_prefix: bool
1717

1818
@classmethod
19-
def signal_channel_def(cls, name: str, value_type: type):
19+
def signal_channel_def(cls, name: str, value_type: Union[type, None]):
2020
return CommunicationMethod(
21-
name, CommunicationMethodType.SignalChannel, value_type, False
21+
name,
22+
CommunicationMethodType.SignalChannel,
23+
value_type if value_type is not None else type(None),
24+
False,
2225
)
2326

2427
@classmethod

iwf/data_attributes.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,22 @@
22

33
from iwf.errors import WorkflowDefinitionError
44
from iwf.iwf_api.models import EncodedObject
5+
from iwf.iwf_api.types import Unset
56
from iwf.object_encoder import ObjectEncoder
67
from iwf.type_store import TypeStore
78

89

910
class DataAttributes:
1011
_type_store: TypeStore
1112
_object_encoder: ObjectEncoder
12-
_current_values: dict[str, Union[EncodedObject, None]]
13-
_updated_values_to_return: dict[str, EncodedObject]
13+
_current_values: dict[str, Union[EncodedObject, None, Unset]]
14+
_updated_values_to_return: dict[str, Union[EncodedObject, Unset]]
1415

1516
def __init__(
1617
self,
1718
type_store: TypeStore,
1819
object_encoder: ObjectEncoder,
19-
current_values: dict[str, Union[EncodedObject, None]],
20+
current_values: dict[str, Union[EncodedObject, None, Unset]],
2021
):
2122
self._object_encoder = object_encoder
2223
self._type_store = type_store
@@ -56,5 +57,5 @@ def set_data_attribute(self, key: str, value: Any):
5657
self._current_values[key] = encoded_value
5758
self._updated_values_to_return[key] = encoded_value
5859

59-
def get_updated_values_to_return(self) -> dict[str, EncodedObject]:
60+
def get_updated_values_to_return(self) -> dict[str, Union[EncodedObject, Unset]]:
6061
return self._updated_values_to_return

iwf/object_encoder.py

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from typing_extensions import Literal
3434

3535
from iwf.iwf_api.models import EncodedObject
36-
from iwf.iwf_api.types import Unset
36+
from iwf.iwf_api.types import UNSET, Unset
3737

3838
# StrEnum is available in 3.11+
3939
if sys.version_info >= (3, 11):
@@ -50,14 +50,15 @@ class PayloadConverter(ABC):
5050
def to_payload(
5151
self,
5252
value: Any,
53-
) -> EncodedObject:
53+
) -> Union[EncodedObject, Unset]:
5454
"""Encode values into payloads.
5555
5656
Args:
5757
value: value to be converted
5858
5959
Returns:
60-
Converted payload.
60+
A boolean to indicate if the payload was converted and the converted value
61+
or Unset
6162
6263
Raises:
6364
Exception: Any issue during conversion.
@@ -90,19 +91,20 @@ class EncodingPayloadConverter(ABC):
9091

9192
@property
9293
@abstractmethod
93-
def encoding(self) -> str:
94+
def encoding(self) -> Union[str, Unset]:
9495
"""Encoding for the payload this converter works with."""
9596
raise NotImplementedError
9697

9798
@abstractmethod
98-
def to_payload(self, value: Any) -> Optional[EncodedObject]:
99+
def to_payload(self, value: Any) -> tuple[bool, Union[EncodedObject, Unset]]:
99100
"""Encode a single value to a payload or None.
100101
101102
Args:
102103
value: Value to be converted.
103104
104105
Returns:
105-
Payload of the value or None if unable to convert.
106+
A boolean to indicate if the payload was converted and the converted value
107+
or Unset
106108
107109
Raises:
108110
TypeError: Value is not the expected type.
@@ -145,7 +147,7 @@ class CompositePayloadConverter(PayloadConverter):
145147
converters: List of payload converters to delegate to, in order.
146148
"""
147149

148-
converters: Mapping[str, EncodingPayloadConverter]
150+
converters: Mapping[Union[str, Unset], EncodingPayloadConverter]
149151

150152
def __init__(self, *converters: EncodingPayloadConverter) -> None:
151153
"""Initializes the data converter.
@@ -159,7 +161,7 @@ def __init__(self, *converters: EncodingPayloadConverter) -> None:
159161
def to_payload(
160162
self,
161163
value: Any,
162-
) -> EncodedObject:
164+
) -> Union[EncodedObject, Unset]:
163165
"""Encode values trying each converter.
164166
165167
See base class. Always returns the same number of payloads as values.
@@ -169,12 +171,13 @@ def to_payload(
169171
"""
170172
# We intentionally attempt these serially just in case a stateful
171173
# converter may rely on the previous values
172-
payload = None
174+
payload: Union[EncodedObject, Unset] = Unset()
175+
is_encoded = False
173176
for converter in self.converters.values():
174-
payload = converter.to_payload(value)
175-
if payload is not None:
177+
is_encoded, payload = converter.to_payload(value)
178+
if is_encoded:
176179
break
177-
if payload is None:
180+
if not is_encoded:
178181
raise RuntimeError(
179182
f"Value of type {type(value)} has no known converter",
180183
)
@@ -194,7 +197,7 @@ def from_payload(
194197
RuntimeError: Error during decode
195198
"""
196199
encoding = payload.encoding
197-
assert isinstance(encoding, str)
200+
assert isinstance(encoding, (str, Unset))
198201
converter = self.converters.get(encoding)
199202
if converter is None:
200203
raise KeyError(f"Unknown payload encoding {encoding}")
@@ -229,17 +232,15 @@ class BinaryNullPayloadConverter(EncodingPayloadConverter):
229232
"""Converter for 'binary/null' payloads supporting None values."""
230233

231234
@property
232-
def encoding(self) -> str:
235+
def encoding(self) -> Union[str, Unset]:
233236
"""See base class."""
234-
return "binary/null"
237+
return UNSET
235238

236-
def to_payload(self, value: Any) -> Optional[EncodedObject]:
239+
def to_payload(self, value: Any) -> tuple[bool, Union[EncodedObject, Unset]]:
237240
"""See base class."""
238241
if value is None:
239-
return EncodedObject(
240-
encoding=self.encoding,
241-
)
242-
return None
242+
return (True, UNSET)
243+
return (False, UNSET)
243244

244245
def from_payload(
245246
self,
@@ -256,18 +257,21 @@ class BinaryPlainPayloadConverter(EncodingPayloadConverter):
256257
"""Converter for 'binary/plain' payloads supporting bytes values."""
257258

258259
@property
259-
def encoding(self) -> str:
260+
def encoding(self) -> Union[str, Unset]:
260261
"""See base class."""
261262
return "binary/plain"
262263

263-
def to_payload(self, value: Any) -> Optional[EncodedObject]:
264+
def to_payload(self, value: Any) -> tuple[bool, Union[EncodedObject, Unset]]:
264265
"""See base class."""
265266
if isinstance(value, bytes):
266-
return EncodedObject(
267-
encoding=self.encoding,
268-
data=str(value),
267+
return (
268+
True,
269+
EncodedObject(
270+
encoding=self.encoding,
271+
data=str(value),
272+
),
269273
)
270-
return None
274+
return (False, UNSET)
271275

272276
def from_payload(
273277
self,
@@ -345,11 +349,11 @@ def __init__(
345349
self._custom_type_converters = custom_type_converters
346350

347351
@property
348-
def encoding(self) -> str:
352+
def encoding(self) -> Union[str, Unset]:
349353
"""See base class."""
350354
return self._encoding
351355

352-
def to_payload(self, value: Any) -> Optional[EncodedObject]:
356+
def to_payload(self, value: Any) -> tuple[bool, Union[EncodedObject, Unset]]:
353357
"""See base class."""
354358
# Check for pydantic then send warning
355359
if hasattr(value, "parse_obj"):
@@ -358,13 +362,16 @@ def to_payload(self, value: Any) -> Optional[EncodedObject]:
358362
"https://github.com/temporalio/samples-python/tree/main/pydantic_converter for better support",
359363
)
360364
# We let JSON conversion errors be thrown to caller
361-
return EncodedObject(
362-
encoding=self.encoding,
363-
data=json.dumps(
364-
value,
365-
cls=self._encoder,
366-
separators=(",", ":"),
367-
sort_keys=True,
365+
return (
366+
True,
367+
EncodedObject(
368+
encoding=self.encoding,
369+
data=json.dumps(
370+
value,
371+
cls=self._encoder,
372+
separators=(",", ":"),
373+
sort_keys=True,
374+
),
368375
),
369376
)
370377

@@ -428,7 +435,7 @@ class PayloadCodec(ABC):
428435
@abstractmethod
429436
def encode(
430437
self,
431-
payload: EncodedObject,
438+
payload: Union[EncodedObject, Unset],
432439
) -> EncodedObject:
433440
"""Encode the given payloads.
434441
@@ -486,7 +493,7 @@ def __post_init__(self) -> None: # noqa: D105
486493
def encode(
487494
self,
488495
value: Any,
489-
) -> EncodedObject:
496+
) -> Union[EncodedObject, Unset]:
490497
"""Encode values into payloads.
491498
492499
First converts values to payload then encodes payload using codec.

iwf/state_execution_locals.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
from typing import Any, List, Tuple
1+
from typing import Any, List, Tuple, Union
22

33
from iwf.errors import WorkflowDefinitionError
44
from iwf.iwf_api.models import EncodedObject, KeyValue
5+
from iwf.iwf_api.types import Unset
56
from iwf.object_encoder import ObjectEncoder
67

78

89
class StateExecutionLocals:
9-
_record_events: dict[str, EncodedObject]
10-
_attribute_name_to_encoded_object_map: dict[str, EncodedObject]
11-
_upsert_attributes_to_return_to_server: dict[str, EncodedObject]
10+
_record_events: dict[str, Union[EncodedObject, Unset]]
11+
_attribute_name_to_encoded_object_map: dict[str, Union[EncodedObject, Unset]]
12+
_upsert_attributes_to_return_to_server: dict[str, Union[EncodedObject, Unset]]
1213
_object_encoder: ObjectEncoder
1314

1415
def __init__(
1516
self,
16-
attribute_name_to_encoded_object_map: dict[str, EncodedObject],
17+
attribute_name_to_encoded_object_map: dict[str, Union[EncodedObject, Unset]],
1718
object_encoder: ObjectEncoder,
1819
):
1920
self._object_encoder = object_encoder

iwf/tests/test_rpc_with_memo_duplicate_java_tests.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ class TestRpcWithMemo(unittest.TestCase):
2222
def setUpClass(cls):
2323
cls.client = Client(registry)
2424

25-
@unittest.skip("Currently broken: difference in behavior with the iwf-java-sdk")
2625
def test_rpc_memo_workflow_func1(self):
2726
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
2827
run_id = self.client.start_workflow(

iwf/tests/workflows/wait_signal_workflow.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,11 @@ class WaitSignalWorkflow(ObjectWorkflow):
125125
def get_communication_schema(self) -> CommunicationSchema:
126126
return CommunicationSchema.create(
127127
CommunicationMethod.signal_channel_def(test_channel_int, int),
128-
CommunicationMethod.signal_channel_def(test_channel_none, type(None)),
128+
CommunicationMethod.signal_channel_def(test_channel_none, None),
129129
CommunicationMethod.signal_channel_def(test_channel_str, str),
130-
CommunicationMethod.signal_channel_def(test_idle_channel_none, type(None)),
131-
CommunicationMethod.signal_channel_def(test_channel1, type(None)),
132-
CommunicationMethod.signal_channel_def(test_channel2, type(None)),
130+
CommunicationMethod.signal_channel_def(test_idle_channel_none, None),
131+
CommunicationMethod.signal_channel_def(test_channel1, None),
132+
CommunicationMethod.signal_channel_def(test_channel2, None),
133133
)
134134

135135
def get_workflow_states(self) -> StateSchema:

iwf/worker_service.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ def handle_workflow_worker_rpc(
8585
unset_to_none(request.input_), rpc_info.input_type
8686
)
8787

88-
current_data_attributes: dict[str, typing.Union[EncodedObject, None]] = {}
88+
current_data_attributes: dict[str, typing.Union[EncodedObject, None, Unset]] = (
89+
{}
90+
)
8991
if not isinstance(request.data_attributes, Unset):
9092
current_data_attributes = {
9193
assert_not_unset(attr.key): unset_to_none(attr.value)
@@ -184,7 +186,9 @@ def handle_workflow_state_wait_until(
184186
unset_to_none(request.state_input), get_input_type(state)
185187
)
186188

187-
current_data_attributes: dict[str, typing.Union[EncodedObject, None]] = {}
189+
current_data_attributes: dict[str, typing.Union[EncodedObject, None, Unset]] = (
190+
{}
191+
)
188192
if not isinstance(request.data_objects, Unset):
189193
current_data_attributes = {
190194
assert_not_unset(attr.key): unset_to_none(attr.value)
@@ -267,7 +271,9 @@ def handle_workflow_state_execute(
267271
unset_to_none(request.state_input), get_input_type(state)
268272
)
269273

270-
current_data_attributes: dict[str, typing.Union[EncodedObject, None]] = {}
274+
current_data_attributes: dict[str, typing.Union[EncodedObject, None, Unset]] = (
275+
{}
276+
)
271277
if not isinstance(request.data_objects, Unset):
272278
current_data_attributes = {
273279
assert_not_unset(attr.key): unset_to_none(attr.value)
@@ -407,9 +413,11 @@ def _create_upsert_search_attributes(
407413
return sas
408414

409415

410-
def to_map(key_values: Union[None, Unset, List[KeyValue]]) -> dict[str, EncodedObject]:
416+
def to_map(
417+
key_values: Union[None, Unset, List[KeyValue]],
418+
) -> dict[str, Union[EncodedObject, Unset]]:
411419
key_values = unset_to_none(key_values) or []
412-
kvs = {}
420+
kvs: dict[str, Union[EncodedObject, Unset]] = {}
413421
for kv in key_values:
414422
k = unset_to_none(kv.key)
415423
v = unset_to_none(kv.value)

0 commit comments

Comments
 (0)