diff --git a/benchmarks/decoders/benchmark_decoders.py b/benchmarks/decoders/benchmark_decoders.py index 81a8c0ea..23f45dab 100644 --- a/benchmarks/decoders/benchmark_decoders.py +++ b/benchmarks/decoders/benchmark_decoders.py @@ -13,6 +13,7 @@ from benchmark_decoders_library import ( AbstractDecoder, + BatchParameters, DecordAccurate, DecordAccurateBatch, plot_data, @@ -173,6 +174,7 @@ def main() -> None: num_sequential_frames_from_start=[1, 10, 100], min_runtime_seconds=args.bm_video_speed_min_run_seconds, benchmark_video_creation=args.bm_video_creation, + batch_parameters=BatchParameters(num_threads=8, batch_size=40), ) plot_data(df_data, args.plot_path) diff --git a/benchmarks/decoders/benchmark_decoders_library.py b/benchmarks/decoders/benchmark_decoders_library.py index 8d9d8a92..ae07727f 100644 --- a/benchmarks/decoders/benchmark_decoders_library.py +++ b/benchmarks/decoders/benchmark_decoders_library.py @@ -3,6 +3,7 @@ import subprocess import urllib.request from concurrent.futures import ThreadPoolExecutor, wait +from dataclasses import dataclass from itertools import product from pathlib import Path @@ -479,6 +480,43 @@ def get_metadata(video_file_path: str) -> VideoStreamMetadata: return VideoDecoder(video_file_path).metadata +@dataclass +class BatchParameters: + num_threads: int + batch_size: int + + +def run_batch_using_threads( + function, + *args, + batch_parameters: BatchParameters = BatchParameters(num_threads=8, batch_size=40), +): + executor = ThreadPoolExecutor(max_workers=batch_parameters.num_threads) + futures = [] + for _ in range(batch_parameters.batch_size): + futures.append(executor.submit(function, *args)) + for f in futures: + assert f.result() + executor.shutdown(wait=True) + + +def convert_result_to_df_item( + result, decoder_name, video_file_path, num_samples, decode_pattern +): + df_item = {} + df_item["decoder"] = decoder_name + df_item["video"] = str(video_file_path) + df_item["description"] = result.description + df_item["frame_count"] = num_samples + df_item["median"] = result.median + df_item["iqr"] = result.iqr + df_item["type"] = decode_pattern + df_item["fps_median"] = num_samples / result.median + df_item["fps_p75"] = num_samples / result._p75 + df_item["fps_p25"] = num_samples / result._p25 + return df_item + + def run_benchmarks( decoder_dict: dict[str, AbstractDecoder], video_files_paths: list[Path], @@ -486,6 +524,7 @@ def run_benchmarks( num_sequential_frames_from_start: list[int], min_runtime_seconds: float, benchmark_video_creation: bool, + batch_parameters: BatchParameters = None, ) -> list[dict[str, str | float | int]]: # Ensure that we have the same seed across benchmark runs. torch.manual_seed(0) @@ -532,18 +571,44 @@ def run_benchmarks( results.append( seeked_result.blocked_autorange(min_run_time=min_runtime_seconds) ) - df_item = {} - df_item["decoder"] = decoder_name - df_item["video"] = str(video_file_path) - df_item["description"] = results[-1].description - df_item["frame_count"] = num_samples - df_item["median"] = results[-1].median - df_item["iqr"] = results[-1].iqr - df_item["type"] = f"{kind}:seek()+next()" - df_item["fps_median"] = num_samples / results[-1].median - df_item["fps_p75"] = num_samples / results[-1]._p75 - df_item["fps_p25"] = num_samples / results[-1]._p25 - df_data.append(df_item) + df_data.append( + convert_result_to_df_item( + results[-1], + decoder_name, + video_file_path, + num_samples, + f"{kind} seek()+next()", + ) + ) + + if batch_parameters: + seeked_result = benchmark.Timer( + stmt="run_batch_using_threads(decoder.get_frames_from_video, video_file, pts_list, batch_parameters=batch_parameters)", + globals={ + "video_file": str(video_file_path), + "pts_list": pts_list, + "decoder": decoder, + "run_batch_using_threads": run_batch_using_threads, + "batch_parameters": batch_parameters, + }, + label=f"video={video_file_path} {metadata_label}", + sub_label=decoder_name, + description=f"batch {kind} {num_samples} seek()+next()", + ) + results.append( + seeked_result.blocked_autorange( + min_run_time=min_runtime_seconds + ) + ) + df_data.append( + convert_result_to_df_item( + results[-1], + decoder_name, + video_file_path, + num_samples * batch_parameters.batch_size, + f"batch {kind} seek()+next()", + ) + ) for num_consecutive_nexts in num_sequential_frames_from_start: consecutive_frames_result = benchmark.Timer( @@ -562,18 +627,44 @@ def run_benchmarks( min_run_time=min_runtime_seconds ) ) - df_item = {} - df_item["decoder"] = decoder_name - df_item["video"] = str(video_file_path) - df_item["description"] = results[-1].description - df_item["frame_count"] = num_consecutive_nexts - df_item["median"] = results[-1].median - df_item["iqr"] = results[-1].iqr - df_item["type"] = "next()" - df_item["fps_median"] = num_consecutive_nexts / results[-1].median - df_item["fps_p75"] = num_consecutive_nexts / results[-1]._p75 - df_item["fps_p25"] = num_consecutive_nexts / results[-1]._p25 - df_data.append(df_item) + df_data.append( + convert_result_to_df_item( + results[-1], + decoder_name, + video_file_path, + num_consecutive_nexts, + f"{num_consecutive_nexts} next()", + ) + ) + + if batch_parameters: + consecutive_frames_result = benchmark.Timer( + stmt="run_batch_using_threads(decoder.get_consecutive_frames_from_video, video_file, consecutive_frames_to_extract, batch_parameters=batch_parameters)", + globals={ + "video_file": str(video_file_path), + "consecutive_frames_to_extract": num_consecutive_nexts, + "decoder": decoder, + "run_batch_using_threads": run_batch_using_threads, + "batch_parameters": batch_parameters, + }, + label=f"video={video_file_path} {metadata_label}", + sub_label=decoder_name, + description=f"batch {num_consecutive_nexts} next()", + ) + results.append( + consecutive_frames_result.blocked_autorange( + min_run_time=min_runtime_seconds + ) + ) + df_data.append( + convert_result_to_df_item( + results[-1], + decoder_name, + video_file_path, + num_consecutive_nexts * batch_parameters.batch_size, + f"batch {num_consecutive_nexts} next()", + ) + ) first_video_file_path = video_files_paths[0] if benchmark_video_creation: