Skip to content

Commit

Permalink
fix: enable joblib.Parallel memory mapping (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
eonu authored Dec 30, 2024
1 parent 284f51b commit 6366e97
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 15 deletions.
6 changes: 2 additions & 4 deletions benchmarks/test_pyts.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ def prepare(data: SequentialDataset, length: int) -> DataSplit:
return X_pad[:, 0], data.y


def multivariate(
*, train_data: DataSplit, test_data: DataSplit, n_jobs: int
) -> None:
def run(*, train_data: DataSplit, test_data: DataSplit, n_jobs: int) -> None:
"""Fit and predict the classifier."""
# initialize model
clf = KNeighborsClassifier(
Expand Down Expand Up @@ -70,7 +68,7 @@ def multivariate(
)

benchmark = timeit.timeit(
"func(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
"run(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
globals=locals(),
number=args.number,
)
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/test_sequentia.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
random_state: np.random.RandomState = np.random.RandomState(0)


def multivariate(
def run(
*, train_data: SequentialDataset, test_data: SequentialDataset, n_jobs: int
) -> None:
"""Fit and predict the classifier."""
Expand Down Expand Up @@ -52,7 +52,7 @@ def multivariate(
train_data, test_data = load_dataset(multivariate=False)

benchmark = timeit.timeit(
"func(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
"run(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
globals=locals(),
number=args.number,
)
Expand Down
6 changes: 2 additions & 4 deletions benchmarks/test_sktime.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ def prepare(data: SequentialDataset) -> DataSplit:
return X_pd, data.y


def multivariate(
*, train_data: DataSplit, test_data: DataSplit, n_jobs: int
) -> None:
def run(*, train_data: DataSplit, test_data: DataSplit, n_jobs: int) -> None:
"""Fit and predict the classifier."""
# initialize model
clf = KNeighborsTimeSeriesClassifier(
Expand Down Expand Up @@ -89,7 +87,7 @@ def multivariate(
train_data, test_data = prepare(train_data), prepare(test_data)

benchmark = timeit.timeit(
"func(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
"run(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
globals=locals(),
number=args.number,
)
Expand Down
4 changes: 2 additions & 2 deletions sequentia/models/hmm/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def fit(
self.models = dict(
zip(
self.classes_,
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self.models[c].fit)(
X_c, lengths=lengths_c
)
Expand Down Expand Up @@ -537,7 +537,7 @@ def predict_scores(
n_jobs = _multiprocessing.effective_n_jobs(self.n_jobs, x=lengths)
chunk_idxs = np.array_split(_data.get_idxs(lengths), n_jobs)
return np.concatenate(
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self._compute_scores_chunk)(X, idxs=idxs)
for idxs in chunk_idxs
)
Expand Down
4 changes: 2 additions & 2 deletions sequentia/models/knn/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def compute_distance_matrix(

# multiprocessed DTW calculation
return np.vstack(
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self._distance_matrix_row_chunk)(
row_idxs, col_chunk_idxs, X, n_jobs, dtw
)
Expand Down Expand Up @@ -245,7 +245,7 @@ def _distance_matrix_row_chunk(
columns.
"""
return np.hstack(
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self._distance_matrix_row_col_chunk)(
col_idxs, row_idxs, X, dtw
)
Expand Down
2 changes: 1 addition & 1 deletion sequentia/models/knn/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def _find_max_labels(
n_jobs = _multiprocessing.effective_n_jobs(self.n_jobs, x=scores)
score_chunks = np.array_split(scores, n_jobs)
return np.concatenate(
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self._find_max_labels_chunk)(score_chunk)
for score_chunk in score_chunks
)
Expand Down

0 comments on commit 6366e97

Please sign in to comment.