5
5
intergrated into main files for execution in internal and external environments.
6
6
"""
7
7
import concurrent .futures
8
+ import copy
8
9
from itertools import chain
9
10
import multiprocessing
11
+ from multiprocessing import shared_memory # type: ignore
10
12
import time
11
13
from typing import Callable , Dict , List , Optional , Set , Tuple
12
14
@@ -169,29 +171,101 @@ def _merge_results(
169
171
return scoredNotes , helpfulnessScores , auxiliaryNoteInfo
170
172
171
173
174
+ def _load_data_with_data_loader_parallelizable (
175
+ dataLoader : CommunityNotesDataLoader , scoringArgs : ScoringArgs
176
+ ) -> ScoringArgs :
177
+ """
178
+ Load data from the dataLoader into the scoringArgs object. This function is designed to be run
179
+ in a multiprocessing pool,
180
+
181
+ Deprecated: prefer _load_data_from_shared_memory_parallelizable.
182
+ """
183
+ _ , ratings , noteStatusHistory , userEnrollment = dataLoader .get_data ()
184
+
185
+ scoringArgs .ratings = ratings
186
+ scoringArgs .noteStatusHistory = noteStatusHistory
187
+ scoringArgs .userEnrollment = userEnrollment
188
+ if type (scoringArgs ) == FinalScoringArgs :
189
+ prescoringNoteModelOutput , prescoringRaterParams = dataLoader .get_prescoring_model_output ()
190
+ scoringArgs .prescoringNoteModelOutput = prescoringNoteModelOutput
191
+ scoringArgs .prescoringRaterModelOutput = prescoringRaterParams
192
+ return scoringArgs
193
+
194
+
195
+ def _load_data_from_shared_memory_parallelizable (
196
+ scoringArgsSharedMemory : c .ScoringArgsSharedMemory , scoringArgs : ScoringArgs
197
+ ) -> ScoringArgs :
198
+ """
199
+ Load data from shared memory into the scoringArgs object. This function is designed to be run
200
+ in a multiprocessing pool.
201
+ """
202
+ scoringArgs .noteTopics = get_df_from_shared_memory (scoringArgsSharedMemory .noteTopics )
203
+ scoringArgs .ratings = get_df_from_shared_memory (scoringArgsSharedMemory .ratings )
204
+ scoringArgs .noteStatusHistory = get_df_from_shared_memory (
205
+ scoringArgsSharedMemory .noteStatusHistory
206
+ )
207
+ scoringArgs .userEnrollment = get_df_from_shared_memory (scoringArgsSharedMemory .userEnrollment )
208
+
209
+ if type (scoringArgs ) == FinalScoringArgs :
210
+ assert type (scoringArgsSharedMemory ) == c .FinalScoringArgsSharedMemory
211
+ scoringArgs .prescoringNoteModelOutput = get_df_from_shared_memory (
212
+ scoringArgsSharedMemory .prescoringNoteModelOutput
213
+ )
214
+ scoringArgs .prescoringRaterModelOutput = get_df_from_shared_memory (
215
+ scoringArgsSharedMemory .prescoringRaterModelOutput
216
+ )
217
+ return scoringArgs
218
+
219
+
172
220
def _run_scorer_parallelizable (
173
221
scorer : Scorer ,
174
222
runParallel : bool ,
175
223
scoringArgs : ScoringArgs ,
176
224
dataLoader : Optional [CommunityNotesDataLoader ] = None ,
225
+ scoringArgsSharedMemory = None ,
177
226
) -> Tuple [ModelResult , float ]:
227
+ """
228
+ Run scoring (either prescoring or final scoring) for a single scorer.
229
+ This function is designed to be run in a multiprocessing pool, so you can run this function
230
+ for each scorer in parallel.
231
+
232
+ We determine whether to run prescoring or final scoring based on the type of scoringArgs
233
+ (PrescoringArgs or FinalScoringArgs).
234
+
235
+ If runParallel is False, then we read input dataframes from scoringArgs.
236
+
237
+ If runParallel is True, then we ignore the dataframe attributes of scoringArgs, and read
238
+ the input dataframes from shared memory if scoringArgsSharedMemory is not None (preferred),
239
+ or from the dataLoader if scoringArgsSharedMemory is None. However, using the dataLoader to
240
+ re-read the dataframes from disk is much slower than using shared memory and is deprecated.
241
+ """
242
+ scorerStartTime = time .perf_counter ()
243
+
244
+ # Load data if multiprocessing
178
245
if runParallel :
179
- assert dataLoader is not None , "must provide a dataLoader to run parallel"
180
- print (f"Since parallel, loading data in run_scoring process for { scorer .get_name ()} " )
181
- ## TODO: also load prescoringNoteModelOutput, raterParamsUnfiltered from data loader.
182
- _ , ratings , noteStatusHistory , userEnrollment = dataLoader .get_data ()
183
-
184
- scoringArgs .ratings = ratings
185
- scoringArgs .noteStatusHistory = noteStatusHistory
186
- scoringArgs .userEnrollment = userEnrollment
187
- if type (scoringArgs ) == FinalScoringArgs :
188
- print (
189
- f"Loading prescoring model output for final scoring, in parallel for scorer { scorer .get_name ()} ."
190
- )
191
- prescoringNoteModelOutput , prescoringRaterParams = dataLoader .get_prescoring_model_output ()
192
- scoringArgs .prescoringNoteModelOutput = prescoringNoteModelOutput
193
- scoringArgs .prescoringRaterModelOutput = prescoringRaterParams
246
+ with c .time_block (f"{ scorer .get_name ()} run_scorer_parallelizable: Loading data" ):
247
+ scoringArgs .remove_large_args_for_multiprocessing () # Should be redundant
248
+ scoringArgs = copy .deepcopy (scoringArgs )
249
+
250
+ if scoringArgsSharedMemory is not None :
251
+ print (
252
+ f"{ scorer .get_name ()} run_scorer_parallelizable just started in parallel: loading data from shared memory."
253
+ )
254
+ scoringArgs = _load_data_from_shared_memory_parallelizable (
255
+ scoringArgsSharedMemory , scoringArgs
256
+ )
257
+ print (f"{ scorer .get_name ()} run_scorer_parallelizable just finished loading data from shared memory." )
258
+ elif dataLoader is not None :
259
+ print (
260
+ f"{ scorer .get_name ()} run_scorer_parallelizable just started in parallel: loading data with dataLoader."
261
+ )
262
+ scoringArgs = _load_data_with_data_loader_parallelizable (dataLoader , scoringArgs )
263
+ else :
264
+ raise ValueError (
265
+ "Must provide either scoringArgsSharedMemory or dataLoader to run parallel"
266
+ )
194
267
268
+ # Run scoring
195
269
scorerStartTime = time .perf_counter ()
196
270
if type (scoringArgs ) == PrescoringArgs :
197
271
scoringResults = scorer .prescore (scoringArgs )
@@ -204,6 +278,75 @@ def _run_scorer_parallelizable(
204
278
return scoringResults , (scorerEndTime - scorerStartTime )
205
279
206
280
281
+ def save_df_to_shared_memory (df : pd .DataFrame , shms : List ) -> c .SharedMemoryDataframeInfo :
282
+ """
283
+ Intended to be called before beginning multiprocessing: saves the df to shared memory
284
+ and returns the info needed to access it, as well as appends it to the list of shared memory objects
285
+ so it's not garbage collected and can be closed later.
286
+ """
287
+ cols = df .columns
288
+ data = df .to_numpy ()
289
+ df_dtypes_dict = dict (list (zip (df .columns , df .dtypes )))
290
+ shm = shared_memory .SharedMemory (create = True , size = data .nbytes )
291
+ np_array = np .ndarray (data .shape , dtype = data .dtype , buffer = shm .buf )
292
+ np_array [:] = data [:]
293
+ shms .append (shm ) # save the shared memory object so we can close it later
294
+ return c .SharedMemoryDataframeInfo (
295
+ sharedMemoryName = shm .name ,
296
+ columns = cols ,
297
+ dataShape = data .shape ,
298
+ dtypesDict = df_dtypes_dict ,
299
+ npDtype = np_array .dtype ,
300
+ )
301
+
302
+
303
+ def get_df_from_shared_memory (sharedMemoryDfInfo : c .SharedMemoryDataframeInfo ) -> pd .DataFrame :
304
+ """
305
+ Intended to be called from a process within a multiprocessing pool in parallel.
306
+ Read a dataframe from shared memory and return it.
307
+ """
308
+ existing_shm = shared_memory .SharedMemory (name = sharedMemoryDfInfo .sharedMemoryName )
309
+ np_array = np .ndarray (
310
+ sharedMemoryDfInfo .dataShape , buffer = existing_shm .buf , dtype = sharedMemoryDfInfo .npDtype
311
+ )
312
+ df = pd .DataFrame (np_array , columns = sharedMemoryDfInfo .columns )
313
+ df = df .astype (sharedMemoryDfInfo .dtypesDict )
314
+ return df
315
+
316
+
317
+ def _save_dfs_to_shared_memory (
318
+ scoringArgs : ScoringArgs ,
319
+ ) -> Tuple [List [shared_memory .SharedMemory ], c .ScoringArgsSharedMemory ]:
320
+ """
321
+ Save large dfs to shared memory. Called before beginning multiprocessing.
322
+ """
323
+ shms : List [shared_memory .SharedMemory ] = []
324
+ noteTopics = save_df_to_shared_memory (scoringArgs .noteTopics , shms )
325
+ ratings = save_df_to_shared_memory (scoringArgs .ratings , shms )
326
+ noteStatusHistory = save_df_to_shared_memory (scoringArgs .noteStatusHistory , shms )
327
+ userEnrollment = save_df_to_shared_memory (scoringArgs .userEnrollment , shms )
328
+
329
+ if type (scoringArgs ) == FinalScoringArgs :
330
+ prescoringNoteModelOutput = save_df_to_shared_memory (
331
+ scoringArgs .prescoringNoteModelOutput , shms
332
+ )
333
+ prescoringRaterModelOutput = save_df_to_shared_memory (
334
+ scoringArgs .prescoringRaterModelOutput , shms
335
+ )
336
+ return shms , c .FinalScoringArgsSharedMemory (
337
+ noteTopics ,
338
+ ratings ,
339
+ noteStatusHistory ,
340
+ userEnrollment ,
341
+ prescoringNoteModelOutput ,
342
+ prescoringRaterModelOutput ,
343
+ )
344
+ else :
345
+ return shms , c .PrescoringArgsSharedMemory (
346
+ noteTopics , ratings , noteStatusHistory , userEnrollment
347
+ )
348
+
349
+
207
350
def _run_scorers (
208
351
scorers : List [Scorer ],
209
352
scoringArgs : ScoringArgs ,
@@ -231,10 +374,12 @@ def _run_scorers(
231
374
# Apply scoring algorithms
232
375
overallStartTime = time .perf_counter ()
233
376
if runParallel :
377
+ shms , scoringArgsSharedMemory = _save_dfs_to_shared_memory (scoringArgs )
378
+
234
379
with concurrent .futures .ProcessPoolExecutor (
235
- mp_context = multiprocessing .get_context ("spawn" ), max_workers = maxWorkers
380
+ mp_context = multiprocessing .get_context ("fork" ),
381
+ max_workers = maxWorkers ,
236
382
) as executor :
237
- assert dataLoader is not None
238
383
print (f"Starting parallel scorer execution with { len (scorers )} scorers." )
239
384
# Pass mostly-empty scoringArgs: the data is too large to be copied in-memory to
240
385
# each process, so must be re-loaded from disk by every scorer's dataLoader.
@@ -244,12 +389,17 @@ def _run_scorers(
244
389
_run_scorer_parallelizable ,
245
390
scorer = scorer ,
246
391
runParallel = True ,
392
+ scoringArgs = copy .deepcopy (scoringArgs ),
247
393
dataLoader = dataLoader ,
248
- scoringArgs = scoringArgs ,
394
+ scoringArgsSharedMemory = copy . deepcopy ( scoringArgsSharedMemory ) ,
249
395
)
250
396
for scorer in scorers
251
397
]
252
398
modelResultsAndTimes = [f .result () for f in futures ]
399
+
400
+ for shm in shms :
401
+ shm .close ()
402
+ shm .unlink () # free the shared memory
253
403
else :
254
404
modelResultsAndTimes = [
255
405
_run_scorer_parallelizable (
0 commit comments