-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathcausal_chambers.py
More file actions
294 lines (239 loc) · 11.1 KB
/
causal_chambers.py
File metadata and controls
294 lines (239 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import numpy as np
import pandas as pd
from abc import abstractmethod
from causalchamber.datasets import Dataset
from collections import namedtuple
from ..base import UnivariateCRPSTask
from ..config import DATA_STORAGE_PATH
from ..metrics.constraints import MinConstraint, MaxConstraint, ListConstraint
from . import WeightCluster
Window = namedtuple("Window", ["seed", "history_start", "future_start", "time_end"])
class WindTunnelTask(UnivariateCRPSTask):
_context_sources = UnivariateCRPSTask._context_sources + ["c_cov"]
__version__ = "0.0.4" # Modification will trigger re-caching
def __init__(
self,
target_name: str,
covariate_name: str = "load_in",
seed: int = None,
fixed_config: dict = None,
dataset_name: str = "wt_changepoints_v1",
datadir: str = DATA_STORAGE_PATH,
):
self.dataset = Dataset(dataset_name, root=datadir, download=True)
self.seed = seed
self.covariate_name = covariate_name
self.target_name = target_name
super().__init__(seed=seed, fixed_config=fixed_config)
def _get_number_instances(self):
"""
Returns number of different instances/windows this task comprises
"""
return len(self.possible_windows)
def _get_instance_by_idx(self, idx: int, downsample: str = None):
"""
Returns instance corresponding to specified index, downsampled if required.
Parameters
----------
idx : int
Instance index.
downsample : str, optional
Downsampling rule (see https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.resample.html), by default None
"""
window = self.possible_windows[idx]
experiment = self.dataset.get_experiment(name=f"load_in_seed_{window.seed}")
observations = experiment.as_pandas_dataframe()
if self.target_name == "pressure_gap":
observations["pressure_gap"] = (
observations["pressure_downwind"] - observations["pressure_ambient"]
)
selected_variables = observations[
[self.covariate_name, self.target_name]
].copy()
selected_variables.index = pd.to_datetime(observations.timestamp, unit="s")
# select past and future
past_time = selected_variables[window.history_start : window.future_start]
future_time = selected_variables[window.future_start : window.time_end]
# get verbalized covariates
text_covariates = self.verbalize_covariate(
observations.iloc[window.history_start : window.time_end]
)
if downsample is not None:
# downsample numerical variates, not averaging to avoid introducing new values
past_time = past_time.resample(downsample).min().ffill()
future_time = future_time.resample(downsample).min().ffill()
# A hack to avoid overlapping past and future timestamps introduced by the resampling
# (this can happen if window.future_start happens in the middle of the new time interval)
if past_time.index[-1] == future_time.index[0]:
future_time = future_time[1:]
return window, past_time, future_time, text_covariates
@abstractmethod
def _interval_descriptions(self, interval_start, interval_end):
pass
@property
def seasonal_period(self) -> int:
"""
This returns the period which should be used by statistical models for this task.
If negative, this means that the data either has no period, or the history is shorter than the period.
"""
# Not enough history for a single period
return -1
def random_instance(self, downsample: str = "1s"):
"""
Sets random downsampled instance/window as task instance.
Parameters
----------
downsample : str, optional
Downsampling rule (see https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.resample.html), by default "1s".
"""
# with random choice we are not sure to sample different windows at evaluation
if self.seed is not None:
window_idx = self.seed % self._get_number_instances()
else:
window_idx = self.random.choice(self._get_number_instances())
(
self.window,
self.past_time,
self.future_time,
self.scenario,
) = self._get_instance_by_idx(window_idx, downsample)
def verbalize_covariate(self, observations: pd.DataFrame, round_freq: str = "s"):
"""
Verbalizes the numerical covariate given time series where change points are marked in the intervention variate.
Parameters
----------
observations : pd.DataFrame
Time series that contains the covariate and intervention variate over time
round_freq : str, optional
Frequency to which rounding time to, to avoid too long strings, by default "s"
Returns
-------
str
verbalized covariate
"""
# intervention column = 0 when load is constant and = 1 when it changes wrt previous timestep
change_points = list(observations[observations.intervention == 1].index)
# timestep 0 is always tagged as change point, but we don't need it
# timestep 0 is not present when window starts at a later timestep
if 0 in change_points:
change_points.remove(0)
# get datetimes from unix times, drop date as it is constant
timestamps = (
pd.to_datetime(observations.timestamp, unit="s")
.dt.round(freq=round_freq)
.dt.time
)
covariate = observations[self.covariate_name]
return self._interval_descriptions(covariate, change_points, timestamps)
class SpeedFromLoadTask(WindTunnelTask):
_context_sources = WindTunnelTask._context_sources + ["c_causal", "c_i"]
_skills = WindTunnelTask._skills + [
"reasoning: causal",
"reasoning: math",
"instruction following",
]
__version__ = "0.0.4" # Modification will trigger re-caching
def __init__(
self,
seed: int = None,
fixed_config: dict = None,
datadir: str = DATA_STORAGE_PATH,
):
self.possible_windows = [
Window(4, 0, 952, 1100),
Window(7, 0, 613, 1000),
Window(3, 300, 807, 1420),
Window(4, 0, 1886, 2000),
Window(5, 0, 502, 600),
Window(6, 0, 686, 880),
Window(2, 0, 440, 700),
Window(0, 0, 1159, 1300),
Window(1, 0, 779, 900),
Window(1, 0, 779, 1400),
]
super().__init__(
"rpm_in", "load_in", seed, fixed_config, "wt_changepoints_v1", datadir
)
self.background = "The wind tunnel is a chamber with one controllable fan that pushes air through it. We can control the load of the fan (corresponding to the duty cycle of the pulse-width-modulation signal) and measure its speed (in revolutions per minute). The fan is designed so its steady-state speed scales broadly linearly with the load. Unless completely powered off, the fan never operates below a certain speed, corresponding to a minimum effective load between 0.1 and 0.2. The task is to forecast the speed of the fan."
self.constraints = "The load is between 0 and 1. At full load (=1), the fan turns at a maximum speed of 3000 rpm."
self.metric_constraint = ListConstraint([MinConstraint(0), MaxConstraint(3000)])
def _interval_descriptions(self, covariate, change_points, timestamps):
ans = f"The load is set to: {covariate.iloc[0]:.1f}"
for c in change_points:
ans += f" until {timestamps[c]}, {covariate[c]:.1f} from {timestamps[c]}"
ans += f" until {timestamps.iloc[-1]}."
return ans
class ExplicitPressureFromSpeedTask(WindTunnelTask):
_context_sources = WindTunnelTask._context_sources + ["c_causal", "c_i"]
_skills = WindTunnelTask._skills + [
"reasoning: causal",
"reasoning: math",
"instruction following",
]
__version__ = "0.0.3" # Modification will trigger re-caching
def __init__(
self,
seed: int = None,
fixed_config: dict = None,
datadir: str = DATA_STORAGE_PATH,
):
self.possible_windows = [
Window(1, 0, 325, 500),
Window(1, 200, 550, 650),
Window(0, 100, 699, 829),
Window(2, 0, 578, 884),
Window(3, 700, 1364, 1587),
Window(4, 600, 952, 1390),
Window(5, 200, 671, 994),
]
super().__init__(
"pressure_gap", "rpm_in", seed, fixed_config, "wt_changepoints_v1", datadir
)
self.background = "The wind tunnel is a chamber with one controllable fan that pushes air through it. We can control the speed of the fan (rpm_in) and measure the gap between the internal pressure and the ambient pressure (in Pascals). The pressure gap can be estimated from the speed using the affinity laws, which state that the pressure over maximal pressure ratio is proportional to the square of the speed over maximal speed ratio. The task is to forecast the pressure."
self.constraints = (
"The maximal fan speed is 3000 rpm and the maximal pressure is 37.5 Pa."
)
self.metric_constraint = MaxConstraint(37.5)
def _interval_descriptions(self, covariate, change_points, timestamps):
ans = f"The speed starts at {covariate.iloc[0]:.1f}."
for i, c in enumerate(change_points[:-1]):
ans += f" At {timestamps[c]}, it rapidly and smoothly changes to {covariate[change_points[i+1]-1]:.1f}."
ans += f" At {timestamps[change_points[-1]]}, it rapidly and smoothly changes to {covariate.iloc[-1]:.1f}."
return ans
class ImplicitPressureFromSpeedTask(ExplicitPressureFromSpeedTask):
_context_sources = WindTunnelTask._context_sources + ["c_causal", "c_i"]
_skills = WindTunnelTask._skills + [
"reasoning: causal",
"retrieval: memory",
"reasoning: math",
"instruction following",
]
__version__ = "0.0.3" # Modification will trigger re-caching
def __init__(
self,
seed: int = None,
fixed_config: dict = None,
datadir: str = DATA_STORAGE_PATH,
):
super().__init__(seed, fixed_config, datadir)
self.background = "The wind tunnel is a chamber with one controllable fan that pushes air through it. We can control the speed of the fan (in revolutions per minute) and measure the gap between the internal pressure and the ambient pressure (in Pascals). The pressure gap can be estimated from the speed using the affinity laws. The task is to forecast the pressure."
__TASKS__ = [
SpeedFromLoadTask,
ExplicitPressureFromSpeedTask,
ImplicitPressureFromSpeedTask,
]
__CLUSTERS__ = [
WeightCluster(
weight=1,
tasks=[
SpeedFromLoadTask,
],
),
WeightCluster(
weight=1,
tasks=[
ExplicitPressureFromSpeedTask,
ImplicitPressureFromSpeedTask,
],
),
]