20
20
from .weekday import Weekday
21
21
22
22
23
- def write_to_csv (df , geo_level , write_se , day_shift , out_name , output_path = "." , start_date = None , end_date = None ):
23
+ def write_to_csv (df , geo_level , write_se , day_shift , out_name , logger , output_path = "." , start_date = None , end_date = None ):
24
24
"""Write sensor values to csv.
25
25
26
26
Args:
@@ -47,15 +47,15 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
47
47
assert df [suspicious_se_mask ].empty , " se contains suspiciously large values"
48
48
assert not df ["se" ].isna ().any (), " se contains nan values"
49
49
if write_se :
50
- logging .info ("========= WARNING: WRITING SEs TO {0} =========" .format (out_name ))
50
+ logger .info ("========= WARNING: WRITING SEs TO {0} =========" .format (out_name ))
51
51
else :
52
52
df .loc [:, "se" ] = np .nan
53
53
54
54
assert not df ["val" ].isna ().any (), " val contains nan values"
55
55
suspicious_val_mask = df ["val" ].gt (90 )
56
56
if not df [suspicious_val_mask ].empty :
57
57
for geo in df .loc [suspicious_val_mask , "geo_id" ]:
58
- logging .warning ("value suspiciously high, {0}: {1}" .format (
58
+ logger .warning ("value suspiciously high, {0}: {1}" .format (
59
59
geo , out_name
60
60
))
61
61
@@ -68,10 +68,10 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, output_path=".",
68
68
sensor = out_name ,
69
69
write_empty_days = True
70
70
)
71
- logging .debug ("wrote {0} rows for {1} {2}" .format (
71
+ logger .debug ("wrote {0} rows for {1} {2}" .format (
72
72
df .size , df ["geo_id" ].unique ().size , geo_level
73
73
))
74
- logging .debug ("wrote files to {0}" .format (output_path ))
74
+ logger .debug ("wrote files to {0}" .format (output_path ))
75
75
return dates
76
76
77
77
@@ -87,7 +87,8 @@ def __init__(self,
87
87
weekday ,
88
88
numtype ,
89
89
se ,
90
- wip_signal ):
90
+ wip_signal ,
91
+ logger ):
91
92
"""Init Sensor Updator.
92
93
93
94
Args:
@@ -100,7 +101,9 @@ def __init__(self,
100
101
numtype: type of count data used, one of ["covid", "cli"]
101
102
se: boolean to write out standard errors, if true, use an obfuscated name
102
103
wip_signal: Prefix for WIP signals
104
+ logger: the structured logger
103
105
"""
106
+ self .logger = logger
104
107
self .startdate , self .enddate , self .dropdate = [
105
108
pd .to_datetime (t ) for t in (startdate , enddate , dropdate )]
106
109
# handle dates
@@ -149,7 +152,7 @@ def geo_reindex(self, data):
149
152
geo = self .geo
150
153
gmpr = GeoMapper ()
151
154
if geo not in {"county" , "state" , "msa" , "hrr" , "nation" , "hhs" }:
152
- logging .error ("{0} is invalid, pick one of 'county', "
155
+ self . logger .error ("{0} is invalid, pick one of 'county', "
153
156
"'state', 'msa', 'hrr', 'hss','nation'" .format (geo ))
154
157
return False
155
158
if geo == "county" :
@@ -201,12 +204,12 @@ def update_sensor(self,
201
204
sub_data .reset_index (level = 0 ,inplace = True )
202
205
if self .weekday :
203
206
sub_data = Weekday .calc_adjustment (wd_params , sub_data )
204
- res = CHCSensor .fit (sub_data , self .burnindate , geo_id )
207
+ res = CHCSensor .fit (sub_data , self .burnindate , geo_id , self . logger )
205
208
res = pd .DataFrame (res ).loc [final_sensor_idxs ]
206
209
dfs .append (res )
207
210
else :
208
211
n_cpu = min (10 , cpu_count ())
209
- logging .debug ("starting pool with {0} workers" .format (n_cpu ))
212
+ self . logger .debug ("starting pool with {0} workers" .format (n_cpu ))
210
213
with Pool (n_cpu ) as pool :
211
214
pool_results = []
212
215
for geo_id , sub_data in data_frame .groupby (level = 0 ,as_index = False ):
@@ -215,7 +218,7 @@ def update_sensor(self,
215
218
sub_data = Weekday .calc_adjustment (wd_params , sub_data )
216
219
pool_results .append (
217
220
pool .apply_async (
218
- CHCSensor .fit , args = (sub_data , self .burnindate , geo_id ,),
221
+ CHCSensor .fit , args = (sub_data , self .burnindate , geo_id , self . logger ),
219
222
)
220
223
)
221
224
pool_results = [proc .get () for proc in pool_results ]
@@ -244,7 +247,8 @@ def update_sensor(self,
244
247
write_se = self .se ,
245
248
day_shift = Config .DAY_SHIFT ,
246
249
out_name = signal ,
247
- output_path = output_path
250
+ output_path = output_path ,
251
+ logger = self .logger
248
252
)
249
253
if len (dates ) > 0 :
250
254
stats .append ((max (dates ), len (dates )))
0 commit comments