Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enable joblib.Parallel memory mapping #262

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions benchmarks/test_pyts.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ def prepare(data: SequentialDataset, length: int) -> DataSplit:
return X_pad[:, 0], data.y


def multivariate(
*, train_data: DataSplit, test_data: DataSplit, n_jobs: int
) -> None:
def run(*, train_data: DataSplit, test_data: DataSplit, n_jobs: int) -> None:
"""Fit and predict the classifier."""
# initialize model
clf = KNeighborsClassifier(
Expand Down Expand Up @@ -70,7 +68,7 @@ def multivariate(
)

benchmark = timeit.timeit(
"func(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
"run(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
globals=locals(),
number=args.number,
)
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/test_sequentia.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
random_state: np.random.RandomState = np.random.RandomState(0)


def multivariate(
def run(
*, train_data: SequentialDataset, test_data: SequentialDataset, n_jobs: int
) -> None:
"""Fit and predict the classifier."""
Expand Down Expand Up @@ -52,7 +52,7 @@ def multivariate(
train_data, test_data = load_dataset(multivariate=False)

benchmark = timeit.timeit(
"func(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
"run(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
globals=locals(),
number=args.number,
)
Expand Down
6 changes: 2 additions & 4 deletions benchmarks/test_sktime.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ def prepare(data: SequentialDataset) -> DataSplit:
return X_pd, data.y


def multivariate(
*, train_data: DataSplit, test_data: DataSplit, n_jobs: int
) -> None:
def run(*, train_data: DataSplit, test_data: DataSplit, n_jobs: int) -> None:
"""Fit and predict the classifier."""
# initialize model
clf = KNeighborsTimeSeriesClassifier(
Expand Down Expand Up @@ -89,7 +87,7 @@ def multivariate(
train_data, test_data = prepare(train_data), prepare(test_data)

benchmark = timeit.timeit(
"func(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
"run(train_data=train_data, test_data=test_data, n_jobs=args.n_jobs)",
globals=locals(),
number=args.number,
)
Expand Down
4 changes: 2 additions & 2 deletions sequentia/models/hmm/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def fit(
self.models = dict(
zip(
self.classes_,
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self.models[c].fit)(
X_c, lengths=lengths_c
)
Expand Down Expand Up @@ -537,7 +537,7 @@ def predict_scores(
n_jobs = _multiprocessing.effective_n_jobs(self.n_jobs, x=lengths)
chunk_idxs = np.array_split(_data.get_idxs(lengths), n_jobs)
return np.concatenate(
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self._compute_scores_chunk)(X, idxs=idxs)
for idxs in chunk_idxs
)
Expand Down
4 changes: 2 additions & 2 deletions sequentia/models/knn/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def compute_distance_matrix(

# multiprocessed DTW calculation
return np.vstack(
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self._distance_matrix_row_chunk)(
row_idxs, col_chunk_idxs, X, n_jobs, dtw
)
Expand Down Expand Up @@ -245,7 +245,7 @@ def _distance_matrix_row_chunk(
columns.
"""
return np.hstack(
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self._distance_matrix_row_col_chunk)(
col_idxs, row_idxs, X, dtw
)
Expand Down
2 changes: 1 addition & 1 deletion sequentia/models/knn/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def _find_max_labels(
n_jobs = _multiprocessing.effective_n_jobs(self.n_jobs, x=scores)
score_chunks = np.array_split(scores, n_jobs)
return np.concatenate(
joblib.Parallel(n_jobs=n_jobs, max_nbytes=None)(
joblib.Parallel(n_jobs=n_jobs, mmap_mode="r+")(
joblib.delayed(self._find_max_labels_chunk)(score_chunk)
for score_chunk in score_chunks
)
Expand Down
Loading