Skip to content

Commit

Permalink
Copied stores files from noisepy repo (#2)
Browse files Browse the repository at this point in the history
* Instantiated the repo using LINCC template

* Instantiated the repo using LINCC template

* Instantiated the repo using LINCC template

* Copied store files from noisepy repo
  • Loading branch information
IshikaKhandelwal authored Dec 30, 2023
1 parent 1db77ae commit a0bc42e
Show file tree
Hide file tree
Showing 11 changed files with 3,426 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .copier-answers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ include_benchmarks: false
include_docs: true
include_notebooks: true
mypy_type_checking: none
package_name: noisepy-seis-io
package_name: noisepy_seis_io
preferred_linter: black
project_license: MIT
project_name: noisepy-io
Expand Down
14 changes: 7 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ repos:
- id: black


- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
hooks:
- id: flake8
additional_dependencies:
- flake8-black
exclude: .git,__pycache__,build,dist
# - repo: https://github.com/PyCQA/flake8
# rev: 6.0.0
# hooks:
# - id: flake8
# additional_dependencies:
# - flake8-black
# exclude: .git,__pycache__,build,dist
# repos:

# # Compare the local template version to the latest remote template version
Expand Down
4 changes: 2 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
.. noisepy-seis-io documentation main file.
.. noisepy_seis_io documentation main file.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to noisepy-seis-io's documentation!
Welcome to noisepy_seis_io's documentation!
========================================================================================

Dev Guide - Getting Started
Expand Down
259 changes: 259 additions & 0 deletions src/noisepy/seis/io/asdfstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import glob
import logging
import os
from pathlib import Path
from typing import Callable, Dict, Generic, List, Optional, Set, Tuple, TypeVar

import numpy as np
import obspy
import pyasdf
from datetimerange import DateTimeRange

from . import noise_module
from .constants import PROGRESS_DATATYPE
from .datatypes import Channel, ChannelData, ChannelType, CrossCorrelation, Stack, Station
from .stores import (
CrossCorrelationDataStore,
RawDataStore,
StackStore,
parse_station_pair,
parse_timespan,
timespan_str,
)

logger = logging.getLogger(__name__)

T = TypeVar("T")


class ASDFDirectory(Generic[T]):
"""
Utility class used byt ASDFRawDataStore and ASDFCCStore to provide easy access
to a set of ASDF files in a directory that follow a specific naming convention.
The files are named after a generic type T (e.g. a timestamp or a pair of stations)
so the constructor takes two functions to map between the type T and the corresponding
file name.
"""

def __init__(
self, directory: str, mode: str, get_filename: Callable[[T], str], parse_filename: Callable[[str], T]
) -> None:
if mode not in ["a", "r"]:
raise ValueError(f"Invalid mode {mode}. Must be 'a' or 'r'")

self.directory = directory
self.mode = mode
self.get_filename = get_filename
self.parse_filename = parse_filename

def __getitem__(self, key: T) -> pyasdf.ASDFDataSet:
return self._get_dataset(key, self.mode)

def _get_dataset(self, key: T, mode: str) -> pyasdf.ASDFDataSet:
file_name = self.get_filename(key)
file_path = os.path.join(self.directory, file_name)
return _get_dataset(file_path, mode)

def get_keys(self) -> List[T]:
h5files = sorted(glob.glob(os.path.join(self.directory, "**/*.h5"), recursive=True))
return list(map(self.parse_filename, h5files))

def contains(self, key: T, data_type: str, path: str = None):
# contains is always a read
ccf_ds = self._get_dataset(key, "r")

if not ccf_ds:
return False
with ccf_ds:
# source-receiver pair
exists = data_type in ccf_ds.auxiliary_data
if path is not None and exists:
return path in ccf_ds.auxiliary_data[data_type]
return exists

def add_aux_data(self, key: T, params: Dict, data_type: str, path: str, data: np.ndarray):
with self[key] as ccf_ds:
ccf_ds.add_auxiliary_data(data=data, data_type=data_type, path=path, parameters=params)


class ASDFRawDataStore(RawDataStore):
"""
A data store implementation to read from a directory of ASDF files. Each file is considered
a timespan with the naming convention: 2019_02_01_00_00_00T2019_02_02_00_00_00.h5
"""

def __init__(self, directory: str, mode: str = "r"):
super().__init__()
self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, parse_timespan)

