@@ -234,15 +234,25 @@ def _map_by_partition(
234
234
spark_serialized_artifacts : _SparkSerializedArtifacts ,
235
235
):
236
236
feature_view , online_store , repo_config = spark_serialized_artifacts .unserialize ()
237
+
238
+ total_batches = 0
239
+ total_time = 0.0
240
+ min_time = float ("inf" )
241
+ max_time = float ("-inf" )
242
+
243
+ total_rows = 0
244
+ min_batch_size = float ("inf" )
245
+ max_batch_size = float ("-inf" )
246
+
237
247
"""Load pandas df to online store"""
238
248
for pdf in iterator :
249
+ start_time = time .perf_counter ()
239
250
pdf_row_count = pdf .shape [0 ]
240
- start_time = time .time ()
241
- # convert to pyarrow table
242
251
if pdf_row_count == 0 :
243
- print ("INFO!!! Dataframe has 0 records to process" )
244
- return
252
+ print ("INFO: Dataframe has 0 records to process" )
253
+ break
245
254
255
+ # convert to pyarrow table
246
256
table = pyarrow .Table .from_pandas (pdf )
247
257
248
258
if feature_view .batch_source .field_mapping is not None :
@@ -266,10 +276,89 @@ def _map_by_partition(
266
276
rows_to_write ,
267
277
lambda x : None ,
268
278
)
269
- end_time = time .time ()
270
- print (
271
- f"INFO!!! Processed batch with size { pdf_row_count } in { int ((end_time - start_time ) * 1000 )} milliseconds"
279
+
280
+ batch_time = time .perf_counter () - start_time
281
+
282
+ (
283
+ total_batches ,
284
+ total_time ,
285
+ min_time ,
286
+ max_time ,
287
+ total_rows ,
288
+ min_batch_size ,
289
+ max_batch_size ,
290
+ ) = update_exec_stats (
291
+ total_batches ,
292
+ total_time ,
293
+ min_time ,
294
+ max_time ,
295
+ total_rows ,
296
+ min_batch_size ,
297
+ max_batch_size ,
298
+ batch_time ,
299
+ pdf_row_count ,
272
300
)
301
+
302
+ if total_batches > 0 :
303
+ print_exec_stats (
304
+ total_batches ,
305
+ total_time ,
306
+ min_time ,
307
+ max_time ,
308
+ total_rows ,
309
+ min_batch_size ,
310
+ max_batch_size ,
311
+ )
312
+
273
313
yield pd .DataFrame (
274
314
[pd .Series (range (1 , 2 ))]
275
315
) # dummy result because mapInPandas needs to return something
316
+
317
+
318
+ def update_exec_stats (
319
+ total_batches ,
320
+ total_time ,
321
+ min_time ,
322
+ max_time ,
323
+ total_rows ,
324
+ min_batch_size ,
325
+ max_batch_size ,
326
+ batch_time ,
327
+ current_batch_size ,
328
+ ):
329
+ total_batches += 1
330
+ total_time += batch_time
331
+ min_time = min (min_time , batch_time )
332
+ max_time = max (max_time , batch_time )
333
+
334
+ total_rows += current_batch_size
335
+ min_batch_size = min (min_batch_size , current_batch_size )
336
+ max_batch_size = max (max_batch_size , current_batch_size )
337
+
338
+ return (
339
+ total_batches ,
340
+ total_time ,
341
+ min_time ,
342
+ max_time ,
343
+ total_rows ,
344
+ min_batch_size ,
345
+ max_batch_size ,
346
+ )
347
+
348
+
349
+ def print_exec_stats (
350
+ total_batches ,
351
+ total_time ,
352
+ min_time ,
353
+ max_time ,
354
+ total_rows ,
355
+ min_batch_size ,
356
+ max_batch_size ,
357
+ ):
358
+ # TODO: Investigate why the logger is not working in Spark Executors
359
+ avg_time = total_time / total_batches
360
+ avg_batch_size = total_rows / total_batches
361
+ print (
362
+ f"Time - Total: { total_time :.6f} s, Avg: { avg_time :.6f} s, Min: { min_time :.6f} s, Max: { max_time :.6f} s | "
363
+ f"Batch Size - Total: { total_rows } , Avg: { avg_batch_size :.2f} , Min: { min_batch_size } , Max: { max_batch_size } "
364
+ )
0 commit comments