Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3ecefaf
PR 1 for appio-authn changes using tokens/interceptors
msheller Mar 16, 2026
8c8aa0d
Reduced scope to ServerAppIo and fixed mypy pylint issues
msheller Mar 17, 2026
466c1a2
Merge branch 'main' into appio-authn/1-shared-interceptor-foundation
msheller Mar 17, 2026
ade51e4
Apply suggestion from @panh99
panh99 Mar 18, 2026
cd6a276
Apply suggestion from @panh99
panh99 Mar 18, 2026
c666a09
Update framework/py/flwr/supercore/interceptors/appio_token_intercept…
msheller Mar 18, 2026
92dbcd0
Update framework/py/flwr/supercore/interceptors/appio_token_intercept…
msheller Mar 18, 2026
10d58a7
Merge branch 'main' into appio-authn/1-shared-interceptor-foundation
msheller Mar 18, 2026
d08b1b1
Fixed factory name
msheller Mar 18, 2026
c9d4bc0
Placed policy code into new auth module
msheller Mar 18, 2026
079077c
Removed use of request token in interceptor
msheller Mar 18, 2026
a6c5f37
No longer relies on fixed list of methods for interceptor test
msheller Mar 19, 2026
1021ae8
Merge branch 'main' into appio-authn/1-shared-interceptor-foundation
msheller Mar 19, 2026
19dd511
Update policy.py
msheller Mar 19, 2026
dee3250
Update __init__.py
msheller Mar 19, 2026
ac98c4b
Merge branch 'main' into appio-authn/1-shared-interceptor-foundation
msheller Mar 19, 2026
5cf4261
Initial draft
msheller Mar 19, 2026
4630760
Merge branch 'main' into appio-authn/2-wire-serverappio-auth-interceptor
msheller Mar 19, 2026
2a50444
Updated initial draft to include PR3 as well (ClientAppIo wiring)
msheller Mar 19, 2026
9092eec
Disallow empty token in GrpcGrid init
msheller Mar 19, 2026
9bbc1c7
Added tests recommended by Codex
msheller Mar 19, 2026
d862946
Initial draft of simulation wiring (client-side)
msheller Mar 20, 2026
eefd34d
Update to fix token lifecycle logic and force simulationio_connection…
msheller Mar 20, 2026
8d6649e
Merge branch 'main' into appio-authn/2-and-3-wire-serverappio-and-cli…
msheller Mar 23, 2026
3a8e941
Merge branch 'main' into appio-authn/2-and-3-wire-serverappio-and-cli…
msheller Mar 23, 2026
9340dcb
Merge branch 'main' into appio-authn/2-and-3-wire-serverappio-and-cli…
msheller Mar 25, 2026
0f95540
Merge branch 'main' into appio-authn/2-and-3-wire-serverappio-and-cli…
msheller Mar 26, 2026
2d3bf9f
Added new run-type parameter to create_run calls
msheller Mar 26, 2026
5049be1
Merge branch 'main' into appio-authn/2-and-3-wire-serverappio-and-cli…
msheller Mar 26, 2026
ab0975e
Added temporary test change to assist diagnosing CI failure
msheller Mar 26, 2026
a43fa90
Revert "Added temporary test change to assist diagnosing CI failure"
msheller Mar 26, 2026
cce72c9
Merge branch 'main' into appio-authn/2-and-3-wire-serverappio-and-cli…
msheller Mar 26, 2026
8502bf1
Merge branch 'main' into appio-authn/2-and-3-wire-serverappio-and-cli…
msheller Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion framework/py/flwr/server/grid/grpc_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
pull_objects,
push_objects,
)
from flwr.supercore.interceptors import AppIoTokenClientInterceptor

from .grid import Grid

Expand Down Expand Up @@ -100,7 +101,7 @@
"""


class GrpcGrid(Grid):
class GrpcGrid(Grid): # pylint: disable=too-many-instance-attributes
"""`GrpcGrid` provides an interface to the ServerAppIo API.

