Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions bedboss/bbuploader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
run_initial_qc,
)
from bedboss.utils import standardize_pep as pep_standardizer
from bedboss.bedstat.r_service import RServiceManager
from bedboss.bedstat.backends import StatBackend, build_backend
from bedboss._version import __version__

_LOGGER = logging.getLogger(PKG_NAME)
Expand Down Expand Up @@ -140,11 +140,11 @@ def upload_all(
)

if not lite:
_LOGGER.info("Initializing R service for statistics")
r_service = RServiceManager()
_LOGGER.info("Initializing stats backend")
stat_backend = build_backend(bbagent.config.config.analysis.backend)
else:
_LOGGER.info("Lite mode: R service disabled")
r_service = None
_LOGGER.info("Lite mode: stats backend disabled")
stat_backend = None

for gse_pep in pep_annotation_list.results:
count += 1
Expand Down Expand Up @@ -205,7 +205,7 @@ def upload_all(
overwrite=overwrite,
overwrite_bedset=overwrite_bedset,
lite=lite,
r_service=r_service,
backend=stat_backend,
pm=pm,
)
except Exception as err:
Expand All @@ -224,6 +224,8 @@ def upload_all(
break

pm.stop_pipeline()
if stat_backend is not None:
stat_backend.cleanup()

return None

Expand Down Expand Up @@ -532,7 +534,7 @@ def _upload_gse(
preload: bool = True,
lite=False,
max_file_size: int = 20 * 1000000,
r_service: RServiceManager = None,
backend: StatBackend = None,
pm: pypiper.PipelineManager = None,
) -> ProjectProcessingStatus:
"""
Expand All @@ -557,7 +559,9 @@ def _upload_gse(
:param lite: lite mode, where skipping statistic processing for memory optimization and time saving
:param max_file_size: maximum file size in bytes. Default: 20MB
:param pypiper.PipelineManager pm: pypiper object
:param r_service: RServiceManager object
:param StatBackend backend: pre-built stats backend (reused across all samples
in the GSE). When None, one is built locally — use a pre-built backend
when processing many GSEs so persistent resources are amortized.
:return: None
"""
if isinstance(bedbase_config, str):
Expand Down Expand Up @@ -607,12 +611,12 @@ def _upload_gse(
else:
stop_pipeline = False

if not lite and not r_service:
r_service = RServiceManager()
owns_backend = False
if not lite and backend is None:
backend = build_backend(bedbase_config.config.config.analysis.backend)
owns_backend = True
elif lite:
r_service = None
else:
r_service = r_service
backend = None

for counter, project_sample in enumerate(project.samples):
_LOGGER.info(f">> Processing {counter+1} / {total_sample_number}")
Expand Down Expand Up @@ -757,7 +761,7 @@ def _upload_gse(
force_overwrite=overwrite,
lite=lite,
pm=pm,
r_service=r_service,
backend=backend,
reference_genome_validator=reference_validator,
)
_LOGGER.info(
Expand Down Expand Up @@ -852,6 +856,9 @@ def _upload_gse(
if stop_pipeline:
pm.stop_pipeline()

if owns_backend and backend is not None:
backend.cleanup()

_LOGGER.info(
f"Processing of '{gse_id}' completed: "
f"{project_status.number_of_processed}/{project_status.number_of_samples} processed, "
Expand Down
67 changes: 44 additions & 23 deletions bedboss/bedboss.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
run_initial_qc,
)
from bedboss.utils import standardize_pep as pep_standardizer
from bedboss.bedstat.r_service import RServiceManager
from bedboss.bedstat.backends import StatBackend, build_backend

_LOGGER = logging.getLogger(PKG_NAME)

Expand Down Expand Up @@ -90,7 +90,7 @@ def run_all(
universe_method: str = None,
universe_bedset: str = None,
pm: pypiper.PipelineManager = None,
r_service: RServiceManager = None,
backend: StatBackend = None,
reference_genome_validator: ReferenceValidator = None,
) -> str:
"""
Expand Down Expand Up @@ -125,7 +125,11 @@ def run_all(
:param str universe_method: method used to create the universe [Default: None]
:param str universe_bedset: bedset identifier for the universe [Default: None]
:param pypiper.PipelineManager pm: pypiper object
:param RServiceManager r_service: RServiceManager object that will run R services
:param StatBackend backend: pre-built statistics backend instance (reused across
calls by batch orchestrators). When None, one is built from the bbconf
config's ``analysis.backend`` value and cleaned up before return — use
that path for single-file invocations; batch callers should build and
reuse one via :func:`bedboss.bedstat.backends.build_backend`.
:param reference_genome_validator: ReferenceValidator object that will validate reference genome compatibility
:return str bed_digest: bed digest
"""
Expand Down Expand Up @@ -192,18 +196,27 @@ def run_all(
statistics_dict = {}
statistics_dict["number_of_regions"] = len(bed_metadata.bed_object)
else:
statistics_dict = bedstat(
bedfile=bed_metadata.bed_file,
outfolder=outfolder,
genome=genome,
ensdb=ensdb,
bed_digest=bed_metadata.bed_digest,
open_signal_matrix=open_signal_matrix,
just_db_commit=just_db_commit,
rfg_config=rfg_config,
pm=pm,
r_service=r_service,
)
# Build a backend for this call if none was passed in (single-file use).
# Batch orchestrators pass a pre-built backend to amortize setup costs.
owns_backend = backend is None
if owns_backend:
backend = build_backend(bbagent.config.config.analysis.backend)
try:
statistics_dict = bedstat(
bedfile=bed_metadata.bed_file,
outfolder=outfolder,
genome=genome,
ensdb=ensdb,
bed_digest=bed_metadata.bed_digest,
open_signal_matrix=open_signal_matrix,
just_db_commit=just_db_commit,
rfg_config=rfg_config,
pm=pm,
backend=backend,
)
finally:
if owns_backend:
backend.cleanup()

if "mean_region_width" not in statistics_dict:
statistics_dict["mean_region_width"] = (
Expand Down Expand Up @@ -418,10 +431,12 @@ def insert_pep(
if rerun:
skipper.reinitialize()

if not lite:
r_service = RServiceManager()
else:
r_service = None
# Build the stats backend once for the whole batch. The backend holds
# per-backend resources (persistent R service, gtars reference caches,
# etc.) that should be reused across files.
stat_backend = (
build_backend(bbagent.config.config.analysis.backend) if not lite else None
)

for i, pep_sample in enumerate(pep.samples):
is_processed = skipper.is_processed(pep_sample.sample_name)
Expand Down Expand Up @@ -475,7 +490,7 @@ def insert_pep(
universe_bedset=pep_sample.get("universe_bedset"),
lite=lite,
pm=pm,
r_service=r_service,
backend=stat_backend,
)

processed_ids.append(bed_id)
Expand All @@ -486,6 +501,9 @@ def insert_pep(
failed_samples.append(pep_sample.sample_name)
skipper.add_failed(pep_sample.sample_name, f"{e}")

if stat_backend is not None:
stat_backend.cleanup()

if create_bedset:
_LOGGER.info(f"Creating bedset from {pep.name}")
run_bedbuncher(
Expand Down Expand Up @@ -554,15 +572,16 @@ def reprocess_all(
else:
stop_pipeline = False

r_service = RServiceManager()

if isinstance(bedbase_config, str):
bbagent = BedBaseAgent(config=bedbase_config)
elif isinstance(bedbase_config, bbconf.BedBaseAgent):
bbagent = bedbase_config
else:
raise BedBossException("Incorrect bedbase_config type. Exiting...")

# Build the stats backend once for the reprocess batch.
stat_backend = build_backend(bbagent.config.config.analysis.backend)

unprocessed_beds = bbagent.bed.get_unprocessed(
limit=limit, genome=["hg38", "hg19", "mm10"]
)
Expand Down Expand Up @@ -601,7 +620,7 @@ def reprocess_all(
universe_method=None,
universe_bedset=None,
pm=pm,
r_service=r_service,
backend=stat_backend,
)
except Exception as e:
_LOGGER.error(f"Failed to process {bed_annot.name}. See {e}")
Expand All @@ -615,6 +634,8 @@ def reprocess_all(
}
)

stat_backend.cleanup()

if failed_samples:
date_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
with open(
Expand Down
45 changes: 45 additions & 0 deletions bedboss/bedstat/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from bedboss.bedstat.backends.base import StatBackend
from bedboss.bedstat.backends.r_backend import RStatBackend
from bedboss.const import BACKEND_GTARS, BACKEND_R

__all__ = ["StatBackend", "RStatBackend", "create_backend", "build_backend"]


def create_backend(name: str, **kwargs) -> StatBackend:
"""Create a statistics computation backend by name.

Low-level factory: pass backend-specific kwargs directly. Most callers
should use :func:`build_backend` instead, which handles backend-specific
prerequisites (e.g. starting an RServiceManager for the R backend).

:param name: Backend name (BACKEND_R or BACKEND_GTARS)
:param kwargs: Backend-specific keyword arguments
:return: StatBackend instance
"""
if name == BACKEND_R:
return RStatBackend(**kwargs)
elif name == BACKEND_GTARS:
raise NotImplementedError("gtars backend not yet available. Install via PR 2a.")
else:
raise ValueError(f"Unknown analysis backend: {name!r}. Use 'r' or 'gtars'.")


def build_backend(name: str) -> StatBackend:
"""Build a backend with any backend-specific prerequisites attached.

High-level constructor used by batch orchestrators. Handles the lifetime
of backend-internal resources (e.g. starts an RServiceManager for the R
backend so the single R process is reused across all files in a batch).

Callers are responsible for calling :meth:`StatBackend.cleanup` when
done to release backend-held resources. See `StatBackend` as a context
manager for automatic cleanup.

:param name: Backend name (BACKEND_R or BACKEND_GTARS)
:return: StatBackend instance ready for batch processing
"""
if name == BACKEND_R:
from bedboss.bedstat.r_service import RServiceManager

return create_backend(name, r_service=RServiceManager())
return create_backend(name)
40 changes: 40 additions & 0 deletions bedboss/bedstat/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Union

import pypiper


class StatBackend(ABC):
"""Interface for BED file statistics computation backends."""

@abstractmethod
def compute(
self,
bedfile: str,
genome: str,
outfolder: str,
bed_digest: str = None,
ensdb: str = None,
open_signal_matrix: str = None,
just_db_commit: bool = False,
rfg_config: Union[str, Path] = None,
pm: pypiper.PipelineManager = None,
) -> dict:
"""Compute statistics for a single BED file.

Returns a dict with at minimum scalar keys matching BedStatsModel
field names. May also include plot dicts and/or a 'distributions' key.
"""
...

def cleanup(self):
"""Release any resources held by this backend."""
pass

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
return False
Loading