-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetric_dreem.py
144 lines (109 loc) · 4.64 KB
/
metric_dreem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import numpy as np
# import torch
import pandas as pd
def jaccard_overlap(localizations_a, localizations_b):
"""Jaccard overlap between two segments A ∩ B / (LENGTH_A + LENGTH_B - A ∩ B)
localizations_a: list of localizations
localizations_a: list of localizations
return:
(ndarray)
"""
size_set_A = len(localizations_a)
size_set_B = len(localizations_b)
localizations_a_start = [localization[0] for localization in localizations_a]
localizations_a_end = [localization[1] for localization in localizations_a]
localizations_b_start = [localization[0] for localization in localizations_b]
localizations_b_end = [localization[1] for localization in localizations_b]
localizations_a_start = np.array([localizations_a_start for _ in range(size_set_B)])
localizations_a_end = np.array([localizations_a_end for _ in range(size_set_B)])
localizations_b_start = np.transpose(np.array([localizations_b_start for _ in range(size_set_A)]))
localizations_b_end = np.transpose(np.array([localizations_b_end for _ in range(size_set_A)]))
length_a = localizations_a_end - localizations_a_start
length_b = localizations_b_end - localizations_b_start
# intersection
max_min = np.maximum(localizations_a_start, localizations_b_start)
min_max = np.minimum(localizations_a_end, localizations_b_end)
intersection = np.maximum((min_max - max_min), 0)
try:
overlaps = intersection / (length_a + length_b - intersection)
except:
print('here')
return overlaps
def extract_events_from_binary_mask(binary_mask, fs=1):
binary_mask = np.array([0] + binary_mask.tolist() + [0])
diff_data = np.diff(binary_mask)
starts = np.where(diff_data == 1)[0] / fs
ends = np.where(diff_data == -1)[0] / fs
assert len(starts) == len(ends)
events = []
for i, _ in enumerate(starts):
events += [(starts[i], ends[i])]
return events
def compute_f1_score(event_pred, event_true):
total_tp = 0
total_fp = 0
total_fn = 0
for idx in range(event_pred.shape[0]):
tp, fp, fn = compute_tp_fp_fn_for_each_entry(event_pred[idx], event_true[idx])
total_tp += tp
total_fp += fp
total_fn +=fn
precision = total_tp / (total_tp + total_fp)
recall = total_tp / (total_tp + total_fn)
print(total_tp,total_fp,total_fn)
if precision == 0 or recall == 0:
f1_score = 0
else:
f1_score = 2 * precision * recall / (precision + recall)
return f1_score
def compute_tp_fp_fn_for_each_entry(prediction, reference, min_iou=0.3):
"""takes 2 event scorings
(in array format [[start1, end1], [start2, end2], ...])
and outputs the f1 score.
Parameters
----------
min_iou : float
minimum intersection-over-union with a true event to be considered
a true positive.
"""
if len(prediction) == 0:
return 0, 0, len(reference)
if len(reference) == 0:
return 0, len(prediction), 0
iou = jaccard_overlap(prediction, reference)
# Number of true events which are matched by a predicted events
TP1 = np.sum(np.amax((iou >= min_iou), axis=0))
# Number of predicted events which match a true event
TP2 = np.sum(np.amax((iou >= min_iou), axis=1))
# In order to avoid duplicate, a true event can only match a true events and a predicted event
true_positive = min(TP1, TP2)
false_positive = len(prediction) - true_positive
false_negative = len(reference) - true_positive
return true_positive, false_positive, false_negative
def format_predictions_for_scoring(mask, window_length=100):
"""
Stack events in one single list for F1 computation
:param mask:
:param window_length:
:return:
"""
result = []
for i, elt in enumerate(mask):
events = extract_events_from_binary_mask(elt)
if len(events) > 0:
events = [[i * window_length + start, i * window_length + end] for start, end in events]
else:
events = []
result.append(events)
return np.array(result)
def dreem_sleep_apnea_custom_metric(y_pred, y_true):
event_true = format_predictions_for_scoring(np.array(y_pred))
event_pred = format_predictions_for_scoring(np.array(y_true))
return compute_f1_score(event_pred, event_true)
if __name__ == '__main__':
import pandas as pd
CSV_FILE_Y_TRUE = 'y_train.csv'
CSV_FILE_Y_PRED = 'train/'+input('Methode :\t')+'_y.csv'
df_y_true = pd.read_csv(CSV_FILE_Y_TRUE, index_col=0, sep=',')
df_y_pred = pd.read_csv(CSV_FILE_Y_PRED, index_col=0, sep=',')
print(dreem_sleep_apnea_custom_metric(df_y_true, df_y_pred))