From 64e38a786b8ac7886276e540d5c45ab5c63eb2ba Mon Sep 17 00:00:00 2001 From: Alexander Zinov Date: Sun, 16 Feb 2025 18:10:01 +0200 Subject: [PATCH 1/2] fix PathLike type-hints --- ffmpeg/asyncio/ffmpeg.py | 4 ++-- ffmpeg/ffmpeg.py | 4 ++-- ffmpeg/options.py | 4 ++-- ffmpeg/protocol.py | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ffmpeg/asyncio/ffmpeg.py b/ffmpeg/asyncio/ffmpeg.py index 6bedec9..bd194fb 100644 --- a/ffmpeg/asyncio/ffmpeg.py +++ b/ffmpeg/asyncio/ffmpeg.py @@ -62,7 +62,7 @@ def option(self, key: str, value: Optional[types.Option] = None) -> Self: def input( self, - url: Union[str, os.PathLike], + url: Union[str, os.PathLike[str]], options: Optional[dict[str, Optional[types.Option]]] = None, **kwargs: Optional[types.Option], ) -> Self: @@ -105,7 +105,7 @@ def input( def output( self, - url: Union[str, os.PathLike], + url: Union[str, os.PathLike[str]], options: Optional[dict[str, Optional[types.Option]]] = None, **kwargs: Optional[types.Option], ) -> Self: diff --git a/ffmpeg/ffmpeg.py b/ffmpeg/ffmpeg.py index 36a8c73..9b47927 100644 --- a/ffmpeg/ffmpeg.py +++ b/ffmpeg/ffmpeg.py @@ -59,7 +59,7 @@ def option(self, key: str, value: Optional[types.Option] = None) -> Self: def input( self, - url: Union[str, os.PathLike], + url: Union[str, os.PathLike[str]], options: Optional[dict[str, Optional[types.Option]]] = None, **kwargs: Optional[types.Option], ) -> Self: @@ -102,7 +102,7 @@ def input( def output( self, - url: Union[str, os.PathLike], + url: Union[str, os.PathLike[str]], options: Optional[dict[str, Optional[types.Option]]] = None, **kwargs: Optional[types.Option], ) -> Self: diff --git a/ffmpeg/options.py b/ffmpeg/options.py index a867382..775cbb0 100644 --- a/ffmpeg/options.py +++ b/ffmpeg/options.py @@ -40,7 +40,7 @@ def option(self, key: str, value: Optional[types.Option] = None): def input( self, - url: Union[str, os.PathLike], + url: Union[str, os.PathLike[str]], options: Optional[dict[str, Optional[types.Option]]] = None, **kwargs: Optional[types.Option], ): @@ -53,7 +53,7 @@ def input( def output( self, - url: Union[str, os.PathLike], + url: Union[str, os.PathLike[str]], options: Optional[dict[str, Optional[types.Option]]] = None, **kwargs: Optional[types.Option], ): diff --git a/ffmpeg/protocol.py b/ffmpeg/protocol.py index 48ea214..0059d86 100644 --- a/ffmpeg/protocol.py +++ b/ffmpeg/protocol.py @@ -18,14 +18,14 @@ def option(self, key: str, value: Optional[types.Option] = None) -> Self: ... def input( self, - url: Union[str, os.PathLike], + url: Union[str, os.PathLike[str]], options: Optional[dict[str, Optional[types.Option]]] = None, **kwargs: Optional[types.Option], ) -> Self: ... def output( self, - url: Union[str, os.PathLike], + url: Union[str, os.PathLike[str]], options: Optional[dict[str, Optional[types.Option]]] = None, **kwargs: Optional[types.Option], ) -> Self: ... From 0a568ad43676ed807e3f56a27f402f2d7fa933d5 Mon Sep 17 00:00:00 2001 From: Alexander Zinov Date: Tue, 25 Feb 2025 20:23:25 +0200 Subject: [PATCH 2/2] improve type-hints, support any AsyncIterable[bytes] as input, remove support for EOL python versions --- ffmpeg/__init__.py | 10 ++++++ ffmpeg/asyncio/__init__.py | 2 ++ ffmpeg/asyncio/ffmpeg.py | 45 +++++++++++++------------- ffmpeg/asyncio/utils.py | 38 +++++++++++++--------- ffmpeg/ffmpeg.py | 34 ++++++++++---------- ffmpeg/options.py | 16 +++++----- ffmpeg/progress.py | 7 +++-- ffmpeg/protocol.py | 22 +++++-------- ffmpeg/statistics.py | 36 ++++++++++++++------- ffmpeg/types.py | 4 +-- ffmpeg/utils.py | 25 +++++++-------- setup.cfg | 2 +- tests/test_asyncio_async_iterable.py | 47 ++++++++++++++++++++++++++++ 13 files changed, 186 insertions(+), 102 deletions(-) create mode 100644 tests/test_asyncio_async_iterable.py diff --git a/ffmpeg/__init__.py b/ffmpeg/__init__.py index 2c64d8a..20d3a5d 100644 --- a/ffmpeg/__init__.py +++ b/ffmpeg/__init__.py @@ -3,3 +3,13 @@ from .progress import Progress __version__ = "2.0.12" + +__all__ = [ + "FFmpeg", + "FFmpegAlreadyExecuted", + "FFmpegError", + "FFmpegFileNotFound", + "FFmpegInvalidCommand", + "FFmpegUnsupportedCodec", + "Progress", +] diff --git a/ffmpeg/asyncio/__init__.py b/ffmpeg/asyncio/__init__.py index 723d47f..c8c37b2 100644 --- a/ffmpeg/asyncio/__init__.py +++ b/ffmpeg/asyncio/__init__.py @@ -1 +1,3 @@ from .ffmpeg import FFmpeg + +__all__ = ["FFmpeg"] diff --git a/ffmpeg/asyncio/ffmpeg.py b/ffmpeg/asyncio/ffmpeg.py index bd194fb..f894a21 100644 --- a/ffmpeg/asyncio/ffmpeg.py +++ b/ffmpeg/asyncio/ffmpeg.py @@ -5,17 +5,19 @@ import os import signal import subprocess -from typing import Optional, Union +import sys +from typing import TYPE_CHECKING, Any, Optional, Union from pyee.asyncio import AsyncIOEventEmitter from typing_extensions import Self -from ffmpeg import types -from ffmpeg.asyncio.utils import create_subprocess, ensure_stream_reader, read_stream, readlines +from ffmpeg.asyncio.utils import create_subprocess, ensure_async_iterable, read_stream, readlines from ffmpeg.ffmpeg import FFmpegAlreadyExecuted, FFmpegError from ffmpeg.options import Options from ffmpeg.progress import Tracker -from ffmpeg.utils import is_windows + +if TYPE_CHECKING: + from ffmpeg import types class FFmpeg(AsyncIOEventEmitter): @@ -34,7 +36,7 @@ def __init__(self, executable: str = "ffmpeg"): self._executed: bool = False self._terminated: bool = False - self._tracker = Tracker(self) # type: ignore + self._tracker = Tracker(self) self.once("error", self._reraise_exception) @@ -146,9 +148,7 @@ def output( self._options.output(url, options, **kwargs) return self - async def execute( - self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None, timeout: Optional[float] = None - ) -> bytes: + async def execute(self, stream: Optional[types.AsyncStream] = None, timeout: Optional[float] = None) -> bytes: """Execute FFmpeg using specified global options and files. Args: @@ -170,24 +170,25 @@ async def execute( self._terminated = False if stream is not None: - stream = ensure_stream_reader(stream) + _stream = ensure_async_iterable(stream) + else: + _stream = None self.emit("start", self.arguments) self._process = await create_subprocess( *self.arguments, - stdin=subprocess.PIPE if stream is not None else None, + stdin=subprocess.PIPE if _stream is not None else None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) self._executed = True - tasks = [ - asyncio.create_task(self._write_stdin(stream)), - asyncio.create_task(self._read_stdout()), - asyncio.create_task(self._handle_stderr()), - asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)), - ] + stdin_task = asyncio.create_task(self._write_stdin(_stream)) + stdout_task = asyncio.create_task(self._read_stdout()) + stderr_task = asyncio.create_task(self._handle_stderr()) + process_task = asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)) + tasks: list[asyncio.Task[Any]] = [stdin_task, stdout_task, stderr_task, process_task] done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) self._executed = False @@ -205,9 +206,9 @@ async def execute( elif self._terminated: self.emit("terminated") else: - raise FFmpegError.create(message=tasks[2].result(), arguments=self.arguments) + raise FFmpegError.create(message=stderr_task.result(), arguments=self.arguments) - return tasks[1].result() + return stdout_task.result() def terminate(self): """Gracefully terminate the running FFmpeg process. @@ -219,24 +220,24 @@ def terminate(self): raise FFmpegError("FFmpeg is not executed", arguments=self.arguments) sigterm = signal.SIGTERM - if is_windows(): + if sys.platform == "win32": # On Windows, SIGTERM is an alias for TerminateProcess(). # To gracefully terminate the FFmpeg process, we should use CTRL_BREAK_EVENT signal. # References: # - https://docs.python.org/3.10/library/subprocess.html#subprocess.Popen.send_signal # - https://github.com/FFmpeg/FFmpeg/blob/release/5.1/fftools/ffmpeg.c#L371 - sigterm = signal.CTRL_BREAK_EVENT # type: ignore + sigterm = signal.CTRL_BREAK_EVENT self._terminated = True self._process.send_signal(sigterm) - async def _write_stdin(self, stream: Optional[asyncio.StreamReader]): + async def _write_stdin(self, stream: Optional[types.AsyncIterable[bytes]]): if stream is None: return assert self._process.stdin is not None - async for chunk in read_stream(stream, size=io.DEFAULT_BUFFER_SIZE): + async for chunk in stream: self._process.stdin.write(chunk) await self._process.stdin.drain() diff --git a/ffmpeg/asyncio/utils.py b/ffmpeg/asyncio/utils.py index 959ea3f..379798a 100644 --- a/ffmpeg/asyncio/utils.py +++ b/ffmpeg/asyncio/utils.py @@ -2,31 +2,31 @@ import io import re import subprocess -from typing import Any, AsyncIterable, Awaitable +import sys +from functools import wraps +from typing import Any, AsyncIterable from ffmpeg import types -from ffmpeg.utils import is_windows -def create_subprocess(*args: Any, **kwargs: Any) -> Awaitable[asyncio.subprocess.Process]: +@wraps(asyncio.create_subprocess_exec) +def create_subprocess(*args: Any, creationflags: int = 0, **kwargs: Any): # On Windows, CREATE_NEW_PROCESS_GROUP flag is required to use CTRL_BREAK_EVENT signal, # which is required to gracefully terminate the FFmpeg process. # Reference: https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.subprocess.Process.send_signal - if is_windows(): - kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore + if sys.platform == "win32": + creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP - return asyncio.create_subprocess_exec(*args, **kwargs) + return asyncio.create_subprocess_exec(*args, creationflags=creationflags, **kwargs) -def ensure_stream_reader(stream: types.AsyncStream) -> asyncio.StreamReader: - if isinstance(stream, asyncio.StreamReader): - return stream - - reader = asyncio.StreamReader() - reader.feed_data(stream) - reader.feed_eof() +async def read_bytes(stream: bytes, size: int = -1) -> AsyncIterable[bytes]: + if size == -1: + yield stream + return - return reader + for i in range(0, len(stream), size): + yield stream[i : i + size] async def read_stream(stream: asyncio.StreamReader, size: int = -1) -> AsyncIterable[bytes]: @@ -38,6 +38,16 @@ async def read_stream(stream: asyncio.StreamReader, size: int = -1) -> AsyncIter yield chunk +def ensure_async_iterable(stream: types.AsyncStream) -> AsyncIterable[bytes]: + if isinstance(stream, bytes): + return read_bytes(stream, io.DEFAULT_BUFFER_SIZE) + + if isinstance(stream, asyncio.StreamReader): + return read_stream(stream, io.DEFAULT_BUFFER_SIZE) + + return stream + + async def readlines(stream: asyncio.StreamReader) -> AsyncIterable[bytes]: pattern = re.compile(rb"[\r\n]+") diff --git a/ffmpeg/ffmpeg.py b/ffmpeg/ffmpeg.py index 9b47927..c683e35 100644 --- a/ffmpeg/ffmpeg.py +++ b/ffmpeg/ffmpeg.py @@ -5,16 +5,19 @@ import os import signal import subprocess -from typing import IO, Optional, Union +import sys +from typing import IO, TYPE_CHECKING, Any, Optional, Union from pyee import EventEmitter from typing_extensions import Self -from ffmpeg import types from ffmpeg.errors import FFmpegAlreadyExecuted, FFmpegError from ffmpeg.options import Options from ffmpeg.progress import Tracker -from ffmpeg.utils import create_subprocess, ensure_io, is_windows, read_stream, readlines +from ffmpeg.utils import create_subprocess, ensure_io, read_stream, readlines + +if TYPE_CHECKING: + from ffmpeg import types class FFmpeg(EventEmitter): @@ -33,14 +36,14 @@ def __init__(self, executable: str = "ffmpeg"): self._executed: bool = False self._terminated: bool = False - self._tracker = Tracker(self) # type: ignore + self._tracker = Tracker(self) @property def arguments(self) -> list[str]: """Return a list of arguments to be used when executing FFmpeg. Returns: - A lit of arguments to be used when executing FFmpeg. + A list of arguments to be used when executing FFmpeg. """ return [self._executable, *self._options.build()] @@ -143,7 +146,7 @@ def output( self._options.output(url, options, **kwargs) return self - def execute(self, stream: Optional[Union[bytes, IO[bytes]]] = None, timeout: Optional[float] = None) -> bytes: + def execute(self, stream: Optional[types.Stream] = None, timeout: Optional[float] = None) -> bytes: """Execute FFmpeg using specified global options and files. Args: @@ -179,12 +182,11 @@ def execute(self, stream: Optional[Union[bytes, IO[bytes]]] = None, timeout: Opt with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: self._executed = True - futures = [ - executor.submit(self._write_stdin, stream), - executor.submit(self._read_stdout), - executor.submit(self._handle_stderr), - executor.submit(self._process.wait, timeout), - ] + stdin_task = executor.submit(self._write_stdin, stream) + stdout_task = executor.submit(self._read_stdout) + stderr_task = executor.submit(self._handle_stderr) + process_task = executor.submit(self._process.wait, timeout) + futures: list[concurrent.futures.Future[Any]] = [stdin_task, stdout_task, stderr_task, process_task] done, pending = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION) self._executed = False @@ -201,9 +203,9 @@ def execute(self, stream: Optional[Union[bytes, IO[bytes]]] = None, timeout: Opt elif self._terminated: self.emit("terminated") else: - raise FFmpegError.create(message=futures[2].result(), arguments=self.arguments) + raise FFmpegError.create(message=stderr_task.result(), arguments=self.arguments) - return futures[1].result() + return stdout_task.result() def terminate(self): """Gracefully terminate the running FFmpeg process. @@ -215,13 +217,13 @@ def terminate(self): raise FFmpegError("FFmpeg is not executed", arguments=self.arguments) sigterm = signal.SIGTERM - if is_windows(): + if sys.platform == "win32": # On Windows, SIGTERM is an alias for TerminateProcess(). # To gracefully terminate the FFmpeg process, we should use CTRL_BREAK_EVENT signal. # References: # - https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal # - https://github.com/FFmpeg/FFmpeg/blob/release/5.1/fftools/ffmpeg.c#L371 - sigterm = signal.CTRL_BREAK_EVENT # type: ignore + sigterm = signal.CTRL_BREAK_EVENT self._terminated = True self._process.send_signal(sigterm) diff --git a/ffmpeg/options.py b/ffmpeg/options.py index 775cbb0..16dbcd0 100644 --- a/ffmpeg/options.py +++ b/ffmpeg/options.py @@ -2,19 +2,21 @@ import os from dataclasses import dataclass -from typing import Iterable, Optional, Union +from typing import TYPE_CHECKING, Iterable, Optional, Union -from ffmpeg import types from ffmpeg.file import InputFile, OutputFile +if TYPE_CHECKING: + from ffmpeg import types + def _unpack_options(options: dict[str, Optional[types.Option]]) -> Iterable[Option]: for key, values in options.items(): - if not isinstance(values, (list, set, tuple)): - values = [values] - - for value in values: - yield Option(key, value) + if isinstance(values, (list, set, tuple)): + for value in values: + yield Option(key, value) + else: + yield Option(key, values) @dataclass(frozen=True) diff --git a/ffmpeg/progress.py b/ffmpeg/progress.py index 0234f95..b384114 100644 --- a/ffmpeg/progress.py +++ b/ffmpeg/progress.py @@ -2,10 +2,13 @@ from dataclasses import asdict, dataclass from datetime import timedelta +from typing import TYPE_CHECKING -from ffmpeg.protocol import FFmpegProtocol from ffmpeg.statistics import Statistics +if TYPE_CHECKING: + from ffmpeg.protocol import ExecuteType_co, FFmpegProtocol + @dataclass(frozen=True) class Progress: @@ -29,7 +32,7 @@ class Progress: class Tracker: - def __init__(self, ffmpeg: FFmpegProtocol): + def __init__(self, ffmpeg: FFmpegProtocol[ExecuteType_co]): self._ffmpeg = ffmpeg self._ffmpeg.on("stderr", self._on_stderr) diff --git a/ffmpeg/protocol.py b/ffmpeg/protocol.py index 0059d86..535cb58 100644 --- a/ffmpeg/protocol.py +++ b/ffmpeg/protocol.py @@ -1,14 +1,18 @@ from __future__ import annotations import os -from typing import Any, Callable, Optional, Union +from typing import Any, Awaitable, Callable, Final, Optional, Protocol, TypeVar, Union -from typing_extensions import Protocol, Self, overload +from typing_extensions import Self, TypeAlias from ffmpeg import types +SyncExecute: TypeAlias = Callable[[Optional[types.Stream], Optional[float]], bytes] +AsyncExecute: TypeAlias = Callable[[Optional[types.AsyncStream], Optional[float]], Awaitable[bytes]] +ExecuteType_co = TypeVar("ExecuteType_co", SyncExecute, AsyncExecute, covariant=True) -class FFmpegProtocol(Protocol): + +class FFmpegProtocol(Protocol[ExecuteType_co]): def __init__(self, executable: str = "ffmpeg"): ... @property @@ -30,17 +34,7 @@ def output( **kwargs: Optional[types.Option], ) -> Self: ... - @overload - def execute( - self, - stream: Optional[types.Stream] = None, - ) -> bytes: ... - - @overload - async def execute( - self, - stream: Optional[types.AsyncStream] = None, - ) -> bytes: ... + execute: Final[ExecuteType_co] def terminate(self): ... diff --git a/ffmpeg/statistics.py b/ffmpeg/statistics.py index f8c7d44..c80b69d 100644 --- a/ffmpeg/statistics.py +++ b/ffmpeg/statistics.py @@ -3,24 +3,34 @@ import re from dataclasses import dataclass, field from datetime import timedelta -from typing import Optional +from typing import Callable, Optional, TypedDict from typing_extensions import Self -from ffmpeg.utils import parse_time, parse_size +from ffmpeg.utils import parse_size, parse_time # Reference: https://github.com/FFmpeg/FFmpeg/blob/release/6.1/fftools/ffmpeg.c#L496 _pattern = re.compile(r"(frame|fps|size|time|bitrate|speed)\s*\=\s*(\S+)") -_field_factory = { - "frame": int, - "fps": float, - "size": parse_size, - "time": parse_time, - "bitrate": lambda item: float(item.replace("kbits/s", "")), - "speed": lambda item: float(item.replace("x", "")), -} + +class FieldFactory(TypedDict): + frame: Callable[[str], int] + fps: Callable[[str], float] + size: Callable[[str], int] + time: Callable[[str], timedelta] + bitrate: Callable[[str], float] + speed: Callable[[str], float] + + +_field_factory = FieldFactory( + frame=int, + fps=float, + size=parse_size, + time=parse_time, + bitrate=lambda item: float(item.replace("kbits/s", "")), + speed=lambda item: float(item.replace("x", "")), +) @dataclass(frozen=True) @@ -42,5 +52,9 @@ def from_line(cls, line: str) -> Optional[Self]: # - frame, fps, size, time, bitrate, speed return None - fields = {key: _field_factory[key](value) for key, value in statistics.items() if value != "N/A"} + fields = { + key: _field_factory[key](value) + for key, value in statistics.items() + if value != "N/A" and key in _field_factory + } return cls(**fields) diff --git a/ffmpeg/types.py b/ffmpeg/types.py index f7ce624..66918b8 100644 --- a/ffmpeg/types.py +++ b/ffmpeg/types.py @@ -1,7 +1,7 @@ from __future__ import annotations import asyncio -from typing import IO, Callable, Iterable, TypeVar, Union +from typing import IO, AsyncIterable, Callable, Iterable, TypeVar, Union Numeric = Union[int, float] @@ -9,6 +9,6 @@ Option = Union[Iterable[T], T] Stream = Union[bytes, IO[bytes]] -AsyncStream = Union[bytes, asyncio.StreamReader] +AsyncStream = Union[bytes, asyncio.StreamReader, AsyncIterable[bytes]] Handler = TypeVar("Handler", bound=Callable[..., None]) diff --git a/ffmpeg/utils.py b/ffmpeg/utils.py index ce70169..9d3c65b 100644 --- a/ffmpeg/utils.py +++ b/ffmpeg/utils.py @@ -5,9 +5,10 @@ import subprocess import sys from datetime import timedelta -from typing import IO, Any, Iterable +from typing import IO, TYPE_CHECKING, Any, AnyStr, Iterable -from ffmpeg import types +if TYPE_CHECKING: + from ffmpeg import types def parse_time(time: str) -> timedelta: @@ -21,6 +22,7 @@ def parse_time(time: str) -> timedelta: milliseconds=int(match.group(4)) * 10, ) + # https://github.com/FFmpeg/FFmpeg/blob/d38bf5e08e768722096723b5c8781cd2eb18d070/fftools/ffmpeg.c#L618C53-L618C56 def parse_size(item: str) -> int: if "kB" in item: @@ -30,18 +32,15 @@ def parse_size(item: str) -> int: else: raise ValueError(f"Unknown size format: {item}") -def is_windows() -> bool: - return sys.platform == "win32" - - -def create_subprocess(*args: Any, **kwargs: Any) -> subprocess.Popen: - # On Windows, CREATE_NEW_PROCESS_GROUP flag is required to use CTRL_BREAK_EVENT signal, - # which is required to gracefully terminate the FFmpeg process. - # Reference: https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal - if is_windows(): - kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore - return subprocess.Popen(*args, **kwargs) +class create_subprocess(subprocess.Popen[AnyStr]): + def __init__(self, *args: Any, creationflags: int = 0, **kwargs: Any): + # On Windows, CREATE_NEW_PROCESS_GROUP flag is required to use CTRL_BREAK_EVENT signal, + # which is required to gracefully terminate the FFmpeg process. + # Reference: https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal + if sys.platform == "win32": + creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP + super().__init__(*args, creationflags=creationflags, **kwargs) def ensure_io(stream: types.Stream) -> IO[bytes]: diff --git a/setup.cfg b/setup.cfg index 337c2cc..095d255 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,5 +22,5 @@ classifiers = install_requires = pyee typing_extensions -python_requires = >=3.7 +python_requires = >=3.9 packages = find: \ No newline at end of file diff --git a/tests/test_asyncio_async_iterable.py b/tests/test_asyncio_async_iterable.py new file mode 100644 index 0000000..9c9082f --- /dev/null +++ b/tests/test_asyncio_async_iterable.py @@ -0,0 +1,47 @@ +import asyncio +from pathlib import Path + +import pytest +from helpers import probe + +from ffmpeg.asyncio import FFmpeg + +epsilon = 0.25 + + +async def yield_async_chunks(source_path: Path, sleep: float = 0.001, sleep_every: int = 1000): + with open(source_path, "rb") as source_file: + for i, source_bytes_chunk in enumerate(source_file): + yield source_bytes_chunk + if i % sleep_every == 0: + await asyncio.sleep(sleep) + + +@pytest.mark.asyncio +async def test_async_iterable_input_via_stdin( + assets_path: Path, + tmp_path: Path, +): + source_path = assets_path / "pier-39.ts" + target_path = tmp_path / "pier-39.mp4" + + ffmpeg = ( + FFmpeg() + .option("y") + .input("pipe:0") + .output( + str(target_path), + codec="copy", + ) + ) + + await ffmpeg.execute(yield_async_chunks(source_path)) + + source = probe(source_path) + target = probe(target_path) + + assert abs(float(source["format"]["duration"]) - float(target["format"]["duration"])) <= epsilon + assert "mp4" in target["format"]["format_name"] + + assert source["streams"][0]["codec_name"] == target["streams"][0]["codec_name"] + assert source["streams"][1]["codec_name"] == target["streams"][1]["codec_name"]