-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathpv_site_datamodule.py
68 lines (58 loc) · 2.37 KB
/
pv_site_datamodule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
""" Data module for pytorch lightning """
import glob
from ocf_datapipes.batch import BatchKey, batch_to_tensor, stack_np_examples_into_batch
from ocf_datapipes.training.pvnet_site import (
pvnet_site_datapipe,
pvnet_site_netcdf_datapipe,
split_dataset_dict_dp,
uncombine_from_single_dataset,
)
from pvnet.data.base import BaseDataModule
class PVSiteDataModule(BaseDataModule):
"""Datamodule for training pvnet site and using pvnet site pipeline in `ocf_datapipes`."""
def _get_datapipe(self, start_time, end_time):
data_pipeline = pvnet_site_datapipe(
self.configuration,
start_time=start_time,
end_time=end_time,
)
data_pipeline = data_pipeline.map(uncombine_from_single_dataset).map(split_dataset_dict_dp)
data_pipeline = data_pipeline.pvnet_site_convert_to_numpy_batch()
data_pipeline = (
data_pipeline.batch(self.batch_size)
.map(stack_np_examples_into_batch)
.map(batch_to_tensor)
)
return data_pipeline
def _get_premade_batches_datapipe(self, subdir, shuffle=False):
filenames = list(glob.glob(f"{self.batch_dir}/{subdir}/*.nc"))
data_pipeline = pvnet_site_netcdf_datapipe(
keys=["pv", "nwp"], # add other keys e.g. sat if used as input in site model
filenames=filenames,
)
data_pipeline = (
data_pipeline.batch(self.batch_size)
.map(stack_np_examples_into_batch)
.map(batch_to_tensor)
)
if shuffle:
data_pipeline = (
data_pipeline.shuffle(buffer_size=100)
.sharding_filter()
# Split the batches and reshuffle them to be combined into new batches
.split_batches(splitting_key=BatchKey.pv)
.shuffle(buffer_size=self.shuffle_factor * self.batch_size)
)
else:
data_pipeline = (
data_pipeline.sharding_filter()
# Split the batches so we can use any batch-size
.split_batches(splitting_key=BatchKey.pv)
)
data_pipeline = (
data_pipeline.batch(self.batch_size)
.map(stack_np_examples_into_batch)
.map(batch_to_tensor)
.set_length(int(len(filenames) / self.batch_size))
)
return data_pipeline