diff --git a/docs/architecture.data.rst b/docs/architecture.data.rst new file mode 100644 index 00000000..89449477 --- /dev/null +++ b/docs/architecture.data.rst @@ -0,0 +1,125 @@ +Data architecture +================= + +Pycytominer data architecture documentation. + +Distinct upstream data sources +------------------------------ + +Pycytominer has distinct data flow contingent on upstream data source. +Various projects are used to generate different kinds of data which are handled differently within Pycytominer. + +* `CellProfiler `_ Generates `CSV `_ data used by Pycytominer. +* `Cytominer-database `_ Generates `SQLite `_ databases (which includes table data based on CellProfiler CSV's mentioned above) used by Pycytominer. +* `DeepProfiler `_ Generates `NPZ `_ data used by Pycytominer. + +SQLite data +----------- + +Pycytominer in some areas consumes SQLite data sources. +This data source is currently considered somewhat deprecated for Pycytominer work. + +**SQLite data structure** + +.. mermaid:: + + erDiagram + Image ||--o{ Cytoplasm : contains + Image ||--o{ Cells : contains + Image ||--o{ Nuclei : contains + +Related SQLite databases have a structure loosely based around the above diagram. +There are generally four tables: Image, Cytoplasm, Cells, and Nuclei. +Each Image may contain zero to many Cells, Nuclei, or Cytoplasm data rows. + +**SQLite compartments** + +The tables Cytoplasm, Cells, and Nuclei are generally referenced as "compartments". +While these are often included within related SQLite datasets, other compartments may be involved as well. + +**SQLite common fields** + +Each of the above tables include ``TableNumber`` and ``ImageNumber`` fields which are cross-related to data in other tables. +``ObjectNumber`` is sometimes also but not guaranteed to be related to data across tables. + +**SQLite data production** + +.. mermaid:: + + flowchart LR + subgraph Data + direction LR + cellprofiler_data[(CSV Files)] -.-> cytominerdatabase_data[(SQLite File)] + cytominerdatabase_data[(SQLite File)] + end + subgraph Projects + direction LR + CellProfiler + Cytominer-database + Pycytominer + end + CellProfiler --> cellprofiler_data + cellprofiler_data --> Cytominer-database + Cytominer-database --> cytominerdatabase_data + cytominerdatabase_data --> Pycytominer + +Related SQLite data is originally created from `CellProfiler `_ CSV data exports. +This CSV data is then converted to SQLite by `Cytominer-database `_. + +**Cytominer-database data transformations** + +* Cytominer-database adds a field to all CSV tables from CellProfiler labeled ``TableNumber``. + This field is added to address dataset uniqueness as CellProfiler sometimes resets ``ImageNumber``. + +Parquet data +------------ + +Pycytominer currently provides capabilities to convert into `Apache Parquet `_ data. + +**Parquet from Cytominer-database SQLite data sources** + +.. mermaid:: + + flowchart LR + subgraph Data + direction LR + cellprofiler_data[(CSV Files)] -.-> cytominerdatabase_data[(SQLite File)] + cytominerdatabase_data[(SQLite File)] -.-> pycytominer_data[(Parquet File)] + end + subgraph Projects + direction LR + CellProfiler + Cytominer-database + subgraph Pycytominer + direction LR + Pycytominer_conversion[Parquet Conversion] + Pycytominer_work[Parquet-based Work] + end + end + CellProfiler --> cellprofiler_data + cellprofiler_data --> Cytominer-database + Cytominer-database --> cytominerdatabase_data + cytominerdatabase_data --> Pycytominer_conversion + Pycytominer_conversion --> pycytominer_data + pycytominer_data --> Pycytominer_work + +Pycytominer includes the capability to convert related `Cytominer-database `_ SQLite-based data into parquet. +The resulting format includes SQLite table data in a single file, using joinable keys ``TableNumber`` and ``ImageNumber`` and none-type values to demonstrate data relationships (or lack thereof). + +Conversion work may be performed using the following module: :ref:`sqliteconvert` + +An Example of the resulting parquet data format for Pycytominer may be found below: + + ++--------------+--------------+-------------------------+---------------------+-----------------------+------------------------+----------------------------+------------------------+--------------------------+ +| TableNumber | ImageNumber | Cytoplasm_ObjectNumber | Cells_ObjectNumber | Nuclei_ObjectNumber | Image_Fields...(many) | Cytoplasm_Fields...(many) | Cells_Fields...(many) | Nuclei_Fields...(many) | ++--------------+--------------+-------------------------+---------------------+-----------------------+------------------------+----------------------------+------------------------+--------------------------+ +| 123abc | 1 | Null | Null | Null | Image Data... | Null | Null | Null | ++--------------+--------------+-------------------------+---------------------+-----------------------+------------------------+----------------------------+------------------------+--------------------------+ +| 123abc | 1 | 1 | Null | Null | Null | Cytoplasm Data... | Null | Null | ++--------------+--------------+-------------------------+---------------------+-----------------------+------------------------+----------------------------+------------------------+--------------------------+ +| 123abc | 1 | Null | 1 | Null | Null | Null | Cells Data... | Null | ++--------------+--------------+-------------------------+---------------------+-----------------------+------------------------+----------------------------+------------------------+--------------------------+ +| 123abc | 1 | Null | Null | 1 | Null | Null | Null | Nuclei Data... | ++--------------+--------------+-------------------------+---------------------+-----------------------+------------------------+----------------------------+------------------------+--------------------------+ + diff --git a/docs/architecture.rst b/docs/architecture.rst new file mode 100644 index 00000000..03f7659d --- /dev/null +++ b/docs/architecture.rst @@ -0,0 +1,9 @@ +Architecture +============ + +The following pages cover pycytominer architecture. + +.. toctree:: + :maxdepth: 2 + + architecture.data diff --git a/docs/conf.py b/docs/conf.py index d994c9a7..495a3fc9 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -35,7 +35,7 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ["sphinx.ext.autodoc", "sphinx.ext.napoleon"] +extensions = ["sphinx.ext.autodoc", "sphinx.ext.napoleon", "sphinxcontrib.mermaid"] # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] diff --git a/docs/index.rst b/docs/index.rst index d33c2ad3..391b6df0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -15,6 +15,7 @@ Software for processing image-based profiling readouts. install tutorial modules + architecture Indices and tables ================== diff --git a/docs/pycytominer.cyto_utils.rst b/docs/pycytominer.cyto_utils.rst index 9a8e9334..15a7f271 100644 --- a/docs/pycytominer.cyto_utils.rst +++ b/docs/pycytominer.cyto_utils.rst @@ -52,6 +52,32 @@ pycytominer.cyto\_utils.util module :undoc-members: :show-inheritance: +pycytominer.cyto\_utils.sqlite.clean module +--------------------------------------------- + +.. automodule:: pycytominer.cyto_utils.sqlite.clean + :members: + :undoc-members: + :show-inheritance: + +.. _sqliteconvert: + +pycytominer.cyto\_utils.sqlite.convert module +--------------------------------------------- + +.. automodule:: pycytominer.cyto_utils.sqlite.convert + :members: + :undoc-members: + :show-inheritance: + +pycytominer.cyto\_utils.sqlite.meta module +--------------------------------------------- + +.. automodule:: pycytominer.cyto_utils.sqlite.meta + :members: + :undoc-members: + :show-inheritance: + pycytominer.cyto\_utils.write\_gct module ----------------------------------------- diff --git a/docs/requirements.txt b/docs/requirements.txt index d4644ac3..56271222 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -5,3 +5,4 @@ groundwork-sphinx-theme mock autodoc +sphinxcontrib-mermaid \ No newline at end of file diff --git a/pycytominer/cyto_utils/__init__.py b/pycytominer/cyto_utils/__init__.py index 6970a52a..47d92973 100644 --- a/pycytominer/cyto_utils/__init__.py +++ b/pycytominer/cyto_utils/__init__.py @@ -34,7 +34,8 @@ aggregate_fields_count, aggregate_image_features, ) -from .sqlite import ( +from .sqlite.meta import engine_from_str, collect_columns, LIKE_NULLS, SQLITE_AFF_REF +from .sqlite.clean import ( clean_like_nulls, collect_columns, contains_conflicting_aff_storage_class, @@ -43,3 +44,12 @@ update_columns_to_nullable, update_values_like_null_to_null, ) +from .sqlite.convert import ( + flow_convert_sqlite_to_parquet, + multi_to_single_parquet, + nan_data_fill, + sql_select_distinct_join_chunks, + sql_table_to_pd_dataframe, + table_concat_to_parquet, + to_unique_parquet, +) diff --git a/pycytominer/cyto_utils/sqlite/__init__.py b/pycytominer/cyto_utils/sqlite/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pycytominer/cyto_utils/sqlite.py b/pycytominer/cyto_utils/sqlite/clean.py similarity index 78% rename from pycytominer/cyto_utils/sqlite.py rename to pycytominer/cyto_utils/sqlite/clean.py index 3952184b..7f9e246c 100644 --- a/pycytominer/cyto_utils/sqlite.py +++ b/pycytominer/cyto_utils/sqlite/clean.py @@ -1,5 +1,5 @@ """ -Pycytominer SQLite utilities +Pycytominer SQLite utilities - cleaning functions """ import logging @@ -7,149 +7,11 @@ import sqlite3 from typing import Optional, Tuple, Union -from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine -logger = logging.getLogger(__name__) - -# A reference dictionary for SQLite affinity and storage class types -# See more here: https://www.sqlite.org/datatype3.html#affinity_name_examples -SQLITE_AFF_REF = { - "INTEGER": [ - "INT", - "INTEGER", - "TINYINT", - "SMALLINT", - "MEDIUMINT", - "BIGINT", - "UNSIGNED BIG INT", - "INT2", - "INT8", - ], - "TEXT": [ - "CHARACTER", - "VARCHAR", - "VARYING CHARACTER", - "NCHAR", - "NATIVE CHARACTER", - "NVARCHAR", - "TEXT", - "CLOB", - ], - "BLOB": ["BLOB"], - "REAL": [ - "REAL", - "DOUBLE", - "DOUBLE PRECISION", - "FLOAT", - ], - "NUMERIC": [ - "NUMERIC", - "DECIMAL", - "BOOLEAN", - "DATE", - "DATETIME", - ], -} - -# strings which may represent null values -LIKE_NULLS = ("null", "none", "nan") - - -def engine_from_str(sql_engine: Union[str, Engine]) -> Engine: - """ - Helper function to create engine from a string or return the engine - if it's already been created. - - Parameters - ---------- - sql_engine: str | sqlalchemy.engine.base.Engine - filename of the SQLite database or existing sqlalchemy engine - - Returns - ------- - sqlalchemy.engine.base.Engine - A SQLAlchemy engine - """ - - # check the type of sql_engine passed and create engine if we have a str - if isinstance(sql_engine, str): - # if we don't already have the sqlite filestring, add it - if "sqlite:///" not in sql_engine: - sql_engine = f"sqlite:///{sql_engine}" - engine = create_engine(sql_engine) - else: - engine = sql_engine - - return engine - +from .meta import LIKE_NULLS, SQLITE_AFF_REF, collect_columns, engine_from_str -def collect_columns( - sql_engine: Union[str, Engine], - table_name: Optional[str] = None, - column_name: Optional[str] = None, -) -> list: - """ - Collect a list of columns from the given engine's - database using optional table or column level - specification. - - Parameters - ---------- - sql_engine: str | sqlalchemy.engine.base.Engine - filename of the SQLite database or existing sqlalchemy engine - table_name: str - optional specific table name to check within database, by default None - column_name: str - optional specific column name to check within database, by default None - - Returns - ------- - list - Returns list, and if populated, contains tuples with values - similar to the following. These may also be accessed by name - similar to dictionaries, as they are SQLAlchemy Row objects. - [('table_name', 'column_name', 'column_type', 'notnull'),...] - """ - - # create column list for return result - column_list = [] - - # create an engine - engine = engine_from_str(sql_engine) - - with engine.connect() as connection: - if table_name is None: - # if no table name is provided, we assume all tables must be scanned - tables = connection.execute( - "SELECT name as table_name FROM sqlite_master WHERE type = 'table';" - ).fetchall() - else: - # otherwise we will focus on just the table name provided - tables = [{"table_name": table_name}] - - for table in tables: - - # if no column name is specified we will focus on all columns within the table - sql_stmt = """ - SELECT :table_name as table_name, - name as column_name, - type as column_type, - [notnull] - FROM pragma_table_info(:table_name) - """ - - if column_name is not None: - # otherwise we will focus on only the column name provided - sql_stmt = f"{sql_stmt} WHERE name = :col_name;" - - # append to column list the results - column_list += connection.execute( - sql_stmt, - {"table_name": str(table["table_name"]), "col_name": str(column_name)}, - ).fetchall() - - return column_list +logger = logging.getLogger(__name__) def contains_conflicting_aff_storage_class( diff --git a/pycytominer/cyto_utils/sqlite/convert.py b/pycytominer/cyto_utils/sqlite/convert.py new file mode 100644 index 00000000..f0ef7e65 --- /dev/null +++ b/pycytominer/cyto_utils/sqlite/convert.py @@ -0,0 +1,537 @@ +""" +Pycytominer SQLite utilities - conversion work for sqlite databases +""" + +import itertools +import logging +import pathlib +import uuid +from typing import Dict, List, Union + +import numpy as np +import pandas as pd +import pyarrow.parquet as pq +from prefect import Flow, Parameter, task, unmapped +from prefect.executors import Executor +from prefect.storage import Storage +from sqlalchemy import create_engine +from sqlalchemy.engine.base import Engine + +from .meta import collect_columns + +logger = logging.getLogger(__name__) + + +@task +def sql_select_distinct_join_chunks( + sql_engine: str, table_name: str, join_keys: List[str], chunk_size: int +) -> List[List[Dict]]: + """ + Selects distinct chunks of values from SQLite. + + Parameters + ---------- + sql_engine: str: + SQLite database engine url url + table_name: str: + Name of table to reference for this function + join_keys: List[str]: + Keys to use for building unique chunk sets + chunk_size: int: + Size of chunk sets to use + + Returns + ------- + List[List[Dict]] + A list of lists with dictionaries for sets of + unique join keys. + """ + + # form string for sql query based on join keys for chunking + join_keys_str = ", ".join(join_keys) + + # select distinct results from database based on join keys + sql_stmt = f""" + select distinct {join_keys_str} from {table_name} + """ + + # gather a dictionary from pandas dataframe based on results from query + result_dicts = pd.read_sql( + sql_stmt, + create_engine(sql_engine), + ).to_dict(orient="records") + + # build chunked result dict list + chunked_result_dicts = [ + result_dicts[i : i + chunk_size] + for i in range(0, len(result_dicts), chunk_size) + ] + + return chunked_result_dicts + + +@task +def sql_table_to_pd_dataframe( + sql_engine: str, + table_name: str, + prepend_tablename_to_cols: bool, + avoid_prepend_for: List[str], + chunk_list_dicts: list, + column_data: List[dict], +) -> pd.DataFrame: + """ + + Parameters + ---------- + sql_engine: str: + SQLite database engine url + table_name: str: + Name of table to reference for this function. + Examples for this parameter: "Image", "Cells", "Cytoplasm", "Nuclei" + prepend_tablename_to_cols: bool: + Determines whether we prepend table name + to column name. + avoid_prepend_for: List[str]: + List of column names to avoid tablename prepend + chunk_list_dicts: list: + List of dictionaries for chunked querying + column_data: List[dict]: + Column metadata extracted from database. + + Returns + ------- + pd.DataFrame + DataFrame with results of query. + + Examples + -------- + + .. code-block:: python + + from prefect import Flow, Parameter, task + from sqlalchemy.engine import create_engine + + from pycytominer.cyto_utils.sqlite.convert import sql_table_to_pd_dataframe + from pycytominer.cyto_utils.sqlite.meta import collect_columns + + sql_path = "test_SQ00014613.sqlite" + sql_url = f"sqlite:///{sql_path}" + + with Flow("Example Flow") as flow: + param_sql_engine = Parameter("sql_engine", default="") + + # form prefect task from sqlite meta util + task_collect_columns = task(collect_columns) + + # gather sql column and table data flow operations + column_data = task_collect_columns(sql_engine=param_sql_engine) + + dataframe_result = sql_table_to_pd_dataframe( + sql_engine=param_sql_engine, + table_name="Image", + prepend_tablename_to_cols=True, + avoid_prepend_for=["TableNumber", "ImageNumber"], + chunk_list_dicts=[ + {"TableNumber": "dd77885d07028e67dc9bcaaba4df34c6", "ImageNumber": "1"}, + {"TableNumber": "1e5d8facac7508cfd4086f3e3e950182", "ImageNumber": "2"}, + ], + column_data=column_data, + ) + + # run the flow as outlined above + flow_state = flow.run(parameters=dict(sql_engine=sql_url)) + + # access the dataframe result of the flow + df = flow_state.result[dataframe_result].result + + # print info from dataframe result + print(df.info()) + + """ + + # adds the tablename to the front of column name for query + if prepend_tablename_to_cols: + colnames = [ + coldata["column_name"] + for coldata in column_data + if coldata["table_name"] == table_name + ] + # build aliased column names with table name prepended + colstring = ",".join( + [ + f"{colname} as '{table_name}_{colname}'" + if colname not in avoid_prepend_for + else colname + for colname in colnames + ] + ) + sql_stmt = f"select {colstring} from {table_name}" + else: + sql_stmt = f"select * from {table_name}" + + # build sql query where clause based on chunk_list_dicts + chunk_list_dicts_str = " OR ".join( + [ + f"({where_group})" + for where_group in [ + " AND ".join([f"{key} = '{val}'" for key, val in list_dict.items()]) + for list_dict in chunk_list_dicts + ] + ] + ) + + # append the where clause to the query + sql_stmt += f" where {chunk_list_dicts_str}" + + # return the sql query as pandas dataframe + return pd.read_sql(sql_stmt, create_engine(sql_engine)) + + +@task +def nan_data_fill(fill_into: pd.DataFrame, fill_from: pd.DataFrame) -> pd.DataFrame: + """ + Add columns with nan data where missing for fill_into + dataframe. + + Parameters + ---------- + fill_into: pd.DataFrame: + Dataframe to fill into. + fill_from: pd.DataFrame: + Dataframe to reference for fill. + + Returns + ------- + pd.DataFrame + New fill_into pd.Dataframe with added column(s) + and nan data. + """ + + # gather columns and dtype missing in fill_into based on fill_from + colnames_and_types = { + # note: we replace int64 with float64 to accommodate np.nan + colname: str(fill_from[colname].dtype).replace("int64", "float64") + for colname in fill_from.columns + if colname not in fill_into.columns + } + + # append all columns not in fill_into table into fill_into + fill_into = pd.concat( + [ + fill_into, + # generate a dataframe with proper row-length and + # new columns (+matching datatype) which do not yet + # exist within fill_into based on fill_from + pd.DataFrame( + { + colname: pd.Series( + data=np.nan, + index=fill_into.index, + dtype=coltype, + ) + for colname, coltype in colnames_and_types.items() + }, + index=fill_into.index, + ), + ], + axis=1, + ) + + return fill_into + + +@task +def table_concat_to_parquet( + sql_engine: str, + column_data: List[dict], + prepend_tablename_to_cols: bool, + avoid_prepend_for: list, + chunk_list_dicts: list, + filename: str, +): + """ + Concatenate chunk of database tables together as + single dataframe, adding any missing columns along + the way, and then dumping to uniquely named parquet + file using a filename prefix. + + Parameters + ---------- + sql_engine: str: + SQLite database engine url + column_data: List[dict]: + Column metadata from database + prepend_tablename_to_cols: bool: + Determines whether we prepend table name + to column name. + avoid_prepend_for: list: + List of column names to avoid tablename prepend + chunk_list_dicts: list: + List of dictionaries for chunked querying + filename: str: + Filename to be used for parquet export + + Returns + ------- + str + Filename of parquet file created. + """ + + # gather a set of the table names + # note: coldata contains repeated tablenames and we gather unique + # tablenames only via set from the list. + table_list = set([coldata["table_name"] for coldata in column_data]) + + # build empty dataframe which will become the basis of concat operations + concatted = pd.DataFrame() + + for table in table_list: + # query data from table by join key chunks, ensuring unique column names where necessary + to_concat = sql_table_to_pd_dataframe.run( + sql_engine=sql_engine, + table_name=table, + prepend_tablename_to_cols=prepend_tablename_to_cols, + avoid_prepend_for=avoid_prepend_for, + chunk_list_dicts=chunk_list_dicts, + column_data=column_data, + ) + + # if concatted is empty, we set it to the first dataframe + if len(concatted) == 0: + concatted = to_concat + + # else we concat the existing dataframe with the new one + else: + # both the already concatted and target are prepared with matching columns nan's + # for data they do not contain. + concatted, to_concat = list( + itertools.starmap( + nan_data_fill.run, [[concatted, to_concat], [to_concat, concatted]] + ) + ) + + # bring the prepared dataframes together as the new concatted + concatted = pd.concat([concatted, to_concat]) + + # export concatted result from all tables to a uniquely name parquet file + filename_uuid = to_unique_parquet.run(df=concatted, filename=filename) + + # return the filename generated from to_unique_parquet + return filename_uuid + + +@task +def to_unique_parquet(df: pd.DataFrame, filename: str) -> str: + """ + Write a uniquely named parquet from provided dataframe. + + Parameters + ---------- + df: pd.DataFrame: + Dataframe to use for parquet write + filename: str: + Filename to use along with uuid for unique filenames + + Returns + ------- + str + Unique filename for parquet file created. + """ + + # gather unique id for filename + file_uuid = str(uuid.uuid4().hex) + + # build a unique filename string + unique_filename = f"{filename}-{file_uuid}" + + # export the dataframe based on the unique filename + df.to_parquet(unique_filename) + + # return the unique filename generated + return unique_filename + + +@task +def multi_to_single_parquet( + pq_files: List[str], + filename: str, +) -> str: + """ + Take a list of parquet file paths and write them + as one single parquet file. Assumes exact same + data schema for all files. + + Parameters + ---------- + pq_files: List[str]: + List of parquet file paths + filename: str: + Filename to use for the parquet file. + + Returns + ------- + str + Filename of the single parquet file. + """ + + # if there's already a file remove it + path = pathlib.Path(filename) + if path.exists(): + path.unlink() + + # read first file for basis of schema and column order for all others + writer_basis = pq.read_table(pq_files[0]) + + # build a parquet file writer which will be used to append files from pq_files + # as a single concatted parquet file, referencing the first file's schema + # (all must be the same schema) + writer = pq.ParquetWriter(filename, writer_basis.schema) + + for tbl in pq_files: + # read the file from the list and write to the concatted parquet file + # note: we pass column order based on the first chunk file to help ensure schema + # compatibility for the writer + writer.write_table(pq.read_table(tbl, columns=writer_basis.column_names)) + # remove the file which was written in the concatted parquet file (we no longer need it) + pathlib.Path(tbl).unlink() + + # close the single concatted parquet file writer + writer.close() + + # return the concatted parquet filename + return filename + + +def flow_convert_sqlite_to_parquet( + sql_engine: Union[str, Engine], + flow_executor: Executor, + flow_storage: Storage, + sql_tbl_basis: str = "Image", + sql_join_keys: List[str] = ["TableNumber", "ImageNumber"], + sql_chunk_size: int = 10, + pq_filename: str = "combined.parquet", +) -> str: + """ + Run a Prefect Flow to convert Pycytominer SQLite data + to single parquet file with same data. + + Parameters + ---------- + sql_engine: Union[str, Engine]: + filename of the SQLite database or existing sqlalchemy engine + flow_executor: Executor: + Prefect flow executor + flow_storage: Storage: + Prefect flow storage + sql_tbl_basis: str: (Default value = "Image") + Database table to use as the basis of building + join keys and chunks + sql_join_keys: List[str]: (Default value = ["TableNumber","ImageNumber"]): + Database column name keys to be used as for + chunking and frame concatenation. + sql_chunk_size: int: (Default value = 10) + Chunk size for unique join key datasets. + Note: adjust to resource capabilities of + machine and provided dataset. Smaller + chunksizes may mean greater time duration + and lower memory consumption + pq_filename: str: (Default value = "combined.parquet") + Target parquet filename to be used for chunks + and also resulting converted filename. + + Returns + ------- + str + Single parquet filename of file which contains + all SQLite data based on the outcome of a + Prefect flow. + + Examples + -------- + + .. code-block:: python + + from prefect.executors import LocalExecutor + from prefect.storage import Local + from sqlalchemy.engine import create_engine + + from pycytominer.cyto_utils.sqlite.convert import flow_convert_sqlite_to_parquet + + sql_path = "test_SQ00014613.sqlite" + sql_url = f"sqlite:///{sql_path}" + sql_engine = create_engine(sql_url) + + # note: encapsulate the following within a __main__ block for + # dask compatibility if desired and set with executor + + result_file_path = flow_convert_sqlite_to_parquet( + sql_engine=sql_engine, + flow_executor=LocalExecutor(), + flow_storage=Local(), + sql_tbl_basis="Image", + sql_join_keys=["TableNumber", "ImageNumber"], + sql_chunk_size=10, + pq_filename="test_SQ00014613.parquet", + ) + + """ + + logger.info("Setting up Prefect flow for running SQLite to parquet conversion.") + + # cast the provided sql_engine to a string from sqlalchemy url if necessary + if not isinstance(sql_engine, str): + sql_engine = str(sql_engine.url) + + # build a prefect flow with explicit storage based on parameter + with Flow("flow_convert_sqlite_to_parquet", storage=flow_storage) as flow: + + # set flow parameters + param_sql_engine = Parameter("sql_engine", default="") + param_sql_tbl_basis = Parameter("sql_tbl_basis", default=sql_tbl_basis) + param_sql_join_keys = Parameter("sql_join_keys", default=sql_join_keys) + param_sql_chunk_size = Parameter("sql_chunk_size", default=sql_chunk_size) + param_pq_filename = Parameter("pq_filename", default=pq_filename) + + # form prefect tasks from sqlite meta utils + task_collect_columns = task(collect_columns) + + # gather sql column and table data flow operations + column_data = task_collect_columns(sql_engine=param_sql_engine) + + # chunk the dicts so as to create batches + chunk_dicts = sql_select_distinct_join_chunks( + sql_engine=param_sql_engine, + table_name=param_sql_tbl_basis, + join_keys=param_sql_join_keys, + chunk_size=param_sql_chunk_size, + ) + + # map to gather our concatted/merged pd dataframes as a list within this flow + pq_files = table_concat_to_parquet.map( + sql_engine=unmapped(param_sql_engine), + column_data=unmapped(column_data), + prepend_tablename_to_cols=unmapped(True), + avoid_prepend_for=unmapped(param_sql_join_keys), + chunk_list_dicts=chunk_dicts, + filename=unmapped(param_pq_filename), + ) + + # reduce to single pq "concatted" file from the map result list + reduced_pq_result = multi_to_single_parquet( + pq_files=pq_files, filename=param_pq_filename + ) + + # run the flow + flow_state = flow.run( + executor=flow_executor, + parameters=dict( + sql_engine=sql_engine, + sql_tbl_basis=sql_tbl_basis, + sql_join_keys=sql_join_keys, + sql_chunk_size=sql_chunk_size, + pq_filename=pq_filename, + ), + ) + + # return the result of reduced_pq_result, a path to the new file + return flow_state.result[reduced_pq_result].result diff --git a/pycytominer/cyto_utils/sqlite/meta.py b/pycytominer/cyto_utils/sqlite/meta.py new file mode 100644 index 00000000..1f2ecc05 --- /dev/null +++ b/pycytominer/cyto_utils/sqlite/meta.py @@ -0,0 +1,150 @@ +""" +Pycytominer SQLite utilities - meta database and data management +""" + +import logging +from typing import Optional, Union + +from sqlalchemy import create_engine +from sqlalchemy.engine.base import Engine + +logger = logging.getLogger(__name__) + +# A reference dictionary for SQLite affinity and storage class types +# See more here: https://www.sqlite.org/datatype3.html#affinity_name_examples +SQLITE_AFF_REF = { + "INTEGER": [ + "INT", + "INTEGER", + "TINYINT", + "SMALLINT", + "MEDIUMINT", + "BIGINT", + "UNSIGNED BIG INT", + "INT2", + "INT8", + ], + "TEXT": [ + "CHARACTER", + "VARCHAR", + "VARYING CHARACTER", + "NCHAR", + "NATIVE CHARACTER", + "NVARCHAR", + "TEXT", + "CLOB", + ], + "BLOB": ["BLOB"], + "REAL": [ + "REAL", + "DOUBLE", + "DOUBLE PRECISION", + "FLOAT", + ], + "NUMERIC": [ + "NUMERIC", + "DECIMAL", + "BOOLEAN", + "DATE", + "DATETIME", + ], +} + +# strings which may represent null values +LIKE_NULLS = ("null", "none", "nan") + + +def engine_from_str(sql_engine: Union[str, Engine]) -> Engine: + """ + Helper function to create engine from a string or return the engine + if it's already been created. + + Parameters + ---------- + sql_engine: str | sqlalchemy.engine.base.Engine + filename of the SQLite database or existing sqlalchemy engine + + Returns + ------- + sqlalchemy.engine.base.Engine + A SQLAlchemy engine + """ + + # check the type of sql_engine passed and create engine if we have a str + if isinstance(sql_engine, str): + # if we don't already have the sqlite filestring, add it + if "sqlite:///" not in sql_engine: + sql_engine = f"sqlite:///{sql_engine}" + engine = create_engine(sql_engine) + else: + engine = sql_engine + + return engine + + +def collect_columns( + sql_engine: Union[str, Engine], + table_name: Optional[str] = None, + column_name: Optional[str] = None, +) -> list: + """ + Collect a list of columns from the given engine's + database using optional table or column level + specification. + + Parameters + ---------- + sql_engine: str | sqlalchemy.engine.base.Engine + filename of the SQLite database or existing sqlalchemy engine + table_name: str + optional specific table name to check within database, by default None + column_name: str + optional specific column name to check within database, by default None + + Returns + ------- + list + Returns list, and if populated, contains tuples with values + similar to the following. These may also be accessed by name + similar to dictionaries, as they are SQLAlchemy Row objects. + [('table_name', 'column_name', 'column_type', 'notnull'),...] + """ + + # create column list for return result + column_list = [] + + # create an engine + engine = engine_from_str(sql_engine) + + with engine.connect() as connection: + if table_name is None: + # if no table name is provided, we assume all tables must be scanned + tables = connection.execute( + "SELECT name as table_name FROM sqlite_master WHERE type = 'table';" + ).fetchall() + else: + # otherwise we will focus on just the table name provided + tables = [{"table_name": table_name}] + + for table in tables: + + # if no column name is specified we will focus on all columns within the table + sql_stmt = """ + SELECT :table_name as table_name, + name as column_name, + type as column_type, + [notnull] + FROM pragma_table_info(:table_name) + """ + + if column_name is not None: + # otherwise we will focus on only the column name provided + sql_stmt = f"{sql_stmt} WHERE name = :col_name;" + + # append to column list the results + column_list += connection.execute( + sql_stmt, + {"table_name": str(table["table_name"]), "col_name": str(column_name)}, + ).fetchall() + + return column_list diff --git a/pycytominer/tests/test_cyto_utils/sqlite/__init__.py b/pycytominer/tests/test_cyto_utils/sqlite/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pycytominer/tests/test_cyto_utils/sqlite/conftest.py b/pycytominer/tests/test_cyto_utils/sqlite/conftest.py new file mode 100644 index 00000000..78c0eadb --- /dev/null +++ b/pycytominer/tests/test_cyto_utils/sqlite/conftest.py @@ -0,0 +1,213 @@ +""" +Pytest conftest file for sqlite series of tests. +""" + +import os +import tempfile +from typing import Dict, List + +import pytest +from pycytominer.cyto_utils.sqlite.convert import sql_select_distinct_join_chunks +from pycytominer.cyto_utils.sqlite.meta import collect_columns +from sqlalchemy import create_engine +from sqlalchemy.engine.base import Engine + + +@pytest.fixture +def database_engine_for_testing() -> Engine: + """ + A database engine for testing as a fixture. + """ + + # get temporary directory + tmpdir = tempfile.gettempdir() + + # create a temporary sqlite connection + sql_path = f"sqlite:///{tmpdir}/test_sqlite.sqlite" + + engine = create_engine(sql_path) + + # statements for creating database with simple structure + create_stmts = [ + "drop table if exists tbl_a;", + """ + create table tbl_a ( + col_integer INTEGER NOT NULL + ,col_text TEXT + ,col_blob BLOB + ,col_real REAL + ); + """, + "drop table if exists tbl_b;", + """ + create table tbl_b ( + col_integer INTEGER + ,col_text TEXT + ,col_blob BLOB + ,col_real REAL + ); + """, + ] + + insert_vals = [1, "sample", b"sample_blob", 0.5] + + with engine.begin() as connection: + for stmt in create_stmts: + connection.execute(stmt) + + # insert statement with some simple values + # note: we use SQLAlchemy's parameters to insert data properly, esp. BLOB + connection.execute( + ( + "INSERT INTO tbl_a (col_integer, col_text, col_blob, col_real)" + "VALUES (?, ?, ?, ?);" + ), + insert_vals, + ) + connection.execute( + ( + "INSERT INTO tbl_b (col_integer, col_text, col_blob, col_real)" + "VALUES (?, ?, ?, ?);" + ), + insert_vals, + ) + + return engine + + +@pytest.fixture +def database_engine_for_conversion_testing() -> Engine: + """ + A database engine for conversion work testing as a fixture. + """ + + # get temporary directory + tmpdir = tempfile.gettempdir() + + sql_filepath = f"{tmpdir}/test_conversion_sqlite.sqlite" + + # remove db if it exists + if os.path.exists(f"{tmpdir}/test_sqlite.sqlite"): + os.remove(f"{tmpdir}/test_sqlite.sqlite") + + # create a temporary sqlite connection + sql_path = f"sqlite:///{sql_filepath}" + + engine = create_engine(sql_path) + + # statements for creating database with simple structure + # intended to very roughly match existing cytomining + # community SQLite files. + create_stmts = [ + "drop table if exists Image;", + """ + create table Image ( + TableNumber INTEGER + ,ImageNumber INTEGER + ,ImageData INTEGER + ,RandomDate DATETIME + ); + """, + "drop table if exists Cells;", + """ + create table Cells ( + TableNumber INTEGER + ,ImageNumber INTEGER + ,ObjectNumber INTEGER + ,CellsData INTEGER + ); + """, + "drop table if exists Nuclei;", + """ + create table Nuclei ( + TableNumber INTEGER + ,ImageNumber INTEGER + ,ObjectNumber INTEGER + ,NucleiData INTEGER + ); + """, + "drop table if exists Cytoplasm;", + """ + create table Cytoplasm ( + TableNumber INTEGER + ,ImageNumber INTEGER + ,ObjectNumber INTEGER + ,Cytoplasm_Parent_Cells INTEGER + ,Cytoplasm_Parent_Nuclei INTEGER + ,CytoplasmData INTEGER + ); + """, + ] + + with engine.begin() as connection: + for stmt in create_stmts: + connection.execute(stmt) + + # images + connection.execute( + "INSERT INTO Image VALUES (?, ?, ?, ?);", + [1, 1, 1, "123-123"], + ) + connection.execute( + "INSERT INTO Image VALUES (?, ?, ?, ?);", + [2, 2, 2, "123-123"], + ) + + # cells + connection.execute( + "INSERT INTO Cells VALUES (?, ?, ?, ?);", + [1, 1, 2, 1], + ) + connection.execute( + "INSERT INTO Cells VALUES (?, ?, ?, ?);", + [2, 2, 3, 1], + ) + + # Nuclei + connection.execute( + "INSERT INTO Nuclei VALUES (?, ?, ?, ?);", + [1, 1, 4, 1], + ) + connection.execute( + "INSERT INTO Nuclei VALUES (?, ?, ?, ?);", + [2, 2, 5, 1], + ) + + # cytoplasm + connection.execute( + "INSERT INTO Cytoplasm VALUES (?, ?, ?, ?, ?, ?);", + [1, 1, 6, 2, 4, 1], + ) + connection.execute( + "INSERT INTO Cytoplasm VALUES (?, ?, ?, ?, ?, ?);", + [2, 2, 7, 3, 5, 1], + ) + + return str(engine.url) + + +@pytest.fixture +def database_distinct_join_chunks( + database_engine_for_conversion_testing, +) -> List[List[Dict]]: + """ + Fixture for database chunks + """ + + return sql_select_distinct_join_chunks.run( + sql_engine=database_engine_for_conversion_testing, + table_name="Image", + join_keys=["TableNumber", "ImageNumber"], + chunk_size=1, + ) + + +@pytest.fixture +def database_column_data( + database_engine_for_conversion_testing, +) -> List[List[Dict]]: + """ + Fixture for column data from database + """ + + return collect_columns(sql_engine=database_engine_for_conversion_testing) diff --git a/pycytominer/tests/test_cyto_utils/test_sqlite.py b/pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_clean.py similarity index 76% rename from pycytominer/tests/test_cyto_utils/test_sqlite.py rename to pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_clean.py index aa5fdafb..fc23c40c 100644 --- a/pycytominer/tests/test_cyto_utils/test_sqlite.py +++ b/pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_clean.py @@ -1,122 +1,18 @@ -""" Tests for pycytominer.cyto_utils.sqlite """ - -import tempfile +""" +Tests for: pycytominer.cyto_utils.sqlite.clean +""" import pytest -from sqlalchemy import create_engine -from sqlalchemy.engine.base import Engine -from sqlalchemy.exc import IntegrityError -from pycytominer.cyto_utils.sqlite import ( - LIKE_NULLS, +from pycytominer.cyto_utils.sqlite.clean import ( clean_like_nulls, - collect_columns, contains_conflicting_aff_storage_class, contains_str_like_null, - engine_from_str, update_columns_to_nullable, update_values_like_null_to_null, ) - - -@pytest.fixture -def database_engine_for_testing() -> Engine: - """ - A database engine for testing as a fixture to be passed - to other tests within this file. - """ - - # get temporary directory - tmpdir = tempfile.gettempdir() - - # create a temporary sqlite connection - sql_path = f"sqlite:///{tmpdir}/test_sqlite.sqlite" - engine = create_engine(sql_path) - - # statements for creating database with simple structure - create_stmts = [ - "drop table if exists tbl_a;", - """ - create table tbl_a ( - col_integer INTEGER NOT NULL - ,col_text TEXT - ,col_blob BLOB - ,col_real REAL - ); - """, - "drop table if exists tbl_b;", - """ - create table tbl_b ( - col_integer INTEGER - ,col_text TEXT - ,col_blob BLOB - ,col_real REAL - ); - """, - ] - - insert_vals = [1, "sample", b"sample_blob", 0.5] - - with engine.begin() as connection: - for stmt in create_stmts: - connection.execute(stmt) - - # insert statement with some simple values - # note: we use SQLAlchemy's parameters to insert data properly, esp. BLOB - connection.execute( - ( - "INSERT INTO tbl_a (col_integer, col_text, col_blob, col_real)" - "VALUES (?, ?, ?, ?);" - ), - insert_vals, - ) - connection.execute( - ( - "INSERT INTO tbl_b (col_integer, col_text, col_blob, col_real)" - "VALUES (?, ?, ?, ?);" - ), - insert_vals, - ) - - return engine - - -def test_engine_from_str(): - """ - Testing engine_from_str - """ - # test str functionality - engine = engine_from_str(":memory:") - assert isinstance(engine, Engine) - assert engine.execute("PRAGMA integrity_check").fetchone()[0] == "ok" - - # test sqlalchemy engine - engine = engine_from_str(create_engine("sqlite:///:memory:")) - assert isinstance(engine, Engine) - assert engine.execute("PRAGMA integrity_check").fetchone()[0] == "ok" - - -def test_collect_columns(database_engine_for_testing): - """ - Testing collect_columns - """ - - # test full database columns collected - assert len(collect_columns(database_engine_for_testing)) == 8 - - # test single database table collected - assert collect_columns(database_engine_for_testing, table_name="tbl_a") == [ - ("tbl_a", "col_integer", "INTEGER", 1), - ("tbl_a", "col_text", "TEXT", 0), - ("tbl_a", "col_blob", "BLOB", 0), - ("tbl_a", "col_real", "REAL", 0), - ] - - # test single column from single table collected - assert collect_columns( - database_engine_for_testing, - table_name="tbl_b", - column_name="col_integer", - ) == [("tbl_b", "col_integer", "INTEGER", 0)] +from pycytominer.cyto_utils.sqlite.meta import LIKE_NULLS +from sqlalchemy.engine.base import Engine +from sqlalchemy.exc import IntegrityError def test_contains_conflicting_aff_storage_class(database_engine_for_testing): diff --git a/pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_convert.py b/pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_convert.py new file mode 100644 index 00000000..85b1700d --- /dev/null +++ b/pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_convert.py @@ -0,0 +1,307 @@ +""" +Tests for: pycytominer.cyto_utils.sqlite.convert +""" + +import os +import tempfile + +import numpy as np +import pandas as pd +from prefect.executors import LocalExecutor +from prefect.storage import Local +from pycytominer.cyto_utils.sqlite.convert import ( + flow_convert_sqlite_to_parquet, + multi_to_single_parquet, + nan_data_fill, + sql_table_to_pd_dataframe, + table_concat_to_parquet, + to_unique_parquet, +) + + +def test_sql_select_distinct_join_chunks(database_distinct_join_chunks): + """ + Testing sql_select_distinct_join_chunks + """ + assert database_distinct_join_chunks == [ + [{"TableNumber": 1, "ImageNumber": 1}], + [{"TableNumber": 2, "ImageNumber": 2}], + ] + + +def test_sql_table_to_pd_dataframe( + database_engine_for_conversion_testing, + database_distinct_join_chunks, + database_column_data, +): + """ + Testing sql_table_to_pd_dataframe + """ + + # test image demo table + df = sql_table_to_pd_dataframe.run( + sql_engine=database_engine_for_conversion_testing, + table_name="Image", + prepend_tablename_to_cols=True, + avoid_prepend_for=["TableNumber", "ImageNumber"], + chunk_list_dicts=database_distinct_join_chunks[0], + column_data=database_column_data, + ) + + assert df.to_dict(orient="dict") == { + "TableNumber": {0: 1}, + "ImageNumber": {0: 1}, + "Image_ImageData": {0: 1}, + "Image_RandomDate": {0: "123-123"}, + } + + # test cytoplasm demo table + df = sql_table_to_pd_dataframe.run( + sql_engine=database_engine_for_conversion_testing, + table_name="Cytoplasm", + prepend_tablename_to_cols=True, + avoid_prepend_for=["TableNumber", "ImageNumber"], + chunk_list_dicts=database_distinct_join_chunks[0], + column_data=database_column_data, + ) + assert df.to_dict(orient="dict") == { + "TableNumber": {0: 1}, + "ImageNumber": {0: 1}, + "Cytoplasm_ObjectNumber": {0: 6}, + "Cytoplasm_Cytoplasm_Parent_Cells": {0: 2}, + "Cytoplasm_Cytoplasm_Parent_Nuclei": {0: 4}, + "Cytoplasm_CytoplasmData": {0: 1}, + } + + +def test_nan_data_fill(): + """ + Testing nan_data_fill + """ + + image_df = pd.DataFrame( + data={ + "TableNumber": {0: 1}, + "ImageNumber": {0: 1}, + "Image_ImageData": {0: 1}, + "Image_RandomDate": {0: "123-123"}, + } + ) + cyto_df = pd.DataFrame( + data={ + "TableNumber": {0: 1}, + "ImageNumber": {0: 1}, + "Cytoplasm_ObjectNumber": {0: 6}, + "Cytoplasm_Cytoplasm_Parent_Cells": {0: 2}, + "Cytoplasm_Cytoplasm_Parent_Nuclei": {0: 4}, + "Cytoplasm_CytoplasmData": {0: 1}, + } + ) + + # testing filling into image_df from cyto_df + pd.testing.assert_frame_equal( + left=nan_data_fill.run(fill_into=image_df, fill_from=cyto_df), + right=pd.DataFrame( + data={ + "TableNumber": {0: 1}, + "ImageNumber": {0: 1}, + "Image_ImageData": {0: 1}, + "Image_RandomDate": {0: "123-123"}, + "Cytoplasm_ObjectNumber": {0: np.nan}, + "Cytoplasm_Cytoplasm_Parent_Cells": {0: np.nan}, + "Cytoplasm_Cytoplasm_Parent_Nuclei": {0: np.nan}, + "Cytoplasm_CytoplasmData": {0: np.nan}, + } + ), + ) + + # testing filling into cyto_df from image_df + pd.testing.assert_frame_equal( + left=nan_data_fill.run(fill_into=cyto_df, fill_from=image_df), + right=pd.DataFrame( + data={ + "TableNumber": {0: 1}, + "ImageNumber": {0: 1}, + "Cytoplasm_ObjectNumber": {0: 6}, + "Cytoplasm_Cytoplasm_Parent_Cells": {0: 2}, + "Cytoplasm_Cytoplasm_Parent_Nuclei": {0: 4}, + "Cytoplasm_CytoplasmData": {0: 1}, + "Image_ImageData": {0: np.nan}, + "Image_RandomDate": {0: None}, + } + ), + ) + + +def test_table_concat_to_parquet( + database_engine_for_conversion_testing, + database_distinct_join_chunks, + database_column_data, +): + + # parquet column order isn't guaranteed + # so we store that here for comparisons below. + _test_cols_order = [ + "TableNumber", + "ImageNumber", + "Image_ImageData", + "Image_RandomDate", + "Cells_ObjectNumber", + "Cells_CellsData", + "Nuclei_ObjectNumber", + "Nuclei_NucleiData", + "Cytoplasm_ObjectNumber", + "Cytoplasm_Cytoplasm_Parent_Cells", + "Cytoplasm_Cytoplasm_Parent_Nuclei", + "Cytoplasm_CytoplasmData", + ] + # build a temporary dir for landing the chunked parquet file + with tempfile.TemporaryDirectory() as temp_dir: + chunk_file_path = table_concat_to_parquet.run( + sql_engine=database_engine_for_conversion_testing, + column_data=database_column_data, + prepend_tablename_to_cols=True, + avoid_prepend_for=["TableNumber", "ImageNumber"], + chunk_list_dicts=database_distinct_join_chunks[0], + filename=f"{temp_dir}/sqlite_convert_chunk_test", + ) + # compare two dictionaries of data based on + # parquet chunk output + pd.testing.assert_frame_equal( + # dataframe as dictionary data for assertion + pd.read_parquet(chunk_file_path)[_test_cols_order] + .sort_values(by=_test_cols_order) + .reset_index(drop=True) + .replace(np.nan, None), + # data should look like this + pd.DataFrame( + data={ + "Cells_CellsData": {0: np.nan, 1: 1.0, 2: np.nan, 3: np.nan}, + "Cells_ObjectNumber": {0: np.nan, 1: 2.0, 2: np.nan, 3: np.nan}, + "Cytoplasm_CytoplasmData": { + 0: np.nan, + 1: np.nan, + 2: np.nan, + 3: 1.0, + }, + "Cytoplasm_Cytoplasm_Parent_Cells": { + 0: np.nan, + 1: np.nan, + 2: np.nan, + 3: 2.0, + }, + "Cytoplasm_Cytoplasm_Parent_Nuclei": { + 0: np.nan, + 1: np.nan, + 2: np.nan, + 3: 4.0, + }, + "Cytoplasm_ObjectNumber": {0: np.nan, 1: np.nan, 2: np.nan, 3: 6.0}, + "ImageNumber": {0: 1, 1: 1, 2: 1, 3: 1}, + "Image_ImageData": {0: 1.0, 1: np.nan, 2: np.nan, 3: np.nan}, + "Image_RandomDate": {0: "123-123", 1: None, 2: None, 3: None}, + "Nuclei_NucleiData": {0: np.nan, 1: np.nan, 2: 1.0, 3: np.nan}, + "Nuclei_ObjectNumber": {0: np.nan, 1: np.nan, 2: 4.0, 3: np.nan}, + "TableNumber": {0: 1, 1: 1, 2: 1, 3: 1}, + } + )[_test_cols_order] + .sort_values(by=_test_cols_order) + .reset_index(drop=True) + .replace(np.nan, None), + ) + + +def test_to_unique_parquet(): + """ + Testing to_unique_parquet + """ + df = pd.DataFrame( + data={ + "TableNumber": {0: 1}, + "ImageNumber": {0: 1}, + "Image_ImageData": {0: 1}, + "Image_RandomDate": {0: "123-123"}, + } + ) + with tempfile.TemporaryDirectory() as temp_dir: + unique_file_path1 = to_unique_parquet.run( + df=df, filename=f"{temp_dir}/unique_pq_file_test1" + ) + # intentionally leave the same filename for testing + # uniqueness in filename + unique_file_path2 = to_unique_parquet.run( + df=df, filename=f"{temp_dir}/unique_pq_file_test1" + ) + + assert unique_file_path1 != unique_file_path2 + assert os.path.isfile(unique_file_path1) + + +def test_multi_to_single_parquet(): + """ + Test multi_to_single_parquet + """ + + df = pd.DataFrame( + data={ + "TableNumber": {0: 1}, + "ImageNumber": {0: 1}, + "Image_ImageData": {0: 1}, + "Image_RandomDate": {0: "123-123"}, + } + ) + with tempfile.TemporaryDirectory() as temp_dir: + pq_files = [ + to_unique_parquet.run(df=df, filename=f"{temp_dir}/unique_pq_file_test1"), + to_unique_parquet.run(df=df, filename=f"{temp_dir}/unique_pq_file_test1"), + ] + + multi_file_path = multi_to_single_parquet.run( + pq_files=pq_files, filename="multi_pq_file_test" + ) + + assert os.path.isfile(multi_file_path) + assert len(pd.read_parquet(multi_file_path)) == 2 + + +def test_flow_convert_sqlite_to_parquet(database_engine_for_conversion_testing): + """ + Tests flow_convert_sqlite_to_parquet + """ + + with tempfile.TemporaryDirectory() as temp_dir: + result_file_path = flow_convert_sqlite_to_parquet( + sql_engine=database_engine_for_conversion_testing, + flow_executor=LocalExecutor(), + flow_storage=Local(), + sql_tbl_basis="Image", + sql_join_keys=["TableNumber", "ImageNumber"], + sql_chunk_size=1, + pq_filename="combined_test", + ) + + df = pd.read_parquet(result_file_path) + assert os.path.isfile(result_file_path) + + # test overall row length + assert len(df) == 8 + # test that all columns exist as they need to + assert list(df.columns.sort_values()) == [ + "Cells_CellsData", + "Cells_ObjectNumber", + "Cytoplasm_CytoplasmData", + "Cytoplasm_Cytoplasm_Parent_Cells", + "Cytoplasm_Cytoplasm_Parent_Nuclei", + "Cytoplasm_ObjectNumber", + "ImageNumber", + "Image_ImageData", + "Image_RandomDate", + "Nuclei_NucleiData", + "Nuclei_ObjectNumber", + "TableNumber", + ] + # test that despite our row length we have + # correct number of unique join keys per what + # was contained within the test database + assert len(df["TableNumber"].unique()) == 2 + assert len(df["ImageNumber"].unique()) == 2 diff --git a/pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_meta.py b/pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_meta.py new file mode 100644 index 00000000..4e37a327 --- /dev/null +++ b/pycytominer/tests/test_cyto_utils/sqlite/test_sqlite_meta.py @@ -0,0 +1,46 @@ +""" +Tests for: pycytominer.cyto_utils.sqlite.meta +""" + +from pycytominer.cyto_utils.sqlite.meta import collect_columns, engine_from_str +from sqlalchemy import create_engine +from sqlalchemy.engine.base import Engine + + +def test_engine_from_str(): + """ + Testing engine_from_str + """ + # test str functionality + engine = engine_from_str(":memory:") + assert isinstance(engine, Engine) + assert engine.execute("PRAGMA integrity_check").fetchone()[0] == "ok" + + # test sqlalchemy engine + engine = engine_from_str(create_engine("sqlite:///:memory:")) + assert isinstance(engine, Engine) + assert engine.execute("PRAGMA integrity_check").fetchone()[0] == "ok" + + +def test_collect_columns(database_engine_for_testing): + """ + Testing collect_columns + """ + + # test full database columns collected + assert len(collect_columns(database_engine_for_testing)) == 8 + + # test single database table collected + assert collect_columns(database_engine_for_testing, table_name="tbl_a") == [ + ("tbl_a", "col_integer", "INTEGER", 1), + ("tbl_a", "col_text", "TEXT", 0), + ("tbl_a", "col_blob", "BLOB", 0), + ("tbl_a", "col_real", "REAL", 0), + ] + + # test single column from single table collected + assert collect_columns( + database_engine_for_testing, + table_name="tbl_b", + column_name="col_integer", + ) == [("tbl_b", "col_integer", "INTEGER", 0)] diff --git a/requirements.txt b/requirements.txt index b4fce603..88397e18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ numpy>=1.16.5 scipy>=1.3.0 pandas>=1.2.0 +prefect>=1.2.4 +pyarrow>=8.0.0 scikit-learn>=0.21.2 sqlalchemy>=1.3.6 pytest>=5.0.1