|
| 1 | +from datetime import datetime, timedelta |
| 2 | +from pathlib import Path |
| 3 | +import pandas as pd |
| 4 | +from delphi_utils import covidcast_wrapper |
| 5 | +import covidcast |
| 6 | +from freezegun import freeze_time |
| 7 | +import os |
| 8 | +from pandas.testing import assert_frame_equal |
| 9 | + |
| 10 | +TEST_DIR = Path(__file__).parent |
| 11 | +API_KEY = os.environ.get('DELPHI_API_KEY') |
| 12 | +covidcast.use_api_key(API_KEY) |
| 13 | +class TestCovidcastWrapper: |
| 14 | + API_KEY = os.environ.get('DELPHI_API_KEY') |
| 15 | + covidcast.use_api_key(API_KEY) |
| 16 | + def test_metadata(self): |
| 17 | + expected_df = pd.read_pickle(f"{TEST_DIR}/test_data/covidcast_metadata.pkl") |
| 18 | + df = covidcast_wrapper.metadata() |
| 19 | + assert_frame_equal(expected_df, df) |
| 20 | + |
| 21 | + @freeze_time("2024-07-29") |
| 22 | + def test_signal(self): |
| 23 | + meta_df = covidcast_wrapper.metadata() |
| 24 | + data_filter = ((meta_df["max_time"] >= datetime(year=2024, month=6, day=1)) & (meta_df["time_type"] == "day")) |
| 25 | + signal_df = meta_df[data_filter].groupby("data_source")["signal"].agg(['unique']) |
| 26 | + enddate = datetime.today() |
| 27 | + startdate = enddate - timedelta(days=15) |
| 28 | + for data_source, row in signal_df.iterrows(): |
| 29 | + signals = list(row[0]) |
| 30 | + for signal in signals: |
| 31 | + # expected_df = covidcast.signal(data_source, signal, start_day=startdate, end_day=enddate, geo_type="state") |
| 32 | + expected_df = pd.read_pickle(f"{TEST_DIR}/test_data/{data_source}_{signal}.pkl") |
| 33 | + if expected_df is None: |
| 34 | + print("%s %s %s %s not existing", data_source, signal, startdate, enddate) |
| 35 | + continue |
| 36 | + df = covidcast_wrapper.signal(data_source, signal, start_day=startdate, end_day=enddate, geo_type="state") |
| 37 | + |
| 38 | + check = df.merge(expected_df, indicator=True) |
| 39 | + assert (check["_merge"] == "both").all() |
| 40 | + |
0 commit comments