Parameters
Expand All @@ -111,6 +112,8 @@ class GrpcGrid(Grid):
The PEM-encoded root certificates as a byte string.
If provided, a secure connection using the certificates will be
established to an SSL-enabled Flower server.
token : str
Executor token used for ServerAppIo authentication.
"""

_deprecation_warning_logged = False
Expand All @@ -119,9 +122,14 @@ def __init__( # pylint: disable=too-many-arguments
self,
serverappio_service_address: str = SERVERAPPIO_API_DEFAULT_CLIENT_ADDRESS,
root_certificates: bytes | None = None,
*,
token: str,
) -> None:
if token == "":
raise ValueError("`token` must be a non-empty string")
self._addr = serverappio_service_address
self._cert = root_certificates
self._token = token
self._run: Run | None = None
self._grpc_stub: ServerAppIoStub | None = None
self._channel: grpc.Channel | None = None
Expand All @@ -146,6 +154,7 @@ def _connect(self) -> None:
server_address=self._addr,
insecure=(self._cert is None),
root_certificates=self._cert,
interceptors=[AppIoTokenClientInterceptor(token=self._token)],
)
self._channel.subscribe(on_channel_state_change)
self._grpc_stub = ServerAppIoStub(self._channel)
Expand Down
32 changes: 31 additions & 1 deletion framework/py/flwr/server/grid/grpc_grid_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
get_all_nested_objects,
get_object_tree,
)
from flwr.supercore.interceptors import AppIoTokenClientInterceptor

from .grpc_grid import GrpcGrid

Expand All @@ -58,7 +59,7 @@ def setUp(self) -> None:
self.mock_run.fab_id = "mock/mock"
self.mock_run.fab_version = "v1.0.0"
self.mock_run.fab_hash = "9f86d08"
self.grid = GrpcGrid()
self.grid = GrpcGrid(token="test-token")
self.grid._grpc_stub = self.mock_stub # pylint: disable=protected-access
self.grid._channel = self.mock_channel # pylint: disable=protected-access
self.grid.set_run(self.mock_run)
Expand Down Expand Up @@ -260,6 +261,35 @@ def test_set_run_rejects_non_run_type(self) -> None:
with self.assertRaises(TypeError):
self.grid.set_run(61016) # type: ignore[arg-type]

@patch("flwr.server.grid.grpc_grid.wrap_stub")
@patch("flwr.server.grid.grpc_grid.ServerAppIoStub")
@patch("flwr.server.grid.grpc_grid.create_channel")
def test_connect_adds_client_interceptor_when_token_is_set(
self,
mock_create_channel: Mock,
_mock_serverappio_stub: Mock,
_mock_wrap_stub: Mock,
) -> None:
"""`_connect` should pass the token client interceptor to create_channel."""
mock_create_channel.return_value = Mock()
grid = GrpcGrid(token="test-token")

grid._connect() # pylint: disable=protected-access

kwargs = mock_create_channel.call_args.kwargs
interceptors = kwargs["interceptors"]
self.assertIsNotNone(interceptors)
assert interceptors is not None
self.assertEqual(len(interceptors), 1)
self.assertIsInstance(interceptors[0], AppIoTokenClientInterceptor)

def test_init_rejects_empty_token(
self,
) -> None:
"""`GrpcGrid` should reject empty token values."""
with self.assertRaises(ValueError):
GrpcGrid(token="")

def test_simple_retry_mechanism_get_nodes(self) -> None:
"""Test retry mechanism with the get_node_ids method."""
# Prepare
Expand Down
1 change: 1 addition & 0 deletions framework/py/flwr/server/serverapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def on_exit() -> None:
grid = GrpcGrid(
serverappio_service_address=serverappio_api_address,
root_certificates=certificates,
token=token,
)

# Pull ServerAppInputs from LinkState
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2026 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""ServerAppIo auth interceptor integration tests."""


import tempfile
import unittest

import grpc

from flwr.common import ConfigRecord
from flwr.common.constant import SERVERAPPIO_API_DEFAULT_SERVER_ADDRESS, Status
from flwr.common.typing import RunStatus
from flwr.proto.appio_pb2 import ( # pylint: disable=E0611
ListAppsToLaunchRequest,
ListAppsToLaunchResponse,
)
from flwr.proto.serverappio_pb2 import ( # pylint: disable=E0611
GetNodesRequest,
GetNodesResponse,
)
from flwr.server.superlink.linkstate.linkstate_factory import LinkStateFactory
from flwr.server.superlink.serverappio.serverappio_grpc import run_serverappio_api_grpc
from flwr.supercore.constant import FLWR_IN_MEMORY_DB_NAME, NOOP_FEDERATION, RunType
from flwr.supercore.ffs import FfsFactory
from flwr.supercore.interceptors import APP_TOKEN_HEADER, AUTHENTICATION_FAILED_MESSAGE
from flwr.supercore.object_store import ObjectStoreFactory
from flwr.superlink.federation import NoOpFederationManager


