From 6e64232d7102dd673ec4435d6473f23807a4e007 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 1 Jun 2017 09:51:27 -0400 Subject: [PATCH] RF: PEP8 --- cili/cleanup.py | 51 ++++++---- cili/extract.py | 35 ++++--- cili/models.py | 21 ++-- cili/util.py | 230 ++++++++++++++++++++++++------------------ setup.py | 14 +-- tests/config.py | 2 +- tests/test_extract.py | 60 ++++++++--- tests/test_util.py | 61 ++++++++--- 8 files changed, 307 insertions(+), 167 deletions(-) diff --git a/cili/cleanup.py b/cili/cleanup.py index b89ba07..5cbcc3a 100644 --- a/cili/cleanup.py +++ b/cili/cleanup.py @@ -4,14 +4,15 @@ #------------------------------------------------------------- # Masking + def find_nested_events(samples, outer, inner): """ Returns indices of events in outer that contain events in inner - + This is helpful for dealing with EyeLink blink events. Each is embedded within a saccade event, and the EyeLink documentation states that data within saccades that contain blinks is unreliable. So we use this method to find those saccade events. - + Parameters ---------- samples (cili Samples) @@ -30,18 +31,21 @@ def find_nested_events(samples, outer, inner): post_onsets = onsets + inner.duration # convert to list of positional indices max_onset = samples.index[-1] - last_idxs = post_onsets.apply(lambda x: max(0, samples.index.searchsorted(x, side="right")-1)) + last_idxs = post_onsets.apply(lambda x: max( + 0, samples.index.searchsorted(x, side="right") - 1)) # step back by one positional index to get pos. index of last samples of our events. # stupid fix - don't nudge the index back for events whose duration went beyond the samples end_safe_evs = post_onsets <= max_onset last_idxs[end_safe_evs] = last_idxs[end_safe_evs] - 1 # get the time indices of the last samples of our events last_onsets = last_idxs.apply(lambda x: samples.index[x]) - idxs = outer.apply(has_overlapping_events, axis=1, args=[onsets, last_onsets]) + idxs = outer.apply(has_overlapping_events, axis=1, + args=[onsets, last_onsets]) if len(idxs) == 0: return pd.DataFrame() return outer[idxs] + def has_overlapping_events(event, onsets, last_onsets): """ Searches for onset/last_onset pairs overlapping with the event in 'event.' @@ -57,9 +61,11 @@ def has_overlapping_events(event, onsets, last_onsets): last_onsets (numpy array like) Last indices of the potentially intersecting events. """ - matches = last_onsets[(onsets <= event.name+event.duration) & (last_onsets >= event.name)] + matches = last_onsets[(onsets <= event.name + + event.duration) & (last_onsets >= event.name)] return len(matches) > 0 + def get_eyelink_mask_events(samples, events, find_recovery=True): """ Finds events from EyeLink data that contain untrustworthy data. @@ -79,11 +85,13 @@ def get_eyelink_mask_events(samples, events, find_recovery=True): the proper ends for blink events. """ be = events.EBLINK.duration.to_frame() - be = pd.concat([be, find_nested_events(samples, events.ESACC.duration.to_frame(), be)]) + be = pd.concat([be, find_nested_events( + samples, events.ESACC.duration.to_frame(), be)]) if find_recovery: adjust_eyelink_recov_idxs(samples, be) return be + def get_eyelink_mask_idxs(samples, events, find_recovery=True): """ Calls get_eyelink_mask_events, finds indices from 'samples' within the returned events. @@ -93,6 +101,7 @@ def get_eyelink_mask_idxs(samples, events, find_recovery=True): bi = ev_row_idxs(samples, be) return bi + def mask_eyelink_blinks(samples, events, mask_fields=["pup_l"], find_recovery=True): """ Sets the value of all untrustworthy data points to NaN. @@ -118,6 +127,7 @@ def mask_eyelink_blinks(samples, events, mask_fields=["pup_l"], find_recovery=Tr samps.loc[indices, mask_fields] = float('nan') return samps + def mask_zeros(samples, mask_fields=["pup_l"]): """ Sets any 0 values in columns in mask_fields to NaN @@ -133,6 +143,7 @@ def mask_zeros(samples, mask_fields=["pup_l"]): samps[samps[f] == 0] = float("nan") return samps + def interp_zeros(samples, interp_fields=["pup_l"]): """ Replace 0s in 'samples' with linearly interpolated data. @@ -151,6 +162,7 @@ def interp_zeros(samples, interp_fields=["pup_l"]): samps.fillna(method="ffill", inplace=True) return samps + def interp_eyelink_blinks(samples, events, find_recovery=True, interp_fields=["pup_l"]): """ Replaces the value of all untrustworthy data points linearly interpolated data. @@ -171,12 +183,14 @@ def interp_eyelink_blinks(samples, events, find_recovery=True, interp_fields=["p interp_fields (list of strings) The columns in which we should interpolate data. """ - samps = mask_eyelink_blinks(samples, events, mask_fields=interp_fields, find_recovery=find_recovery) + samps = mask_eyelink_blinks( + samples, events, mask_fields=interp_fields, find_recovery=find_recovery) # inplace=True causes a crash, so for now... # fixed by #6284 ; will be in 0.14 release of pandas samps = samps.interpolate(method="linear", axis=0, inplace=False) return samps + def ev_row_idxs(samples, events): """ Returns the indices in 'samples' contained in events from 'events.' @@ -190,11 +204,12 @@ def ev_row_idxs(samples, events): import numpy as np idxs = [] for idx, dur in events.duration.items(): - idxs.extend(list(range(idx, int(idx+dur)))) + idxs.extend(list(range(idx, int(idx + dur)))) idxs = np.unique(idxs) idxs = np.intersect1d(idxs, samples.index.tolist()) return idxs + def adjust_eyelink_recov_idxs(samples, events, z_thresh=.1, window=1000, kernel_size=100): """ Extends event endpoint until the z-scored derivative of 'field's timecourse drops below thresh @@ -225,35 +240,37 @@ def adjust_eyelink_recov_idxs(samples, events, z_thresh=.1, window=1000, kernel_ # find a pupil size field to use p_fields = [f for f in samples.columns if f in PUP_FIELDS] if len(p_fields) == 0: - return # if we can't find a pupil field, we won't make any adjustments + return # if we can't find a pupil field, we won't make any adjustments field = p_fields[0] # use pandas to take rolling mean. pandas' kernel looks backwards, so we need to pull a reverse... dfs = np.gradient(samples[field].values) reversed_dfs = dfs[::-1] - reversed_dfs_ravg = np.array(pd.rolling_mean(pd.Series(reversed_dfs),window=kernel_size, min_periods=1)) + reversed_dfs_ravg = np.array(pd.rolling_mean( + pd.Series(reversed_dfs), window=kernel_size, min_periods=1)) dfs_ravg = reversed_dfs_ravg[::-1] - dfs_ravg = np.abs((dfs_ravg-np.mean(dfs_ravg))/np.std(dfs_ravg)) + dfs_ravg = np.abs((dfs_ravg - np.mean(dfs_ravg)) / np.std(dfs_ravg)) samp_count = len(samples) # search for drop beneath z_thresh after end index new_durs = [] for idx, dur in events.duration.items(): try: - s_pos = samples.index.get_loc(idx + dur) - 1 - e_pos = samples.index[min(s_pos+window, samp_count-1)] + s_pos = samples.index.get_loc(idx + dur) - 1 + e_pos = samples.index[min(s_pos + window, samp_count - 1)] except Exception as e: # can't do much about that s_pos = e_pos = 0 if s_pos == e_pos: new_durs.append(dur) continue - e_dpos = np.argmax(dfs_ravg[s_pos:e_pos] < z_thresh) # 0 if not found - new_end = samples.index[min(s_pos + e_dpos, samp_count-1)] + e_dpos = np.argmax(dfs_ravg[s_pos:e_pos] < z_thresh) # 0 if not found + new_end = samples.index[min(s_pos + e_dpos, samp_count - 1)] new_durs.append(new_end - idx) events.duration = new_durs #------------------------------------------------------------- # Filters + def butterworth_series(samples, fields=["pup_l"], filt_order=5, cutoff_freq=.01, inplace=False): """ Applies a butterworth filter to the given fields @@ -267,6 +284,6 @@ def butterworth_series(samples, fields=["pup_l"], filt_order=5, cutoff_freq=.01, from numpy import array samps = samples if inplace else samples.copy(deep=True) B, A = signal.butter(filt_order, cutoff_freq, output="BA") - samps[fields] = samps[fields].apply(lambda x: signal.filtfilt(B,A,x), axis=0) + samps[fields] = samps[fields].apply( + lambda x: signal.filtfilt(B, A, x), axis=0) return samps - diff --git a/cili/extract.py b/cili/extract.py index a8dbfdc..230ba71 100644 --- a/cili/extract.py +++ b/cili/extract.py @@ -6,6 +6,7 @@ TIME_UNITS = 'time' SAMP_UNITS = 'samples' + def extract_event_ranges(samples, events_dataframe, start_offset=0, end_offset=0, round_indices=True, borrow_attributes=[]): """ Extracts ranges from samples based on event timing. @@ -52,7 +53,8 @@ def extract_event_ranges(samples, events_dataframe, start_offset=0, r_times.columns = ['last_onset'] # sanity check - make sure no events start before the data, or end afterwards if any(r_times.index < samples.index[0]): - raise ValueError("at least one event range starts before the first sample") + raise ValueError( + "at least one event range starts before the first sample") if any(r_times.index > samples.index[-1]): raise ValueError("at least one event range ends after the last sample") @@ -65,14 +67,14 @@ def extract_event_ranges(samples, events_dataframe, start_offset=0, # we're going to make a df with a hierarchical index. samples['orig_idx'] = samples.index midx = pd.MultiIndex.from_product([list(range(len(e_starts))), list(range(r_len))], - names=['event', 'onset']) + names=['event', 'onset']) # get all of the samples! # idxs = [] df = pd.DataFrame() idx = 0 for stime, etime in r_times.itertuples(): # get the start time... add the number of indices that you want... - s_idx = np.where(samples.index > stime)[0][0]-1 + s_idx = np.where(samples.index > stime)[0][0] - 1 e_idx = s_idx + r_len - 1 stime = samples.index[s_idx] etime = samples.index[e_idx] @@ -85,6 +87,7 @@ def extract_event_ranges(samples, events_dataframe, start_offset=0, df.index = midx return df + def extract_events(samples, events, offset=0, duration=0, units='samples', borrow_attributes=[]): """ Extracts ranges from samples based on event timing and sample count. @@ -136,39 +139,45 @@ def extract_events(samples, events, offset=0, duration=0, if units == TIME_UNITS: # get the indices for the first event (minus the first index), then use # the length of the first event as a template for all events - r_times = e_starts+offset + r_times = e_starts + offset ev_idxs = np.logical_and(samples.index <= r_times.iloc[0] + duration, samples.index > r_times.iloc[0]) r_dur = len(np.where(ev_idxs)[0]) + 1 - r_idxs = [np.where(samples.index > rt)[0][0]-1 for rt in r_times] + r_idxs = [np.where(samples.index > rt)[0][0] - 1 for rt in r_times] # sanity check - make sure no events start before the data, or end afterwards if any(r_times < samples.index[0]): - raise ValueError("at least one event range starts before the first sample") + raise ValueError( + "at least one event range starts before the first sample") if any(r_times > samples.index[-1]): - raise ValueError("at least one event range ends after the last sample") + raise ValueError( + "at least one event range ends after the last sample") elif units == SAMP_UNITS: # just find the indexes of the event starts, and offset by sample count - r_idxs = np.array([np.where(samples.index > et)[0][0]-1+offset for et in e_starts]) + r_idxs = np.array([np.where(samples.index > et)[0] + [0] - 1 + offset for et in e_starts]) r_dur = duration if any(r_idxs < 0): - raise ValueError("at least one event range starts before the first sample") + raise ValueError( + "at least one event range starts before the first sample") if any(r_idxs >= len(samples)): - raise ValueError("at least one event range ends after the last sample") + raise ValueError( + "at least one event range ends after the last sample") else: raise ValueError("Not a valid unit!") # make a hierarchical index samples['orig_idx'] = samples.index midx = pd.MultiIndex.from_product([list(range(len(e_starts))), list(range(r_dur))], - names=['event', 'onset']) + names=['event', 'onset']) # get the samples df = pd.DataFrame() idx = 0 for s_idx in r_idxs: # get the start time... add the number of indices that you want... - e_idx = s_idx + r_dur-1 # pandas.loc indexing is inclusive + e_idx = s_idx + r_dur - 1 # pandas.loc indexing is inclusive # this deepcopy is heavy handed... but gets around some early pandas bugs - new_df = deepcopy(samples.loc[samples.index[s_idx] : samples.index[e_idx]]) + new_df = deepcopy( + samples.loc[samples.index[s_idx]: samples.index[e_idx]]) for ba in borrow_attributes: new_df[ba] = events.iloc[idx].get(ba, float('nan')) df = pd.concat([df, new_df]) diff --git a/cili/models.py b/cili/models.py index fd1f445..ae38ecf 100644 --- a/cili/models.py +++ b/cili/models.py @@ -2,12 +2,14 @@ import pandas.io.pytables as pt from pandas.compat import u_safe as u, string_types, isidentifier + class SaveMixin(object): """ Bakes in some save settings for NDFrame subclasses - + You can still use the pandas methods, but for quick saving and loading this mixin provides some setting you might want to reuse. """ + def __init__(self, *args, **kwargs): super(SaveMixin, self).__init__(*args, **kwargs) @@ -28,14 +30,17 @@ def load_saved(cls, save_path): def from_pd_obj(cls, pd_obj): return cls(pd_obj._data.copy()).__finalize__(pd_obj) + class Samples(SaveMixin, pd.DataFrame): """Pandas DataFrame subclas for representing eye tracking timeseries data. Indexes may be hierarchical. """ + def __init__(self, *args, **kwargs): super(Samples, self).__init__(*args, **kwargs) - + + class Events(object): """Pandas Panel-like object that gives you access to DataFrames via standard accessors. @@ -48,6 +53,7 @@ class Events(object): Right now, the best way way to make one of these is to use Events.from_dict(). """ + def __init__(self, *args, **kwargs): super(Events, self).__init__(*args, **kwargs) self.dframes = {} @@ -62,14 +68,14 @@ def save(self, save_path): def load_saved(cls, save_path): obj = cls() s = pt.HDFStore(save_path) - obj.dframes = dict([(k[1:],s[k]) for k in list(s.keys())]) + obj.dframes = dict([(k[1:], s[k]) for k in list(s.keys())]) s.close() return obj @classmethod def from_dict(cls, the_d): """ Returns an Events instance containing the given DataFrames - + Parameters ---------- the_d (dict) @@ -115,7 +121,7 @@ def from_list_of_dicts(cls, events_list): def _local_dir(self): """ add the string-like attributes from the info_axis """ return [c for c in list(self.dframes.keys()) - if isinstance(c, string_types) and isidentifier(c)] + if isinstance(c, string_types) and isidentifier(c)] def __dir__(self): """ @@ -137,6 +143,7 @@ def __getattr__(self, name): raise AttributeError("'%s' object has no attribute '%s'" % (type(self).__name__, name)) + def initialize_hdf5(): - pt._TYPE_MAP.update({Events:u('wide'), Samples:u('frame'),}) - pt._AXES_MAP.update({Events:[1, 2], Samples:[0],}) + pt._TYPE_MAP.update({Events: u('wide'), Samples: u('frame'), }) + pt._AXES_MAP.update({Events: [1, 2], Samples: [0], }) diff --git a/cili/util.py b/cili/util.py index 5508db1..400319a 100755 --- a/cili/util.py +++ b/cili/util.py @@ -9,30 +9,30 @@ from .models import * ASC_SFIELDS_EYE = { - 'l':[('onset', np.int64), - ('x_l', np.float64), - ('y_l', np.float64), - ('pup_l', np.float64),], - 'r':[('onset', np.int64), - ('x_r', np.float64), - ('y_r', np.float64), - ('pup_r', np.float64)], - 'b':[('onset', np.int64), - ('x_l', np.float64), - ('y_l', np.float64), - ('pup_l', np.float64), - ('x_r', np.float64), - ('y_r', np.float64), - ('pup_r', np.float64)],} + 'l': [('onset', np.int64), + ('x_l', np.float64), + ('y_l', np.float64), + ('pup_l', np.float64), ], + 'r': [('onset', np.int64), + ('x_r', np.float64), + ('y_r', np.float64), + ('pup_r', np.float64)], + 'b': [('onset', np.int64), + ('x_l', np.float64), + ('y_l', np.float64), + ('pup_l', np.float64), + ('x_r', np.float64), + ('y_r', np.float64), + ('pup_r', np.float64)], } ASC_SFIELDS_VEL = { - 'l':[('vel_x_l', np.float64), - ('vel_y_l', np.float64),], - 'r':[('vel_x_r', np.float64), - ('vel_y_r', np.float64),], - 'b':[('vel_x_l', np.float64), - ('vel_y_l', np.float64), - ('vel_x_r', np.float64), - ('vel_y_r', np.float64)],} + 'l': [('vel_x_l', np.float64), + ('vel_y_l', np.float64), ], + 'r': [('vel_x_r', np.float64), + ('vel_y_r', np.float64), ], + 'b': [('vel_x_l', np.float64), + ('vel_y_l', np.float64), + ('vel_x_r', np.float64), + ('vel_y_r', np.float64)], } ASC_SFIELDS_REZ = [ ('res_x', np.float64), ('res_y', np.float64)] @@ -64,7 +64,7 @@ 'RIGHT_PUPIL_SIZE': np.float64, 'RIGHT_VELOCITY_X': np.float64, 'RIGHT_VELOCITY_Y': np.float64, - 'TIMESTAMP': np.int64,} + 'TIMESTAMP': np.int64, } TXT_INT_TYPES = [np.int64] TXT_NAME_MAP = [np.int64] ASC_EV_LINE_STARTS = [ @@ -75,70 +75,71 @@ 'ESACC', 'EFIX', 'BUTTON', - 'SAMPLES',] + 'SAMPLES', ] ASC_EFIELDS_EVENT = { - 'MSG':[('name', object), - ('onset', np.int64), - ('label', object), - ('content', object)], - 'START':[('name', object), - ('onset', np.int64), - ('eye', object), - ('types', object)], - 'END':[('name', object), - ('onset', np.int64), - ('types', object), - ('x_res', np.float64), - ('y_res', np.float64)], - 'EBLINK':[('name', object), + 'MSG': [('name', object), + ('onset', np.int64), + ('label', object), + ('content', object)], + 'START': [('name', object), + ('onset', np.int64), + ('eye', object), + ('types', object)], + 'END': [('name', object), + ('onset', np.int64), + ('types', object), + ('x_res', np.float64), + ('y_res', np.float64)], + 'EBLINK': [('name', object), + ('eye', object), + ('onset', np.int64), + ('last_onset', np.int64), + ('duration', np.int64)], + 'ESACC': [('name', object), ('eye', object), ('onset', np.int64), ('last_onset', np.int64), - ('duration', np.int64)], - 'ESACC':[('name', object), + ('duration', np.int64), + ('x_start', np.float64), + ('y_start', np.float64), + ('x_end', np.float64), + ('y_end', np.float64), + ('vis_angle', np.float64), + ('peak_velocity', np.int64)], + 'EFIX': [('name', object), ('eye', object), ('onset', np.int64), ('last_onset', np.int64), ('duration', np.int64), - ('x_start', np.float64), - ('y_start', np.float64), - ('x_end', np.float64), - ('y_end', np.float64), - ('vis_angle', np.float64), - ('peak_velocity', np.int64)], - 'EFIX':[('name', object), - ('eye', object), - ('onset', np.int64), - ('last_onset', np.int64), - ('duration', np.int64), - ('x_pos', np.float64), - ('y_pos', np.float64), - ('p_size', np.int64)], - 'BUTTON':[('name', object), - ('onset', np.int64), - ('b_num', np.int64), - ('state', np.int64),],} + ('x_pos', np.float64), + ('y_pos', np.float64), + ('p_size', np.int64)], + 'BUTTON': [('name', object), + ('onset', np.int64), + ('b_num', np.int64), + ('state', np.int64), ], } ASC_EFIELDS_RES = { - 'MSG':[], - 'START':[], - 'END':[], - 'EBLINK':[], - 'ESACC':[('x_res', np.float64), - ('y_res', np.float64)], - 'EFIX':[('x_res', np.float64), + 'MSG': [], + 'START': [], + 'END': [], + 'EBLINK': [], + 'ESACC': [('x_res', np.float64), + ('y_res', np.float64)], + 'EFIX': [('x_res', np.float64), ('y_res', np.float64)], - 'BUTTON':[],} + 'BUTTON': [], } ASC_EV_IGNORE_COLUMNS = { - 'MSG':[], - 'START':[], - 'END':[], - 'EBLINK':[], - 'ESACC':[], - 'EFIX':[], - 'BUTTON':[],} -ASC_IRREG_EVENTS = ['MSG','START','END'] + 'MSG': [], + 'START': [], + 'END': [], + 'EBLINK': [], + 'ESACC': [], + 'EFIX': [], + 'BUTTON': [], } +ASC_IRREG_EVENTS = ['MSG', 'START', 'END'] ASC_INT_TYPES = [np.int64] -PUP_FIELDS = ['pup_r','pup_l','RIGHT_PUPIL_SIZE','LEFT_PUPIL_SIZE'] +PUP_FIELDS = ['pup_r', 'pup_l', 'RIGHT_PUPIL_SIZE', 'LEFT_PUPIL_SIZE'] + def load_eyelink_dataset(file_name): """ Parses eyelink data to return samples and events. @@ -165,6 +166,7 @@ def load_eyelink_dataset(file_name): raise ValueError("only .asc and .txt files supported at the moment...") return s, e + def pandas_df_from_txt(file_path): """ Parses samples out of an EyeLink .txt file """ import pandas as pd @@ -173,9 +175,11 @@ def pandas_df_from_txt(file_path): # then we'll get the fields, figure out the dtypes, and do conversions # accordingly. It would be nice if the dtypes would work in read_csv, but # so far no luck... - df = pd.read_csv(file_path, sep="\t", index_col="TIMESTAMP", low_memory=False, na_values=["."],) + df = pd.read_csv(file_path, sep="\t", index_col="TIMESTAMP", + low_memory=False, na_values=["."],) fields = [str(x) for x in list(df.dtypes.keys())] - dtypes = dict([(d, object) for d in fields if not d in list(TXT_FIELDS.keys())]) + dtypes = dict([(d, object) + for d in fields if not d in list(TXT_FIELDS.keys())]) dtypes.update(dict([(k, v) for k, v in TXT_FIELDS.items() if k in fields])) nums = [k for k, v in dtypes.items() if v not in [object]] ints = [k for k in nums if dtypes[k] in TXT_INT_TYPES] @@ -187,10 +191,11 @@ def pandas_df_from_txt(file_path): df.index.name = "onset" return Samples.from_pd_obj(df) + def pandas_dfs_from_asc(file_path): """ Parses samples and events out of an EyeLink .asc file """ # collect lines for each event type (including samples) - e_lines = dict([(k,[]) for k in ASC_EV_LINE_STARTS]) + e_lines = dict([(k, []) for k in ASC_EV_LINE_STARTS]) s_lines = [] with open(file_path, "r") as f: for line in f: @@ -206,10 +211,13 @@ def pandas_dfs_from_asc(file_path): # determine column names, dtypes if not len(e_lines["SAMPLES"]) > 0: raise ValueError("Could not find samples line in .asc file.") - side, has_vel, has_res, has_htarg, has_input = info_from_asc_samples_line(e_lines["SAMPLES"][0]) - samp_dtypes = build_asc_samp_dtypes(side, has_vel, has_res, has_htarg, has_input) + side, has_vel, has_res, has_htarg, has_input = info_from_asc_samples_line( + e_lines["SAMPLES"][0]) + samp_dtypes = build_asc_samp_dtypes( + side, has_vel, has_res, has_htarg, has_input) ev_names = [k for k in ASC_EV_LINE_STARTS if not k in ["SAMPLES"]] - ev_dtypes = dict([(ev_name, build_asc_ev_dtypes(ev_name, side, has_vel, has_res)) for ev_name in ev_names]) + ev_dtypes = dict([(ev_name, build_asc_ev_dtypes( + ev_name, side, has_vel, has_res)) for ev_name in ev_names]) # get a df for the samples samp_df = pandas_df_from_lines(s_lines, samp_dtypes, ASC_SFIELDS_IGNORE) samps = Samples.from_pd_obj(samp_df) @@ -217,13 +225,14 @@ def pandas_dfs_from_asc(file_path): for ev_name in ASC_IRREG_EVENTS: if not ev_name in e_lines: continue - e_lines[ev_name] = prep_irreg_asc_event_lines(e_lines[ev_name], ev_name) + e_lines[ev_name] = prep_irreg_asc_event_lines( + e_lines[ev_name], ev_name) # get a df for each event type ev_dfs = dict([(ev_name, pandas_df_from_lines(e_lines[ev_name], ev_dtypes[ev_name], ASC_EV_IGNORE_COLUMNS[ev_name])) - for ev_name in ev_names if len(e_lines[ev_name]) > 0]) + for ev_name in ev_names if len(e_lines[ev_name]) > 0]) evs = Events.from_dict(ev_dfs) # adjust events that start before or end after the sample range @@ -231,6 +240,7 @@ def pandas_dfs_from_asc(file_path): # TODO add omitting ASC_EV_IGNORE_COLUMNS[ev_name] return samps, evs + def pandas_df_from_lines(csv_lines, dtypes, ignore): import pandas as pd import io @@ -247,14 +257,17 @@ def pandas_df_from_lines(csv_lines, dtypes, ignore): error_bad_lines=False, # usecols=use_names, warn_bad_lines=False,) - nums = [d[0] for d in dtypes if d[1] not in [object] and d[0] not in ['onset']] - ints = [d[0] for d in dtypes if d[1] in ASC_INT_TYPES and d[0] not in ['onset']] + nums = [d[0] for d in dtypes if d[1] not in [ + object] and d[0] not in ['onset']] + ints = [d[0] + for d in dtypes if d[1] in ASC_INT_TYPES and d[0] not in ['onset']] df[nums] = df[nums].convert_objects(convert_numeric=True) df[ints] = df[ints].astype(np.int64) for ig in ignore: del df[ig] return df + def prep_irreg_asc_event_lines(lines, ev_name): """ uses quotes to force annoying events into usable chunks use sparingly - not super fast right now @@ -273,7 +286,8 @@ def prep_irreg_asc_event_lines(lines, ev_name): # name, onset, eye, then one or two types for line in lines: l = line.split() - new_lines.append('%s\t%s\t%s\t"%s"\n' % (l[0],l[1],l[2],', '.join(l[3:]))) + new_lines.append('%s\t%s\t%s\t"%s"\n' % + (l[0], l[1], l[2], ', '.join(l[3:]))) elif ev_name == 'END': # name, onset, maybe a list of types, 'RES', x_res, y_res # we'll take out the "RES" here @@ -282,11 +296,13 @@ def prep_irreg_asc_event_lines(lines, ev_name): types = ' '.join(l[2:-3]) if len(l) > 5 else '.' x_res = l[-2] y_res = l[-1] - new_lines.append('%s\t%s\t"%s"\t%s\t%s\n' % (l[0], l[1], types, l[-2], l[-1])) + new_lines.append('%s\t%s\t"%s"\t%s\t%s\n' % + (l[0], l[1], types, l[-2], l[-1])) else: new_lines = lines return new_lines + def build_asc_samp_dtypes(side, has_vel, has_res, has_htarg, has_input): dtypes = [] dtypes = list(ASC_SFIELDS_EYE[side]) @@ -301,12 +317,14 @@ def build_asc_samp_dtypes(side, has_vel, has_res, has_htarg, has_input): dtypes.extend(ASC_SFIELDS_REMOTE) return dtypes + def build_asc_ev_dtypes(ev_name, side, has_vel, has_res): - dtypes = list(ASC_EFIELDS_EVENT.get(ev_name,[])) + dtypes = list(ASC_EFIELDS_EVENT.get(ev_name, [])) if has_res: - dtypes.extend(ASC_EFIELDS_RES.get(ev_name,[])) + dtypes.extend(ASC_EFIELDS_RES.get(ev_name, [])) return dtypes if dtypes else None + def info_from_asc_samples_line(line_txt): """ gets sample info from asc SAMPLE lines @@ -341,6 +359,7 @@ def info_from_asc_samples_line(line_txt): sample_side = 'r' return sample_side, has_velocity, has_resolution, has_htarg, has_input + def percentile_bucket(vals, bucket_size=10, scale=1.0, shift=0.0): """ returns percentile scores for each value Parameters @@ -357,11 +376,16 @@ def percentile_bucket(vals, bucket_size=10, scale=1.0, shift=0.0): from scipy.stats import scoreatpercentile as sp import numpy as np from bisect import bisect_left - percs = np.concatenate([np.arange(bucket_size,100,bucket_size), [100]]) # arange to get the percentiles - cuts = [sp(vals, p) for p in percs] # to get the cutoff score for each percentile - new_list = np.array([bisect_left(cuts, val)+1 for val in vals]) * scale + shift # turn values into bucket numbers... +1 since we want 1-indexed buckets + # arange to get the percentiles + percs = np.concatenate([np.arange(bucket_size, 100, bucket_size), [100]]) + # to get the cutoff score for each percentile + cuts = [sp(vals, p) for p in percs] + # turn values into bucket numbers... +1 since we want 1-indexed buckets + new_list = np.array( + [bisect_left(cuts, val) + 1 for val in vals]) * scale + shift return new_list - + + def ensure_dir(dir_path, overwrite=False): from shutil import rmtree from os.path import isdir, exists @@ -374,6 +398,7 @@ def ensure_dir(dir_path, overwrite=False): if not exists(dir_path): makedirs(dir_path) + def get_0_percentage(asc_path): from cili.util import pandas_dfs_from_asc import pandas as pd @@ -381,8 +406,9 @@ def get_0_percentage(asc_path): ds, _ = pandas_dfs_from_asc(asc_path) p_fields = [f for f in ds.columns if f in PUP_FIELDS] if len(p_fields) == 0: - return 1. # if you can't find a pupil field, we'll call that "bad" - return (float(len(ds[ds[p_fields[0]] == 0]))/float(len(ds))) + return 1. # if you can't find a pupil field, we'll call that "bad" + return (float(len(ds[ds[p_fields[0]] == 0])) / float(len(ds))) + def list_run_corruption(asc_dir): # for now, just make a histogram of the % of all ascs that are made up of 0's @@ -407,6 +433,7 @@ def list_run_corruption(asc_dir): print("\nDropout by File:") pprint(vals) + def constrain_events(samples, events): """ adjusts start times of any events that overflow sample bounds""" lowtime = samples.index[0] @@ -419,14 +446,17 @@ def constrain_events(samples, events): new_idxs[idxs] = lowtime df.index = new_idxs + help_message = """ No help at this time. Check the code. """ + class Usage(Exception): def __init__(self, msg=help_message): self.msg = msg + def main(argv=None): import os import getopt @@ -436,7 +466,7 @@ def main(argv=None): try: opts, args = getopt.getopt(argv[1:], "d:", ["dir", "dropout"]) except getopt.error as msg: - raise Usage(msg="\n"+str(msg)) + raise Usage(msg="\n" + str(msg)) # option processing drop_check = False asc_dir = None @@ -455,10 +485,12 @@ def main(argv=None): except Usage as err: f_str = sys.argv[0].split("/")[-1] + ":" lfs = len(f_str) - f_str = "%s\n%s\n%s\n" % ("-"*lfs, f_str, "-"*lfs) + f_str = "%s\n%s\n%s\n" % ("-" * lfs, f_str, "-" * lfs) print(f_str + str(err.msg), file=sys.stderr) - print("-------------------\nfor help use --help\n-------------------", file=sys.stderr) + print( + "-------------------\nfor help use --help\n-------------------", file=sys.stderr) return 2 + if __name__ == '__main__': sys.exit(main()) diff --git a/setup.py b/setup.py index 76f842e..318ebb7 100755 --- a/setup.py +++ b/setup.py @@ -3,6 +3,7 @@ import os from setuptools import setup + def get_readme(): md_path = os.path.join(os.path.dirname(__file__), "README.md") txt_path = os.path.join(os.path.dirname(__file__), "README.txt") @@ -15,6 +16,7 @@ def get_readme(): d = "" return d + setup(name='cili', version='0.5.3', author='Ben Acland', @@ -23,13 +25,13 @@ def get_readme(): license='BSD', keywords='eyetracking pupillometry eyelink', url='https://github.com/beOn/cili', - install_requires=['scipy','numexpr','tables','pandas'], + install_requires=['scipy', 'numexpr', 'tables', 'pandas'], packages=['cili'], long_description=get_readme(), classifiers=[ - 'Development Status :: 2 - Pre-Alpha', - 'Topic :: Scientific/Engineering :: Information Analysis', - 'License :: OSI Approved :: BSD License', - 'Intended Audience :: Science/Research', + 'Development Status :: 2 - Pre-Alpha', + 'Topic :: Scientific/Engineering :: Information Analysis', + 'License :: OSI Approved :: BSD License', + 'Intended Audience :: Science/Research', ], -) + ) diff --git a/tests/config.py b/tests/config.py index d4b6517..5b61dca 100644 --- a/tests/config.py +++ b/tests/config.py @@ -5,4 +5,4 @@ paths = ['bino250', 'bino500', 'bino1000', 'binoRemote250', 'binoRemote500', 'mono250', 'mono500', 'mono1000', 'mono2000', 'monoRemote250', 'monoRemote500'] -paths = dict([(p, os.path.join(DATA_DIR,p+'.asc')) for p in paths]) \ No newline at end of file +paths = dict([(p, os.path.join(DATA_DIR, p + '.asc')) for p in paths]) diff --git a/tests/test_extract.py b/tests/test_extract.py index 38e4636..c177b01 100644 --- a/tests/test_extract.py +++ b/tests/test_extract.py @@ -16,131 +16,161 @@ """ shape of extracted data """ + def test_timeunit_extract_samplecount_250(): ds, es = pandas_dfs_from_asc(paths['mono250']) fixations = es.EFIX[:-1] sc_time_test(ds, fixations, 250, 500) sc_time_test(ds, fixations, 250, 1000) + def test_timeunit_extract_samplecount_500(): ds, es = pandas_dfs_from_asc(paths['mono500']) fixations = es.EFIX[:-1] sc_time_test(ds, fixations, 500, 500) sc_time_test(ds, fixations, 500, 1000) + def test_timeunit_extract_samplecount_1000(): ds, es = pandas_dfs_from_asc(paths['mono1000']) fixations = es.EFIX[:-1] sc_time_test(ds, fixations, 1000, 500) sc_time_test(ds, fixations, 1000, 1000) + def test_timeunit_extract_samplecount_2000(): ds, es = pandas_dfs_from_asc(paths['mono2000']) fixations = es.EFIX[:-1] sc_time_test(ds, fixations, 2000, 500) sc_time_test(ds, fixations, 2000, 1000) + def test_sampleunit_extract_samplecount_250(): ds, es = pandas_dfs_from_asc(paths['mono250']) fixations = es.EFIX[:-1] sc_samp_test(ds, fixations, 10) sc_samp_test(ds, fixations, 130) + def test_sampleunit_extract_samplecount_500(): ds, es = pandas_dfs_from_asc(paths['mono500']) fixations = es.EFIX[:-1] sc_samp_test(ds, fixations, 50) sc_samp_test(ds, fixations, 350) + def test_sampleunit_extract_samplecount_1000(): ds, es = pandas_dfs_from_asc(paths['mono1000']) fixations = es.EFIX[:-1] sc_samp_test(ds, fixations, 500) sc_samp_test(ds, fixations, 750) + def test_sampleunit_extract_samplecount_2000(): ds, es = pandas_dfs_from_asc(paths['mono2000']) fixations = es.EFIX[:-1] sc_samp_test(ds, fixations, 350) sc_samp_test(ds, fixations, 700) + def sc_samp_test(ds, events, duration, offset=0): ext = extract_events(ds, events, duration=duration, units=SAMP_UNITS) ev_count = np.shape(events)[0] assert_equal(ev_count * duration, np.shape(ext)[0]) + def sc_time_test(ds, events, sampfreq, duration, offset=0): ext = extract_events(ds, events, duration=duration, units=TIME_UNITS) ev_count = np.shape(events)[0] - assert_equal(int(sampfreq * (duration/1000.) * ev_count), np.shape(ext)[0]) + assert_equal(int(sampfreq * (duration / 1000.) + * ev_count), np.shape(ext)[0]) + """ exceptions """ + @raises(IndexError) def test_timeunit_extract_underbounds(): ds, es = pandas_dfs_from_asc(paths['mono1000']) - ext = extract_events(ds, es.EFIX, offset=-10000, duration=3000, units=TIME_UNITS) + ext = extract_events(ds, es.EFIX, offset=-10000, + duration=3000, units=TIME_UNITS) + @raises(IndexError) def test_timeunit_extract_overbounds(): ds, es = pandas_dfs_from_asc(paths['mono1000']) ext = extract_events(ds, es.EFIX, duration=3000, units=TIME_UNITS) + """ bounds """ + @raises(ValueError) def test_negative_duration(): ds, es = pandas_dfs_from_asc(paths['mono250']) fixations = es.EFIX[:-1] ext = extract_events(ds, fixations, duration=-1000, units=TIME_UNITS) + def test_posneg_offset(): # make sure that you can use a positive or a negative offset ds, es = pandas_dfs_from_asc(paths['mono250']) fixations = es.EFIX[3:-1] - ext = extract_events(ds, fixations, duration=500, offset=-250, units=TIME_UNITS) - ext = extract_events(ds, fixations, duration=500, offset=250, units=TIME_UNITS) + ext = extract_events(ds, fixations, duration=500, + offset=-250, units=TIME_UNITS) + ext = extract_events(ds, fixations, duration=500, + offset=250, units=TIME_UNITS) assert(True) + """ fields """ + def test_extracted__bino250_cols(): ds, es = pandas_dfs_from_asc(paths['bino250']) rs = es.ESACC[:-3] ext = extract_events(ds, rs, duration=500, offset=0, units=TIME_UNITS) - test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', 'samp_warns', 'orig_idx'] + test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', + 'y_r', 'pup_r', 'samp_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_bino500_cols(): ds, es = pandas_dfs_from_asc(paths['bino500']) rs = es.ESACC[:-3] ext = extract_events(ds, rs, duration=500, offset=0, units=TIME_UNITS) - test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', 'samp_warns', 'orig_idx'] + test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', + 'y_r', 'pup_r', 'samp_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_bino1000_cols(): ds, es = pandas_dfs_from_asc(paths['bino1000']) rs = es.ESACC[:-3] ext = extract_events(ds, rs, duration=500, offset=0, units=TIME_UNITS) - test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', 'samp_warns', 'orig_idx'] + test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', + 'y_r', 'pup_r', 'samp_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_binoRemote250_cols(): ds, es = pandas_dfs_from_asc(paths['binoRemote250']) rs = es.EFIX[:-3] ext = extract_events(ds, rs, duration=500, offset=0, units=TIME_UNITS) test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', 'samp_warns', 'targ_x', - 'targ_y', 'targ_dist', 'remote_warns', 'orig_idx'] + 'targ_y', 'targ_dist', 'remote_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_binoRemote500_cols(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) rs = es.ESACC[:-3] ext = extract_events(ds, rs, duration=500, offset=0, units=TIME_UNITS) test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', 'samp_warns', 'targ_x', - 'targ_y', 'targ_dist', 'remote_warns', 'orig_idx'] + 'targ_y', 'targ_dist', 'remote_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_mono250_cols(): ds, es = pandas_dfs_from_asc(paths['mono250']) rs = es.ESACC[:-3] @@ -148,6 +178,7 @@ def test_extracted_mono250_cols(): test_cols = ['x_l', 'y_l', 'pup_l', 'samp_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_mono500_cols(): ds, es = pandas_dfs_from_asc(paths['mono500']) rs = es.ESACC[:-3] @@ -155,6 +186,7 @@ def test_extracted_mono500_cols(): test_cols = ['x_l', 'y_l', 'pup_l', 'samp_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_mono1000_cols(): ds, es = pandas_dfs_from_asc(paths['mono1000']) rs = es.ESACC[:-3] @@ -162,6 +194,7 @@ def test_extracted_mono1000_cols(): test_cols = ['x_r', 'y_r', 'pup_r', 'samp_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_mono2000_cols(): ds, es = pandas_dfs_from_asc(paths['mono2000']) rs = es.ESACC[:-3] @@ -169,25 +202,28 @@ def test_extracted_mono2000_cols(): test_cols = ['x_r', 'y_r', 'pup_r', 'samp_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_monoRemote250_cols(): ds, es = pandas_dfs_from_asc(paths['monoRemote250']) rs = es.EFIX[:-3] ext = extract_events(ds, rs, duration=500, offset=0, units=TIME_UNITS) test_cols = ['x_l', 'y_l', 'pup_l', 'samp_warns', 'targ_x', 'targ_y', 'targ_dist', - 'remote_warns', 'orig_idx'] + 'remote_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_extracted_monoRemote500_cols(): ds, es = pandas_dfs_from_asc(paths['monoRemote500']) rs = es.ESACC[:-3] ext = extract_events(ds, rs, duration=500, offset=0, units=TIME_UNITS) test_cols = ['x_l', 'y_l', 'pup_l', 'samp_warns', 'targ_x', 'targ_y', 'targ_dist', - 'remote_warns', 'orig_idx'] + 'remote_warns', 'orig_idx'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_borrowed_fields(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) rs = es.ESACC[:-3] ext = extract_events(ds, rs, duration=500, offset=0, units=TIME_UNITS, - borrow_attributes=['peak_velocity']) + borrow_attributes=['peak_velocity']) assert_true('peak_velocity' in ext.columns) diff --git a/tests/test_util.py b/tests/test_util.py index 4e78f4b..0391321 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -15,168 +15,204 @@ """ load_eyelink_dataset """ + def test_load_asc_convenience(): # TODO: make sure you can load an asc without an exception load_eyelink_dataset(paths['bino250']) + def test_load_txt_convenience(): # TODO: make sure you can load a text file raise NotImplementedError() + """ pandas_df_from_txt """ # TODO: we need test data for this... """ pandas_dfs_from_asc """ # test that the function can be run on each data type + + def test_load_asc_bino250(): pandas_dfs_from_asc(paths['bino250']) + def test_load_asc_bino500(): pandas_dfs_from_asc(paths['bino500']) + def test_load_asc_bino1000(): pandas_dfs_from_asc(paths['bino1000']) + def test_load_asc_binoRemote250(): pandas_dfs_from_asc(paths['binoRemote250']) + def test_load_asc_binoRemote500(): pandas_dfs_from_asc(paths['binoRemote500']) + def test_load_asc_mono250(): pandas_dfs_from_asc(paths['mono250']) + def test_load_asc_mono500(): pandas_dfs_from_asc(paths['mono500']) + def test_load_asc_mono1000(): pandas_dfs_from_asc(paths['mono1000']) + def test_load_asc_mono2000(): pandas_dfs_from_asc(paths['mono2000']) + def test_load_asc_monoRemote250(): pandas_dfs_from_asc(paths['monoRemote250']) + def test_load_asc_monoRemote500(): pandas_dfs_from_asc(paths['monoRemote500']) # test that for each type of data, you get the expected headers + def test_bino250_cols(): ds, es = pandas_dfs_from_asc(paths['bino250']) test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', 'samp_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_bino500_cols(): ds, es = pandas_dfs_from_asc(paths['bino500']) test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', 'samp_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_bino1000_cols(): ds, es = pandas_dfs_from_asc(paths['bino1000']) test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', 'samp_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_binoRemote250_cols(): ds, es = pandas_dfs_from_asc(paths['binoRemote250']) test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', - 'samp_warns', 'targ_x', 'targ_y', 'targ_dist', 'remote_warns'] + 'samp_warns', 'targ_x', 'targ_y', 'targ_dist', 'remote_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_binoRemote500_cols(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) test_cols = ['x_l', 'y_l', 'pup_l', 'x_r', 'y_r', 'pup_r', - 'samp_warns', 'targ_x', 'targ_y', 'targ_dist', 'remote_warns'] + 'samp_warns', 'targ_x', 'targ_y', 'targ_dist', 'remote_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_mono250_cols(): ds, es = pandas_dfs_from_asc(paths['mono250']) test_cols = ['x_l', 'y_l', 'pup_l', 'samp_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_mono500_cols(): ds, es = pandas_dfs_from_asc(paths['mono500']) test_cols = ['x_l', 'y_l', 'pup_l', 'samp_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_mono1000_cols(): ds, es = pandas_dfs_from_asc(paths['mono1000']) test_cols = ['x_r', 'y_r', 'pup_r', 'samp_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_mono2000_cols(): ds, es = pandas_dfs_from_asc(paths['mono2000']) test_cols = ['x_r', 'y_r', 'pup_r', 'samp_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_monoRemote250_cols(): ds, es = pandas_dfs_from_asc(paths['monoRemote250']) test_cols = ['x_l', 'y_l', 'pup_l', 'samp_warns', - 'targ_x', 'targ_y', 'targ_dist', 'remote_warns'] + 'targ_x', 'targ_y', 'targ_dist', 'remote_warns'] assert_array_equal(test_cols, ds.columns.tolist()) + def test_monoRemote500_cols(): ds, es = pandas_dfs_from_asc(paths['monoRemote500']) test_cols = ['x_l', 'y_l', 'pup_l', 'samp_warns', - 'targ_x', 'targ_y', 'targ_dist', 'remote_warns'] + 'targ_x', 'targ_y', 'targ_dist', 'remote_warns'] assert_array_equal(test_cols, ds.columns.tolist()) # test that for each frequency, the sample rate is what you expect + def test_mono250_idx(): ds, es = pandas_dfs_from_asc(paths['mono250']) diffs = np.diff(ds.index) - diffs = np.unique(diffs[diffs<100]) - assert_equal(diffs,4) + diffs = np.unique(diffs[diffs < 100]) + assert_equal(diffs, 4) + def test_mono500_idx(): ds, es = pandas_dfs_from_asc(paths['mono500']) diffs = np.diff(ds.index) - diffs = np.unique(diffs[diffs<100]) - assert_equal(diffs,2) + diffs = np.unique(diffs[diffs < 100]) + assert_equal(diffs, 2) + def test_mono1000_idx(): ds, es = pandas_dfs_from_asc(paths['mono1000']) diffs = np.diff(ds.index) - diffs = np.unique(diffs[diffs<100]) - assert_equal(diffs,1) + diffs = np.unique(diffs[diffs < 100]) + assert_equal(diffs, 1) + def test_mono2000_idx(): ds, es = pandas_dfs_from_asc(paths['mono2000']) diffs = np.diff(ds.index) - diffs = np.unique(diffs[diffs<100]) - assert_equal(diffs,4) + diffs = np.unique(diffs[diffs < 100]) + assert_equal(diffs, 4) # check extracted event types and fields against the expected values + def test_event_types(): # make sure the full list of events is there ds, es = pandas_dfs_from_asc(paths['binoRemote500']) test_evs = ['END', 'EFIX', 'EBLINK', 'START', 'ESACC', 'MSG'] assert_array_equal(list(es.dframes.keys()), test_evs) + def test_end_fields(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) test_fields = ['name', 'types', 'x_res', 'y_res'] assert_array_equal(es.END.columns.values, test_fields) + def test_efix_fields(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) test_fields = ['name', 'eye', 'last_onset', 'duration', 'x_pos', 'y_pos', 'p_size'] assert_array_equal(es.EFIX.columns.values, test_fields) + def test_eblink_fields(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) test_fields = ['name', 'eye', 'last_onset', 'duration'] assert_array_equal(es.EBLINK.columns.values, test_fields) + def test_start_fields(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) test_fields = ['name', 'eye', 'types'] assert_array_equal(es.START.columns.values, test_fields) + def test_esacc_fields(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) test_fields = ['name', 'eye', 'last_onset', 'duration', @@ -184,6 +220,7 @@ def test_esacc_fields(): 'vis_angle', 'peak_velocity'] assert_array_equal(es.ESACC.columns.values, test_fields) + def test_msg_fields(): ds, es = pandas_dfs_from_asc(paths['binoRemote500']) test_fields = ['name', 'label', 'content']