def get_channels(self, timespan: DateTimeRange) -> List[Channel]:
with self.datasets[timespan] as ds:
stations = [self._create_station(timespan, sta) for sta in ds.waveforms.list() if sta is not None]
channels = [
Channel(ChannelType(tag), sta)
for sta in stations
for tag in ds.waveforms[str(sta)].get_waveform_tags()
]
return channels

def get_timespans(self) -> List[DateTimeRange]:
return self.datasets.get_keys()

def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData:
with self.datasets[timespan] as ds:
stream = ds.waveforms[str(chan.station)][str(chan.type)]
return ChannelData(stream)

def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory:
with self.datasets[timespan] as ds:
return ds.waveforms[str(station)]["StationXML"]

def _create_station(self, timespan: DateTimeRange, name: str) -> Optional[Station]:
# What should we do if there's no StationXML?
try:
with self.datasets[timespan] as ds:
inventory = ds.waveforms[name]["StationXML"]
sta, net, lon, lat, elv, loc = noise_module.sta_info_from_inv(inventory)
return Station(net, sta, lat, lon, elv, loc)
except Exception as e:
logger.warning(f"Missing StationXML for station {name}. {e}")
return None


class ASDFCCStore(CrossCorrelationDataStore):
def __init__(self, directory: str, mode: str = "a") -> None:
super().__init__()
Path(directory).mkdir(exist_ok=True)
self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, parse_timespan)

# CrossCorrelationDataStore implementation
def contains(self, src: Station, rec: Station, timespan: DateTimeRange) -> bool:
station_pair = self._get_station_pair(src, rec)
contains = self.datasets.contains(timespan, station_pair)
if contains:
logger.info(f"Cross-correlation {station_pair} already exists")
return contains

def append(
self,
timespan: DateTimeRange,
src: Station,
rec: Station,
ccs: List[CrossCorrelation],
):
for cc in ccs:
station_pair = self._get_station_pair(src, rec)
# source-receiver pair: e.g. CI.ARV_CI.BAK
# channels, e.g. bhn_bhn
channels = self._get_channel_pair(cc.src, cc.rec)
self.datasets.add_aux_data(timespan, cc.parameters, station_pair, channels, cc.data)

def get_timespans(self, src: Station, rec: Station) -> List[DateTimeRange]:
timespans = {}
pair_key = self._get_station_pair(src, rec)

def visit(pairs, ts):
if pair_key in pairs:
timespans[str(ts)] = ts

self._visit_pairs(visit)
return sorted(timespans.values(), key=lambda t: str(t))

def get_station_pairs(self) -> List[Tuple[Station, Station]]:
pairs_all = set()
self._visit_pairs(lambda pairs, _: pairs_all.update((parse_station_pair(p) for p in pairs)))
return list(pairs_all)

