Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3693f5f
test
Flosckow Aug 25, 2025
ea41b9f
drop
Flosckow Aug 26, 2025
c0f739f
Add first version, need set queue only in start methods
Flosckow Aug 27, 2025
5937362
remove debug content
Flosckow Aug 27, 2025
62b158a
remove debug file
Flosckow Aug 27, 2025
87e14a0
Try add settign for all params
Flosckow Sep 2, 2025
dc1900c
Try resolve all params(include build in factories)
Flosckow Sep 8, 2025
5f536e1
test
Flosckow Sep 11, 2025
d53228c
refactor
Flosckow Sep 12, 2025
975c83a
YA refactor
Flosckow Sep 12, 2025
ac39be8
Add checking settings
Flosckow Sep 15, 2025
0b47a40
Fix: tests
Flosckow Sep 17, 2025
51f6163
refactor code
Flosckow Sep 22, 2025
1dac0ca
refactor: remove useless logic
Lancetnik Oct 3, 2025
0a72d9a
lint: fix ruff
Lancetnik Oct 3, 2025
7d7d4b6
refactor: move resolve method to parent
Lancetnik Oct 3, 2025
6aa87fa
refactor: change public settings type to dict
Lancetnik Oct 3, 2025
e13eb05
refactor: change public settings type to mapping
Lancetnik Oct 3, 2025
c0bb0a6
refactor: remove dynamic settings resolution
Lancetnik Oct 3, 2025
29b4f36
refactor: revert changes
Lancetnik Oct 3, 2025
078541f
refactor: revert changes
Lancetnik Oct 3, 2025
3f8a4e4
chore: pull main
Lancetnik Oct 3, 2025
fa5be8c
chore: pull main
Lancetnik Oct 3, 2025
7a4cf45
feat: support deep nested settings
Lancetnik Oct 3, 2025
4ec26e1
feat: support mapping resolve
Lancetnik Oct 4, 2025
9933fa5
Need correct last tests
Flosckow Oct 17, 2025
02acbc8
Fix: tests
Flosckow Oct 22, 2025
425851a
Fix: remove unused code
Flosckow Oct 22, 2025
ae3eb41
Remove resolve settings in [pub,sub].start
Flosckow Oct 27, 2025
ed6bde0
some fixes
Flosckow Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions faststream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""A Python framework for building services interacting with Apache Kafka, RabbitMQ, NATS and Redis."""

from faststream._internal.configs.settings import Settings
from faststream._internal.testing.app import TestApp
from faststream._internal.utils import apply_types
from faststream.annotations import ContextRepo, Logger
Expand Down Expand Up @@ -27,6 +28,7 @@
"PublishCommand",
"PublishType",
"Response",
"Settings",
"SourceType",
"StreamMessage",
"TestApp",
Expand Down
11 changes: 10 additions & 1 deletion faststream/_internal/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,20 @@ def _setup_logger(self) -> None:

self.config.logger._setup(self.config.fd_config.context)

def setup_logger(self) -> None:
self._setup_logger()

def resolve_settings(self) -> None:
for sub in self.subscribers:
sub.resolve_settings()

for pub in self.publishers:
pub.resolve_settings()

async def connect(self) -> ConnectionType:
"""Connect to a remote server."""
if self._connection is None:
self._connection = await self._connect()
self._setup_logger()

return self._connection

Expand Down
3 changes: 3 additions & 0 deletions faststream/_internal/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .broker import BrokerConfig, BrokerConfigType, ConfigComposition
from .endpoint import PublisherUsecaseConfig, SubscriberUsecaseConfig
from .settings import Settings, make_settings_container
from .specification import (
PublisherSpecificationConfig,
SpecificationConfig as SubscriberSpecificationConfig,
Expand All @@ -11,6 +12,8 @@
"ConfigComposition",
"PublisherSpecificationConfig",
"PublisherUsecaseConfig",
"Settings",
"SubscriberSpecificationConfig",
"SubscriberUsecaseConfig",
"make_settings_container",
)
4 changes: 4 additions & 0 deletions faststream/_internal/configs/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from faststream._internal.logger import LoggerState
from faststream._internal.producer import ProducerProto, ProducerUnset

from .settings import SettingsContainer, make_settings_container

if TYPE_CHECKING:
from fast_depends.dependencies import Dependant

Expand All @@ -19,6 +21,8 @@ class BrokerConfig:
prefix: str = ""
include_in_schema: bool | None = True

settings: SettingsContainer = field(default_factory=make_settings_container)

broker_middlewares: Sequence["BrokerMiddleware[Any]"] = ()
broker_parser: Optional["CustomCallable"] = None
broker_decoder: Optional["CustomCallable"] = None
Expand Down
64 changes: 64 additions & 0 deletions faststream/_internal/configs/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from collections.abc import Mapping, MutableMapping
from dataclasses import dataclass
from typing import Any, Protocol


@dataclass(slots=True)
class Settings:
key: str


class SettingsContainer(Protocol):
def resolve(self, item: Any) -> Any:
pass


class RealSettingsContainer(SettingsContainer):
def __init__(self, settings: Mapping[str, Any]) -> None:
self._items = settings

def resolve(self, item: Any) -> Any:
if isinstance(item, Settings):
return self._items[item.key]
self._resolve_child(item)
return item

def _resolve_child(
self,
item: Any,
seen: set[Any] | None = None,
) -> None:
if seen is None:
seen = set()

if id(item) in seen:
return

seen.add(id(item))

if isinstance(item, MutableMapping):
for key, value in item.items():
if isinstance(value, Settings):
item[key] = self._items[value.key]
self._resolve_child(value, seen)

else:
for attr_name in dir(item):
if not attr_name.startswith("_"):
attr = getattr(item, attr_name)
if isinstance(attr, Settings):
setattr(item, attr_name, self._items[attr.key])
self._resolve_child(attr, seen)


class FakeSettingsContainer(SettingsContainer):
def resolve(self, item: Any) -> Any:
return item


def make_settings_container(
settings: Mapping[str, Any] | None = None,
) -> SettingsContainer:
if not settings:
return FakeSettingsContainer()
return RealSettingsContainer(settings)
9 changes: 8 additions & 1 deletion faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from collections.abc import Iterable, Sequence
from collections.abc import Iterable, Mapping, Sequence
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -16,6 +16,7 @@

from faststream.__about__ import SERVICE_NAME
from faststream._internal.broker import BrokerUsecase
from faststream._internal.configs import make_settings_container
from faststream._internal.constants import EMPTY
from faststream._internal.context.repository import ContextRepo
from faststream._internal.di import FastDependsConfig
Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(
# FastDepends args
apply_types: bool = True,
serializer: Optional["SerializerProto"] = EMPTY,
settings: Mapping[str, Any] | None = None,
provider: Optional["Provider"] = None,
context: Optional["ContextRepo"] = None,
) -> None:
Expand Down Expand Up @@ -141,6 +143,7 @@ def __init__(
log_level: Service messages log level.
apply_types: Whether to use FastDepends or not.
serializer: FastDepends-compatible serializer to validate incoming messages.
settings: Container for configuration publisher and subscriber.
provider: Provider for FastDepends.
context: Context for FastDepends.
"""
Expand Down Expand Up @@ -185,6 +188,7 @@ def __init__(
# Basic args
routers=routers,
config=RabbitBrokerConfig(
settings=make_settings_container(settings),
channel_manager=cm,
producer=producer,
declarer=declarer,
Expand Down Expand Up @@ -274,6 +278,9 @@ async def close(
async def start(self) -> None:
"""Connect broker to RabbitMQ and startup all subscribers."""
await self.connect()
# can merge it into one operation, something like br.initialize or br.initialize_pipe
self.resolve_settings()
self.setup_logger()
await self.declare_queue(RABBIT_REPLY)
await super().start()

Expand Down
37 changes: 19 additions & 18 deletions faststream/rabbit/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing_extensions import deprecated, override

from faststream._internal.broker.registrator import Registrator
from faststream._internal.configs import Settings
from faststream._internal.constants import EMPTY
from faststream.exceptions import SetupError
from faststream.middlewares import AckPolicy
Expand Down Expand Up @@ -38,11 +39,11 @@ class RabbitRegistrator(Registrator[IncomingMessage, RabbitBrokerConfig]):
@override
def subscriber( # type: ignore[override]
self,
queue: Union[str, "RabbitQueue"],
exchange: Union[str, "RabbitExchange", None] = None,
queue: Union[str, "RabbitQueue", Settings],
exchange: Union[str, "RabbitExchange", Settings, None] = None,
*,
channel: Optional["Channel"] = None,
consume_args: dict[str, Any] | None = None,
channel: Optional["Channel"] | Settings = None,
consume_args: dict[str, Any] | Settings | None = None,
no_ack: Annotated[
bool,
deprecated(
Expand Down Expand Up @@ -120,16 +121,16 @@ def subscriber( # type: ignore[override]
@override
def publisher( # type: ignore[override]
self,
queue: Union["RabbitQueue", str] = "",
exchange: Union["RabbitExchange", str, None] = None,
queue: Union["RabbitQueue", str, Settings] = "",
exchange: Union["RabbitExchange", str, Settings, None] = None,
*,
routing_key: str = "",
mandatory: bool = True,
immediate: bool = False,
routing_key: str | Settings = "",
mandatory: bool | Settings = True,
immediate: bool | Settings = False,
persist: bool | Settings = False,
reply_to: str | Settings | None = None,
priority: int | Settings | None = None,
timeout: "TimeoutType" = None,
persist: bool = False,
reply_to: str | None = None,
priority: int | None = None,
persistent: bool = True,
# specific
middlewares: Annotated[
Expand All @@ -145,12 +146,12 @@ def publisher( # type: ignore[override]
schema: Any | None = None,
include_in_schema: bool = True,
# message args
headers: Optional["HeadersType"] = None,
content_type: str | None = None,
content_encoding: str | None = None,
expiration: Optional["DateType"] = None,
message_type: str | None = None,
user_id: str | None = None,
headers: Optional["HeadersType"] | Settings = None,
content_type: str | Settings | None = None,
content_encoding: str | Settings | None = None,
expiration: Optional["DateType"] | Settings = None,
message_type: str | Settings | None = None,
user_id: str | Settings | None = None,
) -> "RabbitPublisher":
"""Creates long-living and AsyncAPI-documented publisher object.

Expand Down
12 changes: 7 additions & 5 deletions faststream/rabbit/publisher/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Union

from faststream._internal.configs.settings import Settings

from .config import RabbitPublisherConfig, RabbitPublisherSpecificationConfig
from .specification import RabbitPublisherSpecification
Expand All @@ -15,10 +17,10 @@

def create_publisher(
*,
routing_key: str,
queue: "RabbitQueue",
exchange: "RabbitExchange",
message_kwargs: "PublishKwargs",
routing_key: str | Settings,
queue: Union["RabbitQueue", Settings],
exchange: Union["RabbitExchange", Settings],
message_kwargs: Union["PublishKwargs", Settings],
# Broker args
config: "RabbitBrokerConfig",
# Publisher args
Expand Down
12 changes: 11 additions & 1 deletion faststream/rabbit/publisher/usecase.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from typing_extensions import Unpack, override

Expand Down Expand Up @@ -79,7 +79,17 @@ def routing(

return routing_key

def resolve_settings(self) -> None:
resolver: Callable[..., Any] = self._outer_config.settings.resolve
self.routing_key = resolver(self.routing_key)
self.queue = RabbitQueue.validate(resolver(self.queue))
self.exchange = RabbitExchange.validate(resolver(self.exchange))
self.headers = resolver(self.headers)
self.reply_to = resolver(self.reply_to)
self.timeout = resolver(self.timeout)

async def start(self) -> None:
"""Starts the consumer for the RabbitMQ queue."""
if self.exchange is not None:
await self._outer_config.declarer.declare_exchange(self.exchange)
return await super().start()
Expand Down
15 changes: 10 additions & 5 deletions faststream/rabbit/schemas/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ def add_prefix(self, prefix: str) -> "RabbitQueue":

return new_q

def set_routing(self):
re, routing_key = compile_path(
self.routing_key,
replace_symbol="*",
patch_regex=lambda x: x.replace(r"\#", ".+"),
)
self.path_regex = re
self.routing_key = routing_key

@overload
def __init__(
self,
Expand Down Expand Up @@ -164,11 +173,7 @@ def __init__(
:param bind_arguments: Queue-exchange binding options.
:param routing_key: Explicit binding routing key. Uses name if not present.
"""
re, routing_key = compile_path(
routing_key,
replace_symbol="*",
patch_regex=lambda x: x.replace(r"\#", ".+"),
)
re = None

if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM:
if durable is EMPTY:
Expand Down
11 changes: 6 additions & 5 deletions faststream/rabbit/subscriber/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import warnings
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, Union

from faststream._internal.configs.settings import Settings
from faststream._internal.constants import EMPTY
from faststream._internal.endpoint.subscriber.call_item import CallsCollection
from faststream.exceptions import SetupError
Expand All @@ -20,10 +21,10 @@

def create_subscriber(
*,
queue: "RabbitQueue",
exchange: "RabbitExchange",
consume_args: dict[str, Any] | None,
channel: Optional["Channel"],
queue: Union["RabbitQueue", Settings],
exchange: Union["RabbitExchange", Settings],
consume_args: dict[str, Any] | Settings | None,
channel: Optional["Channel"] | Settings,
# Subscriber args
no_reply: bool,
ack_policy: "AckPolicy",
Expand Down
Loading
Loading