Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create staggered hierarchy for CellArr dataset for flexibility for power users #81

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 192 additions & 80 deletions src/cellarr/CellArrDataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
print(result1)
"""

from collections.abc import Mapping
from functools import lru_cache
from typing import List, Optional, Sequence, Union

Expand Down Expand Up @@ -93,99 +94,56 @@ def __next__(self):
raise StopIteration


class CellArrDataset:
"""A class that represent a collection of cells and their associated metadata in a TileDB backed store."""

class _CellArrDatasetBase:
"""
Base class for CellArr dataset. This does not manage tiledb arrays and will not close them on __del__.
Can we even abstract away tiledb.Array here so we can support np.ndarray directly? I expect that
we use too much of the tiledb API for that but we may want to explore that option.

This is a nice-to-have for creating CellArr datasets that combine data that are not within the same prefix,
e.g. when running a pipeline that modifies the data matrices but the metadata never changes. This even allows for
having metadata in memory and cell data streamed from disk. Note that this is a power user interface and we should
not provide support, i.e. power users must know what they are doing and operate at their own risk (hence the leading underscore).
"""

def __init__(
self,
dataset_path: str,
assay_tiledb_group: str = "assays",
assay_uri: Union[str, List[str]] = "counts",
gene_annotation_uri: str = "gene_annotation",
cell_metadata_uri: str = "cell_metadata",
sample_metadata_uri: str = "sample_metadata",
config_or_context: Optional[Union[tiledb.Config, tiledb.Ctx]] = None,
assays: Union[tiledb.Array, Sequence[tiledb.Array], dict[str, tiledb.Array]],
gene_annotations: tiledb.Array,
cell_metadata: tiledb.Array,
sample_metadata: tiledb.Array,
):
"""Initialize a ``CellArrDataset``.

Args:
dataset_path:
Path to the directory containing the TileDB stores.
Usually the ``output_path`` from the
:py:func:`~cellarr.build_cellarrdataset.build_cellarrdataset`.

You may provide any tiledb compatible base path (e.g. local
directory, S3, minio etc.).

assay_tiledb_group:
TileDB group containing the assay matrices.

If the provided build process was used, the matrices are stored
in the "assay" TileDB group.

May be an empty string or `None` to specify no group. This is
mostly for backwards compatibility of cellarr builds for versions
before 0.3.

assay_uri:
Relative path to matrix store.
Must be in tiledb group specified by ``assay_tiledb_group``.

gene_annotation_uri:
Relative path to gene annotation store.

cell_metadata_uri:
Relative path to cell metadata store.

sample_metadata_uri:
Relative path to sample metadata store.

config_or_context:
Custom TileDB configuration or context.
If None, default TileDB Config will be used.
"""
if config_or_context is None:
config_or_context = tiledb.Config()

if isinstance(config_or_context, tiledb.Config):
ctx = tiledb.Ctx(config_or_context)
elif isinstance(config_or_context, tiledb.Ctx):
ctx = config_or_context
if isinstance(assays, tiledb.Array):
assays = [tiledb.Array]
if isinstance(assays, Mapping):
self._matrix_tdb = dict(assays)
else:
raise Exception("'config_or_context' must be either TileDB config or a context object.")

self._dataset_path = dataset_path

if isinstance(assay_uri, str):
assay_uri = [assay_uri]
# TODO: Maybe switch to on-demand loading of these objects
self._matrix_tdb = {}
_asy_path = dataset_path
if assay_tiledb_group is not None and len(assay_tiledb_group) > 0:
_asy_path = f"{dataset_path}/{assay_tiledb_group}"
for mtdb in assay_uri:
self._matrix_tdb[mtdb] = tiledb.open(f"{_asy_path}/{mtdb}", "r", ctx=ctx)
self._gene_annotation_tdb = tiledb.open(f"{dataset_path}/{gene_annotation_uri}", "r", ctx=ctx)
self._cell_metadata_tdb = tiledb.open(f"{dataset_path}/{cell_metadata_uri}", "r", ctx=ctx)
self._sample_metadata_tdb = tiledb.open(f"{dataset_path}/{sample_metadata_uri}", "r", ctx=ctx)
self._matrix_tdb = {assay.uri.split("/")[-1]: assay for assay in assays}
self._gene_annotation_tdb = gene_annotations
self._cell_metadata_tdb = cell_metadata
self._sample_metadata_tdb = sample_metadata

self._validate()

def _validate(self):
num_cells = self._cell_metadata_tdb.nonempty_domain()[0][1]
num_rows = self._gene_annotation_tdb.nonempty_domain()[0][1]

for mname, muri in self._matrix_tdb.items():
dom = muri.nonempty_domain()
for mname, marray in self._matrix_tdb.items():
self._validate_read_only(marray, mname)
dom = marray.nonempty_domain()
if dom[0][1] != num_cells or dom[1][1] != num_rows:
raise RuntimeError(f"Matrix {mname} has incorrect dimensions")

def __del__(self):
self._gene_annotation_tdb.close()
self._cell_metadata_tdb.close()
self._sample_metadata_tdb.close()
for tobj in self._matrix_tdb.values():
tobj.close()
self._validate_read_only(self._gene_annotation_tdb)
self._validate_read_only(self._cell_metadata_tdb)
self._validate_read_only(self._sample_metadata_tdb)

@staticmethod
def _validate_read_only(array: tiledb.Array, name: str = ""):
assert not array.iswritable, f"Arrays must be read-only but found writable array {name}: {array}"


####
## Subset methods for the `cell_metadata` TileDB file.
Expand Down Expand Up @@ -595,7 +553,6 @@ def __repr__(self) -> str:
"""
output = f"{type(self).__name__}(number_of_rows={self.shape[0]}"
output += f", number_of_columns={self.shape[1]}"
output += ", at path=" + self._dataset_path

