Skip to content

Commit e5f0daf

Browse files
authored
Merge pull request #899 from sgsmob/indicator_runner
Create indicator runner main function
2 parents f06c3c8 + b2485d2 commit e5f0daf

File tree

4 files changed

+215
-75
lines changed

4 files changed

+215
-75
lines changed

_delphi_utils_python/delphi_utils/archive.py

Lines changed: 56 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
Created: 2020-08-06
2727
"""
2828

29-
from argparse import ArgumentParser
3029
from contextlib import contextmanager
3130
import filecmp
3231
from glob import glob
@@ -98,43 +97,57 @@ def diff_export_csv(
9897
after_df.loc[added_idx, :])
9998

10099

101-
def run_module(archive_type: str,
102-
cache_dir: str,
103-
export_dir: str,
104-
**kwargs):
105-
"""Build and run an ArchiveDiffer.
100+
def archiver_from_params(params):
101+
"""Build an ArchiveDiffer from `params`.
102+
103+
The type of ArchiveDiffer constructed is inferred from the parameters.
106104
107105
Parameters
108106
----------
109-
archive_type: str
110-
Type of ArchiveDiffer to run. Must be one of ["git", "s3"] which correspond to
111-
`GitArchiveDiffer` and `S3ArchiveDiffer`, respectively.
112-
cache_dir: str
113-
The directory for storing most recent archived/uploaded CSVs to start diffing from.
114-
export_dir: str
115-
The directory with most recent exported CSVs to diff to.
116-
**kwargs:
117-
Keyword arguments corresponding to constructor arguments for the respective ArchiveDiffers.
107+
params: Dict[str, Dict[str, Any]]
108+
Dictionary of user-defined parameters with the following structure:
109+
- "common":
110+
- "export_dir": str, directory to which indicator output files have been exported
111+
- "archive":
112+
- "cache_dir": str, directory containing cached data from previous indicator runs
113+
- "branch_name" (required for git archiver): str, name of git branch
114+
- "override_dirty" (optional for git archiver): bool, whether to allow overwriting of
115+
untracked & uncommitted changes in `cache_dir`
116+
- "commit_partial_success" (optional for git archiver): bool, whether to still commit
117+
even if some files were not archived and staged due to `override_dirty=False`
118+
- "commit_message" (optional for git archiver): str, commit message to use
119+
- "bucket_name" (required for S3 archiver): str, name of S3 bucket to which to upload
120+
files
121+
- "indicator_prefix" (required for S3 archiver): str, S3 prefix for files from this
122+
indicator
123+
- "aws_credentials" (required for S3 archiver): Dict[str, str], authentication
124+
parameters for S3 to create a boto3.Session
125+
126+
Returns
127+
-------
128+
ArchiveDiffer of the inferred type.
118129
"""
119-
if archive_type == "git":
120-
arch_diff = GitArchiveDiffer(cache_dir,
121-
export_dir,
122-
kwargs["branch_name"],
123-
kwargs["override_dirty"],
124-
kwargs["commit_partial_success"],
125-
kwargs["commit_message"])
126-
elif archive_type == "s3":
127-
arch_diff = S3ArchiveDiffer(cache_dir,
128-
export_dir,
129-
kwargs["bucket_name"],
130-
kwargs["indicator_prefix"],
131-
kwargs["aws_credentials"])
132-
elif archive_type == "filesystem":
133-
arch_diff = FilesystemArchiveDiffer(cache_dir,
134-
export_dir)
135-
else:
136-
raise ValueError(f"No archive type named '{archive_type}'")
137-
arch_diff.run()
130+
if "archive" not in params:
131+
return None
132+
133+
# Copy to kwargs to take advantage of default arguments to archiver
134+
kwargs = params["archive"]
135+
kwargs["export_dir"] = params["common"]["export_dir"]
136+
137+
if "branch_name" in kwargs:
138+
return GitArchiveDiffer(**kwargs)
139+
140+
if "bucket_name" in kwargs:
141+
assert "indicator_prefix" in kwargs, "Missing indicator_prefix in params"
142+
assert "aws_credentials" in kwargs, "Missing aws_credentials in params"
143+
return S3ArchiveDiffer(**kwargs)
144+
145+
# Don't run the filesystem archiver if the user misspecified the archiving params
146+
assert set(kwargs.keys()) == set(["cache_dir", "export_dir"]),\
147+
'If you intended to run a filesystem archiver, please remove all options other than '\
148+
'"cache_dir" from the "archive" params. Otherwise, please include either "branch_name" '\
149+
'or "bucket_name" to run the git or S3 archivers, respectively.'
150+
return FilesystemArchiveDiffer(**kwargs)
138151

