Skip to content

Commit 90d42f8

Browse files
committed
Fix passing Nexus context headers/request ID from worker
1 parent 466228f commit 90d42f8

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

temporalio/worker/_nexus.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import (
1010
Any,
1111
Callable,
12+
Mapping,
1213
NoReturn,
1314
Optional,
1415
Sequence,
@@ -105,7 +106,9 @@ async def raise_from_exception_queue() -> NoReturn:
105106
# tasks as we do start operation tasks?
106107
asyncio.create_task(
107108
self._handle_cancel_operation_task(
108-
task.request.cancel_operation, task.task_token
109+
task.task_token,
110+
task.request.cancel_operation,
111+
dict(task.request.header),
109112
)
110113
)
111114
else:
@@ -155,17 +158,22 @@ async def wait_all_completed(self) -> None:
155158
# "Any call up to this function and including this one will be trimmed out of stack traces.""
156159

157160
async def _handle_cancel_operation_task(
158-
self, request: temporalio.api.nexus.v1.CancelOperationRequest, task_token: bytes
161+
self,
162+
task_token: bytes,
163+
request: temporalio.api.nexus.v1.CancelOperationRequest,
164+
headers: Mapping[str, str],
159165
) -> None:
160166
"""
161167
Handle a cancel operation task.
162168
163169
Attempt to execute the user cancel_operation method. Handle errors and send the
164170
task completion.
165171
"""
172+
# TODO(nexus-prerelease): headers
166173
ctx = CancelOperationContext(
167174
service=request.service,
168175
operation=request.operation,
176+
headers=headers,
169177
)
170178
_temporal_operation_context.set(
171179
_TemporalNexusOperationContext(
@@ -174,7 +182,6 @@ async def _handle_cancel_operation_task(
174182
client=self._client,
175183
)
176184
)
177-
# TODO(nexus-prerelease): headers
178185
try:
179186
await self._handler.cancel_operation(ctx, request.operation_token)
180187
except Exception as err:
@@ -202,7 +209,7 @@ async def _handle_start_operation_task(
202209
self,
203210
task_token: bytes,
204211
start_request: temporalio.api.nexus.v1.StartOperationRequest,
205-
headers: dict[str, str],
212+
headers: Mapping[str, str],
206213
) -> None:
207214
"""
208215
Handle a start operation task.
@@ -243,7 +250,7 @@ async def _handle_start_operation_task(
243250
async def _start_operation(
244251
self,
245252
start_request: temporalio.api.nexus.v1.StartOperationRequest,
246-
headers: dict[str, str],
253+
headers: Mapping[str, str],
247254
) -> temporalio.api.nexus.v1.StartOperationResponse:
248255
"""
249256
Invoke the Nexus handler's start_operation method and construct the StartOperationResponse.

0 commit comments

Comments
 (0)