Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve type-hints, support any AsyncIterable[bytes] as input, remove support for EOL python versions #69

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ffmpeg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,13 @@
from .progress import Progress

__version__ = "2.0.12"

__all__ = [
"FFmpeg",
"FFmpegAlreadyExecuted",
"FFmpegError",
"FFmpegFileNotFound",
"FFmpegInvalidCommand",
"FFmpegUnsupportedCodec",
"Progress",
]
2 changes: 2 additions & 0 deletions ffmpeg/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .ffmpeg import FFmpeg

__all__ = ["FFmpeg"]
49 changes: 25 additions & 24 deletions ffmpeg/asyncio/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
import os
import signal
import subprocess
from typing import Optional, Union
import sys
from typing import TYPE_CHECKING, Any, Optional, Union

from pyee.asyncio import AsyncIOEventEmitter
from typing_extensions import Self

from ffmpeg import types
from ffmpeg.asyncio.utils import create_subprocess, ensure_stream_reader, read_stream, readlines
from ffmpeg.asyncio.utils import create_subprocess, ensure_async_iterable, read_stream, readlines
from ffmpeg.ffmpeg import FFmpegAlreadyExecuted, FFmpegError
from ffmpeg.options import Options
from ffmpeg.progress import Tracker
from ffmpeg.utils import is_windows

if TYPE_CHECKING:
from ffmpeg import types


class FFmpeg(AsyncIOEventEmitter):
Expand All @@ -34,7 +36,7 @@ def __init__(self, executable: str = "ffmpeg"):
self._executed: bool = False
self._terminated: bool = False

self._tracker = Tracker(self) # type: ignore
self._tracker = Tracker(self)

self.once("error", self._reraise_exception)

Expand Down Expand Up @@ -62,7 +64,7 @@ def option(self, key: str, value: Optional[types.Option] = None) -> Self:

def input(
self,
url: Union[str, os.PathLike],
url: Union[str, os.PathLike[str]],
options: Optional[dict[str, Optional[types.Option]]] = None,
**kwargs: Optional[types.Option],
) -> Self:
Expand Down Expand Up @@ -105,7 +107,7 @@ def input(

def output(
self,
url: Union[str, os.PathLike],
url: Union[str, os.PathLike[str]],
options: Optional[dict[str, Optional[types.Option]]] = None,
**kwargs: Optional[types.Option],
) -> Self:
Expand Down Expand Up @@ -146,9 +148,7 @@ def output(
self._options.output(url, options, **kwargs)
return self

async def execute(
self, stream: Optional[Union[bytes, asyncio.StreamReader]] = None, timeout: Optional[float] = None
) -> bytes:
async def execute(self, stream: Optional[types.AsyncStream] = None, timeout: Optional[float] = None) -> bytes:
"""Execute FFmpeg using specified global options and files.

Args:
Expand All @@ -170,24 +170,25 @@ async def execute(
self._terminated = False

if stream is not None:
stream = ensure_stream_reader(stream)
_stream = ensure_async_iterable(stream)
else:
_stream = None

self.emit("start", self.arguments)

self._process = await create_subprocess(
*self.arguments,
stdin=subprocess.PIPE if stream is not None else None,
stdin=subprocess.PIPE if _stream is not None else None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

self._executed = True
tasks = [
asyncio.create_task(self._write_stdin(stream)),
asyncio.create_task(self._read_stdout()),
asyncio.create_task(self._handle_stderr()),
asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout)),
]
stdin_task = asyncio.create_task(self._write_stdin(_stream))
stdout_task = asyncio.create_task(self._read_stdout())
stderr_task = asyncio.create_task(self._handle_stderr())
process_task = asyncio.create_task(asyncio.wait_for(self._process.wait(), timeout=timeout))
tasks: list[asyncio.Task[Any]] = [stdin_task, stdout_task, stderr_task, process_task]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
self._executed = False

Expand All @@ -205,9 +206,9 @@ async def execute(
elif self._terminated:
self.emit("terminated")
else:
raise FFmpegError.create(message=tasks[2].result(), arguments=self.arguments)
raise FFmpegError.create(message=stderr_task.result(), arguments=self.arguments)

return tasks[1].result()
return stdout_task.result()

def terminate(self):
"""Gracefully terminate the running FFmpeg process.
Expand All @@ -219,24 +220,24 @@ def terminate(self):
raise FFmpegError("FFmpeg is not executed", arguments=self.arguments)

sigterm = signal.SIGTERM
if is_windows():
if sys.platform == "win32":
# On Windows, SIGTERM is an alias for TerminateProcess().
# To gracefully terminate the FFmpeg process, we should use CTRL_BREAK_EVENT signal.
# References:
# - https://docs.python.org/3.10/library/subprocess.html#subprocess.Popen.send_signal
# - https://github.com/FFmpeg/FFmpeg/blob/release/5.1/fftools/ffmpeg.c#L371
sigterm = signal.CTRL_BREAK_EVENT # type: ignore
sigterm = signal.CTRL_BREAK_EVENT

self._terminated = True
self._process.send_signal(sigterm)

async def _write_stdin(self, stream: Optional[asyncio.StreamReader]):
async def _write_stdin(self, stream: Optional[types.AsyncIterable[bytes]]):
if stream is None:
return

assert self._process.stdin is not None

async for chunk in read_stream(stream, size=io.DEFAULT_BUFFER_SIZE):
async for chunk in stream:
self._process.stdin.write(chunk)
await self._process.stdin.drain()

Expand Down
38 changes: 24 additions & 14 deletions ffmpeg/asyncio/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,31 @@
import io
import re
import subprocess
from typing import Any, AsyncIterable, Awaitable
import sys
from functools import wraps
from typing import Any, AsyncIterable

from ffmpeg import types
from ffmpeg.utils import is_windows


def create_subprocess(*args: Any, **kwargs: Any) -> Awaitable[asyncio.subprocess.Process]:
@wraps(asyncio.create_subprocess_exec)
def create_subprocess(*args: Any, creationflags: int = 0, **kwargs: Any):
# On Windows, CREATE_NEW_PROCESS_GROUP flag is required to use CTRL_BREAK_EVENT signal,
# which is required to gracefully terminate the FFmpeg process.
# Reference: https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.subprocess.Process.send_signal
if is_windows():
kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore
if sys.platform == "win32":
creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP

return asyncio.create_subprocess_exec(*args, **kwargs)
return asyncio.create_subprocess_exec(*args, creationflags=creationflags, **kwargs)


def ensure_stream_reader(stream: types.AsyncStream) -> asyncio.StreamReader:
if isinstance(stream, asyncio.StreamReader):
return stream

reader = asyncio.StreamReader()
reader.feed_data(stream)
reader.feed_eof()
async def read_bytes(stream: bytes, size: int = -1) -> AsyncIterable[bytes]:
if size == -1:
yield stream
return

return reader
for i in range(0, len(stream), size):
yield stream[i : i + size]


async def read_stream(stream: asyncio.StreamReader, size: int = -1) -> AsyncIterable[bytes]:
Expand All @@ -38,6 +38,16 @@ async def read_stream(stream: asyncio.StreamReader, size: int = -1) -> AsyncIter
yield chunk


def ensure_async_iterable(stream: types.AsyncStream) -> AsyncIterable[bytes]:
if isinstance(stream, bytes):
return read_bytes(stream, io.DEFAULT_BUFFER_SIZE)

if isinstance(stream, asyncio.StreamReader):
return read_stream(stream, io.DEFAULT_BUFFER_SIZE)

return stream


async def readlines(stream: asyncio.StreamReader) -> AsyncIterable[bytes]:
pattern = re.compile(rb"[\r\n]+")

Expand Down
38 changes: 20 additions & 18 deletions ffmpeg/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
import os
import signal
import subprocess
from typing import IO, Optional, Union
import sys
from typing import IO, TYPE_CHECKING, Any, Optional, Union

from pyee import EventEmitter
from typing_extensions import Self

from ffmpeg import types
from ffmpeg.errors import FFmpegAlreadyExecuted, FFmpegError
from ffmpeg.options import Options
from ffmpeg.progress import Tracker
from ffmpeg.utils import create_subprocess, ensure_io, is_windows, read_stream, readlines
from ffmpeg.utils import create_subprocess, ensure_io, read_stream, readlines

if TYPE_CHECKING:
from ffmpeg import types


class FFmpeg(EventEmitter):
Expand All @@ -33,14 +36,14 @@ def __init__(self, executable: str = "ffmpeg"):
self._executed: bool = False
self._terminated: bool = False

self._tracker = Tracker(self) # type: ignore
self._tracker = Tracker(self)

@property
def arguments(self) -> list[str]:
"""Return a list of arguments to be used when executing FFmpeg.

Returns:
A lit of arguments to be used when executing FFmpeg.
A list of arguments to be used when executing FFmpeg.
"""
return [self._executable, *self._options.build()]

Expand All @@ -59,7 +62,7 @@ def option(self, key: str, value: Optional[types.Option] = None) -> Self:

def input(
self,
url: Union[str, os.PathLike],
url: Union[str, os.PathLike[str]],
options: Optional[dict[str, Optional[types.Option]]] = None,
**kwargs: Optional[types.Option],
) -> Self:
Expand Down Expand Up @@ -102,7 +105,7 @@ def input(

def output(
self,
url: Union[str, os.PathLike],
url: Union[str, os.PathLike[str]],
options: Optional[dict[str, Optional[types.Option]]] = None,
**kwargs: Optional[types.Option],
) -> Self:
Expand Down Expand Up @@ -143,7 +146,7 @@ def output(
self._options.output(url, options, **kwargs)
return self

def execute(self, stream: Optional[Union[bytes, IO[bytes]]] = None, timeout: Optional[float] = None) -> bytes:
def execute(self, stream: Optional[types.Stream] = None, timeout: Optional[float] = None) -> bytes:
"""Execute FFmpeg using specified global options and files.

Args:
Expand Down Expand Up @@ -179,12 +182,11 @@ def execute(self, stream: Optional[Union[bytes, IO[bytes]]] = None, timeout: Opt

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
self._executed = True
futures = [
executor.submit(self._write_stdin, stream),
executor.submit(self._read_stdout),
executor.submit(self._handle_stderr),
executor.submit(self._process.wait, timeout),
]
stdin_task = executor.submit(self._write_stdin, stream)
stdout_task = executor.submit(self._read_stdout)
stderr_task = executor.submit(self._handle_stderr)
process_task = executor.submit(self._process.wait, timeout)
futures: list[concurrent.futures.Future[Any]] = [stdin_task, stdout_task, stderr_task, process_task]
done, pending = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION)
self._executed = False

Expand All @@ -201,9 +203,9 @@ def execute(self, stream: Optional[Union[bytes, IO[bytes]]] = None, timeout: Opt
elif self._terminated:
self.emit("terminated")
else:
raise FFmpegError.create(message=futures[2].result(), arguments=self.arguments)
raise FFmpegError.create(message=stderr_task.result(), arguments=self.arguments)

return futures[1].result()
return stdout_task.result()

def terminate(self):
"""Gracefully terminate the running FFmpeg process.
Expand All @@ -215,13 +217,13 @@ def terminate(self):
raise FFmpegError("FFmpeg is not executed", arguments=self.arguments)

sigterm = signal.SIGTERM
if is_windows():
if sys.platform == "win32":
# On Windows, SIGTERM is an alias for TerminateProcess().
# To gracefully terminate the FFmpeg process, we should use CTRL_BREAK_EVENT signal.
# References:
# - https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
# - https://github.com/FFmpeg/FFmpeg/blob/release/5.1/fftools/ffmpeg.c#L371
sigterm = signal.CTRL_BREAK_EVENT # type: ignore
sigterm = signal.CTRL_BREAK_EVENT

self._terminated = True
self._process.send_signal(sigterm)
Expand Down
20 changes: 11 additions & 9 deletions ffmpeg/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

import os
from dataclasses import dataclass
from typing import Iterable, Optional, Union
from typing import TYPE_CHECKING, Iterable, Optional, Union

from ffmpeg import types
from ffmpeg.file import InputFile, OutputFile

if TYPE_CHECKING:
from ffmpeg import types


def _unpack_options(options: dict[str, Optional[types.Option]]) -> Iterable[Option]:
for key, values in options.items():
if not isinstance(values, (list, set, tuple)):
values = [values]

for value in values:
yield Option(key, value)
if isinstance(values, (list, set, tuple)):
for value in values:
yield Option(key, value)
else:
yield Option(key, values)


@dataclass(frozen=True)
Expand All @@ -40,7 +42,7 @@ def option(self, key: str, value: Optional[types.Option] = None):

def input(
self,
url: Union[str, os.PathLike],
url: Union[str, os.PathLike[str]],
options: Optional[dict[str, Optional[types.Option]]] = None,
**kwargs: Optional[types.Option],
):
Expand All @@ -53,7 +55,7 @@ def input(

def output(
self,
url: Union[str, os.PathLike],
url: Union[str, os.PathLike[str]],
options: Optional[dict[str, Optional[types.Option]]] = None,
**kwargs: Optional[types.Option],
):
Expand Down
Loading