Skip to content

Commit 9269621

Browse files
committed
removing wrapper and directly converting in other places and moving test as a script
1 parent a2a149f commit 9269621

File tree

8 files changed

+137
-102
lines changed

8 files changed

+137
-102
lines changed

_delphi_utils_python/delphi_utils/__init__.py

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from __future__ import absolute_import
55

66
from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
7-
from .covidcast_wrapper import metadata, signal
87
from .export import create_export_csv
98
from .geomap import GeoMapper
109
from .logger import get_structured_logger

_delphi_utils_python/delphi_utils/validator/datafetcher.py

+58-4
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,42 @@
66
import warnings
77
from os import listdir
88
from os.path import isfile, join
9+
from typing import Union
910

1011
import numpy as np
1112
import pandas as pd
1213
import requests
13-
from delphi_utils import covidcast_wrapper
14+
from delphi_epidata import Epidata
15+
from epiweeks import Week
1416

1517
from .errors import APIDataFetchError, ValidationFailure
1618

1719
FILENAME_REGEX = re.compile(
1820
r'^(?P<date>\d{8})_(?P<geo_type>\w+?)_(?P<signal>\w+)\.csv$')
1921

22+
def _parse_datetimes(date_int: int, time_type: str, date_format: str = "%Y%m%d") -> Union[pd.Timestamp, None]:
23+
"""Convert a date or epiweeks string into timestamp objects.
24+
25+
Datetimes (length 8) are converted to their corresponding date, while epiweeks (length 6)
26+
are converted to the date of the start of the week. Returns nan otherwise
27+
28+
Epiweeks use the CDC format.
29+
30+
date_int: Int representation of date.
31+
time_type: The temporal resolution to request this data. Most signals
32+
are available at the "day" resolution (the default); some are only
33+
available at the "week" resolution, representing an MMWR week ("epiweek").
34+
date_format: String of the date format to parse.
35+
:returns: Timestamp.
36+
"""
37+
date_str = str(date_int)
38+
if time_type == "day":
39+
return pd.to_datetime(date_str, format=date_format)
40+
if time_type == "week":
41+
epiwk = Week(int(date_str[:4]), int(date_str[-2:]))
42+
return pd.to_datetime(epiwk.startdate())
43+
return None
44+
2045
def make_date_filter(start_date, end_date):
2146
"""
2247
Create a function to filter dates in the specified date range (inclusive).
@@ -118,7 +143,16 @@ def get_geo_signal_combos(data_source, api_key):
118143
source_signal_mappings = {i['source']:i['db_source'] for i in
119144
meta_response.json()}
120145

121-
meta = covidcast_wrapper.metadata()
146+
response = Epidata.covidcast_meta()
147+
148+
if response["result"] != 1:
149+
# Something failed in the API and we did not get real metadata
150+
raise RuntimeError("Error when fetching metadata from the API", response["message"])
151+
152+
meta = pd.DataFrame.from_dict(response["epidata"])
153+
meta["min_time"] = meta.apply(lambda x: _parse_datetimes(x.min_time, x.time_type), axis=1)
154+
meta["max_time"] = meta.apply(lambda x: _parse_datetimes(x.max_time, x.time_type), axis=1)
155+
meta["last_update"] = pd.to_datetime(meta["last_update"], unit="s")
122156

123157
source_meta = meta[meta['data_source'] == data_source]
124158
# Need to convert np.records to tuples so they are hashable and can be used in sets and dicts.
@@ -162,8 +196,28 @@ def fetch_api_reference(data_source, start_date, end_date, geo_type, signal_type
162196
163197
Formatting is changed to match that of source data CSVs.
164198
"""
165-
with warnings.catch_warnings():
166-
api_df = covidcast_wrapper.signal(data_source, signal_type, start_date, end_date, geo_type)
199+
if start_date > end_date:
200+
raise ValueError(
201+
"end_day must be on or after start_day, but " f"start_day = '{start_date}', end_day = '{end_date}'"
202+
)
203+
response = Epidata.covidcast(
204+
data_source,
205+
signal_type,
206+
time_type="day",
207+
geo_type=geo_type,
208+
time_values=Epidata.range(start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")),
209+
geo_value=geo_type,
210+
)
211+
if response["result"] != 1:
212+
# Something failed in the API and we did not get real metadata
213+
raise RuntimeError("Error when fetching signal data from the API", response["message"])
214+
215+
api_df = pd.DataFrame.from_dict(response["epidata"])
216+
api_df["issue"] = pd.to_datetime(api_df["issue"], format="%Y%m%d")
217+
api_df["time_value"] = pd.to_datetime(api_df["time_value"], format="%Y%m%d")
218+
api_df.drop("direction", axis=1, inplace=True)
219+
api_df["data_source"] = data_source
220+
api_df["signal"] = signal_type
167221

168222

169223
error_context = f"when fetching reference data from {start_date} to {end_date} " +\

_delphi_utils_python/tests/test_covidcast_wrapper.py

-38
This file was deleted.
Binary file not shown.

google_symptoms/delphi_google_symptoms/run.py

+6-11
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,16 @@
55
when the module is run with `python -m delphi_google_symptoms`.
66
"""
77
import time
8-
from datetime import datetime, date
8+
from datetime import date, datetime
99
from itertools import product
10-
import covidcast
1110

1211
import numpy as np
13-
from pandas import to_datetime
14-
from delphi_utils import (
15-
create_export_csv,
16-
get_structured_logger
17-
)
12+
from delphi_epidata import Epidata
13+
from delphi_utils import create_export_csv, get_structured_logger
1814
from delphi_utils.validator.utils import lag_converter
15+
from pandas import to_datetime
1916

20-
from .constants import (COMBINED_METRIC,
21-
GEO_RESOLUTIONS, SMOOTHERS, SMOOTHERS_MAP)
17+
from .constants import COMBINED_METRIC, GEO_RESOLUTIONS, SMOOTHERS, SMOOTHERS_MAP
2218
from .geo import geo_map
2319
from .pull import pull_gs_data
2420

@@ -66,8 +62,7 @@ def run_module(params):
6662
)
6763

6864
# Fetch metadata to check how recent each signal is
69-
covidcast.use_api_key(params["indicator"]["api_credentials"])
70-
metadata = covidcast.metadata()
65+
metadata = Epidata.covidcast_meta()
7166
# Filter to only those we currently want to produce, ignore any old or deprecated signals
7267
gs_metadata = metadata[(metadata.data_source == "google-symptoms") &
7368
(metadata.signal.isin(sensor_names))]

sir_complainsalot/delphi_sir_complainsalot/check_source.py

+23-8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import covidcast
88
import pandas as pd
9+
from delphi_epidata import Epidata
910

1011
covidcast.covidcast._ASYNC_CALL = True # pylint: disable=protected-access
1112

@@ -64,22 +65,36 @@ def check_source(data_source, meta, params, grace, logger): # pylint: disable=t
6465
age_complaints = {}
6566
gap_complaints = {}
6667

68+
start_date = datetime.now() - timedelta(days=14)
69+
end_date = datetime.now()
70+
6771
for _, row in signals.iterrows():
6872
logger.info("Retrieving signal",
6973
data_source=data_source,
7074
signal=row["signal"],
71-
start_day=(datetime.now() - timedelta(days = 14)).strftime("%Y-%m-%d"),
72-
end_day=datetime.now().strftime("%Y-%m-%d"),
75+
start_day=start_date.strftime("%Y-%m-%d"),
76+
end_day=end_date.strftime("%Y-%m-%d"),
7377
geo_type=row["geo_type"])
7478

75-
latest_data = covidcast.signal(
76-
data_source, row["signal"],
77-
start_day=datetime.now() - timedelta(days = 14),
78-
end_day=datetime.now(),
79-
geo_type=row["geo_type"]
79+
response = Epidata.covidcast(
80+
data_source,
81+
row["signal"],
82+
time_type=row["time_type"],
83+
geo_type=row["geo_type"],
84+
time_values=Epidata.range(start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")),
85+
geo_value="*",
8086
)
8187

82-
current_lag_in_days = (now - row["max_time"]).days
88+
if response["result"] != 1:
89+
# Something failed in the API and we did not get real metadata
90+
raise RuntimeError("Error when fetching signal data from the API", response["message"])
91+
92+
latest_data = pd.DataFrame.from_dict(response["epidata"])
93+
latest_data["issue"] = pd.to_datetime(latest_data["issue"], format="%Y%m%d")
94+
latest_data["time_value"] = pd.to_datetime(latest_data["time_value"], format="%Y%m%d")
95+
latest_data.drop("direction", axis=1, inplace=True)
96+
97+
current_lag_in_days = (now - datetime.strptime(str(row["max_time"]), "%Y%m%d")).days
8398
lag_calculated_from_api = False
8499

85100
if latest_data is not None:

sir_complainsalot/delphi_sir_complainsalot/run.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88
import time
99
from itertools import groupby
1010

11-
import covidcast
12-
from delphi_utils import SlackNotifier
13-
from delphi_utils import get_structured_logger
14-
from delphi_utils import read_params
11+
import pandas as pd
12+
from delphi_epidata import Epidata
13+
from delphi_utils import SlackNotifier, get_structured_logger, read_params
1514

1615
from .check_source import check_source
1716

@@ -29,8 +28,8 @@ def run_module():
2928
"""Run SirCAL."""
3029
start_time = time.time()
3130
params = read_params()
32-
covidcast.use_api_key(params["api_credentials"])
33-
meta = covidcast.metadata()
31+
Epidata.auth = ("epidata", params["api_credentials"])
32+
meta = pd.DataFrame.from_dict(Epidata.covidcast_meta().get("epidata", dict()))
3433
slack_notifier = None
3534
if "channel" in params and "slack_token" in params:
3635
slack_notifier = SlackNotifier(params["channel"], params["slack_token"])

_delphi_utils_python/delphi_utils/covidcast_wrapper.py renamed to testing_utils/check_covidcast_port.py

+45-34
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,20 @@
1-
"""module for covidcast api call wrapper."""
2-
3-
from datetime import date, timedelta
4-
from typing import Iterable, Union
5-
1+
"""
2+
script to check converting covidcast api calls with Epidata.covidcast Epidata.covidcast_meta
3+
"""
4+
from typing import Union, Iterable
5+
from datetime import datetime, timedelta, date
66
import pandas as pd
7+
import covidcast
78
from delphi_epidata import Epidata
9+
from pandas.testing import assert_frame_equal
10+
import os
811
from epiweeks import Week
912

13+
API_KEY = os.environ.get('DELPHI_API_KEY')
14+
covidcast.use_api_key(API_KEY)
1015

11-
def date_generator(startdate: date, enddate: date, time_type: str) -> Iterable[date]:
12-
"""
13-
Take start date and end date and generates date string.
14-
15-
Parameters
16-
----------
17-
startdate: date
18-
enddate: date
19-
time_type: str
20-
21-
Returns
22-
-------
23-
generator of str
24-
"""
25-
if time_type.lower() == "day":
26-
while startdate <= enddate:
27-
yield startdate.strftime("%Y-%m-%d")
28-
startdate = startdate + timedelta(days=1)
29-
elif time_type.lower() == "week":
30-
while startdate <= enddate:
31-
epiweek = Week.fromdate(startdate)
32-
yield epiweek
33-
startdate = startdate + timedelta(days=7)
34-
16+
Epidata.debug = True
17+
Epidata.auth = ("epidata", API_KEY)
3518

3619
def _parse_datetimes(date_int: int, time_type: str, date_format: str = "%Y%m%d") -> Union[pd.Timestamp, None]:
3720
"""Convert a date or epiweeks string into timestamp objects.
@@ -57,7 +40,7 @@ def _parse_datetimes(date_int: int, time_type: str, date_format: str = "%Y%m%d")
5740
return None
5841

5942

60-
def metadata() -> Union[pd.DataFrame, None]:
43+
def ported_metadata() -> Union[pd.DataFrame, None]:
6144
"""
6245
Make covidcast metadata api call.
6346
@@ -78,7 +61,7 @@ def metadata() -> Union[pd.DataFrame, None]:
7861
return df
7962

8063

81-
def signal(
64+
def ported_signal(
8265
data_source: str,
8366
signal: str, # pylint: disable=W0621
8467
start_day: date = None,
@@ -132,14 +115,12 @@ def signal(
132115
"end_day must be on or after start_day, but " f"start_day = '{start_day}', end_day = '{end_day}'"
133116
)
134117

135-
time_values = list(date_generator(start_day, end_day))
136-
137118
response = Epidata.covidcast(
138119
data_source,
139120
signal,
140121
time_type=time_type,
141122
geo_type=geo_type,
142-
time_values=time_values,
123+
time_values=Epidata.range(start_day.strftime("%Y%m%d"), end_day.strftime("%Y%m%d")),
143124
geo_value=geo_values,
144125
as_of=as_of,
145126
lag=lag,
@@ -156,3 +137,33 @@ def signal(
156137
api_df["signal"] = signal
157138

158139
return api_df
140+
141+
def check_metadata():
142+
expected_df = covidcast.metadata()
143+
df = ported_metadata().metadata()
144+
assert_frame_equal(expected_df, df)
145+
146+
def check_signal():
147+
meta_df = covidcast.metadata()
148+
startdate = datetime(year=2022, month=2, day=1)
149+
data_filter = (meta_df["max_time"] >= startdate)
150+
signal_df = meta_df[data_filter].groupby("data_source")["signal"].agg(['unique'])
151+
enddate = startdate + timedelta(days=3)
152+
153+
for data_source, row in signal_df.iterrows():
154+
signals = list(row[0])
155+
for signal in signals:
156+
expected_df = covidcast.signal(data_source, signal, start_day=startdate, end_day=enddate, geo_type="state")
157+
if expected_df is None:
158+
print("%s %s %s %s not existing", data_source, signal, startdate, enddate)
159+
160+
df = ported_signal(data_source, signal, start_day=startdate, end_day=enddate, geo_type="state")
161+
162+
check = df.merge(expected_df, indicator=True)
163+
assert (check["_merge"] == "both").all()
164+
assert check["_left_only"].empty
165+
assert check["_right_only"].empty
166+
167+
if __name__ == "__main__":
168+
# check_metadata()
169+
check_signal()

0 commit comments

Comments
 (0)