diff --git a/crossplane/function/resource.py b/crossplane/function/resource.py index 039af46..c9bfc69 100644 --- a/crossplane/function/resource.py +++ b/crossplane/function/resource.py @@ -45,8 +45,8 @@ def update(r: fnv1.Resource, source: dict | structpb.Struct | pydantic.BaseModel # apiVersion is set to its default value 's3.aws.upbound.io/v1beta2' # (and not explicitly provided during initialization), it will be # excluded from the serialized output. - data['apiVersion'] = source.apiVersion - data['kind'] = source.kind + data["apiVersion"] = source.apiVersion + data["kind"] = source.kind r.resource.update(data) case structpb.Struct(): # TODO(negz): Use struct_to_dict and update to match other semantics? diff --git a/crossplane/function/runtime.py b/crossplane/function/runtime.py index f91f392..ddee602 100644 --- a/crossplane/function/runtime.py +++ b/crossplane/function/runtime.py @@ -16,6 +16,7 @@ import asyncio import os +import signal import grpc from grpc_reflection.v1alpha import reflection @@ -31,6 +32,8 @@ fnv1beta1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name, ) +SHUTDOWN_GRACE_PERIOD_SECONDS = 5 + def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials: """Load TLS credentials for a composition function gRPC server. @@ -90,6 +93,11 @@ def serve( server = grpc.aio.server() + loop.add_signal_handler( + signal.SIGTERM, + lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)), + ) + grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server) grpcv1beta1.add_FunctionRunnerServiceServicer_to_server( BetaFunctionRunner(wrapped=function), server @@ -116,7 +124,7 @@ async def start(): try: loop.run_until_complete(start()) finally: - loop.run_until_complete(server.stop(grace=5)) + loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)) loop.close()