class TestServerAppIoAuthIntegration(unittest.TestCase):
"""Integration tests for ServerAppIo token-auth interceptor behavior."""

def setUp(self) -> None:
"""Start the ServerAppIo gRPC API without client-side auth helpers."""
self.temp_dir = tempfile.TemporaryDirectory() # pylint: disable=R1732
self.addCleanup(self.temp_dir.cleanup)

objectstore_factory = ObjectStoreFactory()
state_factory = LinkStateFactory(
FLWR_IN_MEMORY_DB_NAME, NoOpFederationManager(), objectstore_factory
)
ffs_factory = FfsFactory(self.temp_dir.name)

self.state = state_factory.state()
node_id = self.state.create_node("mock_owner", "fake_name", b"pk", 30)
self.state.acknowledge_node_heartbeat(node_id, 1e3)

self._server: grpc.Server = run_serverappio_api_grpc(
SERVERAPPIO_API_DEFAULT_SERVER_ADDRESS,
state_factory,
ffs_factory,
objectstore_factory,
None,
)

channel = grpc.insecure_channel("localhost:9091")
self._get_nodes = channel.unary_unary(
"/flwr.proto.ServerAppIo/GetNodes",
request_serializer=GetNodesRequest.SerializeToString,
response_deserializer=GetNodesResponse.FromString,
)
self._list_apps_to_launch = channel.unary_unary(
"/flwr.proto.ServerAppIo/ListAppsToLaunch",
request_serializer=ListAppsToLaunchRequest.SerializeToString,
response_deserializer=ListAppsToLaunchResponse.FromString,
)

def tearDown(self) -> None:
"""Stop the gRPC API server."""
self._server.stop(None)

def _create_running_run(self) -> int:
run_id = self.state.create_run(
"", "", "", {}, NOOP_FEDERATION, ConfigRecord(), "", RunType.SERVER_APP
)
_ = self.state.update_run_status(run_id, RunStatus(Status.STARTING, "", ""))
_ = self.state.update_run_status(run_id, RunStatus(Status.RUNNING, "", ""))
return run_id

def test_get_nodes_denied_without_metadata_token(self) -> None:
"""Protected RPC should deny requests missing metadata token."""
run_id = self._create_running_run()

with self.assertRaises(grpc.RpcError) as err:
self._get_nodes.with_call(request=GetNodesRequest(run_id=run_id))
assert err.exception.code() == grpc.StatusCode.UNAUTHENTICATED
assert err.exception.details() == AUTHENTICATION_FAILED_MESSAGE

def test_get_nodes_denied_with_invalid_metadata_token(self) -> None:
"""Protected RPC should deny requests with invalid metadata token."""
run_id = self._create_running_run()

with self.assertRaises(grpc.RpcError) as err:
self._get_nodes.with_call(
request=GetNodesRequest(run_id=run_id),
metadata=((APP_TOKEN_HEADER, "invalid-token"),),
)
assert err.exception.code() == grpc.StatusCode.UNAUTHENTICATED
assert err.exception.details() == AUTHENTICATION_FAILED_MESSAGE

def test_get_nodes_allows_with_valid_metadata_token(self) -> None:
"""Protected RPC should allow requests with a valid metadata token."""
run_id = self._create_running_run()
token = self.state.create_token(run_id)
assert token is not None

response, call = self._get_nodes.with_call(
request=GetNodesRequest(run_id=run_id),
metadata=((APP_TOKEN_HEADER, token),),
)

assert isinstance(response, GetNodesResponse)
assert call.code() == grpc.StatusCode.OK

def test_list_apps_to_launch_allows_without_metadata_token(self) -> None:
"""No-auth RPC should be callable without metadata token."""
response, call = self._list_apps_to_launch.with_call(
request=ListAppsToLaunchRequest()
)

