From 01c52476f1b20cf48cbbea36d327d82d48c296a4 Mon Sep 17 00:00:00 2001 From: Raj Sinha Date: Wed, 10 Jul 2024 15:26:35 +0000 Subject: [PATCH] Add the ability to use CSV files on GCS as data input/output/test sources. Include miscellaneous bugfixes. Update the README file. PiperOrigin-RevId: 651030738 --- CHANGELOG.md | 11 +- README.md | 96 +-- pyproject.toml | 2 + spade_anomaly_detection/__init__.py | 2 +- spade_anomaly_detection/csv_data_loader.py | 549 ++++++++++++++++++ .../csv_data_loader_test.py | 423 ++++++++++++++ spade_anomaly_detection/data_loader.py | 32 +- spade_anomaly_detection/data_loader_test.py | 3 + .../example_data/covertype_pnu_10000.csv | 1 + spade_anomaly_detection/occ_ensemble.py | 8 +- spade_anomaly_detection/parameters.py | 38 +- spade_anomaly_detection/parameters_test.py | 35 +- spade_anomaly_detection/performance_test.py | 180 +++++- spade_anomaly_detection/requirements.txt | 1 + spade_anomaly_detection/runner.py | 370 +++++++++--- spade_anomaly_detection/runner_test.py | 226 ++++++- .../scripts/run_cloud_spade_experiment.sh | 47 +- spade_anomaly_detection/task.py | 64 +- 18 files changed, 1894 insertions(+), 194 deletions(-) create mode 100644 spade_anomaly_detection/csv_data_loader.py create mode 100644 spade_anomaly_detection/csv_data_loader_test.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ab6a411..10236fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,11 +18,17 @@ To release a new version (e.g. from `1.0.0` -> `2.0.0`): * Define the new link url: `[2.0.0]: https://github.com/google-research/spade_anomaly_detection/compare/v1.0.0...v2.0.0` * Update the `[Unreleased]` url: `v1.0.0...HEAD` -> `v2.0.0...HEAD` - +* If updating the PyPi version, also update the `__version__` variable in the + `__init__.py` file at the root of the module. --> ## [Unreleased] +## [0.3.0] - 2024-07-10 + +* Add the ability to use CSV files on GCS as data input/output/test sources. +* Miscellaneous bugfixes. + ## [0.2.2] - 2024-05-19 * Update the OCC training to use negative and unlabeled samples for training. @@ -39,7 +45,8 @@ To release a new version (e.g. from `1.0.0` -> `2.0.0`): * Initial release -[Unreleased]: https://github.com/google-research/spade_anomaly_detection/compare/v0.2.2...HEAD +[Unreleased]: https://github.com/google-research/spade_anomaly_detection/compare/v0.3.0...HEAD +[0.3.0]: https://github.com/google-research/spade_anomaly_detection/compare/v0.2.2...v0.3.0 [0.2.2]: https://github.com/google-research/spade_anomaly_detection/compare/v0.2.1...v0.2.2 [0.2.1]: https://github.com/google-research/spade_anomaly_detection/compare/v0.2.0...v0.2.1 [0.2.0]: https://github.com/google-research/spade_anomaly_detection/compare/v0.1.0...v0.2.0 diff --git a/README.md b/README.md index a23cee7..5e9c9b9 100644 --- a/README.md +++ b/README.md @@ -64,9 +64,11 @@ The metric reported by the pipeline is model [AUC](https://developers.google.com **train_setting (string)**: The 'PNU' (Positive, Negative, and Unlabeled) setting will train the supervised model using ground truth negative and positive data, as well as pseudo-labeled samples from the pseudo-labeler. The 'PU' (Positive and Unlabeled) setting will only use negative data from the pseudo-labeler along with the rest of the positive data (ground truth plus pseudo labeled) to train the supervised model. For model evaluation, we still require ground truth negative data to be in the BigQuery dataset, it just won't be used during training. Default is PNU. -input_bigquery_table_path (string): A BigQuery table path in the format 'project.dataset.table'. If this is the only BigQuery path provided, this will be used in conjunction with the test_dataset_holdout_fraction parameter to create a train/test split. +input_bigquery_table_path (string): A BigQuery table path in the format 'project.dataset.table'. If this is the only BigQuery path provided, this will be used in conjunction with the test_dataset_holdout_fraction parameter to create a train/test split. Use exactly one of BigQuery locations or GCS locations. -output_gcs_uri (string): Cloud Storage location to store the supervised model assets. The location should be in the form gs://bucketname/foldername. A timestamp will be added to the end of the folder so that multiple runs of this won't overwrite previous runs. +input_gcs_uri (string): Cloud Storage location to store the input data. If this is the only Cloud Storage location provided, this will be used in conjunction with test_dataset_holdout_fraction parameter to create a train/test split. Use exactly one of BigQuery locations or GCS locations. + +output_gcs_uri (string): Cloud Storage location to store the supervised model assets. The location should be in the form gs://bucketname/foldername. A timestamp will be added to the end of the folder so that multiple runs of this won't overwrite previous runs. Supervised model assets are always stored to GCS. label_col_name (string): The name of the label column in the input BigQuery table. @@ -89,6 +91,14 @@ one class classifier ensemble to label a point as negative. The higher this valu test_dataset_holdout_fraction: This setting is used if test_bigquery_table_path is not provided. Float between 0 and 1 representing the fraction of samples to hold out as a test set. Default is 0.2, meaning 20% of the data is used for training. In the PU setting, this means that 20% of the positive labels and 100% of the negative data (Since we do not use any ground truth negative data for the supervised mode training) will be used for creating the test sets. For the PNU setting, it is just 20% of positive and negative samples, sampled uniformly at random, all other data would be used for training. +test_gcs_uri: Cloud Storage location to store the CSV data to be used for evaluating the supervised model. Note that the positive and negative label values must also be the same in this testing set. It is okay to have your test labels in that form, or use 1 for positive and 0 for negative. Use exactly one of BigQuery locations or GCS locations. + +upload_only: Use this setting in conjunction with `output_bigquery_table_path` or `data_output_gcs_uri`. When `True`, the algorithm will just upload the pseudo labeled data to the specified table, and will skip training a supervised model. When set to `False`, the algorithm will also train a supervised model and upload it to a GCS location. Default is `False`. + +output_bigquery_table_path: A complete BigQuery path in the form of 'project.dataset.table' to be used for uploading the pseudo labeled data. This includes features and new labels. By default, we will use the column names from the input_bigquery_table_path BigQuery table. Use exactly one of BigQuery locations or GCS locations. + +data_output_gcs_uri: Cloud Storage location used for uploading the pseudo labeled data as CSV. This includes features and new labels. By default, we will use the column names from the data_input_gcs_uri table. Use exactly one of BigQuery locations or GCS locations. + alpha (float): Sample weights for weighting the loss function, only for pseudo-labeled data from the occ ensemble. Original data that is labeled will have a weight of 1.0. By default, we use alpha = 1.0. ensemble_count: Integer representing the number of one class classifiers in the ensemble used for pseudo labeling unlabeled data points. The more models in the ensemble, the less likely it is for all the models to gain consensus, and thus will reduce the amount of labeled data points. By default, we use 5 one class classifiers. @@ -154,28 +164,37 @@ echo "Built and pushed ${IMAGE_URI_ML}" set -x PROJECT_ID=${1:-"project_id"} -#Args, maintain same order as runner.run and task.py. -TRAIN_SETTING=${15:-"PNU"} - -TRIAL_NAME="prod_spade_credit_${TRAIN_SETTING}_${USER}" +DATETIME=$(date '+%Y%m%d_%H%M%S') -INPUT_BIGQUERY_TABLE_PATH=${2:-"bq_table_path"} -OUTPUT_GCS_URI=${14:-"gs://[your-gcs-bucket]/models/model_experiment_$(date '+%Y%m%d_%H%M%S')"} -LABEL_COL_NAME=${3:-"y"} +# Give a unique name to your training job. +TRIAL_NAME="spade_${USER}_${DATETIME}" + +#Args +TRAIN_SETTING=${2:-"PNU"} + +# Use either Bigquery or GCS for input/output/test data. +INPUT_BIGQUERY_TABLE_PATH=${3:-"${PROJECT_ID}.[bq-dataset].[bq-input-table]"} +DATA_INPUT_GCS_URI=${4:-""} +OUTPUT_BIGQUERY_TABLE_PATH=${5:-"${PROJECT_ID}.[bq-dataset].[bq-output-table]"} +DATA_OUTPUT_GCS_URI=${6:-""} +OUTPUT_GCS_URI=${7:-"gs://[gcs-bucket]/[model-folder]"} +LABEL_COL_NAME=${8:-"y"} # The label column is of type float, these must match in order for array # filtering to work correctly. -POSITIVE_DATA_VALUE=${4:-"1"} -NEGATIVE_DATA_VALUE=${5:-"0"} -UNLABELED_DATA_VALUE=${6:-"-1"} -POSITIVE_THRESHOLD=${7:-".1"} -NEGATIVE_THRESHOLD=${8:-"95"} -TEST_BIGQUERY_TABLE_PATH=${16:-"table_path"} -TEST_LABEL_COL_NAME=${17:-"y"} -ALPHA=${10:-"1.0"} -ENSEMBLE_COUNT=${12:-"5"} -VERBOSE=${13:-"True"} - -PROD_IMAGE_URI="us-docker.pkg.dev/[project_id]/spade-anomaly-detection/spade:latest" +POSITIVE_DATA_VALUE=${9:-"1"} +NEGATIVE_DATA_VALUE=${10:-"0"} +UNLABELED_DATA_VALUE=${11:-"-1"} +POSITIVE_THRESHOLD=${12:-".1"} +NEGATIVE_THRESHOLD=${13:-"95"} +TEST_BIGQUERY_TABLE_PATH=${14:-"${PROJECT_ID}.[bq-dataset].[bq-test-table]"} +DATA_TEST_GCS_URI=${15:-""} +TEST_LABEL_COL_NAME=${16:-"y"} +ALPHA=${17:-"1.0"} +ENSEMBLE_COUNT=${19:-"5"} +VERBOSE=${22:-"True"} +UPLOAD_ONLY=${23:-"False"} + +IMAGE_URI="us-docker.pkg.dev/[project_id]/spade-anomaly-detection/spade:latest" REGION="us-central1" @@ -187,21 +206,26 @@ gcloud ai custom-jobs create \ --region="${REGION}" \ --project="${PROJECT_ID}" \ --display-name="${TRIAL_NAME}" \ - --worker-pool-spec="${WORKER_MACHINE}",replica-count=1,container-image-uri="${PROD_IMAGE_URI}" \ - --args=--train_setting="${TRAIN_SETTING}" \ - --args=--input_bigquery_table_path="${INPUT_BIGQUERY_TABLE_PATH}" \ - --args=--output_gcs_uri="${OUTPUT_GCS_URI}" \ - --args=--label_col_name="${LABEL_COL_NAME}" \ - --args=--positive_data_value="${POSITIVE_DATA_VALUE}" \ - --args=--negative_data_value="${NEGATIVE_DATA_VALUE}" \ - --args=--unlabeled_data_value="${UNLABELED_DATA_VALUE}" \ - --args=--positive_threshold="${POSITIVE_THRESHOLD}" \ - --args=--negative_threshold="${NEGATIVE_THRESHOLD}" \ - --args=--test_bigquery_table_path="${TEST_BIGQUERY_TABLE_PATH}" \ - --args=--test_label_col_name="${TEST_LABEL_COL_NAME}" \ - --args=--alpha="${ALPHA}" \ - --args=--ensemble_count="${ENSEMBLE_COUNT}" \ - --args=--verbose="${VERBOSE}" + --worker-pool-spec="${WORKER_MACHINE}",replica-count=1,container-image-uri="${IMAGE_URI}" \ + --args=--train_setting="${TRAIN_SETTING}" \ + --args=--input_bigquery_table_path="${INPUT_BIGQUERY_TABLE_PATH}" \ + --args=--data_input_gcs_uri="${DATA_INPUT_GCS_URI}" \ + --args=--output_bigquery_table_path="${OUTPUT_BIGQUERY_TABLE_PATH}" \ + --args=--data_output_gcs_uri="${DATA_OUTPUT_GCS_URI}" \ + --args=--output_gcs_uri="${OUTPUT_GCS_URI}" \ + --args=--label_col_name="${LABEL_COL_NAME}" \ + --args=--positive_data_value="${POSITIVE_DATA_VALUE}" \ + --args=--negative_data_value="${NEGATIVE_DATA_VALUE}" \ + --args=--unlabeled_data_value="${UNLABELED_DATA_VALUE}" \ + --args=--positive_threshold="${POSITIVE_THRESHOLD}" \ + --args=--negative_threshold="${NEGATIVE_THRESHOLD}" \ + --args=--test_bigquery_table_path="${TEST_BIGQUERY_TABLE_PATH}" \ + --args=--data_test_gcs_uri="${DATA_TEST_GCS_URI}" \ + --args=--test_label_col_name="${TEST_LABEL_COL_NAME}" \ + --args=--alpha="${ALPHA}" \ + --args=--ensemble_count="${ENSEMBLE_COUNT}" \ + --args=--upload_only="${UPLOAD_ONLY}" \ + --args=--verbose="${VERBOSE}" ~~~ ## Example Datasets and their Licenses diff --git a/pyproject.toml b/pyproject.toml index 605b686..5923319 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,8 @@ dependencies = [ "pyarrow==14.0.1", "retry==0.9.2", "scikit-learn==1.4.2", + "tensorflow", + "tensorflow-datasets==4.9.6", "parameterized==0.8.1", "pytest==7.1.2", "fastavro[codecs]==1.4.12", diff --git a/spade_anomaly_detection/__init__.py b/spade_anomaly_detection/__init__.py index 0cf4ec9..f9d8ac9 100644 --- a/spade_anomaly_detection/__init__.py +++ b/spade_anomaly_detection/__init__.py @@ -31,4 +31,4 @@ # A new PyPI release will be pushed every time `__version__` is increased. # When changing this, also update the CHANGELOG.md. -__version__ = '0.2.2' +__version__ = '0.3.0' diff --git a/spade_anomaly_detection/csv_data_loader.py b/spade_anomaly_detection/csv_data_loader.py new file mode 100644 index 0000000..bfee840 --- /dev/null +++ b/spade_anomaly_detection/csv_data_loader.py @@ -0,0 +1,549 @@ +# Copyright 2024 The spade_anomaly_detection Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding=utf-8 +# Copyright 2024 The SPADE Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implements a CSV data loader.""" + +import collections +import dataclasses +import os +from typing import Callable, Dict, Final, Mapping, Optional, Sequence, Tuple + +from absl import logging +from google.cloud import storage +import numpy as np +import pandas as pd +from spade_anomaly_detection import parameters +import tensorflow as tf + + +# Types are from spade_anomaly_detection/data_utils/feature_metadata.py +_FEATURES_TYPE: Final[str] = 'FLOAT64' +_LABEL_TYPE: Final[str] = 'INT64' + +# Setting the shuffle buffer size to 1M seems to be necessary to get the CSV +# reader to provide a diversity of data to the model. +_SHUFFLE_BUFFER_SIZE: Final[int] = 1_000_000 + + +def _get_header_from_input_file(inputs_file: str) -> str: + """Gets the header from a file of data inputs.""" + # Separate this logic so that it can be mocked easily in unit tests. + with tf.io.gfile.GFile(inputs_file, mode='r') as f: + header = f.readline() # Assume that first line is the header. + return header + + +def _list_files( + bucket_name: str, + input_blob_prefix: str, + input_blob_suffix: Optional[str] = None, +) -> Sequence[str]: + """Lists all files in `bucket_name` matching a prefix and an optional suffix. + + Args: + bucket_name: GCS bucket in which to list files. + input_blob_prefix: Prefix of files to list (inside `bucket_name`). + input_blob_suffix: Suffix of files to list (inside `bucket_name`). + + Returns: + Listed files, formatted as "gs://`bucket_name`/`blob.name`. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + + filenames = [] + blobs = bucket.list_blobs(prefix=input_blob_prefix) + for blob in blobs: + if blob.name.endswith('/'): + # Skip any folders. + continue + filenames.append('gs://' + bucket_name + '/' + blob.name) + if input_blob_suffix is not None: + filenames = [ + filename + for filename in filenames + if filename.endswith(input_blob_suffix) + ] + return filenames + + +def _parse_gcs_uri(gcs_uri: str) -> tuple[str, str]: + """Parses a GCS URI into bucket name, prefix and suffix. + + Args: + gcs_uri: GCS URI to parse. + + Returns: + Bucket name and prefix. + + Raises: + ValueError: If the GCS URI is not valid. + """ + gcs_uri_prefix = 'gs://' + if not gcs_uri.startswith(gcs_uri_prefix): + raise ValueError(f'GCS URI {gcs_uri} does not start with "gs://".') + # Paths must be to folders, not files. + gcs_uri = f'{gcs_uri}/' if not gcs_uri.endswith('/') else gcs_uri + gcs_uri = gcs_uri.removeprefix(gcs_uri_prefix) + bucket_name = gcs_uri.split('/')[0] + rest = gcs_uri.removeprefix(f'{bucket_name}/') + return bucket_name, rest + + +@dataclasses.dataclass +class ColumnNamesInfo: + """Information about the column names.""" + + header: str + label_column_name: str + column_names_dict: collections.OrderedDict[str, str] + num_features: int + + @classmethod + def from_inputs_file( + cls, + inputs_file: str, + label_column_name: str, + ) -> 'ColumnNamesInfo': + """Reads the column names information from one CSV file of input data. + + Inputs are in multiple CSV files on GCS. Get the column names from one of + those CSV files. The returned `ColumnNamesInfo` instance contains the + inputs CSV file header, the label column name and a dictionary of column + names to column types. + + Args: + inputs_file: Inputs file from which to read the column names. + label_column_name: The name of the label column. + + Returns: + An instance of ColumnNamesInfo containing the original header, the name of + the label column, a dictionary of column names to column types and the + number of features. + """ + header = _get_header_from_input_file(inputs_file=inputs_file) + header = header.replace('\n', '') + all_columns = header.split(',') + if label_column_name not in all_columns: + raise ValueError( + f'Label column {label_column_name} not found in the header: {header}' + ) + num_features = len(all_columns) - 1 + features_types = [_FEATURES_TYPE] * len(all_columns) + column_names_dict = collections.OrderedDict( + zip(all_columns, features_types) + ) + column_names_dict[label_column_name] = _LABEL_TYPE + return ColumnNamesInfo( + column_names_dict=column_names_dict, + header=header, + label_column_name=label_column_name, + num_features=num_features, + ) + + +@dataclasses.dataclass +class InputFilesMetadata: + """Metadata about the set of CSV files containing the input data. + + Attributes: + location_prefix: Prefix of location pattern on GCS where the CSV files + containing input data are located. All CSV files recursively matching this + patten will be selected. + files: Names for each of the found CSV files. + column_names_info: Instance of ColumnNamesInfo Dataclass. + """ + + location_prefix: str + files: Sequence[str] + column_names_info: ColumnNamesInfo + + +class CsvDataLoader: + """Contains methods for interacting with CSV files using RunnerParameters.""" + + def __init__(self, runner_parameters: parameters.RunnerParameters): + self.runner_parameters = runner_parameters + if self.runner_parameters.data_input_gcs_uri is None: + raise ValueError( + 'Data input GCS URI is not set in the runner parameters. Please set ' + 'the `data_input_gcs_uri` field in the runner parameters.' + ) + self.all_labels = [ + self.runner_parameters.positive_data_value, + self.runner_parameters.negative_data_value, + self.runner_parameters.unlabeled_data_value, + ] + self._label_counts = None + self._last_read_metadata = None + + @property + def label_counts(self) -> Dict[int | str, int]: + """Returns the label counts.""" + if not self._label_counts: + raise ValueError( + 'Label counts have not been computed yet, ensure that you have made ' + 'a call to load_tf_dataset_from_csv() before this property is called.' + ) + return self._label_counts + + def get_inputs_metadata( + self, + bucket_name: str, + location_prefix: str, + label_column_name: str, + ) -> 'InputFilesMetadata': + """Gets information about the CSVs containing the input data. + + Args: + bucket_name: Name of the GCS bucket where the CSV files are located. + location_prefix: The prefix of location of the CSV files, excluding any + trailing unique identifiers. + label_column_name: The name of the label column. + + Returns: + Return a InputFilesMetadata instance. + """ + # Get the names of the CSV files containing the input data. + csv_filenames = _list_files( + bucket_name=bucket_name, input_blob_prefix=location_prefix + ) + logging.info( + 'Collecting metadata for %d files at %s', + len(csv_filenames), + location_prefix, + ) + # Get information about the columns. + column_names_info = ColumnNamesInfo.from_inputs_file( + csv_filenames[0], label_column_name + ) + logging.info( + 'Obtained metadata for data with CSV prefix %s (number of features=%d)', + location_prefix, + column_names_info.num_features, + ) + return InputFilesMetadata( + location_prefix=location_prefix, + files=csv_filenames, + column_names_info=column_names_info, + ) + + def _get_filter_by_label_value_func( + self, + label_column_filter_value: int | list[int] | None, + exclude_label_value: bool = False, + ) -> Callable[[tf.Tensor, tf.Tensor], bool]: + """Returns a function that filters a record based on the label column value. + + Args: + label_column_filter_value: The value of the label column to use as a + filter. If None, all records are included. + exclude_label_value: If True, exclude records with the label column value. + If False, include records with the label column value. + + Returns: + A function that returns True if the label column value is equal to the + label_column_filter_value parameter(s). If exclude_label_value is True, + the function returns True if the label column value is not equal to the + label_column_filter_value parameter(s). + """ + + def filter_func(features: tf.Tensor, label: tf.Tensor) -> bool: # pylint: disable=unused-argument + if not label_column_filter_value: + return True + label_cast = tf.cast(label[0], tf.dtypes.as_dtype(_LABEL_TYPE.lower())) + label_column_filter_value_cast = tf.cast( + label_column_filter_value, label_cast.dtype + ) + broadcast_equal = tf.equal(label_column_filter_value_cast, label_cast) + broadcast_all = tf.reduce_any(broadcast_equal) + if exclude_label_value: + return tf.logical_not(broadcast_all) + return broadcast_all + + return filter_func + + def load_tf_dataset_from_csv( + self, + input_path: str, + label_col_name: str, + batch_size: Optional[int] = None, + label_column_filter_value: Optional[int | list[int]] = None, + exclude_label_value: bool = False, + ) -> tf.data.Dataset: + """Convert multiple CSV files to a tf.data.Dataset. + + Multiple CSV files are read from a specified location. They are streamed + using a tf.data.Dataset. + + Args: + input_path: The path to the CSV files. + label_col_name: The name of the label column. + batch_size: The batch size to use for the dataset. If None, the batch size + will be set to 1. + label_column_filter_value: The value of the label column to use as a + filter. If None, all records are included. + exclude_label_value: If True, exclude records with the label column value. + If False, include records with the label column value. + + Returns: + A tf.data.Dataset. + """ + bucket, prefix = _parse_gcs_uri(input_path) + # Since we are reading a new set of CSV files, we need to get the metadata + # again. + self._last_read_metadata = self.get_inputs_metadata( + bucket_name=bucket, + location_prefix=prefix, + label_column_name=label_col_name, + ) + logging.info('Last read metadata: %s', self._last_read_metadata) + # Get the names of the CSV files containing the data and other metadata + filenames = self._last_read_metadata.files + logging.info('Found %d CSV files.', len(filenames)) + + column_names = list( + self._last_read_metadata.column_names_info.column_names_dict.keys() + ) + # Setting dtypes in the column defaults makes the columns required. + # TODO(sinharaj): Add support for optional columns. + column_defaults = [ + d.lower() + for d in list( + self._last_read_metadata.column_names_info.column_names_dict.values() + ) + ] + + # Construct a single dataset out of multiple CSV files. + # TODO(sinharaj): Remove the determinism after testing. + dataset = tf.data.experimental.make_csv_dataset( + filenames, + batch_size=1, # Initial Dataset is created with one sample at a time. + column_names=column_names, + column_defaults=column_defaults, + label_name=label_col_name, + select_columns=None, + field_delim=',', + use_quote_delim=True, + na_value='', + header=True, + num_epochs=1, + shuffle=True, + shuffle_buffer_size=_SHUFFLE_BUFFER_SIZE, + shuffle_seed=self.runner_parameters.random_seed, + prefetch_buffer_size=tf.data.AUTOTUNE, + num_parallel_reads=tf.data.AUTOTUNE, + sloppy=False, # Set to True for non-deterministic ordering for speed. + num_rows_for_inference=100, + compression_type=None, + ignore_errors=False, + encoding='utf-8', + ) + if not dataset: + raise ValueError( + f'Dataset with prefix {self._last_read_metadata.location_prefix} not ' + 'created.' + ) + ds_filter_func = self._get_filter_by_label_value_func( + label_column_filter_value=label_column_filter_value, + exclude_label_value=exclude_label_value, + ) + dataset = dataset.filter(ds_filter_func) + + # The Dataset returns features as a dict. Combine the features into a single + # tensor. Also cast the features and labels to correct types. + def combine_features_dict_into_tensor( + features: Mapping[str, tf.Tensor], + label: tf.Tensor, + ) -> Tuple[tf.Tensor, tf.Tensor]: + feature_matrix = tf.squeeze(tf.stack(list(features.values()), axis=1)) + feature_matrix = tf.reshape(feature_matrix, (-1,)) + feature_matrix = tf.cast( + feature_matrix, + tf.dtypes.as_dtype(_FEATURES_TYPE.lower()), + name='features', + ) + label = tf.squeeze(label) + label = tf.reshape(label, (-1,)) + label = tf.cast( + label, tf.dtypes.as_dtype(_LABEL_TYPE.lower()), name='label' + ) + return feature_matrix, label + + dataset = dataset.map( + combine_features_dict_into_tensor, + num_parallel_calls=tf.data.AUTOTUNE, + deterministic=True, + ) + + dataset = dataset.repeat(1) # One repeat of the dataset during creation. + if batch_size: + dataset = dataset.batch(batch_size, deterministic=True) + dataset = dataset.prefetch(tf.data.AUTOTUNE) + + # This Dataset was just created. Calculate the label distribution. + self._label_counts = self.counts_by_label(dataset) + logging.info('Label counts: %s', self._label_counts) + + return dataset + + def counts_by_label( + self, + dataset: tf.data.Dataset, + ) -> Dict[int | str, int]: + """Counts the number of samples in each label class in the dataset.""" + + @tf.function + def count_class( + counts: Dict[int, tf.Tensor], + batch: Tuple[tf.Tensor, tf.Tensor], + ) -> Dict[int, tf.Tensor]: + _, labels = batch + new_counts = counts.copy() + for i in self.all_labels: + cc = tf.cast(labels == i, tf.int32) + if i in list(new_counts.keys()): + new_counts[i] += tf.reduce_sum(cc) + else: + new_counts[i] = tf.reduce_sum(cc) + return new_counts + + initial_state = dict((i, 0) for i in self.all_labels) + counts = dataset.reduce( + initial_state=initial_state, reduce_func=count_class + ) + return counts + + def get_label_thresholds(self) -> Mapping[str, float]: + """Computes positive and negative thresholds based on label ratios. + + This method is useful for setting percentile thresholds when performing + inference on the OCC ensemble. Feature vectors can be labeled as normal + or anomalous depending on these values. This method requires that the label + column, and positive and negative values are set in RunnerParameters. + + Args: None. + + Returns: + A dictionary containing 'positive_threshold' and 'negative_threshold'. + + Raises: + ValueError: If the label counts have not been computed yet. + """ + if not self._label_counts: + raise ValueError( + 'Label counts have not been computed yet, ' + 'ensure that you have made a call to ' + 'load_tf_dataset_from_csv() before this method ' + 'is called.' + ) + + positive_count = self._label_counts[ + self.runner_parameters.positive_data_value + ] + + labeled_data_record_count = ( + self._label_counts[self.runner_parameters.positive_data_value] + + self._label_counts[self.runner_parameters.negative_data_value] + ) + + positive_threshold = 100 * (positive_count / labeled_data_record_count) + label_thresholds = { + 'positive_threshold': positive_threshold, + 'negative_threshold': 100 - positive_threshold, + } + + if self.runner_parameters.verbose: + logging.info('Computed label thresholds: %s', label_thresholds) + + return label_thresholds + + def upload_dataframe_to_gcs( + self, + batch: int, + features: np.ndarray, + labels: np.ndarray, + ) -> None: + """Uploads the dataframe to BigQuery, create or replace table. + + Args: + batch: The batch number of the pseudo-labeled data. + features: Numpy array of features. + labels: Numpy array of labels. + + Returns: + None. + + Raises: + ValueError: If the metadata has not been read yet or if the data output + GCS URI is not set in the runner parameters. + """ + if not self._last_read_metadata: + raise ValueError( + 'No metadata has been read yet, ensure that you have made a call to ' + 'load_tf_dataset_from_csv(), trained the model and performed ' + ' pseudo-labeling before this method is called.' + ) + if not self.runner_parameters.data_output_gcs_uri: + raise ValueError( + 'Data output GCS URI is not set in the runner parameters. Please set ' + 'the `data_output_gcs_uri` field in the runner parameters.' + ) + combined_data = np.concatenate( + [features, labels.reshape(len(features), 1)], axis=1 + ) + + column_names = list( + self._last_read_metadata.column_names_info.column_names_dict.keys() + ) + # Make sure the label column is the last column. + # TODO(b/347332980): Add support for the pseudolabel flag. + column_names.remove(self.runner_parameters.label_col_name) + column_names.append(self.runner_parameters.label_col_name) + + complete_dataframe = pd.DataFrame(data=combined_data, columns=column_names) + + # Adjust label column type so that users can go straight to BigQuery or + # AutoML without having to adjust data. Both of these products require a + # boolean or string target column, not integer. + complete_dataframe[self.runner_parameters.label_col_name] = ( + complete_dataframe[self.runner_parameters.label_col_name].astype('bool') + ) + + output_path = os.path.join( + self.runner_parameters.data_output_gcs_uri, + f'pseudo_labeled_batch_{batch}.csv', + ) + with tf.io.gfile.GFile( + output_path, + 'w', + ) as f: + complete_dataframe.to_csv(f, index=False, header=True) + if self.runner_parameters.verbose: + logging.info('Uploaded pseudo-labeled data to %s', output_path) diff --git a/spade_anomaly_detection/csv_data_loader_test.py b/spade_anomaly_detection/csv_data_loader_test.py new file mode 100644 index 0000000..e2b4a93 --- /dev/null +++ b/spade_anomaly_detection/csv_data_loader_test.py @@ -0,0 +1,423 @@ +# Copyright 2024 The spade_anomaly_detection Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# coding=utf-8 +# Copyright 2024 The SPADE Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the CSV data loader.""" + +import collections +import dataclasses +import os +from typing import Set, Tuple +from unittest import mock + +from absl.testing import parameterized +import numpy as np +import pandas as pd +from spade_anomaly_detection import csv_data_loader +from spade_anomaly_detection import parameters +import tensorflow as tf + +import tensorflow_datasets as tfds + +import pytest + +# Test Config. +_NUMBER_OF_DECIMALS_TO_ROUND = 2 + + +def _dataset_to_set_of_nested_tuples( + ds: tf.data.Dataset, +) -> Set[Tuple[Tuple[float, ...], Tuple[float]]]: # pylint: disable=g-one-element-tuple + """Helper to convert a dataset to a tuple of tuples of tuples for tests.""" + ds_list = list(ds.as_numpy_iterator()) + new_ds_list = [] + for elem in ds_list: + # Convert the first element to a tuple, which is the features. + if elem[0].ndim == 1: + new_elem_0 = tuple( + np.round(elem[0], decimals=_NUMBER_OF_DECIMALS_TO_ROUND).tolist() + ) + else: + new_elem_0 = tuple( + np.round(elem[0], decimals=_NUMBER_OF_DECIMALS_TO_ROUND) + .reshape((-1,)) + .tolist() + ) + # Convert second element to a tuple, which is the label. + if elem[1].ndim == 1: + new_elem_1 = tuple(elem[1].tolist()) + else: + new_elem_1 = tuple(elem[1].reshape((-1,)).tolist()) + new_elem = (new_elem_0, new_elem_1) + new_ds_list.append(new_elem) + return set(new_ds_list) + + +@dataclasses.dataclass(frozen=True) +class FakeBlob: + """Represents a fake GCS blob to be returned by bucket.list_blobs. + + Attributes: + name: Name of the blob. + contents: Contents to be returned when download_as_string is called. + """ + + name: str + contents: str + + def download_as_string(self): + return self.contents + + +class CsvDataUtilsTest(tf.test.TestCase, parameterized.TestCase): + + def setUp(self): + super().setUp() + self.storage_client_mock = mock.MagicMock() + self.bucket_mock = mock.MagicMock() + self.storage_client_mock.return_value.bucket.return_value = self.bucket_mock + + self.header = ["x1", "x2", "y"] + self.data1 = [[0.6, 0.2, -1], [0.1, 0.8, 0], [0.6, 0.9, 1]] + self.data1_df = pd.DataFrame(data=self.data1, columns=self.header) + self.csv_file1 = "/dir1/data1.csv" + self.csv_file1_content = self.data1_df.to_csv(header=True, index=False) + + # Params to test: gcs_uri. + @parameterized.named_parameters( + ("single_file", "gs://bucket/dir/file.csv", "bucket", "dir/file.csv/"), + ("folder_no_slash", "gs://bucket/dir", "bucket", "dir/"), + ("folder_with_slash", "gs://bucket/dir/", "bucket", "dir/"), + ) + def test_parse_gcs_uri_returns_bucket_name_and_prefix( + self, gcs_uri, expected_bucket, expected_prefix + ): + bucket_name, prefix = csv_data_loader._parse_gcs_uri(gcs_uri=gcs_uri) + self.assertEqual(bucket_name, expected_bucket) + self.assertEqual(prefix, expected_prefix) + + def test_parse_gcs_uri_incorrect_uri_raises(self): + gcs_uri = "bucket/dir/" + with self.assertRaises(ValueError): + _, _ = csv_data_loader._parse_gcs_uri(gcs_uri=gcs_uri) + + def test_list_files_returns_listed_files(self): + self.bucket_mock.list_blobs.return_value = [ + FakeBlob(name="dir/", contents=""), + FakeBlob( + name="dir/file1.csv", contents="x1,x2,y\n0.1,0.2,1\n0.2,0.3,0" + ), + FakeBlob( + name="dir/file2.csv", contents="x1,x2,y\n0.4,0.6,1\n0.5,0.5,0" + ), + FakeBlob(name="dir/file2.txt", contents="doesn't matter"), + ] + with mock.patch("google.cloud.storage.Client", self.storage_client_mock): + all_files = csv_data_loader._list_files( + bucket_name="bucket", input_blob_prefix="dir/file" + ) + expected_files = [ + "gs://bucket/dir/file1.csv", + "gs://bucket/dir/file2.csv", + "gs://bucket/dir/file2.txt", + ] + + self.assertCountEqual(all_files, expected_files) + + def test_get_header_from_input_file_returns_header(self): + with tfds.testing.MockFs() as fs: + fs.add_file(f"{self.csv_file1}", self.csv_file1_content) + header = csv_data_loader._get_header_from_input_file(self.csv_file1) + expected_header = "x1,x2,y\n" + self.assertEqual(header, expected_header) + + def test_column_names_info_from_inputs_file_returns_column_names_info(self): + with tfds.testing.MockFs() as fs: + fs.add_file(f"{self.csv_file1}", self.csv_file1_content) + column_names_info = csv_data_loader.ColumnNamesInfo.from_inputs_file( + inputs_file=self.csv_file1, label_column_name="y" + ) + expected_column_names_info = csv_data_loader.ColumnNamesInfo( + header="x1,x2,y", + label_column_name="y", + column_names_dict=collections.OrderedDict( + [("x1", "FLOAT64"), ("x2", "FLOAT64"), ("y", "INT64")] + ), + num_features=2, + ) + self.assertEqual(column_names_info, expected_column_names_info) + + +class CsvDataLoaderTest(tf.test.TestCase, parameterized.TestCase): + + def setUp(self): + super().setUp() + self.header = ["x1", "x2", "y"] + self.dir = "dir1/" + self.data1 = [[0.6, 0.2, -1], [0.1, 0.8, 0], [0.6, 0.9, 1]] + self.data1_df = pd.DataFrame(data=self.data1, columns=self.header) + self.csv_file1 = f"{self.dir}data1.csv" + self.csv_file1_content = self.data1_df.to_csv(header=True, index=False) + self.data2 = [[0.6, 0.7, 1], [0.6, 0.5, 0], [0.6, 0.9, 1], [0.6, 0.2, 1]] + self.data2_df = pd.DataFrame(data=self.data2, columns=self.header) + self.csv_file2 = f"{self.dir}data2.csv" + self.csv_file2_content = self.data2_df.to_csv(header=True, index=False) + self.data_df = pd.concat([self.data1_df, self.data2_df]) + self.data_df = self.data_df.astype({"y": "bool"}) + + @parameterized.named_parameters( + dict( + testcase_name="no_label_value_no_exclude", + label_column_filter_value=None, + exclude_label_value=False, + inputs=[([0.6, 0.2], [-1]), ([0.1, 0.8], [0]), ([0.6, 0.9], [1])], + expected=[True, True, True], + ), + dict( + testcase_name="positive_label_value_no_exclude", + label_column_filter_value=1, + exclude_label_value=False, + inputs=[([0.6, 0.2], [-1]), ([0.1, 0.8], [0]), ([0.6, 0.9], [1])], + expected=[False, False, True], + ), + dict( + testcase_name="positive_label_value_exclude", + label_column_filter_value=1, + exclude_label_value=True, + inputs=[([0.6, 0.2], [-1]), ([0.1, 0.8], [0]), ([0.6, 0.9], [1])], + expected=[True, True, False], + ), + dict( + testcase_name="pos_and_0_label_value_no_exclude", + label_column_filter_value=[0, 1], + exclude_label_value=False, + inputs=[([0.6, 0.2], [-1]), ([0.1, 0.8], [0]), ([0.6, 0.9], [1])], + expected=[False, True, True], + ), + ) + def test_get_filter_by_label_value_func( + self, label_column_filter_value, exclude_label_value, inputs, expected + ): + runner_parameters = parameters.RunnerParameters( + train_setting="PNU", + input_bigquery_table_path=None, + data_input_gcs_uri="gs://bucket/input_path", + output_gcs_uri="gs://bucket/output_path", + label_col_name="y", + positive_data_value=1, + negative_data_value=0, + unlabeled_data_value=-1, + positive_threshold=5, + negative_threshold=95, + ) + instance = csv_data_loader.CsvDataLoader( + runner_parameters=runner_parameters + ) + filter_func = instance._get_filter_by_label_value_func( + label_column_filter_value=label_column_filter_value, + exclude_label_value=exclude_label_value, + ) + for i, ((input_f, input_l), keep) in enumerate(zip(inputs, expected)): + with self.subTest(msg=f"input_{i}"): + got = filter_func(input_f, input_l) + self.assertEqual(keep, got) + + # Test the creation of a Dataset from CSV files. Only tests batch_size=1. + @pytest.mark.skip(reason="create_tempdir is broken in pytest") + @mock.patch.object(csv_data_loader, "_list_files", autospec=True) + @mock.patch.object(csv_data_loader, "_parse_gcs_uri", autospec=True) + @mock.patch.object(tf.io.gfile.GFile, "readline", autospec=True) + def test_load_tf_dataset_from_csv_returns_expected_dataset( + self, mock_readline, mock_parse_gcs_uri, mock_file_reader + ): + mock_readline.return_value = ",".join(self.header) + tmp_dir = self.create_tempdir("tmp") + input_path = os.path.join(tmp_dir.full_path, self.dir) + tf.io.gfile.makedirs(input_path) + mock_parse_gcs_uri.return_value = ("doesnt_matter", input_path) + mock_file_reader.return_value = [ + os.path.join(tmp_dir.full_path, self.csv_file1), + os.path.join(tmp_dir.full_path, self.csv_file2), + ] + # Write the test CSV files to temporary files. These CSV files will be + # re-read when the Dataset is created. Their metadata will also be recorded + # in the InputFilesMetadata object. + self.data1_df.to_csv( + os.path.join(tmp_dir.full_path, self.csv_file1), + header=True, + index=False, + ) + self.data2_df.to_csv( + os.path.join(tmp_dir.full_path, self.csv_file2), + header=True, + index=False, + ) + runner_parameters = parameters.RunnerParameters( + train_setting="PNU", + input_bigquery_table_path=None, + data_input_gcs_uri=input_path, + output_gcs_uri=f"{input_path}/output", + label_col_name="y", + positive_data_value=1, + negative_data_value=0, + unlabeled_data_value=-1, + positive_threshold=5, + negative_threshold=95, + verbose=True, + ) + + data_loader = csv_data_loader.CsvDataLoader( + runner_parameters=runner_parameters + ) + dataset = data_loader.load_tf_dataset_from_csv( + input_path=runner_parameters.data_input_gcs_uri, + label_col_name=runner_parameters.label_col_name, + batch_size=1, + label_column_filter_value=None, + exclude_label_value=False, + ) + + expected_dataset = tf.data.Dataset.from_tensor_slices(( + tf.constant( + [ + [0.6, 0.2], + [0.1, 0.8], + [0.6, 0.9], + [0.6, 0.7], + [0.6, 0.5], + [0.6, 0.9], + [0.6, 0.2], + ], + dtype=tf.float64, + ), + tf.constant([[-1], [0], [1], [1], [0], [1], [1]], dtype=tf.float64), + )) + + expected_element_spec = ( + tf.TensorSpec(shape=(None, None), dtype=tf.float64), # features + tf.TensorSpec(shape=(None, None), dtype=tf.int64), # label + ) + with self.subTest(msg="check_spec_equal"): + self.assertTupleEqual(dataset.element_spec, expected_element_spec) + + expected_counts = { + -1: 1, + 0: 2, + 1: 4, + } + with self.subTest(msg="check_dataset_class_counts"): + counts = data_loader.counts_by_label(dataset) + self.assertDictEqual(counts, expected_counts) + + expected_label_thresholds = { + "positive_threshold": 66.6666, + "negative_threshold": 33.3333, + } + with self.subTest(msg="check_dataset_label_thresholds"): + got_label_thresholds = data_loader.get_label_thresholds() + self.assertDictEqual(got_label_thresholds, expected_label_thresholds) + + with self.subTest(msg="check_datasets_have_same_elements"): + self.assertSetEqual( + _dataset_to_set_of_nested_tuples(dataset), + _dataset_to_set_of_nested_tuples(expected_dataset), + ) + + @pytest.mark.skip(reason="create_tempdir is broken in pytest") + def test_upload_dataframe_to_gcs(self): + tmp_dir = self.create_tempdir("tmp") + output_dir = os.path.join(tmp_dir.full_path, self.dir, "output_path") + runner_parameters = parameters.RunnerParameters( + train_setting="PNU", + input_bigquery_table_path=None, + data_input_gcs_uri="gs://bucket/input_path", + output_gcs_uri="gs://bucket/model_path", + label_col_name="y", + positive_data_value=1, + negative_data_value=0, + unlabeled_data_value=-1, + positive_threshold=5, + negative_threshold=95, + data_output_gcs_uri=output_dir, + ) + data_loader = csv_data_loader.CsvDataLoader( + runner_parameters=runner_parameters + ) + tf.io.gfile.makedirs(output_dir) + col_names_info = csv_data_loader.ColumnNamesInfo( + header="x1,x2,y", + label_column_name="y", + column_names_dict=collections.OrderedDict( + [("x1", "FLOAT64"), ("x2", "FLOAT64"), ("y", "INT64")] + ), + num_features=2, + ) + data_loader._last_read_metadata = csv_data_loader.InputFilesMetadata( + column_names_info=col_names_info, + location_prefix="doesnt_matter_for_uploader", + files=["doesnt_matter_for_uploader"], + ) + all_features = self.data_df[["x1", "x2"]].to_numpy() + all_labels = self.data_df["y"].to_numpy() + # Create 2 batches of features and labels. + # TODO(b/347332980): Update test when pseudolabel flag is added. + features1 = all_features[0:2] + labels1 = all_labels[0:2] + features2 = all_features[2:] + labels2 = all_labels[2:] + # Upload batch 1. + data_loader.upload_dataframe_to_gcs( + batch=1, + features=features1, + labels=labels1, + ) + # Upload batch 2. + data_loader.upload_dataframe_to_gcs( + batch=2, + features=features2, + labels=labels2, + ) + # Sorting means batch 1 file will be first. + files_list = sorted(tf.io.gfile.listdir(output_dir)) + self.assertLen(files_list, 2) + expected_dfs = [ + self.data_df.iloc[0:2].reset_index(drop=True), + self.data_df.iloc[2:].reset_index(drop=True), + ] + for i, file_name in enumerate(files_list): + with self.subTest(msg=f"file_{i}"): + file_path = os.path.join(output_dir, file_name) + with tf.io.gfile.GFile(file_path, "r") as f: + got_df = pd.read_csv(f, header=0) + pd.testing.assert_frame_equal( + got_df, expected_dfs[i], check_exact=False, check_like=True + ) + + +if __name__ == "__main__": + tf.test.main() diff --git a/spade_anomaly_detection/data_loader.py b/spade_anomaly_detection/data_loader.py index 46fa1be..68299a5 100644 --- a/spade_anomaly_detection/data_loader.py +++ b/spade_anomaly_detection/data_loader.py @@ -38,7 +38,7 @@ import os import pathlib # TODO(b/247116870): Change to collections when Vertex supports python 3.9 -from typing import Any, List, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Final, List, Mapping, Optional, Sequence, Tuple, Union from absl import logging from google.cloud import bigquery @@ -46,21 +46,21 @@ from google.cloud.storage import transfer_manager import numpy as np import pandas as pd - from spade_anomaly_detection import parameters from spade_anomaly_detection.data_utils import bq_dataset from spade_anomaly_detection.data_utils import bq_utils from spade_anomaly_detection.data_utils import feature_metadata - import tensorflow as tf -_DATA_ROOT = 'spade_anomaly_detection/example_data/' +_DATA_ROOT: Final[str] = 'spade_anomaly_detection/example_data/' def load_dataframe( dataset_name: str, label_col_index: int = -1, - filter_label_value: Optional[Union[str, int]] = None, + filter_label_value: Optional[Union[str, int, list[str], list[int]]] = None, + index_col: Optional[int] = 0, + skiprows: Optional[int] = 1, ) -> Sequence[Union[pd.DataFrame, pd.Series]]: """Loads csv data located in the ./example_data directory for unit tests. @@ -70,11 +70,16 @@ def load_dataframe( filter_label_value: Value to filter the label column on. Could be a string or an integer. If there is no value specified, no filtering will be performed. + index_col: Column to use for the index. If None, the index will be the + default index. + skiprows: Number of rows to skip when reading the CSV file. Returns: A tuple (features, labels), both are dataframes corresponding to the features and labels of the dataset respectively. """ + if isinstance(filter_label_value, (str, int)): + filter_label_value = [filter_label_value] file_path = os.path.join(_DATA_ROOT, f'{dataset_name}.csv') @@ -87,7 +92,9 @@ def load_dataframe( ]: raise ValueError(f'Unknown dataset_name: {dataset_name}') - dataframe = pd.read_csv(file_path, delimiter=',', skiprows=1, index_col=0) + dataframe = pd.read_csv( + file_path, delimiter=',', skiprows=skiprows, index_col=index_col + ) if len(dataframe.shape) != 2: raise ValueError( @@ -102,7 +109,7 @@ def load_dataframe( if filter_label_value is not None: dataframe = dataframe[ - dataframe.iloc[:, label_col_index] == filter_label_value + dataframe.iloc[:, label_col_index].isin(filter_label_value) ] features = dataframe.drop(dataframe.columns[label_col_index], axis=1) @@ -116,6 +123,8 @@ def load_tf_dataset_from_csv( label_col_index: int = -1, batch_size: Optional[int] = None, filter_label_value: Optional[Any] = None, + index_col: Optional[int] = 0, + skiprows: Optional[int] = 1, return_feature_count: bool = False, ) -> Union[tf.data.Dataset, Tuple[tf.data.Dataset, int]]: """Loads a TensorFlow dataset from the ./example_data directory. @@ -130,6 +139,9 @@ def load_tf_dataset_from_csv( yield one record per call instead of a batch of records. filter_label_value: Value to filter the label column on. Could be a string or an integer. + index_col: Column to use for the index. If None, the index will be the + default index. + skiprows: Number of rows to skip when reading the CSV file. return_feature_count: If True, returns a tuple of the Dataset and the number of features. @@ -138,7 +150,11 @@ def load_tf_dataset_from_csv( number of features, """ features, labels = load_dataframe( - dataset_name, label_col_index, filter_label_value + dataset_name, + label_col_index, + filter_label_value, + index_col=index_col, + skiprows=skiprows, ) feature_tensors = tf.convert_to_tensor(features, dtype=tf.dtypes.float32) diff --git a/spade_anomaly_detection/data_loader_test.py b/spade_anomaly_detection/data_loader_test.py index d858c11..3ca3b30 100644 --- a/spade_anomaly_detection/data_loader_test.py +++ b/spade_anomaly_detection/data_loader_test.py @@ -57,6 +57,7 @@ def setUp(self): self.runner_parameters = parameters.RunnerParameters( train_setting='PNU', input_bigquery_table_path='project.dataset.table', + data_input_gcs_uri=None, output_gcs_uri='gs://test_bucket/test_folder', label_col_name='label', positive_data_value=5, @@ -67,7 +68,9 @@ def setUp(self): test_bigquery_table_path='', test_label_col_name='', test_dataset_holdout_fraction=0.3, + data_test_gcs_uri='', output_bigquery_table_path='', + data_output_gcs_uri='', alpha=1.0, batches_per_model=1, labeling_and_model_training_batch_size=None, diff --git a/spade_anomaly_detection/example_data/covertype_pnu_10000.csv b/spade_anomaly_detection/example_data/covertype_pnu_10000.csv index e7fe937..8c899df 100644 --- a/spade_anomaly_detection/example_data/covertype_pnu_10000.csv +++ b/spade_anomaly_detection/example_data/covertype_pnu_10000.csv @@ -1,3 +1,4 @@ +0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52 0.8279139569784891,0.6861111111111111,0.36363636363636365,0.4051539012168933,0.5581395348837209,0.4263032176478853,0.6377952755905512,0.984251968503937,0.8622047244094488,0.10065523490868535,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,-1.0 0.3676838419209604,0.4138888888888889,0.16666666666666669,0.17680744452397995,0.2868217054263566,0.2263594211043979,0.9212598425196851,0.9448818897637795,0.5314960629921259,0.2396486825595985,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0 0.6703351675837917,0.5416666666666667,0.21212121212121213,0.0,0.2235142118863049,0.3918786005339328,0.8543307086614174,0.9881889763779528,0.6496062992125984,0.355778614247874,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0 diff --git a/spade_anomaly_detection/occ_ensemble.py b/spade_anomaly_detection/occ_ensemble.py index afcc292..61f7028 100644 --- a/spade_anomaly_detection/occ_ensemble.py +++ b/spade_anomaly_detection/occ_ensemble.py @@ -32,7 +32,7 @@ # Using typing instead of collections due to Vertex training containers # not supporting them. -from typing import MutableMapping, Sequence, Optional +from typing import Final, MutableMapping, Optional, Sequence from absl import logging import numpy as np @@ -40,6 +40,9 @@ import tensorflow as tf +_RANDOM_SEED: Final[int] = 42 + + # TODO(b/247116870): Create abstract class for templating out future OCC models. class GmmEnsemble: """Class for creating and training a Gaussian mixture model ensemble. @@ -86,6 +89,7 @@ def __init__( ensemble_count: int = 5, positive_threshold: float = 1.0, negative_threshold: float = 95.0, + random_seed: int = _RANDOM_SEED, verbose: bool = False, ) -> None: self.n_components = n_components @@ -95,6 +99,7 @@ def __init__( self.ensemble_count = ensemble_count self.positive_threshold = positive_threshold self.negative_threshold = negative_threshold + self._random_seed = random_seed self.verbose = verbose self.ensemble = [] @@ -113,6 +118,7 @@ def _get_model(self) -> mixture.GaussianMixture: init_params=self.init_params, warm_start=self._warm_start, max_iter=self.max_iter, + random_state=self._random_seed, ) def fit( diff --git a/spade_anomaly_detection/parameters.py b/spade_anomaly_detection/parameters.py index f34d6ef..395bff1 100644 --- a/spade_anomaly_detection/parameters.py +++ b/spade_anomaly_detection/parameters.py @@ -32,8 +32,10 @@ import dataclasses import enum +from typing import Final, Optional, Sequence -from typing import Optional, Sequence + +_RANDOM_SEED: Final[int] = 42 @enum.unique @@ -60,6 +62,10 @@ class RunnerParameters: 'project.dataset.table'. If this is the only BigQuery path provided, this will be used in conjunction with test_dataset_holdout_fraction parameter to create a train/test split. + data_input_gcs_uri: Cloud Storage location to store the input data. If this + is the only Cloud Storage location provided, this will be used in + conjunction with test_dataset_holdout_fraction parameter to create a + train/test split. output_gcs_uri: Cloud Storage location to store the supervised model assets. The location should be in the form gs://bucketname/foldername. A timestamp will be added to the end of the folder so that multiple runs of this won't @@ -104,15 +110,22 @@ class RunnerParameters: test_label_col_name: The label column name in the test dataset. test_dataset_holdout_fraction: Float between 0 and 1 representing the fraction of samples to hold out as a test set. - upload_only: Use this setting in conjunction with - output_bigquery_table_path. When True, the algorithm will just upload the + data_test_gcs_uri: Cloud Storage location to store the CSV data to be used + for evaluating the supervised model. Note that the positive and negative + label values must also be the same in this testing set. It is okay to have + your test labels in that form, or use 1 for positive and 0 for negative. + upload_only: Use this setting in conjunction with output_bigquery_table_path + or data_output_gcs_uri. When True, the algorithm will just upload the pseudo labeled data to the specified table, and will skip training a supervised model. When set to False, the algorithm will also train a - supervised model and upload to a GCS endpoint. Default is False. + supervised model and upload it to a GCS location. Default is False. output_bigquery_table_path: A complete BigQuery path in the form of 'project.dataset.table' to be used for uploading the pseudo labeled data. This includes features and new labels. By default, we will use the column names from the input_bigquery_table_path BigQuery table. + data_output_gcs_uri: Cloud Storage location used for uploading the pseudo + labeled data as CSV. This includes features and new labels. By default, we + will use the column names from the data_input_gcs_uri table. alpha: Sample weights for weighting the loss function, only for pseudo-labeled data from the occ ensemble. Original data that is labeled will have a weight of 1.0. @@ -133,6 +146,8 @@ class RunnerParameters: the less likely it is for all the models to gain consensus, and thus will reduce the amount of labeled data points. By default, we use 5 one class classifiers. + random_seed: The random seed to use for all random number generators in the + algorithm. verbose: The amount of console logs to display during training. Use False to show few messages, and True for displaying many aspects of model training and scoring. This is useful for debugging model performance. @@ -140,6 +155,7 @@ class RunnerParameters: train_setting: TrainSetting input_bigquery_table_path: str + data_input_gcs_uri: str output_gcs_uri: str label_col_name: str positive_data_value: int @@ -152,18 +168,28 @@ class RunnerParameters: test_bigquery_table_path: Optional[str] = None test_label_col_name: Optional[str] = None test_dataset_holdout_fraction: float = 0.2 + data_test_gcs_uri: Optional[str] = None upload_only: bool = False output_bigquery_table_path: Optional[str] = None + data_output_gcs_uri: Optional[str] = None alpha: float = 1.0 batches_per_model: int = 1 max_occ_batch_size: int = 50000 labeling_and_model_training_batch_size: Optional[int] = None ensemble_count: int = 5 + random_seed: int = _RANDOM_SEED verbose: bool = False def __post_init__(self): - if not self.input_bigquery_table_path: - raise ValueError('`input_bigquery_table_path` must be set.') + if not (self.input_bigquery_table_path or self.data_input_gcs_uri): + raise ValueError( + '`input_bigquery_table_path` or `data_input_gcs_uri` must be set.' + ) + if self.input_bigquery_table_path and self.data_input_gcs_uri: + raise ValueError( + 'Both`input_bigquery_table_path` and `data_input_gcs_uri` should not ' + 'be set.' + ) if not self.train_setting: raise ValueError('`train_setting` must be set.') if not self.output_gcs_uri: diff --git a/spade_anomaly_detection/parameters_test.py b/spade_anomaly_detection/parameters_test.py index 6671188..baccc79 100644 --- a/spade_anomaly_detection/parameters_test.py +++ b/spade_anomaly_detection/parameters_test.py @@ -38,22 +38,37 @@ class ParametersTest(tf.test.TestCase): def test_none_required_parameter_raises(self): - with self.assertRaises(ValueError): - _ = parameters.RunnerParameters( - train_setting=parameters.TrainSetting.PNU, - input_bigquery_table_path=None, - output_gcs_uri='gs://some_bucket/some_path', - label_col_name='y', - positive_data_value=1, - negative_data_value=0, - unlabeled_data_value=-1, - ) + with self.subTest(name='two_input_sources_specified'): + with self.assertRaises(ValueError): + _ = parameters.RunnerParameters( + train_setting=parameters.TrainSetting.PNU, + input_bigquery_table_path='some_project.some_dataset.some_table', + data_input_gcs_uri='gs://some_bucket/some_data_input_path', + output_gcs_uri='gs://some_bucket/some_path', + label_col_name='y', + positive_data_value=1, + negative_data_value=0, + unlabeled_data_value=-1, + ) + with self.subTest(name='no_input_sources_specified'): + with self.assertRaises(ValueError): + _ = parameters.RunnerParameters( + train_setting=parameters.TrainSetting.PNU, + input_bigquery_table_path=None, + data_input_gcs_uri=None, + output_gcs_uri='gs://some_bucket/some_path', + label_col_name='y', + positive_data_value=1, + negative_data_value=0, + unlabeled_data_value=-1, + ) def test_equal_data_value_parameter_raises(self): with self.assertRaises(ValueError): _ = parameters.RunnerParameters( train_setting=parameters.TrainSetting.PNU, input_bigquery_table_path='some_project.some_dataset.some_table', + data_input_gcs_uri=None, output_gcs_uri='gs://some_bucket/some_path', label_col_name='y', positive_data_value=1, diff --git a/spade_anomaly_detection/performance_test.py b/spade_anomaly_detection/performance_test.py index 1934dac..1232f06 100644 --- a/spade_anomaly_detection/performance_test.py +++ b/spade_anomaly_detection/performance_test.py @@ -36,15 +36,15 @@ from unittest import mock +from spade_anomaly_detection import csv_data_loader from spade_anomaly_detection import data_loader from spade_anomaly_detection import parameters from spade_anomaly_detection import runner from spade_anomaly_detection import supervised_model - import tensorflow as tf -class PerformanceTest(tf.test.TestCase): +class PerformanceTestOnBQData(tf.test.TestCase): def setUp(self): super().setUp() @@ -57,6 +57,7 @@ def setUp(self): self.runner_parameters = parameters.RunnerParameters( train_setting='PNU', input_bigquery_table_path='project_id.dataset.table_name', + data_input_gcs_uri=None, output_gcs_uri='gs://test_bucket/test_folder', label_col_name='label', positive_data_value=1, @@ -76,26 +77,38 @@ def setUp(self): self.unlabeled_features, self.unlabeled_labels = data_loader.load_dataframe( dataset_name=csv_path, filter_label_value=self.runner_parameters.unlabeled_data_value, + index_col=None, + skiprows=0, ) self.unlabeled_record_count = len(self.unlabeled_labels) _, negative_labels = data_loader.load_dataframe( dataset_name=csv_path, filter_label_value=self.runner_parameters.negative_data_value, + index_col=None, + skiprows=0, ) self.negative_record_count = len(negative_labels) self.occ_fit_batch_size = ( - self.unlabeled_record_count // self.runner_parameters.ensemble_count + (self.unlabeled_record_count + self.negative_record_count) + // self.runner_parameters.ensemble_count ) // self.runner_parameters.batches_per_model self.unlabeled_tensorflow_dataset = data_loader.load_tf_dataset_from_csv( dataset_name=csv_path, batch_size=self.occ_fit_batch_size, - filter_label_value=self.runner_parameters.unlabeled_data_value, + filter_label_value=[ + self.runner_parameters.unlabeled_data_value, + self.runner_parameters.negative_data_value, + ], + index_col=None, + skiprows=0, ) self.complete_tensorflow_dataset = data_loader.load_tf_dataset_from_csv( dataset_name=csv_path, batch_size=self.runner_parameters.labeling_and_model_training_batch_size, + index_col=None, + skiprows=0, ) self.mock_get_total_records = self.enter_context( @@ -142,7 +155,7 @@ def test_spade_auc_performance_pnu_single_batch(self): auc = runner_object.supervised_model_metrics['Supervised_Model_AUC'] # See SPADE performance for Covertype PNU Setting in the design document for # setting and adjusting the AUC here. 0.9251 roughly equates to the - # performance seen on the ~580k row Coertype dataset in the PNU setting. + # performance seen on the ~580k row Covertype dataset in the PNU setting. self.assertAlmostEqual(auc, 0.9251, delta=0.02) def test_spade_auc_performance_pu_single_batch(self): @@ -166,7 +179,162 @@ def test_spade_auc_performance_pu_single_batch(self): auc = runner_object.supervised_model_metrics['Supervised_Model_AUC'] # See SPADE performance for Covertype PU Setting in the design document for # setting and adjusting the AUC here. 0.8870 represents the performance seen - # on the ~580k row Coertype dataset in the PU setting. + # on the ~580k row Covertype dataset in the PU setting. + self.assertAlmostEqual(auc, 0.8870, delta=0.02) + + +class PerformanceTestOnCSVData(tf.test.TestCase): + + def setUp(self): + super().setUp() + + # Using the 10k row covertype dataset here to speed up testing time. The + # 100k version was timing out on blaze. + self.total_record_count = 10000 + csv_path = f'covertype_pnu_{self.total_record_count}' + + self.runner_parameters = parameters.RunnerParameters( + train_setting='PNU', + input_bigquery_table_path='', + data_input_gcs_uri='gs://some_bucket/input_folder', + output_gcs_uri='gs://test_bucket/test_folder', + label_col_name='label', + positive_data_value=1, + negative_data_value=0, + unlabeled_data_value=-1, + positive_threshold=10, + negative_threshold=90, + test_dataset_holdout_fraction=0.3, + data_test_gcs_uri=None, + upload_only=False, + output_bigquery_table_path='', + data_output_gcs_uri=None, + alpha=1.0, + batches_per_model=1, + max_occ_batch_size=50000, + labeling_and_model_training_batch_size=self.total_record_count, + ensemble_count=5, + verbose=True, + ) + + _, self.unlabeled_labels = data_loader.load_dataframe( + dataset_name=csv_path, + filter_label_value=self.runner_parameters.unlabeled_data_value, + index_col=None, + skiprows=0, + ) + self.unlabeled_record_count = len(self.unlabeled_labels) + _, positive_labels = data_loader.load_dataframe( + dataset_name=csv_path, + filter_label_value=self.runner_parameters.positive_data_value, + index_col=None, + skiprows=0, + ) + self.positive_record_count = len(positive_labels) + _, negative_labels = data_loader.load_dataframe( + dataset_name=csv_path, + filter_label_value=self.runner_parameters.negative_data_value, + index_col=None, + skiprows=0, + ) + self.negative_record_count = len(negative_labels) + + self.label_counts = { + self.runner_parameters.positive_data_value: self.positive_record_count, + self.runner_parameters.negative_data_value: self.negative_record_count, + self.runner_parameters.unlabeled_data_value: ( + self.unlabeled_record_count + ), + } + + self.occ_fit_batch_size = ( + (self.unlabeled_record_count + self.negative_record_count) + // self.runner_parameters.ensemble_count + ) // self.runner_parameters.batches_per_model + + self.unlabeled_tensorflow_dataset = data_loader.load_tf_dataset_from_csv( + dataset_name=csv_path, + batch_size=self.occ_fit_batch_size, + filter_label_value=[ + self.runner_parameters.unlabeled_data_value, + self.runner_parameters.negative_data_value, + ], + index_col=None, + skiprows=0, + ) + + self.complete_tensorflow_dataset = data_loader.load_tf_dataset_from_csv( + dataset_name=csv_path, + batch_size=self.runner_parameters.labeling_and_model_training_batch_size, + index_col=None, + skiprows=0, + ) + + self.mock_label_counts = self.enter_context( + mock.patch.object( + csv_data_loader.CsvDataLoader, + 'label_counts', + new_callable=mock.PropertyMock, + ) + ) + + self.mock_load_tf_dataset_from_csv = self.enter_context( + mock.patch.object( + csv_data_loader.CsvDataLoader, + 'load_tf_dataset_from_csv', + autospec=True, + ) + ) + + self.mock_supervised_model_save = self.enter_context( + mock.patch.object( + supervised_model.RandomForestModel, + 'save', + autospec=True, + instance=True, + ) + ) + + def test_spade_auc_performance_pnu_single_batch(self): + self.runner_parameters.train_setting = parameters.TrainSetting.PNU + self.runner_parameters.positive_threshold = 10 + self.runner_parameters.negative_threshold = 90 + + self.mock_load_tf_dataset_from_csv.side_effect = [ + self.complete_tensorflow_dataset, + self.unlabeled_tensorflow_dataset, + self.complete_tensorflow_dataset, + ] + self.mock_label_counts.return_value = self.label_counts + + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + auc = runner_object.supervised_model_metrics['Supervised_Model_AUC'] + # See SPADE performance for Covertype PNU Setting in the design document for + # setting and adjusting the AUC here. 0.9251 roughly equates to the + # performance seen on the ~580k row Coertype dataset in the PNU setting. + self.assertAlmostEqual(auc, 0.9251, delta=0.02) + + def test_spade_auc_performance_pu_single_batch(self): + self.runner_parameters.train_setting = parameters.TrainSetting.PU + self.runner_parameters.positive_threshold = 10 + self.runner_parameters.negative_threshold = 50 + + self.mock_load_tf_dataset_from_csv.side_effect = [ + self.complete_tensorflow_dataset, + self.unlabeled_tensorflow_dataset, + self.complete_tensorflow_dataset, + ] + self.mock_label_counts.return_value = self.label_counts + + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + auc = runner_object.supervised_model_metrics['Supervised_Model_AUC'] + # See SPADE performance for Covertype PU Setting in the design document for + # setting and adjusting the AUC here. 0.8870 represents the performance seen + # on the ~580k row Covertype dataset in the PU setting. self.assertAlmostEqual(auc, 0.8870, delta=0.02) diff --git a/spade_anomaly_detection/requirements.txt b/spade_anomaly_detection/requirements.txt index 11ac821..3e9535c 100644 --- a/spade_anomaly_detection/requirements.txt +++ b/spade_anomaly_detection/requirements.txt @@ -5,6 +5,7 @@ pandas==1.3.5 pyarrow==14.0.1 retry==0.9.2 scikit-learn==1.4.2 +tensorflow-datasets==4.9.6 parameterized==0.8.1 # These are test dependencies and should be added as part of a separate layer # in the container that isn't included in the base container. diff --git a/spade_anomaly_detection/runner.py b/spade_anomaly_detection/runner.py index 59e20b1..a40951f 100644 --- a/spade_anomaly_detection/runner.py +++ b/spade_anomaly_detection/runner.py @@ -35,17 +35,27 @@ data set to batch fit a supervised model. """ +import enum # TODO(b/247116870): Change to collections when Vertex supports python 3.9 -from typing import Optional, Tuple +from typing import Mapping, Optional, Tuple, cast from absl import logging import numpy as np import pandas as pd - +from spade_anomaly_detection import csv_data_loader from spade_anomaly_detection import data_loader from spade_anomaly_detection import occ_ensemble from spade_anomaly_detection import parameters from spade_anomaly_detection import supervised_model +import tensorflow as tf + + +@enum.unique +class DataFormat(enum.Enum): + """Data format for the input, output and test data.""" + + CSV = 'csv' + BIGQUERY = 'bigquery' class Runner: @@ -60,15 +70,50 @@ class Runner: instantiate a supervised model object to train on after pseudo labeling. supervised_model_metrics: Metrics for the supervised model to be evaluated on. By default, this is AUC. - data_loader: An instance of the data loader, useful for performing a number - of operations such as loading, filtering, and uploading of bigquery data. + data_format: The data format for the input, output and test data. This is + used to determine which data loader to use. + input_data_loader: An instance of the data loader, useful for performing a + number of operations such as loading, filtering, and uploading of bigquery + or CSV input and output data. + test_data_loader: An instance of the data loader, useful for performing a + number of operations such as loading, filtering, and uploading of bigquery + or CSV test data. """ def __init__(self, runner_parameters: parameters.RunnerParameters): self.runner_parameters = runner_parameters logging.info('Runner parameters: %s', self.runner_parameters) - self.data_loader = data_loader.DataLoader(self.runner_parameters) + # Exactly one of `input_bigquery_table_path` or `data_input_gcs_uri` are set + # in the runner parameters. + if self.runner_parameters.input_bigquery_table_path: + self.data_format = DataFormat.BIGQUERY + else: + self.data_format = DataFormat.CSV + + if self.data_format == DataFormat.BIGQUERY: + # BigQuery data loaders are the same for input, output and test data. + self.input_data_loader = data_loader.DataLoader(self.runner_parameters) + # Type hint to prevent linter errors. + self.input_data_loader = cast( + data_loader.DataLoader, self.input_data_loader + ) + self.test_data_loader = self.input_data_loader + else: + self.input_data_loader = csv_data_loader.CsvDataLoader( + self.runner_parameters + ) + # Type hint to prevent linter errors. + self.input_data_loader = cast( + csv_data_loader.CsvDataLoader, self.input_data_loader + ) + self.test_data_loader = csv_data_loader.CsvDataLoader( + self.runner_parameters + ) + # Type hint to prevent linter errors. + self.test_data_loader = cast( + csv_data_loader.CsvDataLoader, self.test_data_loader + ) # TODO(b/247116870): Evaluate performance implications of using a global # testing array - the machine may not have enough memory to store the test @@ -89,10 +134,7 @@ def __init__(self, runner_parameters: parameters.RunnerParameters): self.runner_parameters.positive_threshold is None or self.runner_parameters.negative_threshold is None ): - input_table_statistics = self.data_loader.get_label_thresholds( - self.runner_parameters.input_bigquery_table_path - ) - + input_table_statistics = self._get_table_statistics() self.runner_parameters.positive_threshold = ( input_table_statistics['positive_threshold'] if self.runner_parameters.positive_threshold is None @@ -104,8 +146,27 @@ def __init__(self, runner_parameters: parameters.RunnerParameters): else self.runner_parameters.negative_threshold ) + def _get_table_statistics(self) -> Mapping[str, float]: + """Gets the statistics for the input table.""" + if self.data_format == DataFormat.BIGQUERY: + input_table_statistics = self.input_data_loader.get_label_thresholds( + self.runner_parameters.input_bigquery_table_path + ) + else: + stats_data_loader = csv_data_loader.CsvDataLoader(self.runner_parameters) + # Type hint to prevent linter errors. + stats_data_loader = cast(csv_data_loader.CsvDataLoader, stats_data_loader) + _ = stats_data_loader.load_tf_dataset_from_csv( + input_path=self.runner_parameters.data_input_gcs_uri, + label_col_name=self.runner_parameters.label_col_name, + batch_size=1, + label_column_filter_value=[], + ) + input_table_statistics = stats_data_loader.get_label_thresholds() + return input_table_statistics + def _get_record_count_based_on_labels(self, label_value: int) -> int: - """Gets the number of records in the table. + """Gets the number of records in the input table. Args: label_value: The value of the label to use as the filter for records. @@ -113,20 +174,31 @@ def _get_record_count_based_on_labels(self, label_value: int) -> int: Returns: The count of records. """ - label_record_count_filter = ( - f'{self.runner_parameters.label_col_name} = {label_value}' - ) - if self.runner_parameters.where_statements: - label_record_count_where_statements = [ - self.runner_parameters.where_statements - ] + [label_record_count_filter] - else: - label_record_count_where_statements = [label_record_count_filter] + if self.data_format == DataFormat.BIGQUERY: + label_record_count_filter = ( + f'{self.runner_parameters.label_col_name} = {label_value}' + ) + if self.runner_parameters.where_statements: + label_record_count_where_statements = [ + self.runner_parameters.where_statements + ] + [label_record_count_filter] + else: + label_record_count_where_statements = [label_record_count_filter] - label_record_count = self.data_loader.get_query_record_result_length( - input_path=self.runner_parameters.input_bigquery_table_path, - where_statements=label_record_count_where_statements, - ) + self.input_data_loader = cast( + data_loader.DataLoader, self.input_data_loader + ) + label_record_count = ( + self.input_data_loader.get_query_record_result_length( + input_path=self.runner_parameters.input_bigquery_table_path, + where_statements=label_record_count_where_statements, + ) + ) + else: + self.input_data_loader = cast( + csv_data_loader.CsvDataLoader, self.input_data_loader + ) + label_record_count = self.input_data_loader.label_counts[label_value] return label_record_count def check_data_tables( @@ -207,6 +279,8 @@ def instantiate_and_fit_ensemble( ensemble_count=self.runner_parameters.ensemble_count, positive_threshold=self.runner_parameters.positive_threshold, negative_threshold=self.runner_parameters.negative_threshold, + random_seed=self.runner_parameters.random_seed, + verbose=self.runner_parameters.verbose, ) training_record_count = unlabeled_record_count + negative_record_count @@ -216,18 +290,38 @@ def instantiate_and_fit_ensemble( logging.info('Batch size for OCC ensemble: %s', batch_size) - unlabeled_data = self.data_loader.load_tf_dataset_from_bigquery( - input_path=self.runner_parameters.input_bigquery_table_path, - label_col_name=self.runner_parameters.label_col_name, - where_statements=self.runner_parameters.where_statements, - ignore_columns=self.runner_parameters.ignore_columns, - batch_size=batch_size, - # Train using negative labeled data and unlabeled data. - label_column_filter_value=[ - self.runner_parameters.unlabeled_data_value, - self.runner_parameters.negative_data_value, - ], - ) + if self.data_format == DataFormat.BIGQUERY: + logging.info('Loading training data from BigQuery.') + self.input_data_loader = cast( + data_loader.DataLoader, self.input_data_loader + ) + unlabeled_data = self.input_data_loader.load_tf_dataset_from_bigquery( + input_path=self.runner_parameters.input_bigquery_table_path, + label_col_name=self.runner_parameters.label_col_name, + where_statements=self.runner_parameters.where_statements, + ignore_columns=self.runner_parameters.ignore_columns, + batch_size=batch_size, + # Train using negative labeled data and unlabeled data. + label_column_filter_value=[ + self.runner_parameters.unlabeled_data_value, + self.runner_parameters.negative_data_value, + ], + ) + else: + logging.info('Loading training data from CSV.') + self.input_data_loader = cast( + csv_data_loader.CsvDataLoader, self.input_data_loader + ) + unlabeled_data = self.input_data_loader.load_tf_dataset_from_csv( + input_path=self.runner_parameters.data_input_gcs_uri, + label_col_name=self.runner_parameters.label_col_name, + batch_size=batch_size, + # Train using negative labeled data and unlabeled data. + label_column_filter_value=[ + self.runner_parameters.unlabeled_data_value, + self.runner_parameters.negative_data_value, + ], + ) logging.info('Fitting ensemble.') ensemble_object.fit( @@ -255,10 +349,9 @@ def write_verbose_logs( updated_label_counts = pd.DataFrame(labels).value_counts() logging.info('Updated label counts %s', updated_label_counts) - logging.info('Test features shape: %s', - self.test_x.shape if self.test_x is not None else None) - logging.info('Test labels shape: %s', - self.test_y.shape if self.test_y is not None else None) + if self.test_x is not None and self.test_y is not None: + logging.info('Test features shape: %s', self.test_x.shape) + logging.info('Test labels shape: %s', self.test_y.shape) logging.info('Updated features shape: %s', features.shape) logging.info('Updated labels shape: %s', labels.shape) @@ -343,20 +436,21 @@ def _check_runner_parameters(self) -> None: ), self.runner_parameters.test_dataset_holdout_fraction, ) - if ( - self.runner_parameters.test_dataset_holdout_fraction > 0 - and self.runner_parameters.test_bigquery_table_path + if self.runner_parameters.test_dataset_holdout_fraction > 0 and ( + self.runner_parameters.test_bigquery_table_path + or self.runner_parameters.data_test_gcs_uri ): logging.warning( - 'Only a test holdout fraction and a single input table ' - 'or an input and test table may be specified. Using the ' - 'test table instead of the specified holdout fraction.' + 'Only a test holdout fraction and a single input source ' + 'or an input and test source may be specified. Using the ' + 'test source instead of the specified holdout fraction.' ) self.runner_parameters.test_dataset_holdout_fraction = 0 if self.runner_parameters.upload_only and ( self.runner_parameters.test_dataset_holdout_fraction or self.runner_parameters.test_bigquery_table_path + or self.runner_parameters.data_test_gcs_uri ): logging.warning( 'A test set is not needed in upload only mode, ' @@ -365,17 +459,75 @@ def _check_runner_parameters(self) -> None: ) self.runner_parameters.test_dataset_holdout_fraction = 0 self.runner_parameters.test_bigquery_table_path = '' + self.runner_parameters.data_test_gcs_uri = '' self.runner_parameters.output_gcs_uri = '' if ( self.runner_parameters.upload_only and not self.runner_parameters.output_bigquery_table_path + and not self.runner_parameters.data_output_gcs_uri ): raise ValueError( - 'output_bigquery_table_path needs to be specified in ' - 'upload_only mode.' + 'output_bigquery_table_path or data_output_gcs_uri needs to be ' + 'specified in upload_only mode.' ) + def _get_test_data(self) -> tf.data.Dataset: + """Gets the test data from the test table or from the test CSVs.""" + if self.data_format == DataFormat.BIGQUERY: + # Remove any unlabeled samples that may be in the test set. + unlabeled_sample_filter = ( + f'{self.runner_parameters.test_label_col_name} != ' + f'{self.runner_parameters.unlabeled_data_value}' + ) + if self.runner_parameters.where_statements is not None: + unlabeled_sample_where_statements = list( + self.runner_parameters.where_statements + ) + [unlabeled_sample_filter] + else: + unlabeled_sample_where_statements = [unlabeled_sample_filter] + self.test_data_loader = cast( + data_loader.DataLoader, self.test_data_loader + ) + test_dataset_size = self.test_data_loader.get_query_record_result_length( + input_path=self.runner_parameters.test_bigquery_table_path, + where_statements=unlabeled_sample_where_statements, + ) + test_tf_dataset = self.test_data_loader.load_tf_dataset_from_bigquery( + input_path=self.runner_parameters.test_bigquery_table_path, + label_col_name=self.runner_parameters.test_label_col_name, + where_statements=unlabeled_sample_where_statements, + ignore_columns=self.runner_parameters.ignore_columns, + batch_size=test_dataset_size, + ) + else: + logging.info('Loading test data from CSV.') + self.test_data_loader = cast( + csv_data_loader.CsvDataLoader, self.test_data_loader + ) + test_tf_dataset = self.test_data_loader.load_tf_dataset_from_csv( + input_path=self.runner_parameters.data_test_gcs_uri, + label_col_name=self.runner_parameters.test_label_col_name, + batch_size=None, + label_column_filter_value=[ + self.runner_parameters.unlabeled_data_value, + ], + exclude_label_value=True, + ) + test_dataset_size = ( + self.test_data_loader.label_counts[ + self.runner_parameters.positive_data_value + ] + + self.test_data_loader.label_counts[ + self.runner_parameters.negative_data_value + ] + ) + test_tf_dataset = test_tf_dataset.batch( + tf.cast(test_dataset_size, tf.int64) + ) + test_tf_dataset = test_tf_dataset.prefetch(tf.data.AUTOTUNE) + return test_tf_dataset + def preprocess_train_test_split( self, features: np.ndarray, @@ -467,30 +619,11 @@ def preprocess_train_test_split( # TODO(b/247116870): Implement a dedicated function in the runner class # to load BQ test sets before evaluating the supervised model. - if self.runner_parameters.test_bigquery_table_path: - # Remove any unlabeled samples that may be in the test set. - unlabeled_sample_filter = ( - f'{self.runner_parameters.test_label_col_name} != ' - f'{self.runner_parameters.unlabeled_data_value}' - ) - if self.runner_parameters.where_statements is not None: - unlabeled_sample_where_statements = ( - self.runner_parameters.where_statements + [unlabeled_sample_filter] - ) - else: - unlabeled_sample_where_statements = [unlabeled_sample_filter] - test_dataset_size = self.data_loader.get_query_record_result_length( - input_path=self.runner_parameters.test_bigquery_table_path, - where_statements=unlabeled_sample_where_statements, - ) - test_tf_dataset = self.data_loader.load_tf_dataset_from_bigquery( - input_path=self.runner_parameters.test_bigquery_table_path, - label_col_name=self.runner_parameters.test_label_col_name, - where_statements=unlabeled_sample_where_statements, - ignore_columns=self.runner_parameters.ignore_columns, - batch_size=test_dataset_size, - ) - + if ( + self.runner_parameters.test_bigquery_table_path + or self.runner_parameters.data_test_gcs_uri + ): + test_tf_dataset = self._get_test_data() test_x, test_y = test_tf_dataset.as_numpy_iterator().next() self.test_x = np.array(test_x) self.test_y = np.array(test_y) @@ -535,13 +668,10 @@ def train_supervised_model( features: Pseudo labeled and ground truth features. labels: Pseudo and ground truth labels. weights: Weights for pseudo labeled data. - - Raises: - ValueError if there is the supervised model object is None. """ - if self.supervised_model_object is None: - raise ValueError('No supervised model to train.') logging.info('Supervised model training started.') + if self.supervised_model_object is None: + raise ValueError('supervised_model_object is None.') self.supervised_model_object.train( features=features, labels=labels, weights=weights ) @@ -553,11 +683,36 @@ def run(self) -> None: self._check_runner_parameters() - total_record_count = self.data_loader.get_query_record_result_length( - input_path=self.runner_parameters.input_bigquery_table_path, - where_statements=self.runner_parameters.where_statements, - ) - + if self.data_format == DataFormat.BIGQUERY: + # Type hint to prevent linter errors. + self.input_data_loader = cast( + data_loader.DataLoader, self.input_data_loader + ) + total_record_count = ( + self.input_data_loader.get_query_record_result_length( + input_path=self.runner_parameters.input_bigquery_table_path, + where_statements=self.runner_parameters.where_statements, + ) + ) + else: + # Type hint to prevent linter errors. + self.input_data_loader = cast( + csv_data_loader.CsvDataLoader, self.input_data_loader + ) + # Call the data loader to read all the files. This is needed to get the + # label counts. + _ = self.input_data_loader.load_tf_dataset_from_csv( + input_path=self.runner_parameters.data_input_gcs_uri, + label_col_name=self.runner_parameters.label_col_name, + batch_size=1, + label_column_filter_value=[], + ) + # TODO(sinharaj): This is not ideal, we should not need to read the files + # again. Find a way to get the label counts without reading the files. + # Assumes that data loader has already been used to read the input table. + total_record_count = sum(self.input_data_loader.label_counts.values()) + + logging.info('Total record count: %s', total_record_count) unlabeled_record_count = self._get_record_count_based_on_labels( self.runner_parameters.unlabeled_data_value ) @@ -579,13 +734,26 @@ def run(self) -> None: self.runner_parameters.labeling_and_model_training_batch_size or total_record_count ) - tf_dataset = self.data_loader.load_tf_dataset_from_bigquery( - input_path=self.runner_parameters.input_bigquery_table_path, - label_col_name=self.runner_parameters.label_col_name, - where_statements=self.runner_parameters.where_statements, - ignore_columns=self.runner_parameters.ignore_columns, - batch_size=batch_size, - ) + if self.data_format == DataFormat.BIGQUERY: + self.input_data_loader = cast( + data_loader.DataLoader, self.input_data_loader + ) + tf_dataset = self.input_data_loader.load_tf_dataset_from_bigquery( + input_path=self.runner_parameters.input_bigquery_table_path, + label_col_name=self.runner_parameters.label_col_name, + where_statements=self.runner_parameters.where_statements, + ignore_columns=self.runner_parameters.ignore_columns, + batch_size=batch_size, + ) + else: + self.input_data_loader = cast( + csv_data_loader.CsvDataLoader, self.input_data_loader + ) + tf_dataset = self.input_data_loader.load_tf_dataset_from_csv( + input_path=self.runner_parameters.data_input_gcs_uri, + label_col_name=self.runner_parameters.label_col_name, + batch_size=batch_size, + ) tf_dataset = tf_dataset.as_numpy_iterator() for batch_number, (features, labels) in enumerate(tf_dataset): @@ -616,10 +784,30 @@ def run(self) -> None: logging.info('Labeling completed.') # Upload batch of pseudo labels, will append when called more than once. - if self.runner_parameters.output_bigquery_table_path: - self.data_loader.upload_dataframe_as_bigquery_table( + if ( + self.runner_parameters.output_bigquery_table_path + and self.data_format == DataFormat.BIGQUERY + ): + self.input_data_loader = cast( + data_loader.DataLoader, self.input_data_loader + ) + self.input_data_loader.upload_dataframe_as_bigquery_table( features=updated_features, labels=updated_labels ) + elif ( + self.runner_parameters.data_output_gcs_uri + and self.data_format == DataFormat.CSV + ): + self.input_data_loader = cast( + csv_data_loader.CsvDataLoader, self.input_data_loader + ) + self.input_data_loader.upload_dataframe_to_gcs( + batch=batch_number, + features=updated_features, + labels=updated_labels, + ) + else: + logging.info('No output path specified, skipping upload.') # TODO(b/247116870): Create two logging functions, one for batch and one # for the end of SPADE training (reporting job level metrics such as AUC). @@ -638,9 +826,9 @@ def run(self) -> None: ) if not self.runner_parameters.upload_only: + self.evaluate_model() if self.supervised_model_object is None: raise ValueError('Supervised model was not created and trained.') - self.evaluate_model() self.supervised_model_object.save( save_location=self.runner_parameters.output_gcs_uri ) diff --git a/spade_anomaly_detection/runner_test.py b/spade_anomaly_detection/runner_test.py index c84ca89..85d664b 100644 --- a/spade_anomaly_detection/runner_test.py +++ b/spade_anomaly_detection/runner_test.py @@ -35,7 +35,7 @@ import numpy as np from parameterized import parameterized - +from spade_anomaly_detection import csv_data_loader from spade_anomaly_detection import data_loader from spade_anomaly_detection import occ_ensemble from spade_anomaly_detection import parameters @@ -45,7 +45,7 @@ import tensorflow_decision_forests as tfdf -class RunnerTest(tf.test.TestCase): +class RunnerBQTest(tf.test.TestCase): def setUp(self): super().setUp() @@ -53,6 +53,7 @@ def setUp(self): self.runner_parameters = parameters.RunnerParameters( train_setting='PNU', input_bigquery_table_path='project.dataset.table', + data_input_gcs_uri=None, output_gcs_uri='gs://test_bucket/test_folder', label_col_name='label', positive_data_value=5, @@ -65,8 +66,10 @@ def setUp(self): test_bigquery_table_path='', test_label_col_name='', test_dataset_holdout_fraction=0.3, + data_test_gcs_uri=None, upload_only=False, output_bigquery_table_path='', + data_output_gcs_uri=None, alpha=1.0, batches_per_model=1, max_occ_batch_size=50000, @@ -169,14 +172,14 @@ def _create_mock_datasets(self) -> None: ).batch(batch_size_all_data, drop_remainder=True) records_per_occ = ( - self.unlabeled_examples // self.runner_parameters.ensemble_count - ) + self.unlabeled_examples + self.negative_examples + ) // self.runner_parameters.ensemble_count unlabeled_batch_size = ( records_per_occ // self.runner_parameters.batches_per_model ) self.unlabeled_data_tf_dataset = tf.data.Dataset.from_tensor_slices( (unlabeled_features, unlabeled_labels) - ).batch(unlabeled_batch_size, drop_remainder=True) + ).batch(unlabeled_batch_size) if self.runner_parameters.test_bigquery_table_path: self.test_labels = np.repeat( @@ -194,7 +197,7 @@ def _create_mock_datasets(self) -> None: ).astype(np.float32) self.test_tf_dataset = tf.data.Dataset.from_tensor_slices( (self.test_features, self.test_labels) - ).batch(self.total_test_records, drop_remainder=True) + ).batch(self.total_test_records) self.mock_load_tf_dataset_from_bigquery.side_effect = [ self.unlabeled_data_tf_dataset, @@ -647,7 +650,7 @@ def test_bigquery_testing_set_throws_warning(self): self._assert_regex_in( training_logs.output, - r'Only a test holdout fraction and a single input table', + r'Only a test holdout fraction and a single input source', ) def test_dataset_creation_function_calls_no_error(self): @@ -735,10 +738,13 @@ def test_upload_only_setting_true_no_error(self): def test_upload_only_setting_true_throw_error_no_table(self): self.runner_parameters.upload_only = True self.runner_parameters.output_bigquery_table_path = '' + self.runner_parameters.data_output_gcs_uri = '' runner_object = runner.Runner(self.runner_parameters) with self.assertRaisesRegex( - ValueError, r'output_bigquery_table_path needs to be specified in' + ValueError, + r'output_bigquery_table_path or data_output_gcs_uri needs to be ' + r'specified in', ): runner_object.run() @@ -776,7 +782,7 @@ def test_supervised_model_not_instantiated_throw_error(self): ): runner_object.evaluate_model() - def test_evaluatiom_dataset_batch_training(self): + def test_evaluation_dataset_batch_training(self): self.runner_parameters.test_dataset_holdout_fraction = 0.5 # Set batch size to ensure that we are building a test set over multiple # batches from the entire dataset. @@ -839,5 +845,207 @@ def test_threshold_parameter_initialization_positive_threshold_set_no_error( ) +class RunnerCSVTest(tf.test.TestCase): + + def setUp(self): + super().setUp() + + self.runner_parameters = parameters.RunnerParameters( + train_setting='PNU', + input_bigquery_table_path='', + data_input_gcs_uri='gs://some_bucket/input_folder', + output_gcs_uri='gs://test_bucket/test_folder', + label_col_name='label', + positive_data_value=5, + negative_data_value=3, + unlabeled_data_value=-100, + positive_threshold=5, + negative_threshold=95, + ignore_columns=None, + where_statements=None, + test_bigquery_table_path='', + test_label_col_name='', + test_dataset_holdout_fraction=0.3, + data_test_gcs_uri=None, + upload_only=False, + output_bigquery_table_path='', + data_output_gcs_uri=None, + alpha=1.0, + batches_per_model=1, + max_occ_batch_size=50000, + labeling_and_model_training_batch_size=None, + ensemble_count=5, + verbose=False, + ) + + self.mock_label_counts = self.enter_context( + mock.patch.object( + csv_data_loader.CsvDataLoader, + 'label_counts', + new_callable=mock.PropertyMock, + ) + ) + + self.mock_load_tf_dataset_from_csv = self.enter_context( + mock.patch.object( + csv_data_loader.CsvDataLoader, + 'load_tf_dataset_from_csv', + autospec=True, + ) + ) + self.mock_supervised_model_train = self.enter_context( + mock.patch.object( + supervised_model.RandomForestModel, + 'train', + autospec=True, + ) + ) + self.mock_supervised_model_save = self.enter_context( + mock.patch.object( + supervised_model.RandomForestModel, + 'save', + autospec=True, + ) + ) + self.mock_supervised_model_evaluate = self.enter_context( + mock.patch.object( + tfdf.keras.RandomForestModel, + 'evaluate', + autospec=True, + ) + ) + self.mock_csv_upload = self.enter_context( + mock.patch.object( + csv_data_loader.CsvDataLoader, + 'upload_dataframe_to_gcs', + autospec=True, + ) + ) + + self._create_mock_datasets() + + def _create_mock_datasets(self) -> None: + num_features = 2 + self.per_class_labeled_example_count = 10 + self.unlabeled_examples = 200 + self.all_examples = ( + self.per_class_labeled_example_count * 2 + ) + self.unlabeled_examples + self.total_test_records = self.per_class_labeled_example_count * 2 + self.negative_examples = self.per_class_labeled_example_count * 1 + self.label_counts = { + self.runner_parameters.positive_data_value: ( + self.per_class_labeled_example_count + ), + self.runner_parameters.negative_data_value: self.negative_examples, + self.runner_parameters.unlabeled_data_value: self.unlabeled_examples, + } + unlabeled_features = np.random.rand( + self.unlabeled_examples, num_features + ).astype(np.float32) + unlabeled_labels = np.repeat( + self.runner_parameters.unlabeled_data_value, self.unlabeled_examples + ) + + all_features = np.random.rand(self.all_examples, num_features).astype( + np.float32 + ) + + all_labels = np.concatenate( + [ + np.repeat( + self.runner_parameters.positive_data_value, + self.per_class_labeled_example_count, + ), + np.repeat( + self.runner_parameters.negative_data_value, + self.per_class_labeled_example_count, + ), + unlabeled_labels, + ], + axis=0, + ).astype(np.int8) + + if self.runner_parameters.labeling_and_model_training_batch_size is None: + batch_size_all_data = self.all_examples + else: + batch_size_all_data = ( + self.runner_parameters.labeling_and_model_training_batch_size + ) + + self.all_data_tf_dataset = tf.data.Dataset.from_tensor_slices( + (all_features, all_labels) + ).batch(batch_size_all_data, drop_remainder=True) + + records_per_occ = ( + self.unlabeled_examples + self.negative_examples + ) // self.runner_parameters.ensemble_count + unlabeled_batch_size = ( + records_per_occ // self.runner_parameters.batches_per_model + ) + self.unlabeled_data_tf_dataset = tf.data.Dataset.from_tensor_slices( + (unlabeled_features, unlabeled_labels) + ).batch(unlabeled_batch_size) + + if ( + self.runner_parameters.test_bigquery_table_path + or self.runner_parameters.data_test_gcs_uri + ): + self.test_labels = np.repeat( + [ + self.runner_parameters.positive_data_value, + self.runner_parameters.negative_data_value, + ], + [ + self.per_class_labeled_example_count, + self.per_class_labeled_example_count, + ], + ) + self.test_features = np.random.rand( + self.total_test_records, num_features + ).astype(np.float32) + self.test_tf_dataset = tf.data.Dataset.from_tensor_slices( + (self.test_features, self.test_labels) + ).batch(self.total_test_records) + + self.mock_load_tf_dataset_from_csv.side_effect = [ + self.all_data_tf_dataset, + self.unlabeled_data_tf_dataset, + self.all_data_tf_dataset, + self.test_tf_dataset, + ] + self.mock_label_counts.return_value = self.label_counts + else: + self.mock_load_tf_dataset_from_csv.side_effect = [ + self.all_data_tf_dataset, + self.unlabeled_data_tf_dataset, + self.all_data_tf_dataset, + ] + self.mock_label_counts.return_value = self.label_counts + + def test_runner_supervised_model_fit_with_csv_data(self): + self.runner_parameters.alpha = 0.8 + self.runner_parameters.negative_threshold = 0 + + runner_object = runner.Runner(self.runner_parameters) + runner_object.run() + + supervised_model_actual_kwargs = ( + self.mock_supervised_model_train.call_args.kwargs + ) + + with self.subTest('NoUnlabeledData'): + self.assertNotIn( + self.runner_parameters.unlabeled_data_value, + supervised_model_actual_kwargs['labels'], + msg='Unlabeled data was used to fit the supervised model.', + ) + with self.subTest('LabelWeights'): + self.assertIn( + self.runner_parameters.alpha, + supervised_model_actual_kwargs['weights'], + ) + + if __name__ == '__main__': tf.test.main() diff --git a/spade_anomaly_detection/scripts/run_cloud_spade_experiment.sh b/spade_anomaly_detection/scripts/run_cloud_spade_experiment.sh index b20678d..da83aea 100644 --- a/spade_anomaly_detection/scripts/run_cloud_spade_experiment.sh +++ b/spade_anomaly_detection/scripts/run_cloud_spade_experiment.sh @@ -26,40 +26,44 @@ PROJECT_ID=${1:-"[insert-project-id]"} DATETIME=$(date '+%Y%m%d_%H%M%S') #Args -TRAIN_SETTING=${15:-"PNU"} +TRAIN_SETTING=${2:-"PNU"} -INPUT_BIGQUERY_TABLE_PATH=${2:-"${PROJECT_ID}.[bq-dataset].[bq-input-table]"} -OUTPUT_BIGQUERY_TABLE_PATH=${23:-"${PROJECT_ID}.[bq-dataset].[bq-output-table]"} -OUTPUT_GCS_URI=${14:-"gs://[gcs-bucket]/[model-folder]"} -LABEL_COL_NAME=${3:-"y"} +# Use either Bigquery or GCS for input/output/test data. +INPUT_BIGQUERY_TABLE_PATH=${3:-"${PROJECT_ID}.[bq-dataset].[bq-input-table]"} +DATA_INPUT_GCS_URI=${4:-""} +OUTPUT_BIGQUERY_TABLE_PATH=${5:-"${PROJECT_ID}.[bq-dataset].[bq-output-table]"} +DATA_OUTPUT_GCS_URI=${6:-""} +OUTPUT_GCS_URI=${7:-"gs://[gcs-bucket]/[model-folder]"} +LABEL_COL_NAME=${8:-"y"} # The label column is of type float, these must match in order for array # filtering to work correctly. -POSITIVE_DATA_VALUE=${4:-"1"} -NEGATIVE_DATA_VALUE=${5:-"0"} -UNLABELED_DATA_VALUE=${6:-"-1"} -POSITIVE_THRESHOLD=${7:-".1"} -NEGATIVE_THRESHOLD=${8:-"95"} -TEST_BIGQUERY_TABLE_PATH=${16:-"${PROJECT_ID}.[bq-dataset].[bq-test-table]"} -TEST_LABEL_COL_NAME=${17:-"y"} -ALPHA=${10:-"1.0"} -BATCHES_PER_MODEL=${11:-"1"} -ENSEMBLE_COUNT=${12:-"5"} -MAX_OCC_BATCH_SIZE=${18:-"50000"} +POSITIVE_DATA_VALUE=${9:-"1"} +NEGATIVE_DATA_VALUE=${10:-"0"} +UNLABELED_DATA_VALUE=${11:-"-1"} +POSITIVE_THRESHOLD=${12:-".1"} +NEGATIVE_THRESHOLD=${13:-"95"} +TEST_BIGQUERY_TABLE_PATH=${14:-"${PROJECT_ID}.[bq-dataset].[bq-test-table]"} +DATA_TEST_GCS_URI=${15:-""} +TEST_LABEL_COL_NAME=${16:-"y"} +ALPHA=${17:-"1.0"} +BATCHES_PER_MODEL=${18:-"1"} +ENSEMBLE_COUNT=${19:-"5"} +MAX_OCC_BATCH_SIZE=${20:-"50000"} LABELING_AND_MODEL_TRAINING_BATCH_SIZE=${21:-"100000"} -VERBOSE=${13:-"True"} -UPLOAD_ONLY=${22:-"False"} +VERBOSE=${22:-"True"} +UPLOAD_ONLY=${23:-"False"} # Give a unique name to your training job. TRIAL_NAME="spade_${USER}_${DATETIME}" # Image name and location IMAGE_NAME="spade" -IMAGE_TAG=${7:-"latest-oss"} +IMAGE_TAG=${24:-"latest-oss"} # Project image (use this for testing) IMAGE_URI="us-docker.pkg.dev/${PROJECT_ID}/spade/${IMAGE_NAME}:${IMAGE_TAG}" echo "IMAGE_URI = ${IMAGE_URI}" -BUILD=${14:-"TRUE"} +BUILD=${25:-"TRUE"} if [[ "${BUILD}" == "TRUE" ]]; then /bin/bash ./scripts/build_and_push_image.sh "${IMAGE_TAG}" "${IMAGE_NAME}" "${PROJECT_ID}" || exit @@ -77,7 +81,9 @@ gcloud ai custom-jobs create \ --worker-pool-spec="${WORKER_MACHINE}",replica-count=1,container-image-uri="${IMAGE_URI}" \ --args=--train_setting="${TRAIN_SETTING}" \ --args=--input_bigquery_table_path="${INPUT_BIGQUERY_TABLE_PATH}" \ + --args=--data_input_gcs_uri="${DATA_INPUT_GCS_URI}" \ --args=--output_bigquery_table_path="${OUTPUT_BIGQUERY_TABLE_PATH}" \ + --args=--data_output_gcs_uri="${DATA_OUTPUT_GCS_URI}" \ --args=--output_gcs_uri="${OUTPUT_GCS_URI}" \ --args=--label_col_name="${LABEL_COL_NAME}" \ --args=--positive_data_value="${POSITIVE_DATA_VALUE}" \ @@ -86,6 +92,7 @@ gcloud ai custom-jobs create \ --args=--positive_threshold="${POSITIVE_THRESHOLD}" \ --args=--negative_threshold="${NEGATIVE_THRESHOLD}" \ --args=--test_bigquery_table_path="${TEST_BIGQUERY_TABLE_PATH}" \ + --args=--data_test_gcs_uri="${DATA_TEST_GCS_URI}" \ --args=--test_label_col_name="${TEST_LABEL_COL_NAME}" \ --args=--alpha="${ALPHA}" \ --args=--batches_per_model="${BATCHES_PER_MODEL}" \ diff --git a/spade_anomaly_detection/task.py b/spade_anomaly_detection/task.py index 55a28d9..18e3e05 100644 --- a/spade_anomaly_detection/task.py +++ b/spade_anomaly_detection/task.py @@ -32,14 +32,22 @@ See spade/scripts for examples on launching a Vertex training job and using this script as an entry point. """ + import logging -from typing import Sequence +import os +import random +from typing import Final, Sequence from absl import app from absl import flags - +import numpy as np from spade_anomaly_detection import parameters from spade_anomaly_detection import runner +import tensorflow as tf + + +_RANDOM_SEED: Final[int] = 42 + _TRAIN_SETTING = flags.DEFINE_enum_class( "train_setting", @@ -67,6 +75,16 @@ ), ) +_DATA_INPUT_GCS_URI = flags.DEFINE_string( + "data_input_gcs_uri", + default=None, + required=False, + help=( + "Cloud Storage location to store the CSV input data used for training " + "the anomaly detection model." + ), +) + _OUTPUT_GCS_URI = flags.DEFINE_string( "output_gcs_uri", default=None, @@ -146,7 +164,7 @@ ), ) -_WHERE_STAEMENTS = flags.DEFINE_list( +_WHERE_STATEMENTS = flags.DEFINE_list( "where_statements", default=None, required=False, @@ -187,6 +205,19 @@ ), ) +_DATA_TEST_GCS_URI = flags.DEFINE_string( + "data_test_gcs_uri", + default=None, + required=False, + help=( + "Cloud Storage location to store the CSV input data used for evaluating" + " the supervised model. Note that the positive and negative label " + "values must also be the same in this testing set. It is okay to have " + "your test labels in that form, or use 1 for positive and 0 for " + "negative." + ), +) + _OUTPUT_BIGQUERY_TABLE_PATH = flags.DEFINE_string( "output_bigquery_table_path", default=None, @@ -199,6 +230,17 @@ ), ) +_DATA_OUTPUT_GCS_URI = flags.DEFINE_string( + "data_output_gcs_uri", + default=None, + required=False, + help=( + "Cloud Storage location to be used for uploaded the pseudo labeled data" + " as CSV. This includes features and new labels. By default, we will " + "use the column names from the data_input_gcs_uri table." + ), +) + _ALPHA = flags.DEFINE_float( "alpha", default=1.0, @@ -286,13 +328,24 @@ # TODO(b/247116870) Implement the rest of the input parameters. +def _set_seeds(seed: int = _RANDOM_SEED): + """Sets the random seed in the relevant modules.""" + os.environ["PYTHONHASHSEED"] = str(seed) + random.seed(seed) + np.random.seed(seed) + tf.random.set_seed(seed) + + def main(argv: Sequence[str]) -> None: if len(argv) > 1: raise app.UsageError("Too many command-line arguments.") + _set_seeds(seed=_RANDOM_SEED) + runner_parameters = parameters.RunnerParameters( train_setting=_TRAIN_SETTING.value, input_bigquery_table_path=_INPUT_BIGQUERY_TABLE_PATH.value, + data_input_gcs_uri=_DATA_INPUT_GCS_URI.value, output_gcs_uri=_OUTPUT_GCS_URI.value, label_col_name=_LABEL_COL_NAME.value, positive_data_value=_POSITIVE_DATA_VALUE.value, @@ -301,17 +354,20 @@ def main(argv: Sequence[str]) -> None: positive_threshold=_POSITIVE_THRESHOLD.value, negative_threshold=_NEGATIVE_THRESHOLD.value, ignore_columns=_IGNORE_COLUMNS.value, - where_statements=_WHERE_STAEMENTS.value, + where_statements=_WHERE_STATEMENTS.value, test_bigquery_table_path=_TEST_BIGQUERY_TABLE_PATH.value, test_label_col_name=_TEST_LABEL_COL_NAME.value, test_dataset_holdout_fraction=_TEST_DATASET_HOLDOUT_FRACTION.value, + data_test_gcs_uri=_DATA_TEST_GCS_URI.value, upload_only=_UPLOAD_ONLY.value, output_bigquery_table_path=_OUTPUT_BIGQUERY_TABLE_PATH.value, + data_output_gcs_uri=_DATA_OUTPUT_GCS_URI.value, alpha=_ALPHA.value, batches_per_model=_BATCHES_PER_MODEL.value, max_occ_batch_size=_MAX_OCC_BATCH_SIZE.value, labeling_and_model_training_batch_size=_BATCH_SIZE.value, ensemble_count=_ENSEMBLE_COUNT.value, + random_seed=_RANDOM_SEED, verbose=_VERBOSE.value, ) runner_obj = runner.Runner(runner_parameters)