Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into 782-add-rbac-to-workf…
Browse files Browse the repository at this point in the history
…lows
  • Loading branch information
Sparrow1029 committed Mar 5, 2025
2 parents 43aeacb + 1a5a433 commit d3c3a68
Show file tree
Hide file tree
Showing 23 changed files with 402 additions and 90 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 3.0.0rc1
current_version = 3.1.1
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(rc(?P<build>\d+))?
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/run-codspeed-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
name: Run benchmarks
runs-on: ubuntu-latest
container:
image: python:3.11
image: python:3.13
options: --privileged
services:
postgres:
Expand Down Expand Up @@ -61,7 +61,7 @@ jobs:

- uses: CodSpeedHQ/action@v3
with:
run: CACHE_URI=redis://redis DATABASE_URI=postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST/$POSTGRES_DB pytest test/unit_tests --codspeed
run: CACHE_URI=redis://redis DATABASE_URI=postgresql+psycopg://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST/$POSTGRES_DB pytest test/unit_tests --codspeed
token: ${{ secrets.CODSPEED_TOKEN }}
env:
POSTGRES_DB: orchestrator-core-test
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
env:
FLIT_ROOT_INSTALL: 1
- name: Run Unit tests
run: CACHE_URI=redis://redis DATABASE_URI=postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST/$POSTGRES_DB pytest --cov-branch --cov=orchestrator --cov-report=xml --ignore=test --ignore=orchestrator/devtools --ignore=examples --ignore=docs --ignore=orchestrator/vendor
run: CACHE_URI=redis://redis DATABASE_URI=postgresql+psycopg://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST/$POSTGRES_DB pytest --cov-branch --cov=orchestrator --cov-report=xml --ignore=test --ignore=orchestrator/devtools --ignore=examples --ignore=docs --ignore=orchestrator/vendor
env:
POSTGRES_DB: orchestrator-core-test
POSTGRES_USER: nwa
Expand Down
4 changes: 2 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ nav:
- Callbacks: reference-docs/workflows/callbacks.md
- Websockets: reference-docs/websockets.md
- Migration guides:
- 2.0: migration-guide/2.0.md
- 3.0: migration-guide/3.0.md
- 2.x: migration-guide/2.0.md
- 3.x: migration-guide/3.0.md

- Workshops:
# - Beginner:
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

"""This is the orchestrator workflow engine."""

__version__ = "3.0.0rc1"
__version__ = "3.1.1"

from orchestrator.app import OrchestratorCore
from orchestrator.settings import app_settings
Expand Down
3 changes: 2 additions & 1 deletion orchestrator/api/api_v1/endpoints/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from orchestrator.settings import ExecutorType, app_settings
from orchestrator.utils.json import json_dumps
from orchestrator.utils.redis import delete_keys_matching_pattern
from orchestrator.utils.redis_client import create_redis_asyncio_client
from orchestrator.websocket import WS_CHANNELS, broadcast_invalidate_cache, websocket_manager

router = APIRouter()
Expand All @@ -41,7 +42,7 @@

@router.delete("/cache/{name}")
async def clear_cache(name: str) -> int | None:
cache: AIORedis = AIORedis.from_url(str(app_settings.CACHE_URI))
cache: AIORedis = create_redis_asyncio_client(app_settings.CACHE_URI)
if name not in CACHE_FLUSH_OPTIONS:
raise_status(HTTPStatus.BAD_REQUEST, "Invalid cache name")

Expand Down
7 changes: 5 additions & 2 deletions orchestrator/cli/generator/generator/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
sort_product_blocks_by_dependencies,
)
from orchestrator.cli.generator.generator.settings import product_generator_settings as settings
from orchestrator.settings import convert_database_uri

logger = structlog.getLogger(__name__)