def read(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[CrossCorrelation]:
with self.datasets[timespan] as ccf_ds:
dtype = self._get_station_pair(src_sta, rec_sta)
if dtype not in ccf_ds.auxiliary_data:
logging.warning(f"No data available for {timespan}/{dtype}")
return []
ccs = []
ch_pair_paths = ccf_ds.auxiliary_data[dtype].list()
for ch_pair_path in ch_pair_paths:
src_ch, rec_ch = _parse_channel_path(ch_pair_path)
stream = ccf_ds.auxiliary_data[dtype][ch_pair_path]
ccs.append(CrossCorrelation(src_ch, rec_ch, stream.parameters, stream.data[:]))
return ccs

def _visit_pairs(self, visitor: Callable[[Set[Tuple[str, str]], DateTimeRange], None]):
all_timespans = self.datasets.get_keys()
for timespan in all_timespans:
with self.datasets[timespan] as ccf_ds:
data = ccf_ds.auxiliary_data.list()
pairs = {p for p in data if p != PROGRESS_DATATYPE}
visitor(pairs, timespan)

def _get_channel_pair(self, src_chan: ChannelType, rec_chan: ChannelType) -> str:
return f"{src_chan}_{rec_chan}"

def _get_station_pair(self, src_sta: Station, rec_sta: Station) -> str:
return f"{src_sta}_{rec_sta}"


class ASDFStackStore(StackStore):
def __init__(self, directory: str, mode: str = "a"):
super().__init__()
self.datasets = ASDFDirectory(directory, mode, _filename_from_stations, _parse_station_pair_h5file)

# TODO: Do we want to support storing stacks from different timespans in the same store?
def append(self, timespan: DateTimeRange, src: Station, rec: Station, stacks: List[Stack]):
for stack in stacks:
self.datasets.add_aux_data((src, rec), stack.parameters, stack.name, stack.component, stack.data)

def get_station_pairs(self) -> List[Tuple[Station, Station]]:
return self.datasets.get_keys()

def get_timespans(self, src: Station, rec: Station) -> List[DateTimeRange]:
# TODO: Do we want to support storing stacks from different timespans in the same store?
return []

def read(self, timespan: DateTimeRange, src: Station, rec: Station) -> List[Stack]:
stacks = []
with self.datasets[(src, rec)] as ds:
for name in ds.auxiliary_data.list():
for component in ds.auxiliary_data[name].list():
stream = ds.auxiliary_data[name][component]
stacks.append(Stack(component, name, stream.parameters, stream.data[:]))
return stacks


def _get_dataset(filename: str, mode: str) -> pyasdf.ASDFDataSet:
logger.debug(f"Opening {filename}")
if os.path.exists(filename):
return pyasdf.ASDFDataSet(filename, mode=mode, mpi=False, compression=None)
elif mode == "r":
return None
else: # create new file
Path(filename).parent.mkdir(exist_ok=True, parents=True)
return pyasdf.ASDFDataSet(filename, mode=mode, mpi=False, compression=None)


def _filename_from_stations(pair: Tuple[Station, Station]) -> str:
return f"{pair[0]}/{pair[0]}_{pair[1]}.h5"


def _filename_from_timespan(timespan: DateTimeRange) -> str:
return f"{timespan_str(timespan)}.h5"


def _parse_station_pair_h5file(path: str) -> Tuple[Station, Station]:
pair = Path(path).stem
return parse_station_pair(pair)


def _parse_channel_path(path: str) -> Tuple[ChannelType, ChannelType]:
parts = path.split("_")
if len(parts) == 2: # e.g. bhn_bhn
return tuple(map(ChannelType, parts))
elif len(parts) == 3: # when we have one location code
if parts[1].isdigit(): # e.g. bhn_00_bhn
return tuple(map(ChannelType, ["_".join(parts[0:2]), parts[2]]))
else: # e.g. bhn_bhn_00
return tuple(map(ChannelType, [parts[0], "_".join(parts[1:3])]))
elif len(parts) == 4: # when we have two location codes: e.g. bhn_00_bhn_00
return tuple(map(ChannelType, ["_".join(parts[0:2]), "_".join(parts[2:4])]))
else:
raise ValueError(f"Invalid channel path {path}")
12 changes: 12 additions & 0 deletions src/noisepy/seis/io/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
STATION_FILE = "station.csv"
DATE_FORMAT_HELP = "%%Y_%%m_%%d_%%H_%%M_%%S"
DATE_FORMAT = "%Y_%m_%d_%H_%M_%S"
DONE_PATH = "done"
PROGRESS_DATATYPE = "Progress"
CONFIG_FILE = "config.yaml"
AWS_BATCH_JOB_ARRAY_INDEX = "AWS_BATCH_JOB_ARRAY_INDEX"
AWS_BATCH_JOB_ID = "AWS_BATCH_JOB_ID"
AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV"
NO_DATA_MSG = "Abort! no available seismic files for FFT"
NO_CCF_DATA_MSG = "Abort! no available CCF data for stacking"
WILD_CARD = "*"
Loading

0 comments on commit a0bc42e

Please sign in to comment.