139152

140153
class ArchiveDiffer:
@@ -621,46 +634,17 @@ def update_cache(self):
621634
self._cache_updated = True
622635

623636
if __name__ == "__main__":
624-
parser = ArgumentParser()
625-
parser.add_argument("--archive_type", required=True, type=str,
626-
choices=["git", "s3", "filesystem"],
627-
help="Type of archive differ to use.")
628-
parser.add_argument("--indicator_prefix", type=str, default="",
629-
help="The prefix for S3 keys related to this indicator."
630-
" Required for `archive_type = 's3'")
631-
parser.add_argument("--branch_name", type=str, default="",
632-
help=" Branch to use for `archive_type` = 'git'.")
633-
parser.add_argument("--override_dirty", action="store_true",
634-
help="Whether to allow overwriting of untracked &"
635-
" uncommitted changes for `archive_type` = 'git'")
636-
parser.add_argument("--commit_partial_success", action="store_true",
637-
help="Whether to still commit for `archive_type` = "
638-
"'git' even if some files were not archived and "
639-
"staged due to `override_dirty` = False.")
640-
parser.add_argument("--commit_message", type=str, default="",
641-
help="Commit message for `archive_type` = 'git'")
642-
args = parser.parse_args()
643-
params = read_params()
637+
_params = read_params()
644638

645639
# Autodetect whether parameters have been factored hierarchically or not
646640
# See https://github.com/cmu-delphi/covidcast-indicators/issues/847
647641
# Once all indicators have their parameters factored in to "common", "indicator", "validation",
648642
# and "archive", this code will be obsolete.
649-
if "archive" in params:
650-
archive_params = params["archive"]
651-
common_params = params["common"]
652-
else:
653-
archive_params = params
654-
common_params = params
655-
656-
run_module(args.archive_type,
657-
archive_params["cache_dir"],
658-
common_params["export_dir"],
659-
aws_credentials=archive_params.get("aws_credentials", {}),
660-
branch_name=args.branch_name,
661-
bucket_name=archive_params.get("bucket_name", ""),
662-
commit_message=args.commit_message,
663-
commit_partial_success=args.commit_partial_success,
664-
indicator_prefix=args.indicator_prefix,
665-
override_dirty=args.override_dirty
666-
)
643+
#
644+
# We assume that by virtue of invoking this module from the command line that the user intends
645+
# to run validation. Thus if the "archive" sub-object is not found, we interpret that to mean
646+
# the parameters have not be hierarchically refactored.
647+
if "archive" not in _params:
648+
_params = {"archive": _params, "common": _params}
649+
650+
archiver_from_params(_params).run()

_delphi_utils_python/delphi_utils/runner.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
"""Indicator running utilities."""
2+
import argparse as ap
3+
import importlib
24
from typing import Any, Callable, Dict, Optional
3-
from .archive import ArchiveDiffer
5+
from .archive import ArchiveDiffer, archiver_from_params
46
from .utils import read_params
57
from .validator.validate import Validator
8+
from .validator.run import validator_from_params
69

710
Params = Dict[str, Any]
811

@@ -33,3 +36,14 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
3336
validation_report = validator.validate()
3437
if archiver and (not validator or validation_report.success()):
3538
archiver.archive()
39+
40+
41+
if __name__ == "__main__":
42+
parser = ap.ArgumentParser()
43+
parser.add_argument("indicator_name",
44+
type=str,
45+
help="Name of the Python package containing the indicator. This package "
46+
"must export a `run_module(params)` function.")
47+
args = parser.parse_args()
48+
indicator_module = importlib.import_module(args.indicator_name)
49+
run_indicator_pipeline(indicator_module.run_module, validator_from_params, archiver_from_params)

_delphi_utils_python/delphi_utils/validator/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def run_module():
1414
validator.validate().print_and_exit()
1515

1616

