-
Notifications
You must be signed in to change notification settings - Fork 0
Live data path #357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Live data path #357
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,11 +3,14 @@ | |
| import asyncio | ||
| import contextlib | ||
| import logging | ||
| import os | ||
| import typing | ||
| from pathlib import Path | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| PRODUCTION = os.environ.get("PRODUCTION", "False").lower() == "true" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably a rethink about if we need this as above. |
||
|
|
||
|
|
||
| def get_file_snapshot(directory: Path) -> dict[str, float]: | ||
| """Get a snapshot of all files in a directory with their modification times. | ||
|
|
@@ -108,7 +111,8 @@ def get_live_data_directory(instrument: str, ceph_dir: str) -> Path | None: | |
| :return: Path to live data directory, or None if it doesn't exist | ||
| """ | ||
| instrument_upper = instrument.upper() | ||
| live_data_path = Path(ceph_dir) / "GENERIC" / "livereduce" / instrument_upper | ||
| generic_dir = "GENERIC" if PRODUCTION else "GENERIC-staging" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we end up keeping this generic dir strategy that is determined from PRODUCTION env var, we should just define the path as the global variable instead of doing it here. We do that in the the live_data.py file (and other places) so let's maintain the same methodology. |
||
| live_data_path = Path(ceph_dir) / generic_dir / "livereduce" / instrument_upper | ||
|
|
||
| if not (live_data_path.exists() and live_data_path.is_dir()): | ||
| return None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| import importlib | ||
| from unittest import mock | ||
|
|
||
| import pytest | ||
|
|
||
| import plotting_service.routers.live_data as live_data_router | ||
| from plotting_service.services import live_data_service | ||
|
|
||
|
|
||
| def reload_live_data_modules(monkeypatch: pytest.MonkeyPatch, production: bool | None) -> tuple[object, object]: | ||
| if production is None: | ||
| monkeypatch.delenv("PRODUCTION", raising=False) | ||
| else: | ||
| monkeypatch.setenv("PRODUCTION", str(production).lower()) | ||
|
|
||
| reloaded_service = importlib.reload(live_data_service) | ||
| reloaded_router = importlib.reload(live_data_router) | ||
| return reloaded_service, reloaded_router | ||
|
|
||
|
|
||
| def test_get_live_data_directory_uses_staging_path_when_production_is_unset(tmp_path, monkeypatch): | ||
| """Use the staging live-data directory when the PRODUCTION flag is | ||
| unset.""" | ||
| live_data_service_module, _ = reload_live_data_modules(monkeypatch, production=None) | ||
| live_data_path = tmp_path / "GENERIC-staging" / "livereduce" / "LOQ" | ||
| live_data_path.mkdir(parents=True) | ||
|
|
||
| assert live_data_service_module.get_live_data_directory("loq", str(tmp_path)) == live_data_path | ||
|
|
||
|
|
||
| def test_get_live_data_directory_uses_generic_path_when_production_is_true(tmp_path, monkeypatch): | ||
| """Use the production live-data directory when the PRODUCTION flag is | ||
| true.""" | ||
| live_data_service_module, _ = reload_live_data_modules(monkeypatch, production=True) | ||
| live_data_path = tmp_path / "GENERIC" / "livereduce" / "LOQ" | ||
| live_data_path.mkdir(parents=True) | ||
|
|
||
| assert live_data_service_module.get_live_data_directory("loq", str(tmp_path)) == live_data_path | ||
|
|
||
|
|
||
| def test_get_live_data_directory_returns_none_when_selected_path_is_missing(tmp_path, monkeypatch): | ||
| """Return None when the environment-selected live-data directory is | ||
| absent.""" | ||
| live_data_service_module, _ = reload_live_data_modules(monkeypatch, production=False) | ||
| (tmp_path / "GENERIC" / "livereduce" / "LOQ").mkdir(parents=True) | ||
|
|
||
| assert live_data_service_module.get_live_data_directory("loq", str(tmp_path)) is None | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_get_live_data_files_validates_staging_base_path(tmp_path, monkeypatch): | ||
| """Validate listed files against the staging live-data base path.""" | ||
| _, live_data_router_module = reload_live_data_modules(monkeypatch, production=False) | ||
| live_data_path = tmp_path / "GENERIC-staging" / "livereduce" / "LOQ" | ||
| live_data_path.mkdir(parents=True) | ||
| (live_data_path / "second.txt").write_text("second") | ||
| (live_data_path / "first.txt").write_text("first") | ||
|
|
||
| with ( | ||
| mock.patch.object(live_data_router_module, "CEPH_DIR", str(tmp_path)), | ||
| mock.patch.object(live_data_router_module, "safe_check_filepath") as safe_check_filepath, | ||
| ): | ||
| files = await live_data_router_module.get_live_data_files("loq") | ||
|
|
||
| assert files == ["first.txt", "second.txt"] | ||
| safe_check_filepath.assert_called_once_with(live_data_path, str(tmp_path) + "/GENERIC-staging/livereduce") | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_live_data_validates_production_base_path(tmp_path, monkeypatch): | ||
| """Validate streamed live-data events against the production base path.""" | ||
| _, live_data_router_module = reload_live_data_modules(monkeypatch, production=True) | ||
| live_data_path = tmp_path / "GENERIC" / "livereduce" / "LOQ" | ||
| live_data_path.mkdir(parents=True) | ||
|
|
||
| with ( | ||
| mock.patch.object(live_data_router_module, "CEPH_DIR", str(tmp_path)), | ||
| mock.patch.object(live_data_router_module, "safe_check_filepath") as safe_check_filepath, | ||
| mock.patch.object( | ||
| live_data_router_module, | ||
| "generate_file_change_events", | ||
| return_value=iter([b"event: connected\ndata: {}\n\n"]), | ||
| ) as generate_file_change_events, | ||
| ): | ||
| response = await live_data_router_module.live_data("loq") | ||
|
|
||
| safe_check_filepath.assert_called_once_with(live_data_path, str(tmp_path) + "/GENERIC/livereduce") | ||
| generate_file_change_events.assert_called_once_with(live_data_path, str(tmp_path), "loq", 30, 2) | ||
| assert response.media_type == "text/event-stream" | ||
| assert response.headers["X-Accel-Buffering"] == "no" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably rename this, and rethink about the implementation so we can change the path from gitops.