def create_migration_file(message: str, head: str) -> Path | None:
if not environ.get("DATABASE_URI"):
environ.update({"DATABASE_URI": "postgresql://nwa:nwa@localhost/orchestrator-core"})
if environ.get("DATABASE_URI"):
environ.update({"DATABASE_URI": convert_database_uri(environ["DATABASE_URI"])})
else:
environ.update({"DATABASE_URI": "postgresql+psycopg://nwa:nwa@localhost/orchestrator-core"})
if not environ.get("PYTHONPATH"):
environ.update({"PYTHONPATH": "."})
logger.info(
Expand Down
5 changes: 3 additions & 2 deletions orchestrator/distlock/managers/redis_distlock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from structlog import get_logger

from orchestrator.settings import app_settings
from orchestrator.utils.redis_client import create_redis_asyncio_client, create_redis_client

logger = get_logger(__name__)

Expand All @@ -37,7 +38,7 @@ def __init__(self, redis_address: RedisDsn):
self.redis_address = redis_address

async def connect_redis(self) -> None:
self.redis_conn = AIORedis.from_url(str(self.redis_address))
self.redis_conn = create_redis_asyncio_client(self.redis_address)

async def disconnect_redis(self) -> None:
if self.redis_conn:
Expand Down Expand Up @@ -78,7 +79,7 @@ async def release_lock(self, lock: Lock) -> None:
def release_sync(self, lock: Lock) -> None:
redis_conn: Redis | None = None
try:
redis_conn = Redis.from_url(str(app_settings.CACHE_URI))
redis_conn = create_redis_client(app_settings.CACHE_URI)
sync_lock: SyncLock = SyncLock(
redis=redis_conn,
name=lock.name, # type: ignore
Expand Down
3 changes: 2 additions & 1 deletion orchestrator/graphql/resolvers/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from orchestrator.services.settings import get_engine_settings, get_engine_settings_for_update, post_update_to_slack
from orchestrator.settings import ExecutorType, app_settings
from orchestrator.utils.redis import delete_keys_matching_pattern
from orchestrator.utils.redis_client import create_redis_asyncio_client

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -57,7 +58,7 @@ def resolve_settings(info: OrchestratorInfo) -> StatusType:

# Mutations
async def clear_cache(info: OrchestratorInfo, name: str) -> CacheClearSuccess | Error:
cache: AIORedis = AIORedis.from_url(str(app_settings.CACHE_URI))
cache: AIORedis = create_redis_asyncio_client(app_settings.CACHE_URI)
if name not in CACHE_FLUSH_OPTIONS:
return Error(message="Invalid cache name")

Expand Down
21 changes: 19 additions & 2 deletions orchestrator/graphql/schemas/product.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import TYPE_CHECKING, Annotated
from typing import TYPE_CHECKING, Annotated, Iterable

import strawberry
from strawberry import UNSET
from strawberry.federation.schema_directives import Key

from oauth2_lib.strawberry import authenticated_field
from orchestrator.db import ProductTable
from orchestrator.db import ProductBlockTable, ProductTable
from orchestrator.domain.base import ProductModel
from orchestrator.graphql.pagination import Connection
from orchestrator.graphql.schemas.fixed_input import FixedInput
Expand Down Expand Up @@ -51,6 +51,23 @@ async def subscriptions(
filter_by_with_related_subscriptions = (filter_by or []) + [GraphqlFilter(field="product", value=self.name)]
return await resolve_subscriptions(info, filter_by_with_related_subscriptions, sort_by, first, after)

@strawberry.field(description="Returns list of all nested productblock names") # type: ignore
async def all_pb_names(self) -> list[str]:

model = get_original_model(self, ProductTable)

def get_all_pb_names(product_blocks: list[ProductBlockTable]) -> Iterable[str]:
for product_block in product_blocks:
yield product_block.name

if product_block.depends_on:
yield from get_all_pb_names(product_block.depends_on)

names: list[str] = list(get_all_pb_names(model.product_blocks))
names.sort()

return names

@strawberry.field(description="Return product blocks") # type: ignore
async def product_blocks(self) -> list[Annotated["ProductBlock", strawberry.lazy(".product_block")]]:
from orchestrator.graphql.schemas.product_block import ProductBlock
Expand Down
10 changes: 5 additions & 5 deletions orchestrator/migrations/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,10 +880,10 @@ def delete_product(conn: sa.engine.Connection, name: str) -> None:
RETURNING product_id
),
deleted_p_pb AS (
DELETE FROM product_product_blocks WHERE product_id IN (SELECT product_id FROM deleted_p)
DELETE FROM product_product_blocks WHERE product_id = ANY(SELECT product_id FROM deleted_p)
),
deleted_pb_rt AS (
DELETE FROM products_workflows WHERE product_id IN (SELECT product_id FROM deleted_p)
DELETE FROM products_workflows WHERE product_id = ANY(SELECT product_id FROM deleted_p)
)
SELECT * from deleted_p;
"""
Expand Down Expand Up @@ -911,10 +911,10 @@ def delete_product_block(conn: sa.engine.Connection, name: str) -> None:
RETURNING product_block_id
),
deleted_p_pb AS (
DELETE FROM product_product_blocks WHERE product_block_id IN (SELECT product_block_id FROM deleted_pb)
DELETE FROM product_product_blocks WHERE product_block_id =ANY(SELECT product_block_id FROM deleted_pb)
),
deleted_pb_rt AS (
DELETE FROM product_block_resource_types WHERE product_block_id IN (SELECT product_block_id FROM deleted_pb)
DELETE FROM product_block_resource_types WHERE product_block_id =ANY(SELECT product_block_id FROM deleted_pb)
)
SELECT * from deleted_pb;
"""
Expand Down Expand Up @@ -968,7 +968,7 @@ def delete_resource_type(conn: sa.engine.Connection, resource_type: str) -> None
RETURNING resource_type_id
),
deleted_pb_rt AS (
DELETE FROM product_block_resource_types WHERE resource_type_id IN (SELECT resource_type_id FROM deleted_pb)
DELETE FROM product_block_resource_types WHERE resource_type_id =ANY(SELECT resource_type_id FROM deleted_pb)
)
SELECT * from deleted_pb;
"""
Expand Down
28 changes: 26 additions & 2 deletions orchestrator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@

import secrets
import string
import warnings
from pathlib import Path
from typing import Literal

from pydantic import PostgresDsn, RedisDsn
from pydantic import Field, NonNegativeInt, PostgresDsn, RedisDsn
from pydantic_settings import BaseSettings

from oauth2_lib.settings import oauth2lib_settings
from pydantic_forms.types import strEnum


class OrchestratorDeprecationWarning(DeprecationWarning):
pass


class ExecutorType(strEnum):
WORKER = "celery"
THREADPOOL = "threadpool"
Expand All @@ -49,14 +54,17 @@ class AppSettings(BaseSettings):
EXECUTOR: str = ExecutorType.THREADPOOL
WORKFLOWS_SWAGGER_HOST: str = "localhost"
WORKFLOWS_GUI_URI: str = "http://localhost:3000"
DATABASE_URI: PostgresDsn = "postgresql://nwa:nwa@localhost/orchestrator-core" # type: ignore
DATABASE_URI: PostgresDsn = "postgresql+psycopg://nwa:nwa@localhost/orchestrator-core" # type: ignore
MAX_WORKERS: int = 5
MAIL_SERVER: str = "localhost"
MAIL_PORT: int = 25
MAIL_STARTTLS: bool = False
CACHE_URI: RedisDsn = "redis://localhost:6379/0" # type: ignore
CACHE_DOMAIN_MODELS: bool = False
CACHE_HMAC_SECRET: str | None = None # HMAC signing key, used when pickling results in the cache
REDIS_RETRY_COUNT: NonNegativeInt = Field(
2, description="Number of retries for redis connection errors/timeouts, 0 to disable"
) # More info: https://redis-py.readthedocs.io/en/stable/retry.html
ENABLE_DISTLOCK_MANAGER: bool = True
DISTLOCK_BACKEND: str = "memory"
CC_NOC: int = 0
Expand Down Expand Up @@ -85,6 +93,22 @@ class AppSettings(BaseSettings):
VALIDATE_OUT_OF_SYNC_SUBSCRIPTIONS: bool = False
FILTER_BY_MODE: Literal["partial", "exact"] = "exact"

def __init__(self) -> None:
super(AppSettings, self).__init__()
self.DATABASE_URI = PostgresDsn(convert_database_uri(str(self.DATABASE_URI)))


def convert_database_uri(db_uri: str) -> str:
if db_uri.startswith(("postgresql://", "postgresql+psycopg2://")):
db_uri = "postgresql+psycopg" + db_uri[db_uri.find("://") :]
warnings.filterwarnings("always", category=OrchestratorDeprecationWarning)
warnings.warn(
"DATABASE_URI converted to postgresql+psycopg:// format, please update your enviroment variable",
OrchestratorDeprecationWarning,
stacklevel=2,
)
return db_uri


app_settings = AppSettings()

Expand Down
17 changes: 6 additions & 11 deletions orchestrator/utils/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@
from typing import Any, Callable
from uuid import UUID

import redis.exceptions
from anyio import CancelScope, get_cancelled_exc_class
from redis import Redis
from redis.asyncio import Redis as AIORedis
from redis.asyncio.client import Pipeline, PubSub
from redis.asyncio.retry import Retry
from redis.backoff import EqualJitterBackoff
from structlog import get_logger

from orchestrator.services.subscriptions import _generate_etag
from orchestrator.settings import app_settings
from orchestrator.utils.json import PY_JSON_TYPES, json_dumps, json_loads
from orchestrator.utils.redis_client import (
create_redis_asyncio_client,
create_redis_client,
)

logger = get_logger(__name__)

cache = Redis.from_url(str(app_settings.CACHE_URI))
cache = create_redis_client(app_settings.CACHE_URI)

ONE_WEEK = 3600 * 24 * 7

Expand Down Expand Up @@ -136,12 +136,7 @@ class RedisBroadcast:
client: AIORedis

def __init__(self, redis_url: str):
self.client = AIORedis.from_url(
redis_url,
retry_on_error=[redis.exceptions.ConnectionError],
retry_on_timeout=True,
retry=Retry(EqualJitterBackoff(base=0.05), 2),
)
self.client = create_redis_asyncio_client(redis_url)
self.redis_url = redis_url

@asynccontextmanager
Expand Down
35 changes: 35 additions & 0 deletions orchestrator/utils/redis_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import redis.asyncio
import redis.client
import redis.exceptions
from pydantic import RedisDsn
from redis import Redis
from redis.asyncio import Redis as AIORedis
from redis.asyncio.retry import Retry as AIORetry
from redis.backoff import EqualJitterBackoff
from redis.retry import Retry

from orchestrator.settings import app_settings

REDIS_RETRY_ON_ERROR = [redis.exceptions.ConnectionError]
REDIS_RETRY_ON_TIMEOUT = True
REDIS_RETRY_BACKOFF = EqualJitterBackoff(base=0.05)


def create_redis_client(redis_url: str | RedisDsn) -> redis.client.Redis:
"""Create sync Redis client for the given Redis DSN with retry handling for connection errors and timeouts."""
return Redis.from_url(
str(redis_url),
retry_on_error=REDIS_RETRY_ON_ERROR, # type: ignore[arg-type]
retry_on_timeout=REDIS_RETRY_ON_TIMEOUT,
retry=Retry(REDIS_RETRY_BACKOFF, app_settings.REDIS_RETRY_COUNT),
)


def create_redis_asyncio_client(redis_url: str | RedisDsn) -> redis.asyncio.client.Redis:
"""Create async Redis client for the given Redis DSN with retry handling for connection errors and timeouts."""
return AIORedis.from_url(
str(redis_url),
retry_on_error=REDIS_RETRY_ON_ERROR, # type: ignore[arg-type]
retry_on_timeout=REDIS_RETRY_ON_TIMEOUT,
retry=AIORetry(REDIS_RETRY_BACKOFF, app_settings.REDIS_RETRY_COUNT),
)
14 changes: 7 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ dependencies = [
"itsdangerous",
"Jinja2==3.1.5",
"orjson==3.10.15",
"psycopg2-binary==2.9.10",
"pydantic[email]~=2.8.2",
"pydantic-settings~=2.7.1",
"psycopg[binary]==3.2.5",
"pydantic[email]~=2.10.6",
"pydantic-settings~=2.8.0",
"python-dateutil==2.8.2",
"python-rapidjson>=1.18,<1.21",
"pytz==2025.1",
Expand All @@ -59,13 +59,13 @@ dependencies = [
"SQLAlchemy==2.0.38",
"SQLAlchemy-Utils==0.41.2",
"structlog",
"typer==0.15.1",
"uvicorn[standard]~=0.32.0",
"typer==0.15.2",
"uvicorn[standard]~=0.34.0",
"nwa-stdlib~=1.9.0",
"oauth2-lib~=2.4.0",
"tabulate==0.9.0",
"strawberry-graphql>=0.246.2",
"pydantic-forms~=1.3.0",
"pydantic-forms~=1.4.0",
]

description-file = "README.md"
Expand All @@ -89,7 +89,7 @@ test = [
"jsonref",
"mypy==1.9",
"pyinstrument",
"pytest==8.3.4",
"pytest==8.3.5",
"pytest-asyncio==0.21.2",
"pytest-codspeed",
"pytest-cov",
Expand Down
Loading

0 comments on commit d3c3a68

Please sign in to comment.