diff --git a/ChildProject/pipelines/samplers.py b/ChildProject/pipelines/samplers.py index 707e616b1..7a1afc9e2 100644 --- a/ChildProject/pipelines/samplers.py +++ b/ChildProject/pipelines/samplers.py @@ -19,11 +19,11 @@ class Sampler(ABC): def __init__( - self, - project: ChildProject.projects.ChildProject, - recordings: Union[str, List[str], pd.DataFrame] = None, - exclude: Union[str, pd.DataFrame] = None, - ignore_segments: Union[str, pd.DataFrame] = None, + self, + project: ChildProject.projects.ChildProject, + recordings: Union[str, List[str], pd.DataFrame] = None, + exclude: Union[str, pd.DataFrame] = None, + ignore_segments: Union[str, pd.DataFrame] = None, ): self.project = project @@ -43,20 +43,20 @@ def __init__( exclude = pd.read_csv(exclude) if not {"recording_filename", "segment_onset", "segment_offset"}.issubset( - set(exclude.columns) + set(exclude.columns) ): raise ValueError( "exclude dataframe is missing a 'recording_filename' column" ) self.excluded = exclude - + if ignore_segments is not None: if not isinstance(ignore_segments, pd.DataFrame): ignore_segments = pd.read_csv(ignore_segments) if not {"recording_filename", "segment_onset", "segment_offset"}.issubset( - set(ignore_segments.columns) + set(ignore_segments.columns) ): raise ValueError( "exclude dataframe is missing a 'recording_filename' column" @@ -94,34 +94,38 @@ def retrieve_segments(self, recording_filename=None): if recording_filename: annotations = annotations[ annotations["recording_filename"] == recording_filename - ] + ] if annotations.shape[0] == 0: return None try: segments = am.get_segments(annotations) - + if not self.ignore_segments.empty: - ignore_segs = self.ignore_segments[self.ignore_segments["recording_filename"] == recording_filename] if recording_filename else self.ignore_segments - ignore_segs = ignore_segs.rename(columns={"segment_onset": "range_onset","segment_offset":"range_offset"}, errors="raise") - ignore_segs = am.get_within_ranges(ignore_segs, sets=[self.annotation_set]) - ignore_segs = am.get_segments(ignore_segs, clip=False) - - suffix = '__left' - segments = segments.merge(ignore_segs, indicator=True, how='outer', on=['speaker_type','segment_onset','segment_offset'], suffixes=("", suffix)).query('_merge == "left_only"') - to_drop = ['_merge'] - for x in segments.columns: - if x.endswith(suffix): to_drop.append(x) - segments = segments.drop(to_drop, axis=1) - - except Exception as e: #will have to check this part, not sure why we should catch the exception + ignore_segs = self.ignore_segments[self.ignore_segments[ + "recording_filename"] == recording_filename] if recording_filename else self.ignore_segments + if not ignore_segs.empty: + ignore_segs = ignore_segs.rename( + columns={"segment_onset": "range_onset", "segment_offset": "range_offset"}, errors="raise") + ignore_segs = am.get_within_ranges(ignore_segs, sets=[self.annotation_set]) + ignore_segs = am.get_segments(ignore_segs, clip=False) + + suffix = '__left' + segments = segments.merge(ignore_segs, indicator=True, how='outer', + on=['speaker_type', 'segment_onset', 'segment_offset'], + suffixes=("", suffix)).query('_merge == "left_only"') + to_drop = ['_merge'] + for x in segments.columns: + if x.endswith(suffix): to_drop.append(x) + segments = segments.drop(to_drop, axis=1) + + except Exception as e: # will have to check this part, not sure why we should catch the exception raise e - return None - + if len(self.target_speaker_type) and len(segments): segments = segments[segments["speaker_type"].isin(self.target_speaker_type)] - + return segments def remove_excluded(self): @@ -143,7 +147,7 @@ def remove_excluded(self): excl_segments = self.excluded.loc[ self.excluded["recording_filename"] == recording - ] + ] excl = Timeline( segments=[ Segment(segment_onset, segment_offset) @@ -166,7 +170,7 @@ def remove_excluded(self): ) self.segments = pd.concat(segments) - + def restructure_overlaps(self): """takes the sampled segments and look for overlaps in them When an overlap is detected, all the segments involved in it are @@ -174,42 +178,43 @@ def restructure_overlaps(self): using existing timestamps (to avoid cutting vocalizations) """ self.segments = self.segments.sort_values(['segment_onset', 'segment_offset']) - + for i in self.segments.index: - #print('new seg') - #print(pd.DataFrame(row).transpose()) - - #select all segments that overlap with the current vocalization (including the vocalization in question) - ovl_index = self.segments.index[(self.segments['segment_onset'] < self.segments.loc[i,'segment_offset']) & - (self.segments['segment_offset'] > self.segments.loc[i,'segment_onset'])] - + # print('new seg') + # print(pd.DataFrame(row).transpose()) + + # select all segments that overlap with the current vocalization (including the vocalization in question) + ovl_index = self.segments.index[(self.segments['segment_onset'] < self.segments.loc[i, 'segment_offset']) & + (self.segments['segment_offset'] > self.segments.loc[i, 'segment_onset'])] + shape = ovl_index.shape[0] - 1 - - if shape < 1: continue #no overlap, skip the segment - - timestamps = sorted(self.segments.loc[ovl_index,'segment_onset'].to_list() + - self.segments.loc[ovl_index,'segment_offset'].to_list()) - + + if shape < 1: continue # no overlap, skip the segment + + timestamps = sorted(self.segments.loc[ovl_index, 'segment_onset'].to_list() + + self.segments.loc[ovl_index, 'segment_offset'].to_list()) + start = timestamps.pop(0) end = timestamps.pop(-1) - + selection = [] for k in range(shape): - ideal_ts = start + ( (k+1) * (end - start) / (shape + 1)) - - best_index = min(range(len(timestamps[:k+1-shape])), key=lambda x: abs(timestamps[:k+1-shape][x]-ideal_ts)) \ - if (shape - k) > 1 else min(range(len(timestamps)), key=lambda x: abs(timestamps[x]-ideal_ts)) - + ideal_ts = start + ((k + 1) * (end - start) / (shape + 1)) + + best_index = min(range(len(timestamps[:k + 1 - shape])), + key=lambda x: abs(timestamps[:k + 1 - shape][x] - ideal_ts)) \ + if (shape - k) > 1 else min(range(len(timestamps)), key=lambda x: abs(timestamps[x] - ideal_ts)) + selection.append(timestamps.pop(best_index)) - + k = 0 self.segments.loc[ovl_index[0], 'segment_onset'] = int(start) while k < shape: self.segments.loc[ovl_index[k], 'segment_offset'] = int(selection[k]) - self.segments.loc[ovl_index[k+1], 'segment_onset'] = int(selection[k]) - k+=1 + self.segments.loc[ovl_index[k + 1], 'segment_onset'] = int(selection[k]) + k += 1 self.segments.loc[ovl_index[-1], 'segment_offset'] = int(end) - + return self.segments def assert_valid(self): @@ -239,7 +244,7 @@ def export_audio(self, destination, profile=None, **kwargs): "wav", ) output_path = os.path.join(destination, output_name) - seg = audio[segment["segment_onset"] : segment["segment_offset"]] + seg = audio[segment["segment_onset"]: segment["segment_offset"]] os.makedirs(os.path.dirname(output_path), exist_ok=True) seg.export(output_path, **kwargs) @@ -249,13 +254,12 @@ class CustomSampler(Sampler): SUBCOMMAND = "custom" def __init__( - self, - project: ChildProject.projects.ChildProject, - segments_path: str, - recordings: Union[str, List[str], pd.DataFrame] = None, - exclude: Union[str, pd.DataFrame] = None, + self, + project: ChildProject.projects.ChildProject, + segments_path: str, + recordings: Union[str, List[str], pd.DataFrame] = None, + exclude: Union[str, pd.DataFrame] = None, ): - super().__init__(project, recordings, exclude) self.segments_path = segments_path @@ -287,16 +291,15 @@ class PeriodicSampler(Sampler): SUBCOMMAND = "periodic" def __init__( - self, - project: ChildProject.projects.ChildProject, - length: int, - period: int, - offset: int = 0, - profile: str = None, - recordings: Union[str, List[str], pd.DataFrame] = None, - exclude: Union[str, pd.DataFrame] = None, + self, + project: ChildProject.projects.ChildProject, + length: int, + period: int, + offset: int = 0, + profile: str = None, + recordings: Union[str, List[str], pd.DataFrame] = None, + exclude: Union[str, pd.DataFrame] = None, ): - super().__init__(project, recordings, exclude) self.length = int(length) self.period = int(period) @@ -391,15 +394,15 @@ class RandomVocalizationSampler(Sampler): SUBCOMMAND = "random-vocalizations" def __init__( - self, - project: ChildProject.projects.ChildProject, - annotation_set: str, - target_speaker_type: list, - sample_size: int, - threads: int = 1, - by: str = "recording_filename", - recordings: Union[str, List[str], pd.DataFrame] = None, - exclude: Union[str, pd.DataFrame] = None, + self, + project: ChildProject.projects.ChildProject, + annotation_set: str, + target_speaker_type: list, + sample_size: int, + threads: int = 1, + by: str = "recording_filename", + recordings: Union[str, List[str], pd.DataFrame] = None, + exclude: Union[str, pd.DataFrame] = None, ): super().__init__(project, recordings, exclude) @@ -440,7 +443,7 @@ def _sample(self): self.segments = map(self._sample_unit, recordings.groupby(self.by)) else: with mp.Pool( - processes=self.threads if self.threads >= 1 else mp.cpu_count() + processes=self.threads if self.threads >= 1 else mp.cpu_count() ) as pool: self.segments = pool.map(self._sample_unit, recordings.groupby(self.by)) @@ -507,20 +510,20 @@ class EnergyDetectionSampler(Sampler): SUBCOMMAND = "energy-detection" def __init__( - self, - project: ChildProject.projects.ChildProject, - windows_length: int, - windows_spacing: int, - windows_count: int, - windows_offset: int = 0, - threshold: float = 0.8, - low_freq: int = 0, - high_freq: int = 100000, - threads: int = 1, - profile: str = "", - by: str = "recording_filename", - recordings: Union[str, List[str], pd.DataFrame] = None, - exclude: Union[str, pd.DataFrame] = None, + self, + project: ChildProject.projects.ChildProject, + windows_length: int, + windows_spacing: int, + windows_count: int, + windows_offset: int = 0, + threshold: float = 0.8, + low_freq: int = 0, + high_freq: int = 100000, + threads: int = 1, + profile: str = "", + by: str = "recording_filename", + recordings: Union[str, List[str], pd.DataFrame] = None, + exclude: Union[str, pd.DataFrame] = None, ): super().__init__(project, recordings, exclude) @@ -587,7 +590,7 @@ def get_recording_windows(self, recording): ) for start in windows_starts: energy = 0 - chunk = audio[start : start + self.windows_length].get_array_of_samples() + chunk = audio[start: start + self.windows_length].get_array_of_samples() channel_energies = np.zeros(channels) for channel in range(channels): @@ -627,7 +630,7 @@ def _sample(self): ) else: with mp.Pool( - processes=self.threads if self.threads >= 1 else mp.cpu_count() + processes=self.threads if self.threads >= 1 else mp.cpu_count() ) as pool: windows = pd.concat( pool.map( @@ -746,18 +749,18 @@ class HighVolubilitySampler(Sampler): SUBCOMMAND = "high-volubility" def __init__( - self, - project: ChildProject.projects.ChildProject, - annotation_set: str, - metric: str, - windows_length: int, - windows_count: int, - speakers: List[str] = ["FEM", "MAL", "CHI"], - threads: int = 1, - by: str = "recording_filename", - recordings: Union[str, List[str], pd.DataFrame] = None, - exclude: Union[str, pd.DataFrame] = None, - ignore_segments: Union[str, pd.DataFrame] = None, + self, + project: ChildProject.projects.ChildProject, + annotation_set: str, + metric: str, + windows_length: int, + windows_count: int, + speakers: List[str] = ["FEM", "MAL", "CHI"], + threads: int = 1, + by: str = "recording_filename", + recordings: Union[str, List[str], pd.DataFrame] = None, + exclude: Union[str, pd.DataFrame] = None, + ignore_segments: Union[str, pd.DataFrame] = None, ): super().__init__(project, recordings, exclude, ignore_segments) @@ -771,8 +774,8 @@ def __init__( def _segment_scores(self, recording): segments = self.retrieve_segments(recording["recording_filename"]) - - if segments is None: + + if segments.empty: print( "warning: no annotations from the set '{}' were found for the recording '{}'".format( self.annotation_set, recording["recording_filename"] @@ -786,16 +789,14 @@ def _segment_scores(self, recording): # code via a complicated string replacement , which imo was incorrect, but # there are edge cases that must be decided (even though they are quiet small) segments["chunk"] = ( - segments["segment_offset"] // self.windows_length + 1 + segments["segment_offset"] // self.windows_length + 1 ).astype("int") - # NOTE: This is shifting the chunks considered for sampling to include the segments that finish in the original chunk limit in their entirety and stop with the last segment. This can shift the chunk significantly, especially in the case of short samples and long speech segments. # here we remove the speech segments to reduce that problem (but that does not solve it completely) segments_no_speech = segments[~segments['speaker_type'].isnull()] segment_onsets = segments_no_speech.groupby("chunk")["segment_onset"].min() segment_offsets = segments_no_speech.groupby("chunk")["segment_offset"].max() - # this dataframe contains the segment onset and offsets for the chunks we calculated. windows = pd.merge( segment_onsets, segment_offsets, left_index=True, right_index=True @@ -818,23 +819,23 @@ def _segment_scores(self, recording): key_child_environment = set(self.speakers) - {"CHI"} segments["is_CT"] = (segments["iti"] < 1000) & ( - ( - (segments["speaker_type"] == "CHI") - & (segments["prev_speaker_type"].isin(key_child_environment)) - ) - | ( - (segments["speaker_type"].isin(key_child_environment)) - & (segments["prev_speaker_type"] == "CHI") - ) + ( + (segments["speaker_type"] == "CHI") + & (segments["prev_speaker_type"].isin(key_child_environment)) + ) + | ( + (segments["speaker_type"].isin(key_child_environment)) + & (segments["prev_speaker_type"] == "CHI") + ) ) - + segments = ( segments.groupby("chunk", as_index=False)[["is_CT"]] .sum() .rename(columns={"is_CT": self.metric}) .merge(windows) ) - + elif self.metric == "vocs": # NOTE: This is the equivalent of CVC (tab2) in rlena_extract.R @@ -892,10 +893,9 @@ def _sample(self): self.segments = map(self._sample_unit, recordings.groupby(self.by)) else: with mp.Pool( - processes=self.threads if self.threads >= 1 else mp.cpu_count() + processes=self.threads if self.threads >= 1 else mp.cpu_count() ) as pool: self.segments = pool.map(self._sample_unit, recordings.groupby(self.by)) - self.segments = pd.concat(self.segments) @staticmethod @@ -966,16 +966,16 @@ class ConversationSampler(Sampler): SUBCOMMAND = "conversations" def __init__( - self, - project: ChildProject.projects.ChildProject, - annotation_set: str, - count: int, - interval: int = 1000, - speakers: List[str] = ["FEM", "MAL", "CHI"], - threads: int = 1, - by: str = "recording_filename", - recordings: Union[str, List[str], pd.DataFrame] = None, - exclude: Union[str, pd.DataFrame] = None, + self, + project: ChildProject.projects.ChildProject, + annotation_set: str, + count: int, + interval: int = 1000, + speakers: List[str] = ["FEM", "MAL", "CHI"], + threads: int = 1, + by: str = "recording_filename", + recordings: Union[str, List[str], pd.DataFrame] = None, + exclude: Union[str, pd.DataFrame] = None, ): super().__init__(project, recordings, exclude) @@ -1015,12 +1015,12 @@ def _retrieve_conversations(self, recording): key_child_environment = set(self.speakers) - {"CHI"} segments["is_CT"] = ( - (segments["speaker_type"] == "CHI") - & (segments["prev_speaker_type"].isin(key_child_environment)) - ) | ( - (segments["speaker_type"].isin(key_child_environment)) - & (segments["prev_speaker_type"] == "CHI") - ) + (segments["speaker_type"] == "CHI") + & (segments["prev_speaker_type"].isin(key_child_environment)) + ) | ( + (segments["speaker_type"].isin(key_child_environment)) + & (segments["prev_speaker_type"] == "CHI") + ) conversations = segments.groupby(segments["breaks_chain"].cumsum()).agg( recording_filename=("recording_filename", "first"), @@ -1054,7 +1054,7 @@ def _sample(self): self.segments = map(self._sample_unit, recordings.groupby(self.by)) else: with mp.Pool( - processes=self.threads if self.threads >= 1 else mp.cpu_count() + processes=self.threads if self.threads >= 1 else mp.cpu_count() ) as pool: self.segments = pool.map(self._sample_unit, recordings.groupby(self.by)) @@ -1128,7 +1128,7 @@ def run(self, path, destination, sampler, func=None, **kwargs): self.segments[ set(self.segments.columns) & {"recording_filename", "segment_onset", "segment_offset"} - ].to_csv(segments_path, index=False) + ].to_csv(segments_path, index=False) print("exported sampled segments to {}".format(segments_path)) dump( {