17-
def from_params(params):
17+
def validator_from_params(params):
1818
"""Construct a validator from `params`.
1919
2020
Arguments

_delphi_utils_python/tests/test_archive.py

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55

66
from boto3 import Session
77
from git import Repo, exc
8+
import mock
89
from moto import mock_s3
910
import numpy as np
1011
import pandas as pd
1112
from pandas.testing import assert_frame_equal
1213
import pytest
1314

14-
from delphi_utils import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
15+
from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
16+
archiver_from_params
1517

1618
CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
1719

@@ -467,3 +469,143 @@ def test_run(self, tmp_path):
467469
assert_frame_equal(
468470
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
469471
csv1_diff)
472+
473+
474+
class TestFromParams:
475+
"""Tests for creating archive differs from params."""
476+
477+
def test_null_creation(self):
478+
"""Test that a None object is created with no "archive" params."""
479+
assert archiver_from_params({"common": {}}) is None
480+
481+
@mock.patch("delphi_utils.archive.GitArchiveDiffer")
482+
def test_get_git_archiver(self, mock_archiver):
483+
"""Test that GitArchiveDiffer is created successfully."""
484+
params = {
485+
"common": {
486+
"export_dir": "dir"
487+
},
488+
"archive": {
489+
"cache_dir": "cache",
490+
"branch_name": "branch",
491+
"override_dirty": True,
492+
"commit_partial_success": True,
493+
"commit_message": "msg"
494+
}
495+
}
496+
497+
archiver_from_params(params)
498+
mock_archiver.assert_called_once_with(
499+
export_dir="dir",
500+
cache_dir="cache",
501+
branch_name="branch",
502+
override_dirty=True,
503+
commit_partial_success=True,
504+
commit_message="msg"
505+
)
506+
507+
@mock.patch("delphi_utils.archive.GitArchiveDiffer")
508+
def test_get_git_archiver_with_defaults(self, mock_archiver):
509+
"""Test that GitArchiveDiffer is created successfully without optional arguments."""
510+
params = {
511+
"common": {
512+
"export_dir": "dir"
513+
},
514+
"archive": {
515+
"cache_dir": "cache",
516+
"branch_name": "branch",
517+
"commit_message": "msg"
518+
}
519+
}
520+
521+
archiver_from_params(params)
522+
mock_archiver.assert_called_once_with(
523+
export_dir="dir",
524+
cache_dir="cache",
525+
branch_name="branch",
526+
commit_message="msg"
527+
)
528+
@mock.patch("delphi_utils.archive.S3ArchiveDiffer")
529+
def test_get_s3_archiver(self, mock_archiver):
530+
"""Test that S3ArchiveDiffer is created successfully."""
531+
params = {
532+
"common": {
533+
"export_dir": "dir"
534+
},
535+
"archive": {
536+
"cache_dir": "cache",
537+
"bucket_name": "bucket",
538+
"indicator_prefix": "ind",
539+
"aws_credentials": {"pass": "word"}
540+
}
541+
}
542+
543+
archiver_from_params(params)
544+
mock_archiver.assert_called_once_with(
545+
export_dir="dir",
546+
cache_dir="cache",
547+
bucket_name="bucket",
548+
indicator_prefix="ind",
549+
aws_credentials={"pass": "word"}
550+
)
551+
552+
def test_get_s3_archiver_without_required(self):
553+
"""Test that S3ArchiveDiffer is not created without required arguments."""
554+
params = {
555+
"common": {
556+
"export_dir": "dir"
557+
},
558+
"archive": {
559+
"cache_dir": "cache",
560+
"bucket_name": "bucket"
561+
}
562+
}
563+
564+
with pytest.raises(AssertionError,
565+
match="Missing indicator_prefix in params"):
566+
archiver_from_params(params)
567+
568+
params["archive"]["indicator_prefix"] = "prefix"
569+
with pytest.raises(AssertionError,
570+
match="Missing aws_credentials in params"):
571+
archiver_from_params(params)
572+
573+
@mock.patch("delphi_utils.archive.FilesystemArchiveDiffer")
574+
def test_get_filesystem_archiver(self, mock_archiver):
575+
"""Test that FilesystemArchiveDiffer is created successfully."""
576+
params = {
577+
"common": {
578+
"export_dir": "dir"
579+
},
580+
"archive": {
581+
"cache_dir": "cache"
582+
}
583+
}
584+
585+
archiver_from_params(params)
586+
mock_archiver.assert_called_once_with(
587+
export_dir="dir",
588+
cache_dir="cache"
589+
)
590+
591+
def test_get_filesystem_archiver_with_extra_params(self):
592+
"""Test that FilesystemArchiveDiffer is not created with extra parameters."""
593+
params = {
594+
"common": {
595+
"export_dir": "dir"
596+
},
597+
"archive": {
598+
"cache_dir": "cache",
599+
"indicator_prefix": "prefix"
600+
}
601+
}
602+
603+
with pytest.raises(AssertionError,
604+
match="If you intended to run"):
605+
archiver_from_params(params)
606+
607+
del params["archive"]["cache_dir"]
608+
del params["archive"]["indicator_prefix"]
609+
with pytest.raises(AssertionError,
610+
match="If you intended to run"):
611+
archiver_from_params(params)

0 commit comments

Comments
 (0)