|
50 | 50 | except ImportError:
|
51 | 51 | from typing_extensions import Literal
|
52 | 52 |
|
53 |
| - |
54 | 53 | try:
|
55 | 54 | import ipyparallel
|
56 | 55 | from ipyparallel.client.asyncresult import AsyncResult
|
|
101 | 100 | _default_executor = loky.get_reusable_executor
|
102 | 101 |
|
103 | 102 |
|
104 |
| -# -- Internal executor-related, things |
105 |
| - |
106 |
| - |
107 |
| -def _ensure_executor( |
108 |
| - executor: ExecutorTypes | None, |
109 |
| -) -> concurrent.Executor: |
110 |
| - if executor is None: |
111 |
| - executor = concurrent.ProcessPoolExecutor() |
112 |
| - |
113 |
| - if isinstance(executor, concurrent.Executor): |
114 |
| - return executor |
115 |
| - elif with_ipyparallel and isinstance(executor, ipyparallel.Client): |
116 |
| - return executor.executor() |
117 |
| - elif with_distributed and isinstance(executor, distributed.Client): |
118 |
| - return executor.get_executor() |
119 |
| - else: |
120 |
| - raise TypeError( |
121 |
| - # TODO: check if this is correct. Isn't MPI,loky supported? |
122 |
| - "Only a concurrent.futures.Executor, distributed.Client," |
123 |
| - " or ipyparallel.Client can be used." |
124 |
| - ) |
125 |
| - |
126 |
| - |
127 |
| -def _get_ncores( |
128 |
| - ex: (ExecutorTypes), |
129 |
| -) -> int: |
130 |
| - """Return the maximum number of cores that an executor can use.""" |
131 |
| - if with_ipyparallel and isinstance(ex, ipyparallel.client.view.ViewExecutor): |
132 |
| - return len(ex.view) |
133 |
| - elif isinstance( |
134 |
| - ex, (concurrent.ProcessPoolExecutor, concurrent.ThreadPoolExecutor) |
135 |
| - ): |
136 |
| - return ex._max_workers # not public API! |
137 |
| - elif isinstance(ex, loky.reusable_executor._ReusablePoolExecutor): |
138 |
| - return ex._max_workers # not public API! |
139 |
| - elif isinstance(ex, SequentialExecutor): |
140 |
| - return 1 |
141 |
| - elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor): |
142 |
| - return sum(n for n in ex._client.ncores().values()) |
143 |
| - elif with_mpi4py and isinstance(ex, mpi4py.futures.MPIPoolExecutor): |
144 |
| - ex.bootup() # wait until all workers are up and running |
145 |
| - return ex._pool.size # not public API! |
146 |
| - else: |
147 |
| - raise TypeError(f"Cannot get number of cores for {ex.__class__}") |
148 |
| - |
149 |
| - |
150 | 103 | class BaseRunner(metaclass=abc.ABCMeta):
|
151 | 104 | r"""Base class for runners that use `concurrent.futures.Executor`\'s.
|
152 | 105 |
|
@@ -979,6 +932,52 @@ def replay_log(
|
979 | 932 | getattr(learner, method)(*args)
|
980 | 933 |
|
981 | 934 |
|
| 935 | +# -- Internal executor-related, things |
| 936 | + |
| 937 | + |
| 938 | +def _ensure_executor( |
| 939 | + executor: ExecutorTypes | None, |
| 940 | +) -> concurrent.Executor: |
| 941 | + if executor is None: |
| 942 | + executor = concurrent.ProcessPoolExecutor() |
| 943 | + |
| 944 | + if isinstance(executor, concurrent.Executor): |
| 945 | + return executor |
| 946 | + elif with_ipyparallel and isinstance(executor, ipyparallel.Client): |
| 947 | + return executor.executor() |
| 948 | + elif with_distributed and isinstance(executor, distributed.Client): |
| 949 | + return executor.get_executor() |
| 950 | + else: |
| 951 | + raise TypeError( |
| 952 | + # TODO: check if this is correct. Isn't MPI,loky supported? |
| 953 | + "Only a concurrent.futures.Executor, distributed.Client," |
| 954 | + " or ipyparallel.Client can be used." |
| 955 | + ) |
| 956 | + |
| 957 | + |
| 958 | +def _get_ncores( |
| 959 | + ex: (ExecutorTypes), |
| 960 | +) -> int: |
| 961 | + """Return the maximum number of cores that an executor can use.""" |
| 962 | + if with_ipyparallel and isinstance(ex, ipyparallel.client.view.ViewExecutor): |
| 963 | + return len(ex.view) |
| 964 | + elif isinstance( |
| 965 | + ex, (concurrent.ProcessPoolExecutor, concurrent.ThreadPoolExecutor) |
| 966 | + ): |
| 967 | + return ex._max_workers # not public API! |
| 968 | + elif isinstance(ex, loky.reusable_executor._ReusablePoolExecutor): |
| 969 | + return ex._max_workers # not public API! |
| 970 | + elif isinstance(ex, SequentialExecutor): |
| 971 | + return 1 |
| 972 | + elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor): |
| 973 | + return sum(n for n in ex._client.ncores().values()) |
| 974 | + elif with_mpi4py and isinstance(ex, mpi4py.futures.MPIPoolExecutor): |
| 975 | + ex.bootup() # wait until all workers are up and running |
| 976 | + return ex._pool.size # not public API! |
| 977 | + else: |
| 978 | + raise TypeError(f"Cannot get number of cores for {ex.__class__}") |
| 979 | + |
| 980 | + |
982 | 981 | # --- Useful runner goals
|
983 | 982 |
|
984 | 983 | # TODO: deprecate
|
@@ -1016,9 +1015,6 @@ def stop_after(*, seconds=0, minutes=0, hours=0) -> Callable[[BaseLearner], bool
|
1016 | 1015 | return lambda _: time.time() > stop_time
|
1017 | 1016 |
|
1018 | 1017 |
|
1019 |
| -# -- Internal executor-related, things |
1020 |
| - |
1021 |
| - |
1022 | 1018 | class _TimeGoal:
|
1023 | 1019 | def __init__(self, dt: timedelta | datetime | int | float):
|
1024 | 1020 | self.dt = dt if isinstance(dt, (timedelta, datetime)) else timedelta(seconds=dt)
|
|
0 commit comments