Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill in some type annotations #237

Merged
merged 7 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions src/evdev/device.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import contextlib
import os
from typing import NamedTuple, Tuple, Union
from typing import Dict, Iterator, List, Literal, NamedTuple, Tuple, Union, overload

from . import _input, ecodes, util

Expand Down Expand Up @@ -95,7 +95,7 @@ class DeviceInfo(NamedTuple):
product: int
version: int

def __str__(self):
def __str__(self) -> str:
msg = "bus: {:04x}, vendor {:04x}, product {:04x}, version {:04x}"
return msg.format(*self) # pylint: disable=not-an-iterable

Expand Down Expand Up @@ -151,7 +151,7 @@ def __init__(self, dev: Union[str, bytes, os.PathLike]):
#: The number of force feedback effects the device can keep in its memory.
self.ff_effects_count = _input.ioctl_EVIOCGEFFECTS(self.fd)

def __del__(self):
def __del__(self) -> None:
if hasattr(self, "fd") and self.fd is not None:
try:
self.close()
Expand All @@ -176,7 +176,13 @@ def _capabilities(self, absinfo: bool = True):

return res

def capabilities(self, verbose: bool = False, absinfo: bool = True):
@overload
def capabilities(self, verbose: Literal[False] = ..., absinfo: bool = ...) -> Dict[int, List[int]]:
...
@overload
def capabilities(self, verbose: Literal[True], absinfo: bool = ...) -> Dict[Tuple[str, int], List[Tuple[str, int]]]:
...
def capabilities(self, verbose: bool = False, absinfo: bool = True) -> Union[Dict[int, List[int]], Dict[Tuple[str, int], List[Tuple[str, int]]]]:
"""
Return the event types that this device supports as a mapping of
supported event types to lists of handled event codes.
Expand Down Expand Up @@ -263,7 +269,7 @@ def leds(self, verbose: bool = False):

return leds

def set_led(self, led_num: int, value: int):
def set_led(self, led_num: int, value: int) -> None:
"""
Set the state of the selected LED.

Expand All @@ -279,26 +285,26 @@ def __eq__(self, other):
"""
return isinstance(other, self.__class__) and self.info == other.info and self.path == other.path

def __str__(self):
def __str__(self) -> str:
msg = 'device {}, name "{}", phys "{}", uniq "{}"'
return msg.format(self.path, self.name, self.phys, self.uniq or "")

def __repr__(self):
def __repr__(self) -> str:
msg = (self.__class__.__name__, self.path)
return "{}({!r})".format(*msg)

def __fspath__(self):
return self.path

def close(self):
def close(self) -> None:
if self.fd > -1:
try:
super().close()
os.close(self.fd)
finally:
self.fd = -1

def grab(self):
def grab(self) -> None:
"""
Grab input device using ``EVIOCGRAB`` - other applications will
be unable to receive events until the device is released. Only
Expand All @@ -311,7 +317,7 @@ def grab(self):

_input.ioctl_EVIOCGRAB(self.fd, 1)

def ungrab(self):
def ungrab(self) -> None:
"""
Release device if it has been already grabbed (uses `EVIOCGRAB`).

Expand All @@ -324,7 +330,7 @@ def ungrab(self):
_input.ioctl_EVIOCGRAB(self.fd, 0)

@contextlib.contextmanager
def grab_context(self):
def grab_context(self) -> Iterator[None]:
"""
A context manager for the duration of which only the current
process will be able to receive events from the device.
Expand All @@ -342,7 +348,7 @@ def upload_effect(self, effect: "ff.Effect"):
ff_id = _input.upload_effect(self.fd, data)
return ff_id

def erase_effect(self, ff_id):
def erase_effect(self, ff_id) -> None:
"""
Erase a force effect from a force feedback device. This also
stops the effect.
Expand Down Expand Up @@ -402,7 +408,7 @@ def absinfo(self, axis_num: int):
"""
return AbsInfo(*_input.ioctl_EVIOCGABS(self.fd, axis_num))

def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None):
def set_absinfo(self, axis_num: int, value=None, min=None, max=None, fuzz=None, flat=None, resolution=None) -> None:
"""
Update :class:`AbsInfo` values. Only specified values will be overwritten.

Expand Down
87 changes: 47 additions & 40 deletions src/evdev/eventio_async.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,57 @@
import asyncio
import select
import sys

from . import eventio
from .events import InputEvent

# needed for compatibility
from .eventio import EvdevError

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing import Any as Self


class ReadIterator:
def __init__(self, device):
self.current_batch = iter(())
self.device = device

# Standard iterator protocol.
def __iter__(self) -> Self:
return self

def __next__(self) -> InputEvent:
try:
# Read from the previous batch of events.
return next(self.current_batch)
except StopIteration:
r, w, x = select.select([self.device.fd], [], [])
self.current_batch = self.device.read()
return next(self.current_batch)

def __aiter__(self) -> Self:
return self

def __anext__(self) -> "asyncio.Future[InputEvent]":
future = asyncio.Future()
try:
# Read from the previous batch of events.
future.set_result(next(self.current_batch))
except StopIteration:

def next_batch_ready(batch):
try:
self.current_batch = batch.result()
future.set_result(next(self.current_batch))
except Exception as e:
future.set_exception(e)

self.device.async_read().add_done_callback(next_batch_ready)
return future


class EventIO(eventio.EventIO):
def _do_when_readable(self, callback):
Expand Down Expand Up @@ -42,7 +88,7 @@ def async_read(self):
self._do_when_readable(lambda: self._set_result(future, self.read))
return future

def async_read_loop(self):
def async_read_loop(self) -> ReadIterator:
"""
Return an iterator that yields input events. This iterator is
compatible with the ``async for`` syntax.
Expand All @@ -58,42 +104,3 @@ def close(self):
# no event loop present, so there is nothing to
# remove the reader from. Ignore
pass


class ReadIterator:
def __init__(self, device):
self.current_batch = iter(())
self.device = device

# Standard iterator protocol.
def __iter__(self):
return self

def __next__(self):
try:
# Read from the previous batch of events.
return next(self.current_batch)
except StopIteration:
r, w, x = select.select([self.device.fd], [], [])
self.current_batch = self.device.read()
return next(self.current_batch)

def __aiter__(self):
return self

def __anext__(self):
future = asyncio.Future()
try:
# Read from the previous batch of events.
future.set_result(next(self.current_batch))
except StopIteration:

def next_batch_ready(batch):
try:
self.current_batch = batch.result()
future.set_result(next(self.current_batch))
except Exception as e:
future.set_exception(e)

self.device.async_read().add_done_callback(next_batch_ready)
return future
4 changes: 2 additions & 2 deletions src/evdev/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Union, List

from . import ecodes
from .events import event_factory
from .events import InputEvent, event_factory


def list_devices(input_device_dir: Union[str, bytes, os.PathLike] = "/dev/input") -> List[str]:
Expand All @@ -32,7 +32,7 @@ def is_device(fn: Union[str, bytes, os.PathLike]) -> bool:
return True


def categorize(event):
def categorize(event: InputEvent) -> InputEvent:
"""
Categorize an event according to its type.

Expand Down