|
| 1 | +""" |
| 2 | +Loads weather data from NOAA and uploads it to BigQuery. |
| 3 | +""" |
| 4 | + |
| 5 | +from datetime import datetime, timedelta |
| 6 | +from ftplib import FTP |
| 7 | +from io import StringIO |
| 8 | + |
| 9 | +import google.auth |
| 10 | +import pandas as pd |
| 11 | +import pandas_gbq |
| 12 | +from tqdm import tqdm |
| 13 | + |
| 14 | +credentials, project = google.auth.default() |
| 15 | + |
| 16 | + |
| 17 | +def get_weather_data(): |
| 18 | + """Get weather data from NOAA. |
| 19 | + @return: Tuple of (nbh, nbs) where nbh is the weather data for the next 24 hours |
| 20 | + and nbs is the weather data for the next 72 hours (in 3hr increments). |
| 21 | + """ |
| 22 | + |
| 23 | + print('Accessing NOAA FTP server...') |
| 24 | + |
| 25 | + ftp = FTP('ftp.ncep.noaa.gov') |
| 26 | + ftp.login() |
| 27 | + |
| 28 | + ftp.cwd('pub/data/nccf/com/blend/prod') |
| 29 | + |
| 30 | + # Get forecast days |
| 31 | + forecast_days = ftp.nlst() |
| 32 | + forecast_day = forecast_days[-1] |
| 33 | + ftp.cwd(forecast_day) |
| 34 | + |
| 35 | + # Get forecast hours |
| 36 | + forecast_hours = ftp.nlst() |
| 37 | + forecast_hour = forecast_hours[-1] |
| 38 | + |
| 39 | + print(f'Downloading NBH forecast {forecast_day} {forecast_hour}Z...') |
| 40 | + |
| 41 | + nbh_str = StringIO() |
| 42 | + ftp.retrlines('RETR {time}/text/blend_nbhtx.t{time}z'.format(time=forecast_hour), lambda line: nbh_str.write(line + '\n')) |
| 43 | + nbh = nbh_str.getvalue() |
| 44 | + nbh_str.close() |
| 45 | + |
| 46 | + print(f'Downloading NBS forecast {forecast_day} {forecast_hour}Z...') |
| 47 | + |
| 48 | + nbs_str = StringIO() |
| 49 | + ftp.retrlines('RETR {time}/text/blend_nbstx.t{time}z'.format(time=forecast_hour), lambda line: nbs_str.write(line + '\n')) |
| 50 | + nbs = nbs_str.getvalue() |
| 51 | + nbs_str.close() |
| 52 | + |
| 53 | + return nbh, nbs |
| 54 | + |
| 55 | + |
| 56 | +def parse_weather_data(data, fmt): |
| 57 | + """Parse weather data for a specific location |
| 58 | + @param data: The weather data to parse |
| 59 | + @param fmt: The format of the data. Either 'nbh' or 'nbs' |
| 60 | + @return: A pandas DataFrame containing the weather data |
| 61 | + """ |
| 62 | + |
| 63 | + location = data.strip().split('\n')[0].split(' ')[0] |
| 64 | + |
| 65 | + forecast_date = None |
| 66 | + if fmt == 'nbh': |
| 67 | + forecast_date_str = data.strip().split('\n')[0].strip().split(' ') |
| 68 | + forecast_date_str = list(filter(len, forecast_date_str))[-3:] |
| 69 | + forecast_date_str = ' '.join(forecast_date_str) |
| 70 | + forecast_date = datetime.strptime(forecast_date_str, '%m/%d/%Y %H%M %Z') |
| 71 | + first_date = forecast_date + timedelta(hours=1) |
| 72 | + elif fmt == 'nbs': |
| 73 | + forecast_date_str = data.strip().split('\n')[0].strip().split(' ') |
| 74 | + forecast_date_str = list(filter(len, forecast_date_str))[-3:] |
| 75 | + forecast_date_str = ' '.join(forecast_date_str) |
| 76 | + utc_hour = int(forecast_date_str[-8:-6]) |
| 77 | + forecast_date = datetime.strptime(forecast_date_str, '%m/%d/%Y %H%M %Z') |
| 78 | + first_date = forecast_date + timedelta(hours=6 - (utc_hour % 3)) |
| 79 | + |
| 80 | + skip_lines = 1 if fmt == 'nbh' else 2 |
| 81 | + |
| 82 | + lines = data.strip().split('\n')[skip_lines:] |
| 83 | + parsed_data = {} |
| 84 | + |
| 85 | + max_len = max([len(line) for line in lines]) |
| 86 | + for line in lines: |
| 87 | + var_name = line[:5].strip() |
| 88 | + value_str = line[5:] |
| 89 | + |
| 90 | + # Split the values into a list of integers every 3 characters |
| 91 | + values = [int(value_str[i:i+3].strip()) if value_str[i:i+3].strip().lstrip('-').isdigit() else None for i in range(0, max_len - 5, 3)] |
| 92 | + |
| 93 | + parsed_data[var_name] = values |
| 94 | + |
| 95 | + df = pd.DataFrame(parsed_data) |
| 96 | + df['Location'] = location |
| 97 | + df['Forecast_Time'] = forecast_date |
| 98 | + |
| 99 | + date = first_date |
| 100 | + dates = [] |
| 101 | + |
| 102 | + prev_hr = None |
| 103 | + for hr in df['UTC']: |
| 104 | + date = date.replace(hour=hr) |
| 105 | + |
| 106 | + if prev_hr is not None and hr < prev_hr: |
| 107 | + date += timedelta(days=1) |
| 108 | + |
| 109 | + dates.append(date) |
| 110 | + prev_hr = hr |
| 111 | + |
| 112 | + df['Time'] = dates |
| 113 | + |
| 114 | + return df |
| 115 | + |
| 116 | + |
| 117 | +if __name__ == '__main__': |
| 118 | + nbh, nbs = get_weather_data() |
| 119 | + |
| 120 | + # Split the data into separate locations |
| 121 | + nbh = nbh.strip().split(' ' * 50)[1:] |
| 122 | + nbs = nbs.strip().split(' ' * 50)[1:] |
| 123 | + |
| 124 | + print('Parsing NBH forecast data...') |
| 125 | + |
| 126 | + df_nbh = pd.DataFrame() |
| 127 | + for forecast in tqdm(nbh): |
| 128 | + location_forecast = parse_weather_data(forecast, 'nbh') |
| 129 | + df_nbh = pd.concat([df_nbh, location_forecast]) |
| 130 | + |
| 131 | + df_nbh.to_csv('wx_nbh.csv', index=False) |
| 132 | + |
| 133 | + print('Parsing NBS forecast data...') |
| 134 | + |
| 135 | + df_nbs = pd.DataFrame() |
| 136 | + for forecast in tqdm(nbs): |
| 137 | + location_forecast = parse_weather_data(forecast, 'nbs') |
| 138 | + df_nbs = pd.concat([df_nbs, location_forecast]) |
| 139 | + |
| 140 | + df_nbs.to_csv('wx_nbs.csv', index=False) |
| 141 | + |
| 142 | + print('Uploading to BigQuery...') |
| 143 | + |
| 144 | + # Upload to BigQuery |
| 145 | + pandas_gbq.to_gbq(df_nbh, 'weather.nbh', project, if_exists='replace', credentials=credentials) |
| 146 | + pandas_gbq.to_gbq(df_nbs, 'weather.nbs', project, if_exists='replace', credentials=credentials) |
0 commit comments