assert isinstance(response, ListAppsToLaunchResponse)
assert call.code() == grpc.StatusCode.OK
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from flwr.server.superlink.linkstate import LinkStateFactory
from flwr.supercore.ffs import FfsFactory
from flwr.supercore.interceptors import create_serverappio_token_auth_server_interceptor
from flwr.supercore.object_store import ObjectStoreFactory

from .serverappio_servicer import ServerAppIoServicer
Expand All @@ -46,6 +47,9 @@ def run_serverappio_api_grpc(
ffs_factory=ffs_factory,
objectstore_factory=objectstore_factory,
)
auth_interceptor = create_serverappio_token_auth_server_interceptor(
state_provider=state_factory.state
)
serverappio_add_servicer_to_server_fn = add_ServerAppIoServicer_to_server
serverappio_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(
Expand All @@ -55,6 +59,7 @@ def run_serverappio_api_grpc(
server_address=address,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
certificates=certificates,
interceptors=[auth_interceptor],
)

address = serverappio_grpc_server.bound_address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def PushAppOutputs(
log(DEBUG, "ServerAppIoServicer.PushAppOutputs")

# Validate the token
run_id = self._verify_token(request.token, context)
_ = self._verify_token(request.token, context)

# Init state and store
state = self.state_factory.state()
Expand All @@ -373,9 +373,6 @@ def PushAppOutputs(
)

state.set_serverapp_context(request.run_id, context_from_proto(request.context))

# Remove the token
state.delete_token(run_id)
return PushAppOutputsResponse()

def UpdateRunStatus(
Expand All @@ -398,6 +395,8 @@ def UpdateRunStatus(

# If the run is finished, delete the run from ObjectStore
if request.run_status.status == Status.FINISHED:
# Remove the token once the run completes.
state.delete_token(request.run_id)
# Delete all objects related to the run
store.delete_objects_in_run(request.run_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# ==============================================================================
"""ServerAppIoServicer tests."""

# pylint: disable=too-many-lines


import tempfile
import unittest
Expand Down Expand Up @@ -86,6 +88,7 @@
get_object_tree,
iterate_object_tree,
)
from flwr.supercore.interceptors import APP_TOKEN_HEADER, AppIoTokenClientInterceptor
from flwr.supercore.object_store import ObjectStoreFactory
from flwr.superlink.federation import NoOpFederationManager

Expand Down Expand Up @@ -169,7 +172,24 @@ def setUp(self) -> None:
None,
)

self._channel = grpc.insecure_channel("localhost:9091")
# Provide a valid metadata token on the default test channel so existing
# servicer behavior tests continue to exercise business logic paths.
self._auth_run_id = self.state.create_run(
"", "", "", {}, NOOP_FEDERATION, ConfigRecord(), "", RunType.SERVER_APP
)
auth_token = self.state.create_token(self._auth_run_id)
assert auth_token is not None
self._auth_token = auth_token
_ = self.state.update_run_status(
self._auth_run_id, RunStatus(Status.STARTING, "", "")
)
_ = self.state.update_run_status(
self._auth_run_id, RunStatus(Status.RUNNING, "", "")
)
self._channel = grpc.intercept_channel(
grpc.insecure_channel("localhost:9091"),
AppIoTokenClientInterceptor(token=self._auth_token),
)
self._get_nodes = self._channel.unary_unary(
"/flwr.proto.ServerAppIo/GetNodes",
request_serializer=GetNodesRequest.SerializeToString,
Expand Down Expand Up @@ -530,10 +550,20 @@ def test_pull_message_from_expired_message_error(self) -> None:
with patch("datetime.datetime") as mock_dt:
mock_dt.now.return_value = future_dt # over TTL limit

token = self.state.create_token(run_id)
assert token is not None
request = PullAppMessagesRequest(message_ids=[str(msg_id)], run_id=run_id)
pull_messages_plain = grpc.insecure_channel("localhost:9091").unary_unary(
"/flwr.proto.ServerAppIo/PullMessages",
request_serializer=PullAppMessagesRequest.SerializeToString,
response_deserializer=PullAppMessagesResponse.FromString,
)

# Execute
response, call = self._pull_messages.with_call(request=request)
response, call = pull_messages_plain.with_call(
request=request,
metadata=((APP_TOKEN_HEADER, token),),
)

# Assert
assert isinstance(response, PullAppMessagesResponse)
Expand Down
Loading
Loading