output += ")"
return output
Expand All @@ -609,7 +566,6 @@ def __str__(self) -> str:

output += f"number_of_rows: {self.shape[0]}\n"
output += f"number_of_columns: {self.shape[1]}\n"
output += f"path: '{self._dataset_path}'\n"

return output

Expand Down Expand Up @@ -668,3 +624,159 @@ def itersamples(self) -> CellArrSampleIterator:
def itercells(self) -> CellArrCellIterator:
"""Iterator over samples."""
return CellArrCellIterator(self)


class _CellArrDatasetUri(_CellArrDatasetBase):
"""
An extension of _CellArrDatasetBase that manages the underlying tiledb arrays. Unlike the base class,
this accepts onli uris, not tiledb.Array objects, and will manage the tiledb.Array objects. That means,
it will open them (in read-only mode) upon creation and close them upon deletion via override of __del__.

This is a nice-to-have for creating CellArr datasets that combine data that are not within the same prefix,
e.g. when running a pipeline that modifies the data matrices but the metadata never changes.
Note that this is a power user interface and we should not provide support, i.e. power users must know what
they are doing and operate at their own risk (hence the leading underscore).
"""

def __init__(
self,
assay_uris: Union[str, Sequence[str], Mapping[str, str]],
gene_annotation_uri: str,
cell_metadata_uri: str,
sample_metadata_uri: str,
config_or_context: Optional[Union[tiledb.Config, tiledb.Ctx]] = None,
):

if config_or_context is None:
config_or_context = tiledb.Config()

if isinstance(config_or_context, tiledb.Config):
self._ctx = tiledb.Ctx(config_or_context)
elif isinstance(config_or_context, tiledb.Ctx):
self._ctx = config_or_context
else:
raise Exception("'config_or_context' must be either TileDB config or a context object.")

if isinstance(assay_uris, str):
assay_uris = [assay_uris]
if not isinstance(assay_uris, Mapping):
assay_uris = {uri.split("/")[-1]: uri for uri in assay_uris}
def _open(uri):
return tiledb.open(uri=uri, mode="r", ctx=self._ctx)
assays = {name: _open(uri=uri) for name, uri in assay_uris.items()}

super().__init__(
assays=assays,
gene_annotations=_open(gene_annotation_uri),
cell_metadata=_open(cell_metadata_uri),
sample_metadata=_open(sample_metadata_uri),
)

def __del__(self):
self._gene_annotation_tdb.close()
self._cell_metadata_tdb.close()
self._sample_metadata_tdb.close()
for tobj in self._matrix_tdb.values():
tobj.close()


class CellArrDataset(_CellArrDatasetUri):
"""A class that represent a collection of cells and their associated metadata in a TileDB backed store."""

def __init__(
self,
dataset_path: str,
assay_tiledb_group: str = "assays",
assay_uri: Union[str, List[str]] = "counts",
gene_annotation_uri: str = "gene_annotation",
cell_metadata_uri: str = "cell_metadata",
sample_metadata_uri: str = "sample_metadata",
config_or_context: Optional[Union[tiledb.Config, tiledb.Ctx]] = None,
):
"""Initialize a ``CellArrDataset``.

Args:
dataset_path:
Path to the directory containing the TileDB stores.
Usually the ``output_path`` from the
:py:func:`~cellarr.build_cellarrdataset.build_cellarrdataset`.

You may provide any tiledb compatible base path (e.g. local
directory, S3, minio etc.).

assay_tiledb_group:
TileDB group containing the assay matrices.

If the provided build process was used, the matrices are stored
in the "assay" TileDB group.

May be an empty string or `None` to specify no group. This is
mostly for backwards compatibility of cellarr builds for versions
before 0.3.

assay_uri:
Relative path to matrix store.
Must be in tiledb group specified by ``assay_tiledb_group``.

gene_annotation_uri:
Relative path to gene annotation store.

cell_metadata_uri:
Relative path to cell metadata store.

sample_metadata_uri:
Relative path to sample metadata store.

config_or_context:
Custom TileDB configuration or context.
If None, default TileDB Config will be used.
"""


self._dataset_path = dataset_path

if isinstance(assay_uri, str):
assay_uri = [assay_uri]

def _prefix(uri, *prefixes):
prefix = "/".join(prefixes)
return f"{dataset_path}/{prefix}/{uri}"

assay_uris = {name: _prefix(name, assay_tiledb_group or "") for name in assay_uri}
super().__init__(
assay_uris=assay_uris,
gene_annotation_uri=_prefix(gene_annotation_uri),
cell_metadata_uri=_prefix(cell_metadata_uri),
sample_metadata_uri=_prefix(sample_metadata_uri),
config_or_context=config_or_context
)

####
## Printing.
####

def __repr__(self) -> str:
"""
Returns:
A string representation.
"""
output = f"{type(self).__name__}(number_of_rows={self.shape[0]}"
output += f", number_of_columns={self.shape[1]}"
output += ", at path=" + self._dataset_path

output += ")"
return output

def __str__(self) -> str:
"""
Returns:
A pretty-printed string containing the contents of this object.
"""
output = f"class: {type(self).__name__}\n"

output += f"number_of_rows: {self.shape[0]}\n"
output += f"number_of_columns: {self.shape[1]}\n"
output += f"path: '{self._dataset_path}'\n"

return output

Loading