From 435864cb5013bdb49be4be670c2ebd8f46ca630f Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 1 Feb 2025 13:16:04 +0100 Subject: [PATCH 01/28] Removed import state functions in favor of an import state dataclass. Makes this more readable in my opinion, we also now have typehints for the import state. --- beets/importer.py | 341 +++++++++++++++++++++++----------------------- 1 file changed, 169 insertions(+), 172 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index b30e6399b3..fcf6835cab 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -1,18 +1,3 @@ -# This file is part of beets. -# Copyright 2016, Adrian Sampson. -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - - """Provides the basic, interface-agnostic workflow for importing and autotagging music files. """ @@ -23,17 +8,20 @@ import re import shutil import time +from abc import ABC, abstractmethod from bisect import bisect_left, insort from collections import defaultdict -from contextlib import contextmanager +from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp +from typing import Iterable, Sequence import mediafile from beets import autotag, config, dbcore, library, logging, plugins, util from beets.util import ( MoveOperation, + PathLike, ancestry, displayable_path, normpath, @@ -49,8 +37,7 @@ QUEUE_SIZE = 128 SINGLE_ARTIST_THRESH = 0.25 -PROGRESS_KEY = "tagprogress" -HISTORY_KEY = "taghistory" + # Usually flexible attributes are preserved (i.e., not updated) during # reimports. The following two lists (globally) change this behaviour for # certain fields. To alter these lists only when a specific plugin is in use, @@ -80,142 +67,163 @@ class ImportAbortError(Exception): pass -# Utilities. - - -def _open_state(): - """Reads the state file, returning a dictionary.""" - try: - with open(config["statefile"].as_filename(), "rb") as f: - return pickle.load(f) - except Exception as exc: - # The `pickle` module can emit all sorts of exceptions during - # unpickling, including ImportError. We use a catch-all - # exception to avoid enumerating them all (the docs don't even have a - # full list!). - log.debug("state file could not be read: {0}", exc) - return {} +@dataclass +class ImportState: + """Representing the progress of an import task. + Opens the state file on creation of the class. If you want + to ensure the state is written to disk, you should use the + context manager protocol. -def _save_state(state): - """Writes the state dictionary out to disk.""" - try: - with open(config["statefile"].as_filename(), "wb") as f: - pickle.dump(state, f) - except OSError as exc: - log.error("state file could not be written: {0}", exc) + Tagprogress allows long tagging tasks to be resumed when they pause. + Taghistory is a utility for manipulating the "incremental" import log. + This keeps track of all directories that were ever imported, which + allows the importer to only import new stuff. -# Utilities for reading and writing the beets progress file, which -# allows long tagging tasks to be resumed when they pause (or crash). + Usage + ----- + ``` + # Readonly + progress = ImportState().tagprogress - -def progress_read(): - state = _open_state() - return state.setdefault(PROGRESS_KEY, {}) - - -@contextmanager -def progress_write(): - state = _open_state() - progress = state.setdefault(PROGRESS_KEY, {}) - yield progress - _save_state(state) - - -def progress_add(toppath, *paths): - """Record that the files under all of the `paths` have been imported - under `toppath`. + # Read and write + with ImportState() as state: + state["key"] = "value" + ``` """ - with progress_write() as state: - imported = state.setdefault(toppath, []) - for path in paths: - # Normally `progress_add` will be called with the path - # argument increasing. This is because of the ordering in - # `albums_in_dir`. We take advantage of that to make the - # code faster - if imported and imported[len(imported) - 1] <= path: - imported.append(path) - else: - insort(imported, path) + tagprogress: dict + taghistory: set + path: PathLike -def progress_element(toppath, path): - """Return whether `path` has been imported in `toppath`.""" - state = progress_read() - if toppath not in state: - return False - imported = state[toppath] - i = bisect_left(imported, path) - return i != len(imported) and imported[i] == path - - -def has_progress(toppath): - """Return `True` if there exist paths that have already been - imported under `toppath`. - """ - state = progress_read() - return toppath in state + def __init__(self, readonly=False, path: PathLike | None = None): + self.path = path or config["statefile"].as_filename() + self._open() + def __enter__(self): + return self -def progress_reset(toppath): - with progress_write() as state: - if toppath in state: - del state[toppath] + def __exit__(self, exc_type, exc_val, exc_tb): + self._save() + def _open( + self, + ): + try: + with open(self.path, "rb") as f: + state = pickle.load(f) + # Read the states + self.tagprogress = state.get("tagprogress", {}) + self.taghistory = state.get("taghistory", set()) + except Exception as exc: + # The `pickle` module can emit all sorts of exceptions during + # unpickling, including ImportError. We use a catch-all + # exception to avoid enumerating them all (the docs don't even have a + # full list!). + log.debug("state file could not be read: {0}", exc) -# Similarly, utilities for manipulating the "incremental" import log. -# This keeps track of all directories that were ever imported, which -# allows the importer to only import new stuff. - + def _save(self): + try: + with open(self.path, "wb") as f: + pickle.dump( + { + "tagprogress": self.tagprogress, + "taghistory": self.taghistory, + }, + f, + ) + except OSError as exc: + log.error("state file could not be written: {0}", exc) -def history_add(paths): - """Indicate that the import of the album in `paths` is completed and - should not be repeated in incremental imports. - """ - state = _open_state() - if HISTORY_KEY not in state: - state[HISTORY_KEY] = set() + # -------------------------------- Tagprogress ------------------------------- # - state[HISTORY_KEY].add(tuple(paths)) + def progress_add(self, toppath: PathLike, *paths: list[PathLike]): + """Record that the files under all of the `paths` have been imported + under `toppath`. + """ + with self as state: + imported = state.tagprogress.setdefault(toppath, []) + for path in paths: + if imported and imported[-1] <= path: + imported.append(path) + else: + insort(imported, path) - _save_state(state) + def progress_has_element(self, toppath: PathLike, path: PathLike) -> bool: + """Return whether `path` has been imported in `toppath`.""" + imported = self.tagprogress.get(toppath, []) + i = bisect_left(imported, path) + return i != len(imported) and imported[i] == path + def progress_has(self, toppath: PathLike) -> bool: + """Return `True` if there exist paths that have already been + imported under `toppath`. + """ + return toppath in self.tagprogress -def history_get(): - """Get the set of completed path tuples in incremental imports.""" - state = _open_state() - if HISTORY_KEY not in state: - return set() - return state[HISTORY_KEY] + def progress_reset(self, toppath: PathLike): + """Reset the progress for `toppath`.""" + with self as state: + if toppath in state.tagprogress: + del state.tagprogress[toppath] + # -------------------------------- Taghistory -------------------------------- # -# Abstract session class. + def history_add(self, paths: list[PathLike]): + """Add the paths to the history.""" + with self as state: + state.taghistory.add(tuple(paths)) -class ImportSession: +class ImportSession(ABC): """Controls an import action. Subclasses should implement methods to communicate with the user or otherwise make decisions. """ - def __init__(self, lib, loghandler, paths, query): - """Create a session. `lib` is a Library object. `loghandler` is a - logging.Handler. Either `paths` or `query` is non-null and indicates - the source of files to be imported. + logger: logging.Logger + paths: list[bytes] | None + lib: library.Library + + _is_resuming: dict[bytes, bool] + _merged_items: set + _merged_dirs: set + + def __init__( + self, + lib: library.Library, + loghandler: logging.Handler | None, + paths: Iterable[PathLike] | None, + query: dbcore.Query | None, + ): + """Create a session. + + Parameters + ---------- + lib : library.Library + The library instance to which items will be imported. + loghandler : logging.Handler or None + A logging handler to use for the session's logger. If None, a + NullHandler will be used. + paths : os.PathLike or None + The paths to be imported. If None, no paths are specified. + query : dbcore.Query or None + A query to filter items for import. If None, no query is applied. """ self.lib = lib self.logger = self._setup_logging(loghandler) - self.paths = paths self.query = query self._is_resuming = {} self._merged_items = set() self._merged_dirs = set() # Normalize the paths. - if self.paths: - self.paths = list(map(normpath, self.paths)) + if paths is not None: + self.paths = list(map(normpath, paths)) + else: + self.paths = None - def _setup_logging(self, loghandler): + def _setup_logging(self, loghandler: logging.Handler | None): logger = logging.getLogger(__name__) logger.propagate = False if not loghandler: @@ -243,9 +251,7 @@ def set_config(self, config): iconfig["incremental"] = False if iconfig["reflink"]: - iconfig["reflink"] = iconfig["reflink"].as_choice( - ["auto", True, False] - ) + iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False]) # Copy, move, reflink, link, and hardlink are mutually exclusive. if iconfig["move"]: @@ -302,17 +308,21 @@ def log_choice(self, task, duplicate=False): elif task.choice_flag is action.SKIP: self.tag_log("skip", paths) + @abstractmethod def should_resume(self, path): - raise NotImplementedError + raise NotImplementedError("Inheriting class must implement `should_resume`") + @abstractmethod def choose_match(self, task): - raise NotImplementedError + raise NotImplementedError("Inheriting class must implement `choose_match`") + @abstractmethod def resolve_duplicate(self, task, found_duplicates): - raise NotImplementedError + raise NotImplementedError("Inheriting class must implement `resolve_duplicate`") + @abstractmethod def choose_item(self, task): - raise NotImplementedError + raise NotImplementedError("Inheriting class must implement `choose_item`") def run(self): """Run the import task.""" @@ -366,12 +376,13 @@ def run(self): # Incremental and resumed imports - def already_imported(self, toppath, paths): + def already_imported(self, toppath: PathLike, paths: Sequence[PathLike]): """Returns true if the files belonging to this task have already been imported in a previous session. """ + state = ImportState() if self.is_resuming(toppath) and all( - [progress_element(toppath, p) for p in paths] + [state.progress_has_element(toppath, p) for p in paths] ): return True if self.config["incremental"] and tuple(paths) in self.history_dirs: @@ -379,13 +390,15 @@ def already_imported(self, toppath, paths): return False + _history_dirs = None + @property def history_dirs(self): - if not hasattr(self, "_history_dirs"): - self._history_dirs = history_get() + if self._history_dirs is None: + self._history_dirs = ImportState().taghistory return self._history_dirs - def already_merged(self, paths): + def already_merged(self, paths: Sequence[PathLike]): """Returns true if all the paths being imported were part of a merge during previous tasks. """ @@ -394,7 +407,7 @@ def already_merged(self, paths): return False return True - def mark_merged(self, paths): + def mark_merged(self, paths: Sequence[PathLike]): """Mark paths and directories as merged for future reimport tasks.""" self._merged_items.update(paths) dirs = { @@ -403,30 +416,31 @@ def mark_merged(self, paths): } self._merged_dirs.update(dirs) - def is_resuming(self, toppath): + def is_resuming(self, toppath: PathLike): """Return `True` if user wants to resume import of this path. You have to call `ask_resume` first to determine the return value. """ - return self._is_resuming.get(toppath, False) + return self._is_resuming.get(normpath(toppath), False) - def ask_resume(self, toppath): + def ask_resume(self, toppath: PathLike): """If import of `toppath` was aborted in an earlier session, ask user if they want to resume the import. Determines the return value of `is_resuming(toppath)`. """ - if self.want_resume and has_progress(toppath): + state = ImportState() + if self.want_resume and state.progress_has(toppath): # Either accept immediately or prompt for input to decide. if self.want_resume is True or self.should_resume(toppath): log.warning( "Resuming interrupted import of {0}", - util.displayable_path(toppath), + util.displayable_path(normpath(toppath)), ) - self._is_resuming[toppath] = True + self._is_resuming[normpath(toppath)] = True else: # Clear progress; we're starting from the top. - progress_reset(toppath) + state.progress_reset(toppath) # The importer task class. @@ -528,12 +542,12 @@ def save_progress(self): finished. """ if self.toppath: - progress_add(self.toppath, *self.paths) + ImportState().progress_add(self.toppath, *self.paths) def save_history(self): """Save the directory in the history for incremental imports.""" if self.paths: - history_add(self.paths) + ImportState().history_add(self.paths) # Logical decisions. @@ -593,9 +607,7 @@ def remove_duplicates(self, lib): for item in duplicate_items: item.remove() if lib.directory in util.ancestry(item.path): - log.debug( - "deleting duplicate {0}", util.displayable_path(item.path) - ) + log.debug("deleting duplicate {0}", util.displayable_path(item.path)) util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) @@ -627,7 +639,8 @@ def finalize(self, session): self.save_progress() if session.config["incremental"] and not ( # Should we skip recording to incremental list? - self.skip and session.config["incremental_skip_later"] + self.skip + and session.config["incremental_skip_later"] ): self.save_history() @@ -684,9 +697,7 @@ def lookup_candidates(self): candidate IDs are stored in self.search_ids: if present, the initial lookup is restricted to only those IDs. """ - artist, album, prop = autotag.tag_album( - self.items, search_ids=self.search_ids - ) + artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids) self.cur_artist = artist self.cur_album = album self.candidates = prop.candidates @@ -737,8 +748,7 @@ def align_album_level_fields(self): [i.albumartist or i.artist for i in self.items] ) if freq == len(self.items) or ( - freq > 1 - and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH + freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH ): # Single-artist album. changes["albumartist"] = plur_albumartist @@ -832,15 +842,10 @@ def record_replaced(self, lib): self.replaced_albums = defaultdict(list) replaced_album_ids = set() for item in self.imported_items(): - dup_items = list( - lib.items(dbcore.query.BytesQuery("path", item.path)) - ) + dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path))) self.replaced_items[item] = dup_items for dup_item in dup_items: - if ( - not dup_item.album_id - or dup_item.album_id in replaced_album_ids - ): + if not dup_item.album_id or dup_item.album_id in replaced_album_ids: continue replaced_album = dup_item._cached_album if replaced_album: @@ -893,8 +898,7 @@ def _reduce_and_log(new_obj, existing_fields, overwrite_keys): self.album.artpath = replaced_album.artpath self.album.store() log.debug( - "Reimported album {}. Preserving attribute ['added']. " - "Path: {}", + "Reimported album {}. Preserving attribute ['added']. " "Path: {}", self.album.id, displayable_path(self.album.path), ) @@ -1094,10 +1098,10 @@ def save_history(self): def save_progress(self): if self.paths is None: # "Done" sentinel. - progress_reset(self.toppath) + ImportState().progress_reset(self.toppath) else: # "Directory progress" sentinel for singletons - progress_add(self.toppath, *self.paths) + ImportState().progress_add(self.toppath, *self.paths) def skip(self): return True @@ -1308,9 +1312,7 @@ def paths(self): def singleton(self, path): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): - log.debug( - "Skipping previously-imported path: {0}", displayable_path(path) - ) + log.debug("Skipping previously-imported path: {0}", displayable_path(path)) self.skipped += 1 return None @@ -1333,9 +1335,7 @@ def album(self, paths, dirs=None): dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug( - "Skipping previously-imported path: {0}", displayable_path(dirs) - ) + log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) self.skipped += 1 return None @@ -1364,8 +1364,7 @@ def unarchive(self): if not (self.session.config["move"] or self.session.config["copy"]): log.warning( - "Archive importing requires either " - "'copy' or 'move' to be enabled." + "Archive importing requires either " "'copy' or 'move' to be enabled." ) return @@ -1578,9 +1577,7 @@ def resolve_duplicates(session, task): if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG): found_duplicates = task.find_duplicates(session.lib) if found_duplicates: - log.debug( - "found duplicates: {}".format([o.id for o in found_duplicates]) - ) + log.debug("found duplicates: {}".format([o.id for o in found_duplicates])) # Get the default action to follow from config. duplicate_action = config["import"]["duplicate_action"].as_choice( From c81a2c9b1809c7c615b1f421a73ade63ac8b97e1 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 1 Feb 2025 13:25:25 +0100 Subject: [PATCH 02/28] Using 3.9 unions instead of new 3.10 style unions for typehints --- beets/importer.py | 16 ++++++++-------- test/plugins/test_lyrics.py | 19 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index fcf6835cab..af61afda26 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -14,7 +14,7 @@ from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp -from typing import Iterable, Sequence +from typing import Dict, Iterable, Optional, Sequence, Union import mediafile @@ -97,7 +97,7 @@ class ImportState: taghistory: set path: PathLike - def __init__(self, readonly=False, path: PathLike | None = None): + def __init__(self, readonly=False, path: Union[PathLike, None] = None): self.path = path or config["statefile"].as_filename() self._open() @@ -182,19 +182,19 @@ class ImportSession(ABC): """ logger: logging.Logger - paths: list[bytes] | None + paths: Union[list[bytes], None] lib: library.Library - _is_resuming: dict[bytes, bool] + _is_resuming: Dict[bytes, bool] _merged_items: set _merged_dirs: set def __init__( self, lib: library.Library, - loghandler: logging.Handler | None, - paths: Iterable[PathLike] | None, - query: dbcore.Query | None, + loghandler: Optional[logging.Handler], + paths: Optional[Sequence[PathLike]], + query: Optional[dbcore.Query], ): """Create a session. @@ -223,7 +223,7 @@ def __init__( else: self.paths = None - def _setup_logging(self, loghandler: logging.Handler | None): + def _setup_logging(self, loghandler: Optional[logging.Handler]): logger = logging.getLogger(__name__) logger.propagate = False if not loghandler: diff --git a/test/plugins/test_lyrics.py b/test/plugins/test_lyrics.py index c6d48c3bdb..1305a21d3b 100644 --- a/test/plugins/test_lyrics.py +++ b/test/plugins/test_lyrics.py @@ -22,7 +22,12 @@ from beets.library import Item from beets.test.helper import PluginMixin -from beetsplug import lyrics + +try: + from beetsplug import lyrics +except Exception: + pytest.skip("lyrics plugin couldn't be loaded", allow_module_level=True) + from .lyrics_pages import LyricsPage, lyrics_pages @@ -69,9 +74,7 @@ def test_search_empty(self, artist, title): ("横山克", "Masaru Yokoyama", ["Masaru Yokoyama"]), ], ) - def test_search_pairs_artists( - self, artist, artist_sort, expected_extra_artists - ): + def test_search_pairs_artists(self, artist, artist_sort, expected_extra_artists): item = Item(artist=artist, artist_sort=artist_sort, title="song") actual_artists = [a for a, _ in lyrics.search_pairs(item)] @@ -96,9 +99,7 @@ def test_search_pairs_artists( def test_search_pairs_titles(self, title, expected_extra_titles): item = Item(title=title, artist="A") - actual_titles = { - t: None for _, tit in lyrics.search_pairs(item) for t in tit - } + actual_titles = {t: None for _, tit in lyrics.search_pairs(item) for t in tit} assert list(actual_titles) == [title, *expected_extra_titles] @@ -240,9 +241,7 @@ def backend(self, lyrics_plugin): @pytest.fixture def lyrics_html(self, lyrics_root_dir, file_name): - return (lyrics_root_dir / f"{file_name}.txt").read_text( - encoding="utf-8" - ) + return (lyrics_root_dir / f"{file_name}.txt").read_text(encoding="utf-8") @pytest.mark.on_lyrics_update From a7ea60356b02526967bc2d83a05c920d60bd2356 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 1 Feb 2025 13:26:23 +0100 Subject: [PATCH 03/28] Init taghistory before loading --- beets/importer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/beets/importer.py b/beets/importer.py index af61afda26..dfc802a387 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -99,6 +99,8 @@ class ImportState: def __init__(self, readonly=False, path: Union[PathLike, None] = None): self.path = path or config["statefile"].as_filename() + self.tagprogress = {} + self.taghistory = set() self._open() def __enter__(self): From 6f2ee5c61451fde9602b695dfdef47d2eda42424 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 1 Feb 2025 13:37:24 +0100 Subject: [PATCH 04/28] Removed abc as the importer class is used directly sometimes... --- beets/importer.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index dfc802a387..e72aac514a 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -178,7 +178,7 @@ def history_add(self, paths: list[PathLike]): state.taghistory.add(tuple(paths)) -class ImportSession(ABC): +class ImportSession: """Controls an import action. Subclasses should implement methods to communicate with the user or otherwise make decisions. """ @@ -310,19 +310,15 @@ def log_choice(self, task, duplicate=False): elif task.choice_flag is action.SKIP: self.tag_log("skip", paths) - @abstractmethod def should_resume(self, path): raise NotImplementedError("Inheriting class must implement `should_resume`") - @abstractmethod def choose_match(self, task): raise NotImplementedError("Inheriting class must implement `choose_match`") - @abstractmethod def resolve_duplicate(self, task, found_duplicates): raise NotImplementedError("Inheriting class must implement `resolve_duplicate`") - @abstractmethod def choose_item(self, task): raise NotImplementedError("Inheriting class must implement `choose_item`") From c83f2e4e7127ae746b4f3d8107e10a4e9170f44b Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 1 Feb 2025 13:45:18 +0100 Subject: [PATCH 05/28] Recreating importstate class to imitate previous code, otherwise we have slightly different behavior. --- beets/importer.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index e72aac514a..3586445fdc 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -378,9 +378,8 @@ def already_imported(self, toppath: PathLike, paths: Sequence[PathLike]): """Returns true if the files belonging to this task have already been imported in a previous session. """ - state = ImportState() if self.is_resuming(toppath) and all( - [state.progress_has_element(toppath, p) for p in paths] + [ImportState().progress_has_element(toppath, p) for p in paths] ): return True if self.config["incremental"] and tuple(paths) in self.history_dirs: @@ -427,8 +426,7 @@ def ask_resume(self, toppath: PathLike): Determines the return value of `is_resuming(toppath)`. """ - state = ImportState() - if self.want_resume and state.progress_has(toppath): + if self.want_resume and ImportState().progress_has(toppath): # Either accept immediately or prompt for input to decide. if self.want_resume is True or self.should_resume(toppath): log.warning( @@ -438,7 +436,7 @@ def ask_resume(self, toppath: PathLike): self._is_resuming[normpath(toppath)] = True else: # Clear progress; we're starting from the top. - state.progress_reset(toppath) + ImportState().progress_reset(toppath) # The importer task class. From 09b15aaf524250acd9906e91d521d1e5fefe7755 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 1 Feb 2025 15:08:41 +0100 Subject: [PATCH 06/28] Added type hints for pipeline stage decorators --- beets/util/pipeline.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index d23b1bd10c..e79325fe2f 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -31,9 +31,13 @@ in place of any single coroutine. """ + import queue import sys from threading import Lock, Thread +from typing import Callable, Generator, Optional, Union + +from typing_extensions import TypeVar, TypeVarTuple, Unpack BUBBLE = "__PIPELINE_BUBBLE__" POISON = "__PIPELINE_POISON__" @@ -149,7 +153,17 @@ def multiple(messages): return MultiMessage(messages) -def stage(func): +A = TypeVarTuple("A") +T = TypeVar("T") +R = TypeVar("R") + + +def stage( + func: Callable[ + [Unpack[A], T], + R, + ], +): """Decorate a function to become a simple stage. >>> @stage @@ -163,7 +177,7 @@ def stage(func): [3, 4, 5] """ - def coro(*args): + def coro(*args: Unpack[A]) -> Generator[Union[R, T, None], T, None]: task = None while True: task = yield task @@ -172,7 +186,7 @@ def coro(*args): return coro -def mutator_stage(func): +def mutator_stage(func: Callable[[Unpack[A], T], None]): """Decorate a function that manipulates items in a coroutine to become a simple stage. @@ -187,7 +201,7 @@ def mutator_stage(func): [{'x': True}, {'a': False, 'x': True}] """ - def coro(*args): + def coro(*args: Unpack[A]) -> Generator[Optional[T], T, None]: task = None while True: task = yield task @@ -406,9 +420,7 @@ def run_parallel(self, queue_size=DEFAULT_QUEUE_SIZE): for i in range(1, queue_count): for coro in self.stages[i]: threads.append( - MiddlePipelineThread( - coro, queues[i - 1], queues[i], threads - ) + MiddlePipelineThread(coro, queues[i - 1], queues[i], threads) ) # Last stage. From 04aa1b8ddbfbc980039a4d820f7e7d3a2011dbf9 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 1 Feb 2025 15:52:11 +0100 Subject: [PATCH 07/28] Added last missing typehints in importer and fixed some typehint issues in util. Also run poe format --- beets/importer.py | 191 +++++++++++++++++++++++++----------- beets/util/__init__.py | 13 +-- beets/util/pipeline.py | 5 +- test/plugins/test_lyrics.py | 12 ++- 4 files changed, 153 insertions(+), 68 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 3586445fdc..a81c97df05 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -2,19 +2,20 @@ autotagging music files. """ +from __future__ import annotations + import itertools import os import pickle import re import shutil import time -from abc import ABC, abstractmethod from bisect import bisect_left, insort from collections import defaultdict from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp -from typing import Dict, Iterable, Optional, Sequence, Union +from typing import Callable, Dict, Iterable, Optional, Sequence, Union, cast import mediafile @@ -98,7 +99,7 @@ class ImportState: path: PathLike def __init__(self, readonly=False, path: Union[PathLike, None] = None): - self.path = path or config["statefile"].as_filename() + self.path = path or cast(PathLike, config["statefile"].as_filename()) self.tagprogress = {} self.taghistory = set() self._open() @@ -140,7 +141,7 @@ def _save(self): # -------------------------------- Tagprogress ------------------------------- # - def progress_add(self, toppath: PathLike, *paths: list[PathLike]): + def progress_add(self, toppath: PathLike, *paths: PathLike): """Record that the files under all of the `paths` have been imported under `toppath`. """ @@ -253,7 +254,9 @@ def set_config(self, config): iconfig["incremental"] = False if iconfig["reflink"]: - iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False]) + iconfig["reflink"] = iconfig["reflink"].as_choice( + ["auto", True, False] + ) # Copy, move, reflink, link, and hardlink are mutually exclusive. if iconfig["move"]: @@ -311,16 +314,24 @@ def log_choice(self, task, duplicate=False): self.tag_log("skip", paths) def should_resume(self, path): - raise NotImplementedError("Inheriting class must implement `should_resume`") + raise NotImplementedError( + "Inheriting class must implement `should_resume`" + ) def choose_match(self, task): - raise NotImplementedError("Inheriting class must implement `choose_match`") + raise NotImplementedError( + "Inheriting class must implement `choose_match`" + ) def resolve_duplicate(self, task, found_duplicates): - raise NotImplementedError("Inheriting class must implement `resolve_duplicate`") + raise NotImplementedError( + "Inheriting class must implement `resolve_duplicate`" + ) def choose_item(self, task): - raise NotImplementedError("Inheriting class must implement `choose_item`") + raise NotImplementedError( + "Inheriting class must implement `choose_item`" + ) def run(self): """Run the import task.""" @@ -448,7 +459,16 @@ class BaseImportTask: Tasks flow through the importer pipeline. Each stage can update them.""" - def __init__(self, toppath, paths, items): + toppath: Optional[PathLike] + paths: list[PathLike] + items: list[library.Item] + + def __init__( + self, + toppath: Optional[PathLike], + paths: Optional[Iterable[PathLike]], + items: Optional[Iterable[library.Item]], + ): """Create a task. The primary fields that define a task are: * `toppath`: The user-specified base directory that contains the @@ -466,8 +486,8 @@ def __init__(self, toppath, paths, items): These fields should not change after initialization. """ self.toppath = toppath - self.paths = paths - self.items = items + self.paths = list(paths) if paths is not None else [] + self.items = list(items) if items is not None else [] class ImportTask(BaseImportTask): @@ -514,9 +534,14 @@ def __init__(self, toppath, paths, items): self.is_album = True self.search_ids = [] # user-supplied candidate IDs. - def set_choice(self, choice): + def set_choice( + self, choice: Union[action, autotag.AlbumMatch, autotag.TrackMatch] + ): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. + + Album and trackmatch are implemented as tuples, so we can't + use isinstance to check for them. """ # Not part of the task structure: assert choice != action.APPLY # Only used internally. @@ -566,7 +591,7 @@ def chosen_info(self): if self.choice_flag in (action.ASIS, action.RETAG): likelies, consensus = autotag.current_metadata(self.items) return likelies - elif self.choice_flag is action.APPLY: + elif self.choice_flag is action.APPLY and self.match: return self.match.info.copy() assert False @@ -578,13 +603,19 @@ def imported_items(self): """ if self.choice_flag in (action.ASIS, action.RETAG): return list(self.items) - elif self.choice_flag == action.APPLY: + elif self.choice_flag == action.APPLY and isinstance( + self.match, autotag.AlbumMatch + ): return list(self.match.mapping.keys()) else: assert False def apply_metadata(self): """Copy metadata from match info to the items.""" + assert isinstance( + self.match, autotag.AlbumMatch + ), "apply_metadata() only works for albums" + if config["import"]["from_scratch"]: for item in self.match.mapping: item.clear() @@ -603,7 +634,9 @@ def remove_duplicates(self, lib): for item in duplicate_items: item.remove() if lib.directory in util.ancestry(item.path): - log.debug("deleting duplicate {0}", util.displayable_path(item.path)) + log.debug( + "deleting duplicate {0}", util.displayable_path(item.path) + ) util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) @@ -635,8 +668,7 @@ def finalize(self, session): self.save_progress() if session.config["incremental"] and not ( # Should we skip recording to incremental list? - self.skip - and session.config["incremental_skip_later"] + self.skip and session.config["incremental_skip_later"] ): self.save_history() @@ -663,7 +695,7 @@ def cleanup(self, copy=False, delete=False, move=False): for old_path in self.old_paths: # Only delete files that were actually copied. if old_path not in new_paths: - util.remove(syspath(old_path), False) + util.remove(old_path, False) self.prune(old_path) # When moving, prune empty directories containing the original files. @@ -671,7 +703,7 @@ def cleanup(self, copy=False, delete=False, move=False): for old_path in self.old_paths: self.prune(old_path) - def _emit_imported(self, lib): + def _emit_imported(self, lib: library.Library): plugins.send("album_imported", lib=lib, album=self.album) def handle_created(self, session): @@ -693,7 +725,9 @@ def lookup_candidates(self): candidate IDs are stored in self.search_ids: if present, the initial lookup is restricted to only those IDs. """ - artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids) + artist, album, prop = autotag.tag_album( + self.items, search_ids=self.search_ids + ) self.cur_artist = artist self.cur_album = album self.candidates = prop.candidates @@ -713,7 +747,9 @@ def find_duplicates(self, lib): # Construct a query to find duplicates with this metadata. We # use a temporary Album object to generate any computed fields. tmp_album = library.Album(lib, **info) - keys = config["import"]["duplicate_keys"]["album"].as_str_seq() + keys = cast( + list[str], config["import"]["duplicate_keys"]["album"].as_str_seq() + ) dup_query = tmp_album.duplicates_query(keys) # Don't count albums with the same files as duplicates. @@ -744,7 +780,8 @@ def align_album_level_fields(self): [i.albumartist or i.artist for i in self.items] ) if freq == len(self.items) or ( - freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH + freq > 1 + and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH ): # Single-artist album. changes["albumartist"] = plur_albumartist @@ -770,7 +807,12 @@ def align_album_level_fields(self): for item in self.items: item.update(changes) - def manipulate_files(self, operation=None, write=False, session=None): + def manipulate_files( + self, + operation: Optional[MoveOperation] = None, + write=False, + session: Optional[ImportSession] = None, + ): """Copy, move, link, hardlink or reflink (depending on `operation`) the files as well as write metadata. @@ -778,11 +820,12 @@ def manipulate_files(self, operation=None, write=False, session=None): If `write` is `True` metadata is written to the files. """ + assert session is not None, "session must be provided" items = self.imported_items() # Save the original paths of all items for deletion and pruning # in the next step (finalization). - self.old_paths = [item.path for item in items] + self.old_paths: list[PathLike] = [item.path for item in items] for item in items: if operation is not None: # In copy and link modes, treat re-imports specially: @@ -820,7 +863,9 @@ def add(self, lib): self.remove_replaced(lib) self.album = lib.add_album(self.imported_items()) - if self.choice_flag == action.APPLY: + if self.choice_flag == action.APPLY and isinstance( + self.match, autotag.AlbumMatch + ): # Copy album flexible fields to the DB # TODO: change the flow so we create the `Album` object earlier, # and we can move this into `self.apply_metadata`, just like @@ -830,25 +875,30 @@ def add(self, lib): self.reimport_metadata(lib) - def record_replaced(self, lib): + def record_replaced(self, lib: library.Library): """Records the replaced items and albums in the `replaced_items` and `replaced_albums` dictionaries. """ self.replaced_items = defaultdict(list) - self.replaced_albums = defaultdict(list) + self.replaced_albums: dict[PathLike, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): - dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path))) + dup_items = list( + lib.items(dbcore.query.BytesQuery("path", item.path)) + ) self.replaced_items[item] = dup_items for dup_item in dup_items: - if not dup_item.album_id or dup_item.album_id in replaced_album_ids: + if ( + not dup_item.album_id + or dup_item.album_id in replaced_album_ids + ): continue replaced_album = dup_item._cached_album if replaced_album: replaced_album_ids.add(dup_item.album_id) self.replaced_albums[replaced_album.path] = replaced_album - def reimport_metadata(self, lib): + def reimport_metadata(self, lib: library.Library): """For reimports, preserves metadata for reimported items and albums. """ @@ -894,7 +944,8 @@ def _reduce_and_log(new_obj, existing_fields, overwrite_keys): self.album.artpath = replaced_album.artpath self.album.store() log.debug( - "Reimported album {}. Preserving attribute ['added']. " "Path: {}", + "Reimported album {}. Preserving attribute ['added']. " + "Path: {}", self.album.id, displayable_path(self.album.path), ) @@ -973,14 +1024,14 @@ def prune(self, filename): util.prune_dirs( os.path.dirname(filename), self.toppath, - clutter=config["clutter"].as_str_seq(), + clutter=cast(list[str], config["clutter"].as_str_seq()), ) class SingletonImportTask(ImportTask): """ImportTask for a single track that is not associated to an album.""" - def __init__(self, toppath, item): + def __init__(self, toppath: Optional[PathLike], item: library.Item): super().__init__(toppath, [item.path], [item]) self.item = item self.is_album = False @@ -995,13 +1046,17 @@ def chosen_info(self): assert self.choice_flag in (action.ASIS, action.RETAG, action.APPLY) if self.choice_flag in (action.ASIS, action.RETAG): return dict(self.item) - elif self.choice_flag is action.APPLY: + elif self.choice_flag is action.APPLY and self.match: return self.match.info.copy() + assert False def imported_items(self): return [self.item] def apply_metadata(self): + assert isinstance( + self.match, autotag.TrackMatch + ), "apply_metadata() only works for tracks" autotag.apply_item_metadata(self.item, self.match.info) def _emit_imported(self, lib): @@ -1022,7 +1077,9 @@ def find_duplicates(self, lib): # Query for existing items using the same metadata. We use a # temporary `Item` object to generate any computed fields. tmp_item = library.Item(lib, **info) - keys = config["import"]["duplicate_keys"]["item"].as_str_seq() + keys = cast( + list[str], config["import"]["duplicate_keys"]["item"].as_str_seq() + ) dup_query = tmp_item.duplicates_query(keys) found_items = [] @@ -1092,23 +1149,24 @@ def save_history(self): pass def save_progress(self): - if self.paths is None: + if self.paths is None and self.toppath: # "Done" sentinel. ImportState().progress_reset(self.toppath) - else: + elif self.toppath: # "Directory progress" sentinel for singletons ImportState().progress_add(self.toppath, *self.paths) - def skip(self): + @property + def skip(self) -> bool: return True def set_choice(self, choice): raise NotImplementedError - def cleanup(self, **kwargs): + def cleanup(self, copy=False, delete=False, move=False): pass - def _emit_imported(self, session): + def _emit_imported(self, lib): pass @@ -1152,7 +1210,7 @@ def handlers(cls): implements the same interface as `tarfile.TarFile`. """ if not hasattr(cls, "_handlers"): - cls._handlers = [] + cls._handlers: list[tuple[Callable, ...]] = [] from zipfile import ZipFile, is_zipfile cls._handlers.append((is_zipfile, ZipFile)) @@ -1174,9 +1232,9 @@ def handlers(cls): return cls._handlers - def cleanup(self, **kwargs): + def cleanup(self, copy=False, delete=False, move=False): """Removes the temporary directory the archive was extracted to.""" - if self.extracted: + if self.extracted and self.toppath: log.debug( "Removing extracted directory: {0}", displayable_path(self.toppath), @@ -1187,10 +1245,18 @@ def extract(self): """Extracts the archive to a temporary directory and sets `toppath` to that directory. """ + assert self.toppath is not None, "toppath must be set" + + handler_class = None for path_test, handler_class in self.handlers(): if path_test(os.fsdecode(self.toppath)): break + if handler_class is None: + raise ValueError( + "No handler found for archive: {0}".format(self.toppath) + ) + extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") try: @@ -1268,7 +1334,7 @@ def tasks(self): # for archive imports, send the archive task instead (to remove # the extracted directory). if self.is_archive: - yield archive_task + yield archive_task # type: ignore else: yield self.sentinel() @@ -1308,7 +1374,9 @@ def paths(self): def singleton(self, path): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): - log.debug("Skipping previously-imported path: {0}", displayable_path(path)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(path) + ) self.skipped += 1 return None @@ -1331,7 +1399,9 @@ def album(self, paths, dirs=None): dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(dirs) + ) self.skipped += 1 return None @@ -1360,7 +1430,8 @@ def unarchive(self): if not (self.session.config["move"] or self.session.config["copy"]): log.warning( - "Archive importing requires either " "'copy' or 'move' to be enabled." + "Archive importing requires either " + "'copy' or 'move' to be enabled." ) return @@ -1473,7 +1544,7 @@ def query_tasks(session): @pipeline.mutator_stage -def lookup_candidates(session, task): +def lookup_candidates(session: ImportSession, task: ImportTask): """A coroutine for performing the initial MusicBrainz lookup for an album. It accepts lists of Items and yields (items, cur_artist, cur_album, candidates, rec) tuples. If no match @@ -1495,7 +1566,7 @@ def lookup_candidates(session, task): @pipeline.stage -def user_query(session, task): +def user_query(session: ImportSession, task: ImportTask): """A coroutine for interfacing with the user about the tagging process. @@ -1528,7 +1599,9 @@ def emitter(task): yield SentinelImportTask(task.toppath, task.paths) return _extend_pipeline( - emitter(task), lookup_candidates(session), user_query(session) + emitter(task), + lookup_candidates(session), + user_query(session), # type: ignore # Recursion in decorators ) # As albums: group items by albums and create task for each album @@ -1537,7 +1610,7 @@ def emitter(task): [task], group_albums(session), lookup_candidates(session), - user_query(session), + user_query(session), # type: ignore # Recursion in decorators ) resolve_duplicates(session, task) @@ -1559,7 +1632,9 @@ def emitter(task): ) return _extend_pipeline( - [merged_task], lookup_candidates(session), user_query(session) + [merged_task], + lookup_candidates(session), + user_query(session), # type: ignore # Recursion in decorators ) apply_choice(session, task) @@ -1573,7 +1648,9 @@ def resolve_duplicates(session, task): if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG): found_duplicates = task.find_duplicates(session.lib) if found_duplicates: - log.debug("found duplicates: {}".format([o.id for o in found_duplicates])) + log.debug( + "found duplicates: {}".format([o.id for o in found_duplicates]) + ) # Get the default action to follow from config. duplicate_action = config["import"]["duplicate_action"].as_choice( @@ -1697,7 +1774,7 @@ def manipulate_files(session, task): @pipeline.stage -def log_files(session, task): +def log_files(session: ImportSession, task: ImportTask): """A coroutine (pipeline stage) to log each file to be imported.""" if isinstance(task, SingletonImportTask): log.info("Singleton: {0}", displayable_path(task.item["path"])) @@ -1753,8 +1830,8 @@ def albums_in_dir(path): containing any media files is an album. """ collapse_pat = collapse_paths = collapse_items = None - ignore = config["ignore"].as_str_seq() - ignore_hidden = config["ignore_hidden"].get(bool) + ignore = cast(list[str], config["ignore"].as_str_seq()) + ignore_hidden = cast(bool, config["ignore_hidden"].get(bool)) for root, dirs, files in sorted_walk( path, ignore=ignore, ignore_hidden=ignore_hidden, logger=log diff --git a/beets/util/__init__.py b/beets/util/__init__.py index 32a63b216d..e13b676049 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -198,8 +198,8 @@ def ancestry(path: AnyStr) -> list[AnyStr]: def sorted_walk( - path: AnyStr, - ignore: Sequence[bytes] = (), + path: PathLike, + ignore: Sequence[PathLike] = (), ignore_hidden: bool = False, logger: Logger | None = None, ) -> Iterator[tuple[bytes, Sequence[bytes], Sequence[bytes]]]: @@ -297,8 +297,8 @@ def fnmatch_all(names: Sequence[bytes], patterns: Sequence[bytes]) -> bool: def prune_dirs( - path: bytes, - root: bytes | None = None, + path: PathLike, + root: PathLike | None = None, clutter: Sequence[str] = (".DS_Store", "Thumbs.db"), ): """If path is an empty directory, then remove it. Recursively remove @@ -419,12 +419,13 @@ def bytestring_path(path: PathLike) -> bytes: def displayable_path( - path: BytesOrStr | tuple[BytesOrStr, ...], separator: str = "; " + path: PathLike | Iterable[PathLike], separator: str = "; " ) -> str: """Attempts to decode a bytestring path to a unicode object for the purpose of displaying it to the user. If the `path` argument is a list or a tuple, the elements are joined with `separator`. """ + if isinstance(path, (list, tuple)): return separator.join(displayable_path(p) for p in path) elif isinstance(path, str): @@ -472,7 +473,7 @@ def samefile(p1: bytes, p2: bytes) -> bool: return False -def remove(path: bytes, soft: bool = True): +def remove(path: PathLike, soft: bool = True): """Remove the file. If `soft`, then no error will be raised if the file does not exist. """ diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index e79325fe2f..5387186b46 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -31,7 +31,6 @@ in place of any single coroutine. """ - import queue import sys from threading import Lock, Thread @@ -420,7 +419,9 @@ def run_parallel(self, queue_size=DEFAULT_QUEUE_SIZE): for i in range(1, queue_count): for coro in self.stages[i]: threads.append( - MiddlePipelineThread(coro, queues[i - 1], queues[i], threads) + MiddlePipelineThread( + coro, queues[i - 1], queues[i], threads + ) ) # Last stage. diff --git a/test/plugins/test_lyrics.py b/test/plugins/test_lyrics.py index 1305a21d3b..fd6112d7c2 100644 --- a/test/plugins/test_lyrics.py +++ b/test/plugins/test_lyrics.py @@ -74,7 +74,9 @@ def test_search_empty(self, artist, title): ("横山克", "Masaru Yokoyama", ["Masaru Yokoyama"]), ], ) - def test_search_pairs_artists(self, artist, artist_sort, expected_extra_artists): + def test_search_pairs_artists( + self, artist, artist_sort, expected_extra_artists + ): item = Item(artist=artist, artist_sort=artist_sort, title="song") actual_artists = [a for a, _ in lyrics.search_pairs(item)] @@ -99,7 +101,9 @@ def test_search_pairs_artists(self, artist, artist_sort, expected_extra_artists) def test_search_pairs_titles(self, title, expected_extra_titles): item = Item(title=title, artist="A") - actual_titles = {t: None for _, tit in lyrics.search_pairs(item) for t in tit} + actual_titles = { + t: None for _, tit in lyrics.search_pairs(item) for t in tit + } assert list(actual_titles) == [title, *expected_extra_titles] @@ -241,7 +245,9 @@ def backend(self, lyrics_plugin): @pytest.fixture def lyrics_html(self, lyrics_root_dir, file_name): - return (lyrics_root_dir / f"{file_name}.txt").read_text(encoding="utf-8") + return (lyrics_root_dir / f"{file_name}.txt").read_text( + encoding="utf-8" + ) @pytest.mark.on_lyrics_update From ed92f9b997af93b5d4bb0fca54d61d9ee4642dad Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Mon, 3 Feb 2025 11:37:33 +0100 Subject: [PATCH 08/28] Do not skip tests in ci. The return type of the stage decorator should in theory be `T|None` but the return of task types is not consistent in its usage. Would need some bigger changes for which I'm not ready at the moment. --- beets/util/pipeline.py | 8 ++++---- test/plugins/test_lyrics.py | 12 +++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index 5387186b46..d797dea132 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -160,7 +160,7 @@ def multiple(messages): def stage( func: Callable[ [Unpack[A], T], - R, + Optional[R], ], ): """Decorate a function to become a simple stage. @@ -176,7 +176,7 @@ def stage( [3, 4, 5] """ - def coro(*args: Unpack[A]) -> Generator[Union[R, T, None], T, None]: + def coro(*args: Unpack[A]) -> Generator[Union[R, T, None], T, R]: task = None while True: task = yield task @@ -185,7 +185,7 @@ def coro(*args: Unpack[A]) -> Generator[Union[R, T, None], T, None]: return coro -def mutator_stage(func: Callable[[Unpack[A], T], None]): +def mutator_stage(func: Callable[[Unpack[A], T], R]): """Decorate a function that manipulates items in a coroutine to become a simple stage. @@ -200,7 +200,7 @@ def mutator_stage(func: Callable[[Unpack[A], T], None]): [{'x': True}, {'a': False, 'x': True}] """ - def coro(*args: Unpack[A]) -> Generator[Optional[T], T, None]: + def coro(*args: Unpack[A]) -> Generator[Union[T, None], T, None]: task = None while True: task = yield task diff --git a/test/plugins/test_lyrics.py b/test/plugins/test_lyrics.py index fd6112d7c2..a3c640109b 100644 --- a/test/plugins/test_lyrics.py +++ b/test/plugins/test_lyrics.py @@ -14,6 +14,8 @@ """Tests for the 'lyrics' plugin.""" +import importlib.util +import os import re from functools import partial from http import HTTPStatus @@ -22,14 +24,14 @@ from beets.library import Item from beets.test.helper import PluginMixin +from beetsplug import lyrics -try: - from beetsplug import lyrics -except Exception: - pytest.skip("lyrics plugin couldn't be loaded", allow_module_level=True) +from .lyrics_pages import LyricsPage, lyrics_pages +github_ci = os.environ.get("GITHUB_ACTIONS") == "true" +if not github_ci and not importlib.util.find_spec("langdetect"): + pytest.skip("langdetect isn't available", allow_module_level=True) -from .lyrics_pages import LyricsPage, lyrics_pages PHRASE_BY_TITLE = { "Lady Madonna": "friday night arrives without a suitcase", From 23f4f8261c157a00abd2a401fc8b6c483c8582bb Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Tue, 4 Feb 2025 16:38:24 +0100 Subject: [PATCH 09/28] Added some more typehints --- beets/importer.py | 82 ++++++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index a81c97df05..8f64e25dff 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -185,7 +185,7 @@ class ImportSession: """ logger: logging.Logger - paths: Union[list[bytes], None] + paths: Union[list[bytes], None] = None lib: library.Library _is_resuming: Dict[bytes, bool] @@ -223,8 +223,6 @@ def __init__( # Normalize the paths. if paths is not None: self.paths = list(map(normpath, paths)) - else: - self.paths = None def _setup_logging(self, loghandler: Optional[logging.Handler]): logger = logging.getLogger(__name__) @@ -286,13 +284,13 @@ def set_config(self, config): self.want_resume = config["resume"].as_choice([True, False, "ask"]) - def tag_log(self, status, paths): + def tag_log(self, status, paths: Sequence[PathLike]): """Log a message about a given album to the importer log. The status should reflect the reason the album couldn't be tagged. """ self.logger.info("{0} {1}", status, displayable_path(paths)) - def log_choice(self, task, duplicate=False): + def log_choice(self, task: ImportTask, duplicate=False): """Logs the task's current choice if it should be logged. If ``duplicate``, then this is a secondary choice after a duplicate was detected and a decision was made. @@ -313,22 +311,22 @@ def log_choice(self, task, duplicate=False): elif task.choice_flag is action.SKIP: self.tag_log("skip", paths) - def should_resume(self, path): + def should_resume(self, path: PathLike): raise NotImplementedError( "Inheriting class must implement `should_resume`" ) - def choose_match(self, task): + def choose_match(self, task: ImportTask): raise NotImplementedError( "Inheriting class must implement `choose_match`" ) - def resolve_duplicate(self, task, found_duplicates): + def resolve_duplicate(self, task: ImportTask, found_duplicates): raise NotImplementedError( "Inheriting class must implement `resolve_duplicate`" ) - def choose_item(self, task): + def choose_item(self, task: ImportTask): raise NotImplementedError( "Inheriting class must implement `choose_item`" ) @@ -522,12 +520,16 @@ class ImportTask(BaseImportTask): system. """ + choice_flag: Optional[action] = None + match: Union[autotag.AlbumMatch, autotag.TrackMatch, None] = None + + # Keep track of the current task item + cur_album: Optional[str] = None + cur_artist: Optional[str] = None + candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = [] + def __init__(self, toppath, paths, items): super().__init__(toppath, paths, items) - self.choice_flag = None - self.cur_album = None - self.cur_artist = None - self.candidates = [] self.rec = None self.should_remove_duplicates = False self.should_merge_duplicates = False @@ -622,13 +624,13 @@ def apply_metadata(self): autotag.apply_metadata(self.match.info, self.match.mapping) - def duplicate_items(self, lib): + def duplicate_items(self, lib: library.Library): duplicate_items = [] for album in self.find_duplicates(lib): duplicate_items += album.items() return duplicate_items - def remove_duplicates(self, lib): + def remove_duplicates(self, lib: library.Library): duplicate_items = self.duplicate_items(lib) log.debug("removing {0} old duplicated items", len(duplicate_items)) for item in duplicate_items: @@ -640,7 +642,7 @@ def remove_duplicates(self, lib): util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) - def set_fields(self, lib): + def set_fields(self, lib: library.Library): """Sets the fields given at CLI or configuration to the specified values, for both the album and all its items. """ @@ -661,7 +663,7 @@ def set_fields(self, lib): item.store() self.album.store() - def finalize(self, session): + def finalize(self, session: ImportSession): """Save progress, clean up files, and emit plugin event.""" # Update progress. if session.want_resume: @@ -706,7 +708,7 @@ def cleanup(self, copy=False, delete=False, move=False): def _emit_imported(self, lib: library.Library): plugins.send("album_imported", lib=lib, album=self.album) - def handle_created(self, session): + def handle_created(self, session: ImportSession): """Send the `import_task_created` event for this task. Return a list of tasks that should continue through the pipeline. By default, this is a list containing only the task itself, but plugins can replace the task @@ -733,7 +735,7 @@ def lookup_candidates(self): self.candidates = prop.candidates self.rec = prop.recommendation - def find_duplicates(self, lib): + def find_duplicates(self, lib: library.Library): """Return a list of albums from `lib` with the same artist and album name as the task. """ @@ -855,7 +857,7 @@ def manipulate_files( plugins.send("import_task_files", session=session, task=self) - def add(self, lib): + def add(self, lib: library.Library): """Add the items as an album to the library and remove replaced items.""" self.align_album_level_fields() with lib.transaction(): @@ -1101,7 +1103,7 @@ def add(self, lib): def infer_album_fields(self): raise NotImplementedError - def choose_match(self, session): + def choose_match(self, session: ImportSession): """Ask the session which match should apply and apply it.""" choice = session.choose_item(self) self.set_choice(choice) @@ -1285,7 +1287,7 @@ class ImportTaskFactory: indicated by a path. """ - def __init__(self, toppath, session): + def __init__(self, toppath: PathLike, session: ImportSession): """Create a new task factory. `toppath` is the user-specified path to search for music to @@ -1338,7 +1340,7 @@ def tasks(self): else: yield self.sentinel() - def _create(self, task): + def _create(self, task: Optional[ImportTask]): """Handle a new task to be emitted by the factory. Emit the `import_task_created` event and increment the @@ -1371,7 +1373,7 @@ def paths(self): for dirs, paths in albums_in_dir(self.toppath): yield dirs, paths - def singleton(self, path): + def singleton(self, path: PathLike): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): log.debug( @@ -1386,7 +1388,7 @@ def singleton(self, path): else: return None - def album(self, paths, dirs=None): + def album(self, paths: Iterable[PathLike], dirs=None): """Return a `ImportTask` with all media files from paths. `dirs` is a list of parent directories used to record already @@ -1413,7 +1415,7 @@ def album(self, paths, dirs=None): else: return None - def sentinel(self, paths=None): + def sentinel(self, paths: Optional[Iterable[PathLike]] = None): """Return a `SentinelImportTask` indicating the end of a top-level directory import. """ @@ -1448,7 +1450,7 @@ def unarchive(self): log.debug("Archive extracted to: {0}", self.toppath) return archive_task - def read_item(self, path): + def read_item(self, path: PathLike): """Return an `Item` read from the path. If an item cannot be read, return `None` instead and log an @@ -1491,12 +1493,16 @@ def _extend_pipeline(tasks, *stages): # Full-album pipeline stages. -def read_tasks(session): +def read_tasks(session: ImportSession): """A generator yielding all the albums (as ImportTask objects) found in the user-specified list of paths. In the case of a singleton import, yields single-item tasks instead. """ skipped = 0 + if session.paths is None: + log.warning("No path specified in session.") + return + for toppath in session.paths: # Check whether we need to resume the import. session.ask_resume(toppath) @@ -1514,7 +1520,7 @@ def read_tasks(session): log.info("Skipped {0} paths.", skipped) -def query_tasks(session): +def query_tasks(session: ImportSession): """A generator that works as a drop-in-replacement for read_tasks. Instead of finding files from the filesystem, a query is used to match items from the library. @@ -1641,7 +1647,7 @@ def emitter(task): return task -def resolve_duplicates(session, task): +def resolve_duplicates(session: ImportSession, task: ImportTask): """Check if a task conflicts with items or albums already imported and ask the session to resolve this. """ @@ -1684,7 +1690,7 @@ def resolve_duplicates(session, task): @pipeline.mutator_stage -def import_asis(session, task): +def import_asis(session: ImportSession, task: ImportTask): """Select the `action.ASIS` choice for all tasks. This stage replaces the initial_lookup and user_query stages @@ -1698,7 +1704,7 @@ def import_asis(session, task): apply_choice(session, task) -def apply_choice(session, task): +def apply_choice(session: ImportSession, task: ImportTask): """Apply the task's choice to the Album or Item it contains and add it to the library. """ @@ -1722,7 +1728,11 @@ def apply_choice(session, task): @pipeline.mutator_stage -def plugin_stage(session, func, task): +def plugin_stage( + session: ImportSession, + func: Callable[[ImportSession, ImportTask]], + task: ImportTask, +): """A coroutine (pipeline stage) that calls the given function with each non-skipped import task. These stages occur between applying metadata changes and moving/copying/writing files. @@ -1739,7 +1749,7 @@ def plugin_stage(session, func, task): @pipeline.stage -def manipulate_files(session, task): +def manipulate_files(session: ImportSession, task: ImportTask): """A coroutine (pipeline stage) that performs necessary file manipulations *after* items have been added to the library and finalizes each task. @@ -1784,7 +1794,7 @@ def log_files(session: ImportSession, task: ImportTask): log.info(" {0}", displayable_path(item["path"])) -def group_albums(session): +def group_albums(session: ImportSession): """A pipeline stage that groups the items of each task into albums using their metadata. @@ -1823,7 +1833,7 @@ def is_subdir_of_any_in_list(path, dirs): return any(d in ancestors for d in dirs) -def albums_in_dir(path): +def albums_in_dir(path: PathLike): """Recursively searches the given directory and returns an iterable of (paths, items) where paths is a list of directories and items is a list of Items that is probably an album. Specifically, any folder From 12b21c48e9fc02979167bf14e414d94509319b3f Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Fri, 7 Feb 2025 16:55:26 +0100 Subject: [PATCH 10/28] Changelog addition --- docs/changelog.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index 62cd0c4cc1..33a4710e64 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -83,6 +83,8 @@ Other changes: wrong (outdated) commit. Now the tag is created in the same workflow step right after committing the version update. :bug:`5539` +* Added some typehints: ImportSession and Pipeline have typehints now. Should + improve useability for new developers. 2.2.0 (December 02, 2024) ------------------------- From 10c0aa3f6ab442bf8144bec2dec85ca908617749 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 8 Feb 2025 22:03:03 +0100 Subject: [PATCH 11/28] Replaced pathlike with pathbytes and remove unnecessary type ignores --- beets/importer.py | 118 ++++++++++++++++++++++++---------------------- 1 file changed, 61 insertions(+), 57 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 8f64e25dff..202e8f1eff 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -15,14 +15,13 @@ from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp -from typing import Callable, Dict, Iterable, Optional, Sequence, Union, cast +from typing import Callable, Iterable, Optional, Sequence, cast import mediafile from beets import autotag, config, dbcore, library, logging, plugins, util from beets.util import ( MoveOperation, - PathLike, ancestry, displayable_path, normpath, @@ -61,6 +60,10 @@ # Global logger. log = logging.getLogger("beets") +# Here for now to allow for a easy replace later on +# once we can move to a PathLike +PathBytes = bytes + class ImportAbortError(Exception): """Raised when the user aborts the tagging operation.""" @@ -94,12 +97,12 @@ class ImportState: ``` """ - tagprogress: dict - taghistory: set - path: PathLike + tagprogress: dict[PathBytes, list[PathBytes]] + taghistory: set[tuple[PathBytes, ...]] + path: PathBytes - def __init__(self, readonly=False, path: Union[PathLike, None] = None): - self.path = path or cast(PathLike, config["statefile"].as_filename()) + def __init__(self, readonly=False, path: Optional[PathBytes] = None): + self.path = path or cast(PathBytes, config["statefile"].as_filename()) self.tagprogress = {} self.taghistory = set() self._open() @@ -141,7 +144,7 @@ def _save(self): # -------------------------------- Tagprogress ------------------------------- # - def progress_add(self, toppath: PathLike, *paths: PathLike): + def progress_add(self, toppath: PathBytes, *paths: PathBytes): """Record that the files under all of the `paths` have been imported under `toppath`. """ @@ -153,19 +156,19 @@ def progress_add(self, toppath: PathLike, *paths: PathLike): else: insort(imported, path) - def progress_has_element(self, toppath: PathLike, path: PathLike) -> bool: + def progress_has_element(self, toppath: PathBytes, path: PathBytes) -> bool: """Return whether `path` has been imported in `toppath`.""" imported = self.tagprogress.get(toppath, []) i = bisect_left(imported, path) return i != len(imported) and imported[i] == path - def progress_has(self, toppath: PathLike) -> bool: + def progress_has(self, toppath: PathBytes) -> bool: """Return `True` if there exist paths that have already been imported under `toppath`. """ return toppath in self.tagprogress - def progress_reset(self, toppath: PathLike): + def progress_reset(self, toppath: PathBytes): """Reset the progress for `toppath`.""" with self as state: if toppath in state.tagprogress: @@ -173,7 +176,7 @@ def progress_reset(self, toppath: PathLike): # -------------------------------- Taghistory -------------------------------- # - def history_add(self, paths: list[PathLike]): + def history_add(self, paths: list[PathBytes]): """Add the paths to the history.""" with self as state: state.taghistory.add(tuple(paths)) @@ -185,18 +188,18 @@ class ImportSession: """ logger: logging.Logger - paths: Union[list[bytes], None] = None + paths: Optional[list[bytes]] = None lib: library.Library - _is_resuming: Dict[bytes, bool] - _merged_items: set - _merged_dirs: set + _is_resuming: dict[bytes, bool] + _merged_items: set[PathBytes] + _merged_dirs: set[PathBytes] def __init__( self, lib: library.Library, loghandler: Optional[logging.Handler], - paths: Optional[Sequence[PathLike]], + paths: Optional[Sequence[PathBytes]], query: Optional[dbcore.Query], ): """Create a session. @@ -284,7 +287,7 @@ def set_config(self, config): self.want_resume = config["resume"].as_choice([True, False, "ask"]) - def tag_log(self, status, paths: Sequence[PathLike]): + def tag_log(self, status, paths: Sequence[PathBytes]): """Log a message about a given album to the importer log. The status should reflect the reason the album couldn't be tagged. """ @@ -311,7 +314,7 @@ def log_choice(self, task: ImportTask, duplicate=False): elif task.choice_flag is action.SKIP: self.tag_log("skip", paths) - def should_resume(self, path: PathLike): + def should_resume(self, path: PathBytes): raise NotImplementedError( "Inheriting class must implement `should_resume`" ) @@ -383,7 +386,7 @@ def run(self): # Incremental and resumed imports - def already_imported(self, toppath: PathLike, paths: Sequence[PathLike]): + def already_imported(self, toppath: PathBytes, paths: Sequence[PathBytes]): """Returns true if the files belonging to this task have already been imported in a previous session. """ @@ -404,7 +407,7 @@ def history_dirs(self): self._history_dirs = ImportState().taghistory return self._history_dirs - def already_merged(self, paths: Sequence[PathLike]): + def already_merged(self, paths: Sequence[PathBytes]): """Returns true if all the paths being imported were part of a merge during previous tasks. """ @@ -413,7 +416,7 @@ def already_merged(self, paths: Sequence[PathLike]): return False return True - def mark_merged(self, paths: Sequence[PathLike]): + def mark_merged(self, paths: Sequence[PathBytes]): """Mark paths and directories as merged for future reimport tasks.""" self._merged_items.update(paths) dirs = { @@ -422,14 +425,14 @@ def mark_merged(self, paths: Sequence[PathLike]): } self._merged_dirs.update(dirs) - def is_resuming(self, toppath: PathLike): + def is_resuming(self, toppath: PathBytes): """Return `True` if user wants to resume import of this path. You have to call `ask_resume` first to determine the return value. """ return self._is_resuming.get(normpath(toppath), False) - def ask_resume(self, toppath: PathLike): + def ask_resume(self, toppath: PathBytes): """If import of `toppath` was aborted in an earlier session, ask user if they want to resume the import. @@ -457,14 +460,14 @@ class BaseImportTask: Tasks flow through the importer pipeline. Each stage can update them.""" - toppath: Optional[PathLike] - paths: list[PathLike] + toppath: Optional[PathBytes] + paths: list[PathBytes] items: list[library.Item] def __init__( self, - toppath: Optional[PathLike], - paths: Optional[Iterable[PathLike]], + toppath: Optional[PathBytes], + paths: Optional[Iterable[PathBytes]], items: Optional[Iterable[library.Item]], ): """Create a task. The primary fields that define a task are: @@ -521,7 +524,7 @@ class ImportTask(BaseImportTask): """ choice_flag: Optional[action] = None - match: Union[autotag.AlbumMatch, autotag.TrackMatch, None] = None + match: autotag.AlbumMatch | autotag.TrackMatch | None = None # Keep track of the current task item cur_album: Optional[str] = None @@ -537,7 +540,7 @@ def __init__(self, toppath, paths, items): self.search_ids = [] # user-supplied candidate IDs. def set_choice( - self, choice: Union[action, autotag.AlbumMatch, autotag.TrackMatch] + self, choice: action | autotag.AlbumMatch | autotag.TrackMatch ): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. @@ -547,6 +550,7 @@ def set_choice( """ # Not part of the task structure: assert choice != action.APPLY # Only used internally. + if choice in ( action.SKIP, action.ASIS, @@ -554,11 +558,11 @@ def set_choice( action.ALBUMS, action.RETAG, ): - self.choice_flag = choice + self.choice_flag = cast(action, choice) self.match = None else: self.choice_flag = action.APPLY # Implicit choice. - self.match = choice + self.match = cast(autotag.AlbumMatch | autotag.TrackMatch, choice) def save_progress(self): """Updates the progress state to indicate that this album has @@ -827,7 +831,7 @@ def manipulate_files( items = self.imported_items() # Save the original paths of all items for deletion and pruning # in the next step (finalization). - self.old_paths: list[PathLike] = [item.path for item in items] + self.old_paths: list[PathBytes] = [item.path for item in items] for item in items: if operation is not None: # In copy and link modes, treat re-imports specially: @@ -882,7 +886,7 @@ def record_replaced(self, lib: library.Library): and `replaced_albums` dictionaries. """ self.replaced_items = defaultdict(list) - self.replaced_albums: dict[PathLike, library.Album] = defaultdict() + self.replaced_albums: dict[PathBytes, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): dup_items = list( @@ -1033,7 +1037,7 @@ def prune(self, filename): class SingletonImportTask(ImportTask): """ImportTask for a single track that is not associated to an album.""" - def __init__(self, toppath: Optional[PathLike], item: library.Item): + def __init__(self, toppath: Optional[PathBytes], item: library.Item): super().__init__(toppath, [item.path], [item]) self.item = item self.is_album = False @@ -1287,7 +1291,7 @@ class ImportTaskFactory: indicated by a path. """ - def __init__(self, toppath: PathLike, session: ImportSession): + def __init__(self, toppath: PathBytes, session: ImportSession): """Create a new task factory. `toppath` is the user-specified path to search for music to @@ -1314,6 +1318,7 @@ def tasks(self): extracted data. """ # Check whether this is an archive. + archive_task: ArchiveImportTask | None = None if self.is_archive: archive_task = self.unarchive() if not archive_task: @@ -1335,8 +1340,8 @@ def tasks(self): # it is finished. This is usually just a SentinelImportTask, but # for archive imports, send the archive task instead (to remove # the extracted directory). - if self.is_archive: - yield archive_task # type: ignore + if archive_task: + yield archive_task else: yield self.sentinel() @@ -1373,7 +1378,7 @@ def paths(self): for dirs, paths in albums_in_dir(self.toppath): yield dirs, paths - def singleton(self, path: PathLike): + def singleton(self, path: PathBytes): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): log.debug( @@ -1388,7 +1393,7 @@ def singleton(self, path: PathLike): else: return None - def album(self, paths: Iterable[PathLike], dirs=None): + def album(self, paths: Iterable[PathBytes], dirs=None): """Return a `ImportTask` with all media files from paths. `dirs` is a list of parent directories used to record already @@ -1407,15 +1412,16 @@ def album(self, paths: Iterable[PathLike], dirs=None): self.skipped += 1 return None - items = map(self.read_item, paths) - items = [item for item in items if item] + items: list[library.Item] = [ + item for item in map(self.read_item, paths) if item + ] if items: return ImportTask(self.toppath, dirs, items) else: return None - def sentinel(self, paths: Optional[Iterable[PathLike]] = None): + def sentinel(self, paths: Optional[Iterable[PathBytes]] = None): """Return a `SentinelImportTask` indicating the end of a top-level directory import. """ @@ -1450,7 +1456,7 @@ def unarchive(self): log.debug("Archive extracted to: {0}", self.toppath) return archive_task - def read_item(self, path: PathLike): + def read_item(self, path: PathBytes): """Return an `Item` read from the path. If an item cannot be read, return `None` instead and log an @@ -1528,9 +1534,9 @@ def query_tasks(session: ImportSession): if session.config["singletons"]: # Search for items. for item in session.lib.items(session.query): - task = SingletonImportTask(None, item) - for task in task.handle_created(session): - yield task + singleton_task = SingletonImportTask(None, item) + for task in singleton_task.handle_created(session): + yield singleton_task else: # Search for albums. @@ -1607,7 +1613,7 @@ def emitter(task): return _extend_pipeline( emitter(task), lookup_candidates(session), - user_query(session), # type: ignore # Recursion in decorators + user_query(session), ) # As albums: group items by albums and create task for each album @@ -1616,7 +1622,7 @@ def emitter(task): [task], group_albums(session), lookup_candidates(session), - user_query(session), # type: ignore # Recursion in decorators + user_query(session), ) resolve_duplicates(session, task) @@ -1638,9 +1644,7 @@ def emitter(task): ) return _extend_pipeline( - [merged_task], - lookup_candidates(session), - user_query(session), # type: ignore # Recursion in decorators + [merged_task], lookup_candidates(session), user_query(session) ) apply_choice(session, task) @@ -1730,7 +1734,7 @@ def apply_choice(session: ImportSession, task: ImportTask): @pipeline.mutator_stage def plugin_stage( session: ImportSession, - func: Callable[[ImportSession, ImportTask]], + func: Callable[[ImportSession, ImportTask], None], task: ImportTask, ): """A coroutine (pipeline stage) that calls the given function with @@ -1811,10 +1815,10 @@ def group(item): if task.skip: continue tasks = [] - sorted_items = sorted(task.items, key=group) + sorted_items: list[library.Item] = sorted(task.items, key=group) for _, items in itertools.groupby(sorted_items, group): - items = list(items) - task = ImportTask(task.toppath, [i.path for i in items], items) + l_items = list(items) + task = ImportTask(task.toppath, [i.path for i in l_items], l_items) tasks += task.handle_created(session) tasks.append(SentinelImportTask(task.toppath, task.paths)) @@ -1833,7 +1837,7 @@ def is_subdir_of_any_in_list(path, dirs): return any(d in ancestors for d in dirs) -def albums_in_dir(path: PathLike): +def albums_in_dir(path: PathBytes): """Recursively searches the given directory and returns an iterable of (paths, items) where paths is a list of directories and items is a list of Items that is probably an album. Specifically, any folder From bbe4fb454b21a32869ffd807d1a6ea8e31ba8a48 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 8 Feb 2025 22:12:55 +0100 Subject: [PATCH 12/28] Renamed ignore to _ignore to prevent mypy error --- beets/util/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/beets/util/__init__.py b/beets/util/__init__.py index e13b676049..411202075b 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -210,7 +210,9 @@ def sorted_walk( """ # Make sure the paths aren't Unicode strings. bytes_path = bytestring_path(path) - ignore = [bytestring_path(i) for i in ignore] + _ignore = [ # rename prevents mypy variable shadowing issue + bytestring_path(i) for i in ignore + ] # Get all the directories and files at this level. try: @@ -230,7 +232,7 @@ def sorted_walk( # Skip ignored filenames. skip = False - for pat in ignore: + for pat in _ignore: if fnmatch.fnmatch(base, pat): if logger: logger.debug( @@ -257,7 +259,7 @@ def sorted_walk( # Recurse into directories. for base in dirs: cur = os.path.join(bytes_path, base) - yield from sorted_walk(cur, ignore, ignore_hidden, logger) + yield from sorted_walk(cur, _ignore, ignore_hidden, logger) def path_as_posix(path: bytes) -> bytes: From 44074d746449eb7dc157eeb39e53722f67476a39 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 8 Feb 2025 22:19:37 +0100 Subject: [PATCH 13/28] Readded copyright and union. --- beets/importer.py | 96 +++++++++++++++++++---------------------------- 1 file changed, 38 insertions(+), 58 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 202e8f1eff..22f6273df3 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -1,3 +1,17 @@ +# This file is part of beets. +# Copyright 2016, Adrian Sampson. +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + """Provides the basic, interface-agnostic workflow for importing and autotagging music files. """ @@ -15,7 +29,7 @@ from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp -from typing import Callable, Iterable, Optional, Sequence, cast +from typing import Callable, Iterable, Optional, Sequence, Union, cast import mediafile @@ -255,9 +269,7 @@ def set_config(self, config): iconfig["incremental"] = False if iconfig["reflink"]: - iconfig["reflink"] = iconfig["reflink"].as_choice( - ["auto", True, False] - ) + iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False]) # Copy, move, reflink, link, and hardlink are mutually exclusive. if iconfig["move"]: @@ -315,24 +327,16 @@ def log_choice(self, task: ImportTask, duplicate=False): self.tag_log("skip", paths) def should_resume(self, path: PathBytes): - raise NotImplementedError( - "Inheriting class must implement `should_resume`" - ) + raise NotImplementedError("Inheriting class must implement `should_resume`") def choose_match(self, task: ImportTask): - raise NotImplementedError( - "Inheriting class must implement `choose_match`" - ) + raise NotImplementedError("Inheriting class must implement `choose_match`") def resolve_duplicate(self, task: ImportTask, found_duplicates): - raise NotImplementedError( - "Inheriting class must implement `resolve_duplicate`" - ) + raise NotImplementedError("Inheriting class must implement `resolve_duplicate`") def choose_item(self, task: ImportTask): - raise NotImplementedError( - "Inheriting class must implement `choose_item`" - ) + raise NotImplementedError("Inheriting class must implement `choose_item`") def run(self): """Run the import task.""" @@ -539,9 +543,7 @@ def __init__(self, toppath, paths, items): self.is_album = True self.search_ids = [] # user-supplied candidate IDs. - def set_choice( - self, choice: action | autotag.AlbumMatch | autotag.TrackMatch - ): + def set_choice(self, choice: action | autotag.AlbumMatch | autotag.TrackMatch): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. @@ -562,7 +564,8 @@ def set_choice( self.match = None else: self.choice_flag = action.APPLY # Implicit choice. - self.match = cast(autotag.AlbumMatch | autotag.TrackMatch, choice) + # Union is needed here for python 3.9 compatibility! + self.match = cast(Union[autotag.AlbumMatch, autotag.TrackMatch], choice) def save_progress(self): """Updates the progress state to indicate that this album has @@ -640,9 +643,7 @@ def remove_duplicates(self, lib: library.Library): for item in duplicate_items: item.remove() if lib.directory in util.ancestry(item.path): - log.debug( - "deleting duplicate {0}", util.displayable_path(item.path) - ) + log.debug("deleting duplicate {0}", util.displayable_path(item.path)) util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) @@ -674,7 +675,8 @@ def finalize(self, session: ImportSession): self.save_progress() if session.config["incremental"] and not ( # Should we skip recording to incremental list? - self.skip and session.config["incremental_skip_later"] + self.skip + and session.config["incremental_skip_later"] ): self.save_history() @@ -731,9 +733,7 @@ def lookup_candidates(self): candidate IDs are stored in self.search_ids: if present, the initial lookup is restricted to only those IDs. """ - artist, album, prop = autotag.tag_album( - self.items, search_ids=self.search_ids - ) + artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids) self.cur_artist = artist self.cur_album = album self.candidates = prop.candidates @@ -753,9 +753,7 @@ def find_duplicates(self, lib: library.Library): # Construct a query to find duplicates with this metadata. We # use a temporary Album object to generate any computed fields. tmp_album = library.Album(lib, **info) - keys = cast( - list[str], config["import"]["duplicate_keys"]["album"].as_str_seq() - ) + keys = cast(list[str], config["import"]["duplicate_keys"]["album"].as_str_seq()) dup_query = tmp_album.duplicates_query(keys) # Don't count albums with the same files as duplicates. @@ -786,8 +784,7 @@ def align_album_level_fields(self): [i.albumartist or i.artist for i in self.items] ) if freq == len(self.items) or ( - freq > 1 - and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH + freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH ): # Single-artist album. changes["albumartist"] = plur_albumartist @@ -889,15 +886,10 @@ def record_replaced(self, lib: library.Library): self.replaced_albums: dict[PathBytes, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): - dup_items = list( - lib.items(dbcore.query.BytesQuery("path", item.path)) - ) + dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path))) self.replaced_items[item] = dup_items for dup_item in dup_items: - if ( - not dup_item.album_id - or dup_item.album_id in replaced_album_ids - ): + if not dup_item.album_id or dup_item.album_id in replaced_album_ids: continue replaced_album = dup_item._cached_album if replaced_album: @@ -950,8 +942,7 @@ def _reduce_and_log(new_obj, existing_fields, overwrite_keys): self.album.artpath = replaced_album.artpath self.album.store() log.debug( - "Reimported album {}. Preserving attribute ['added']. " - "Path: {}", + "Reimported album {}. Preserving attribute ['added']. " "Path: {}", self.album.id, displayable_path(self.album.path), ) @@ -1083,9 +1074,7 @@ def find_duplicates(self, lib): # Query for existing items using the same metadata. We use a # temporary `Item` object to generate any computed fields. tmp_item = library.Item(lib, **info) - keys = cast( - list[str], config["import"]["duplicate_keys"]["item"].as_str_seq() - ) + keys = cast(list[str], config["import"]["duplicate_keys"]["item"].as_str_seq()) dup_query = tmp_item.duplicates_query(keys) found_items = [] @@ -1259,9 +1248,7 @@ def extract(self): break if handler_class is None: - raise ValueError( - "No handler found for archive: {0}".format(self.toppath) - ) + raise ValueError("No handler found for archive: {0}".format(self.toppath)) extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") @@ -1381,9 +1368,7 @@ def paths(self): def singleton(self, path: PathBytes): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): - log.debug( - "Skipping previously-imported path: {0}", displayable_path(path) - ) + log.debug("Skipping previously-imported path: {0}", displayable_path(path)) self.skipped += 1 return None @@ -1406,9 +1391,7 @@ def album(self, paths: Iterable[PathBytes], dirs=None): dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug( - "Skipping previously-imported path: {0}", displayable_path(dirs) - ) + log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) self.skipped += 1 return None @@ -1438,8 +1421,7 @@ def unarchive(self): if not (self.session.config["move"] or self.session.config["copy"]): log.warning( - "Archive importing requires either " - "'copy' or 'move' to be enabled." + "Archive importing requires either " "'copy' or 'move' to be enabled." ) return @@ -1658,9 +1640,7 @@ def resolve_duplicates(session: ImportSession, task: ImportTask): if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG): found_duplicates = task.find_duplicates(session.lib) if found_duplicates: - log.debug( - "found duplicates: {}".format([o.id for o in found_duplicates]) - ) + log.debug("found duplicates: {}".format([o.id for o in found_duplicates])) # Get the default action to follow from config. duplicate_action = config["import"]["duplicate_action"].as_choice( From 1a24e3f1d0977d69843a3db365f59773dff30940 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 8 Feb 2025 22:20:40 +0100 Subject: [PATCH 14/28] Formatting --- beets/importer.py | 81 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 22f6273df3..0fd83b561e 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -269,7 +269,9 @@ def set_config(self, config): iconfig["incremental"] = False if iconfig["reflink"]: - iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False]) + iconfig["reflink"] = iconfig["reflink"].as_choice( + ["auto", True, False] + ) # Copy, move, reflink, link, and hardlink are mutually exclusive. if iconfig["move"]: @@ -327,16 +329,24 @@ def log_choice(self, task: ImportTask, duplicate=False): self.tag_log("skip", paths) def should_resume(self, path: PathBytes): - raise NotImplementedError("Inheriting class must implement `should_resume`") + raise NotImplementedError( + "Inheriting class must implement `should_resume`" + ) def choose_match(self, task: ImportTask): - raise NotImplementedError("Inheriting class must implement `choose_match`") + raise NotImplementedError( + "Inheriting class must implement `choose_match`" + ) def resolve_duplicate(self, task: ImportTask, found_duplicates): - raise NotImplementedError("Inheriting class must implement `resolve_duplicate`") + raise NotImplementedError( + "Inheriting class must implement `resolve_duplicate`" + ) def choose_item(self, task: ImportTask): - raise NotImplementedError("Inheriting class must implement `choose_item`") + raise NotImplementedError( + "Inheriting class must implement `choose_item`" + ) def run(self): """Run the import task.""" @@ -543,7 +553,9 @@ def __init__(self, toppath, paths, items): self.is_album = True self.search_ids = [] # user-supplied candidate IDs. - def set_choice(self, choice: action | autotag.AlbumMatch | autotag.TrackMatch): + def set_choice( + self, choice: action | autotag.AlbumMatch | autotag.TrackMatch + ): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. @@ -565,7 +577,9 @@ def set_choice(self, choice: action | autotag.AlbumMatch | autotag.TrackMatch): else: self.choice_flag = action.APPLY # Implicit choice. # Union is needed here for python 3.9 compatibility! - self.match = cast(Union[autotag.AlbumMatch, autotag.TrackMatch], choice) + self.match = cast( + Union[autotag.AlbumMatch, autotag.TrackMatch], choice + ) def save_progress(self): """Updates the progress state to indicate that this album has @@ -643,7 +657,9 @@ def remove_duplicates(self, lib: library.Library): for item in duplicate_items: item.remove() if lib.directory in util.ancestry(item.path): - log.debug("deleting duplicate {0}", util.displayable_path(item.path)) + log.debug( + "deleting duplicate {0}", util.displayable_path(item.path) + ) util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) @@ -675,8 +691,7 @@ def finalize(self, session: ImportSession): self.save_progress() if session.config["incremental"] and not ( # Should we skip recording to incremental list? - self.skip - and session.config["incremental_skip_later"] + self.skip and session.config["incremental_skip_later"] ): self.save_history() @@ -733,7 +748,9 @@ def lookup_candidates(self): candidate IDs are stored in self.search_ids: if present, the initial lookup is restricted to only those IDs. """ - artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids) + artist, album, prop = autotag.tag_album( + self.items, search_ids=self.search_ids + ) self.cur_artist = artist self.cur_album = album self.candidates = prop.candidates @@ -753,7 +770,9 @@ def find_duplicates(self, lib: library.Library): # Construct a query to find duplicates with this metadata. We # use a temporary Album object to generate any computed fields. tmp_album = library.Album(lib, **info) - keys = cast(list[str], config["import"]["duplicate_keys"]["album"].as_str_seq()) + keys = cast( + list[str], config["import"]["duplicate_keys"]["album"].as_str_seq() + ) dup_query = tmp_album.duplicates_query(keys) # Don't count albums with the same files as duplicates. @@ -784,7 +803,8 @@ def align_album_level_fields(self): [i.albumartist or i.artist for i in self.items] ) if freq == len(self.items) or ( - freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH + freq > 1 + and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH ): # Single-artist album. changes["albumartist"] = plur_albumartist @@ -886,10 +906,15 @@ def record_replaced(self, lib: library.Library): self.replaced_albums: dict[PathBytes, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): - dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path))) + dup_items = list( + lib.items(dbcore.query.BytesQuery("path", item.path)) + ) self.replaced_items[item] = dup_items for dup_item in dup_items: - if not dup_item.album_id or dup_item.album_id in replaced_album_ids: + if ( + not dup_item.album_id + or dup_item.album_id in replaced_album_ids + ): continue replaced_album = dup_item._cached_album if replaced_album: @@ -942,7 +967,8 @@ def _reduce_and_log(new_obj, existing_fields, overwrite_keys): self.album.artpath = replaced_album.artpath self.album.store() log.debug( - "Reimported album {}. Preserving attribute ['added']. " "Path: {}", + "Reimported album {}. Preserving attribute ['added']. " + "Path: {}", self.album.id, displayable_path(self.album.path), ) @@ -1074,7 +1100,9 @@ def find_duplicates(self, lib): # Query for existing items using the same metadata. We use a # temporary `Item` object to generate any computed fields. tmp_item = library.Item(lib, **info) - keys = cast(list[str], config["import"]["duplicate_keys"]["item"].as_str_seq()) + keys = cast( + list[str], config["import"]["duplicate_keys"]["item"].as_str_seq() + ) dup_query = tmp_item.duplicates_query(keys) found_items = [] @@ -1248,7 +1276,9 @@ def extract(self): break if handler_class is None: - raise ValueError("No handler found for archive: {0}".format(self.toppath)) + raise ValueError( + "No handler found for archive: {0}".format(self.toppath) + ) extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") @@ -1368,7 +1398,9 @@ def paths(self): def singleton(self, path: PathBytes): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): - log.debug("Skipping previously-imported path: {0}", displayable_path(path)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(path) + ) self.skipped += 1 return None @@ -1391,7 +1423,9 @@ def album(self, paths: Iterable[PathBytes], dirs=None): dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(dirs) + ) self.skipped += 1 return None @@ -1421,7 +1455,8 @@ def unarchive(self): if not (self.session.config["move"] or self.session.config["copy"]): log.warning( - "Archive importing requires either " "'copy' or 'move' to be enabled." + "Archive importing requires either " + "'copy' or 'move' to be enabled." ) return @@ -1640,7 +1675,9 @@ def resolve_duplicates(session: ImportSession, task: ImportTask): if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG): found_duplicates = task.find_duplicates(session.lib) if found_duplicates: - log.debug("found duplicates: {}".format([o.id for o in found_duplicates])) + log.debug( + "found duplicates: {}".format([o.id for o in found_duplicates]) + ) # Get the default action to follow from config. duplicate_action = config["import"]["duplicate_action"].as_choice( From bbd92d97ab7cbaa3dfd0735560f7b980c28c6d6e Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sun, 9 Feb 2025 11:05:09 +0100 Subject: [PATCH 15/28] Removed optional type hints for pipe based optional with None --- beets/importer.py | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 0fd83b561e..2c9e89f741 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -115,8 +115,10 @@ class ImportState: taghistory: set[tuple[PathBytes, ...]] path: PathBytes - def __init__(self, readonly=False, path: Optional[PathBytes] = None): - self.path = path or cast(PathBytes, config["statefile"].as_filename()) + def __init__(self, readonly=False, path: PathBytes | None = None): + self.path = path or os.fsencode( + cast(str, config["statefile"].as_filename()) + ) self.tagprogress = {} self.taghistory = set() self._open() @@ -202,7 +204,7 @@ class ImportSession: """ logger: logging.Logger - paths: Optional[list[bytes]] = None + paths: list[bytes] | None = None lib: library.Library _is_resuming: dict[bytes, bool] @@ -212,9 +214,9 @@ class ImportSession: def __init__( self, lib: library.Library, - loghandler: Optional[logging.Handler], - paths: Optional[Sequence[PathBytes]], - query: Optional[dbcore.Query], + loghandler: logging.Handler | None, + paths: Sequence[PathBytes] | None, + query: dbcore.Query | None, ): """Create a session. @@ -241,7 +243,7 @@ def __init__( if paths is not None: self.paths = list(map(normpath, paths)) - def _setup_logging(self, loghandler: Optional[logging.Handler]): + def _setup_logging(self, loghandler: logging.Handler | None): logger = logging.getLogger(__name__) logger.propagate = False if not loghandler: @@ -474,15 +476,15 @@ class BaseImportTask: Tasks flow through the importer pipeline. Each stage can update them.""" - toppath: Optional[PathBytes] + toppath: PathBytes | None paths: list[PathBytes] items: list[library.Item] def __init__( self, - toppath: Optional[PathBytes], - paths: Optional[Iterable[PathBytes]], - items: Optional[Iterable[library.Item]], + toppath: PathBytes | None, + paths: Iterable[PathBytes] | None, + items: Iterable[library.Item] | None, ): """Create a task. The primary fields that define a task are: @@ -537,12 +539,12 @@ class ImportTask(BaseImportTask): system. """ - choice_flag: Optional[action] = None + choice_flag: action | None = None match: autotag.AlbumMatch | autotag.TrackMatch | None = None # Keep track of the current task item - cur_album: Optional[str] = None - cur_artist: Optional[str] = None + cur_album: str | None = None + cur_artist: str | None = None candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = [] def __init__(self, toppath, paths, items): @@ -834,7 +836,7 @@ def manipulate_files( self, operation: Optional[MoveOperation] = None, write=False, - session: Optional[ImportSession] = None, + session: ImportSession | None = None, ): """Copy, move, link, hardlink or reflink (depending on `operation`) the files as well as write metadata. @@ -1054,7 +1056,7 @@ def prune(self, filename): class SingletonImportTask(ImportTask): """ImportTask for a single track that is not associated to an album.""" - def __init__(self, toppath: Optional[PathBytes], item: library.Item): + def __init__(self, toppath: PathBytes | None, item: library.Item): super().__init__(toppath, [item.path], [item]) self.item = item self.is_album = False @@ -1362,7 +1364,7 @@ def tasks(self): else: yield self.sentinel() - def _create(self, task: Optional[ImportTask]): + def _create(self, task: ImportTask | None): """Handle a new task to be emitted by the factory. Emit the `import_task_created` event and increment the @@ -1438,7 +1440,7 @@ def album(self, paths: Iterable[PathBytes], dirs=None): else: return None - def sentinel(self, paths: Optional[Iterable[PathBytes]] = None): + def sentinel(self, paths: Iterable[PathBytes] | None = None): """Return a `SentinelImportTask` indicating the end of a top-level directory import. """ From 716720d2a5d123594a31230e126df82fa00ffca0 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sun, 9 Feb 2025 11:23:01 +0100 Subject: [PATCH 16/28] Removed unnecessary cast even tho it now produces issues locally. --- beets/importer.py | 97 +++++++++++++++-------------------------------- 1 file changed, 30 insertions(+), 67 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 2c9e89f741..a0094f497c 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -29,7 +29,7 @@ from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp -from typing import Callable, Iterable, Optional, Sequence, Union, cast +from typing import Callable, Iterable, Sequence, Union, cast import mediafile @@ -116,9 +116,7 @@ class ImportState: path: PathBytes def __init__(self, readonly=False, path: PathBytes | None = None): - self.path = path or os.fsencode( - cast(str, config["statefile"].as_filename()) - ) + self.path = path or os.fsencode(config["statefile"].as_filename()) self.tagprogress = {} self.taghistory = set() self._open() @@ -271,9 +269,7 @@ def set_config(self, config): iconfig["incremental"] = False if iconfig["reflink"]: - iconfig["reflink"] = iconfig["reflink"].as_choice( - ["auto", True, False] - ) + iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False]) # Copy, move, reflink, link, and hardlink are mutually exclusive. if iconfig["move"]: @@ -331,24 +327,16 @@ def log_choice(self, task: ImportTask, duplicate=False): self.tag_log("skip", paths) def should_resume(self, path: PathBytes): - raise NotImplementedError( - "Inheriting class must implement `should_resume`" - ) + raise NotImplementedError("Inheriting class must implement `should_resume`") def choose_match(self, task: ImportTask): - raise NotImplementedError( - "Inheriting class must implement `choose_match`" - ) + raise NotImplementedError("Inheriting class must implement `choose_match`") def resolve_duplicate(self, task: ImportTask, found_duplicates): - raise NotImplementedError( - "Inheriting class must implement `resolve_duplicate`" - ) + raise NotImplementedError("Inheriting class must implement `resolve_duplicate`") def choose_item(self, task: ImportTask): - raise NotImplementedError( - "Inheriting class must implement `choose_item`" - ) + raise NotImplementedError("Inheriting class must implement `choose_item`") def run(self): """Run the import task.""" @@ -555,9 +543,7 @@ def __init__(self, toppath, paths, items): self.is_album = True self.search_ids = [] # user-supplied candidate IDs. - def set_choice( - self, choice: action | autotag.AlbumMatch | autotag.TrackMatch - ): + def set_choice(self, choice: action | autotag.AlbumMatch | autotag.TrackMatch): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. @@ -574,14 +560,14 @@ def set_choice( action.ALBUMS, action.RETAG, ): + # Cast needed as mypy can't infer the type self.choice_flag = cast(action, choice) self.match = None else: self.choice_flag = action.APPLY # Implicit choice. # Union is needed here for python 3.9 compatibility! - self.match = cast( - Union[autotag.AlbumMatch, autotag.TrackMatch], choice - ) + # Cast needed as mypy can't infer the type + self.match = cast(Union[autotag.AlbumMatch, autotag.TrackMatch], choice) def save_progress(self): """Updates the progress state to indicate that this album has @@ -659,9 +645,7 @@ def remove_duplicates(self, lib: library.Library): for item in duplicate_items: item.remove() if lib.directory in util.ancestry(item.path): - log.debug( - "deleting duplicate {0}", util.displayable_path(item.path) - ) + log.debug("deleting duplicate {0}", util.displayable_path(item.path)) util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) @@ -693,7 +677,8 @@ def finalize(self, session: ImportSession): self.save_progress() if session.config["incremental"] and not ( # Should we skip recording to incremental list? - self.skip and session.config["incremental_skip_later"] + self.skip + and session.config["incremental_skip_later"] ): self.save_history() @@ -750,9 +735,7 @@ def lookup_candidates(self): candidate IDs are stored in self.search_ids: if present, the initial lookup is restricted to only those IDs. """ - artist, album, prop = autotag.tag_album( - self.items, search_ids=self.search_ids - ) + artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids) self.cur_artist = artist self.cur_album = album self.candidates = prop.candidates @@ -772,9 +755,7 @@ def find_duplicates(self, lib: library.Library): # Construct a query to find duplicates with this metadata. We # use a temporary Album object to generate any computed fields. tmp_album = library.Album(lib, **info) - keys = cast( - list[str], config["import"]["duplicate_keys"]["album"].as_str_seq() - ) + keys: list[str] = config["import"]["duplicate_keys"]["album"].as_str_seq() dup_query = tmp_album.duplicates_query(keys) # Don't count albums with the same files as duplicates. @@ -805,8 +786,7 @@ def align_album_level_fields(self): [i.albumartist or i.artist for i in self.items] ) if freq == len(self.items) or ( - freq > 1 - and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH + freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH ): # Single-artist album. changes["albumartist"] = plur_albumartist @@ -834,7 +814,7 @@ def align_album_level_fields(self): def manipulate_files( self, - operation: Optional[MoveOperation] = None, + operation: MoveOperation | None = None, write=False, session: ImportSession | None = None, ): @@ -908,15 +888,10 @@ def record_replaced(self, lib: library.Library): self.replaced_albums: dict[PathBytes, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): - dup_items = list( - lib.items(dbcore.query.BytesQuery("path", item.path)) - ) + dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path))) self.replaced_items[item] = dup_items for dup_item in dup_items: - if ( - not dup_item.album_id - or dup_item.album_id in replaced_album_ids - ): + if not dup_item.album_id or dup_item.album_id in replaced_album_ids: continue replaced_album = dup_item._cached_album if replaced_album: @@ -969,8 +944,7 @@ def _reduce_and_log(new_obj, existing_fields, overwrite_keys): self.album.artpath = replaced_album.artpath self.album.store() log.debug( - "Reimported album {}. Preserving attribute ['added']. " - "Path: {}", + "Reimported album {}. Preserving attribute ['added']. " "Path: {}", self.album.id, displayable_path(self.album.path), ) @@ -1049,7 +1023,7 @@ def prune(self, filename): util.prune_dirs( os.path.dirname(filename), self.toppath, - clutter=cast(list[str], config["clutter"].as_str_seq()), + clutter=config["clutter"].as_str_seq(), ) @@ -1102,9 +1076,7 @@ def find_duplicates(self, lib): # Query for existing items using the same metadata. We use a # temporary `Item` object to generate any computed fields. tmp_item = library.Item(lib, **info) - keys = cast( - list[str], config["import"]["duplicate_keys"]["item"].as_str_seq() - ) + keys: list[str] = config["import"]["duplicate_keys"]["item"].as_str_seq() dup_query = tmp_item.duplicates_query(keys) found_items = [] @@ -1278,9 +1250,7 @@ def extract(self): break if handler_class is None: - raise ValueError( - "No handler found for archive: {0}".format(self.toppath) - ) + raise ValueError("No handler found for archive: {0}".format(self.toppath)) extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") @@ -1400,9 +1370,7 @@ def paths(self): def singleton(self, path: PathBytes): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): - log.debug( - "Skipping previously-imported path: {0}", displayable_path(path) - ) + log.debug("Skipping previously-imported path: {0}", displayable_path(path)) self.skipped += 1 return None @@ -1425,9 +1393,7 @@ def album(self, paths: Iterable[PathBytes], dirs=None): dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug( - "Skipping previously-imported path: {0}", displayable_path(dirs) - ) + log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) self.skipped += 1 return None @@ -1457,8 +1423,7 @@ def unarchive(self): if not (self.session.config["move"] or self.session.config["copy"]): log.warning( - "Archive importing requires either " - "'copy' or 'move' to be enabled." + "Archive importing requires either " "'copy' or 'move' to be enabled." ) return @@ -1677,9 +1642,7 @@ def resolve_duplicates(session: ImportSession, task: ImportTask): if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG): found_duplicates = task.find_duplicates(session.lib) if found_duplicates: - log.debug( - "found duplicates: {}".format([o.id for o in found_duplicates]) - ) + log.debug("found duplicates: {}".format([o.id for o in found_duplicates])) # Get the default action to follow from config. duplicate_action = config["import"]["duplicate_action"].as_choice( @@ -1863,8 +1826,8 @@ def albums_in_dir(path: PathBytes): containing any media files is an album. """ collapse_pat = collapse_paths = collapse_items = None - ignore = cast(list[str], config["ignore"].as_str_seq()) - ignore_hidden = cast(bool, config["ignore_hidden"].get(bool)) + ignore: list[str] = config["ignore"].as_str_seq() + ignore_hidden: bool = config["ignore_hidden"].get(bool) for root, dirs, files in sorted_walk( path, ignore=ignore, ignore_hidden=ignore_hidden, logger=log From fdf7afbfe3c0b2258cd1e32e5a9aa4dca0d6a46d Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sun, 9 Feb 2025 11:23:22 +0100 Subject: [PATCH 17/28] Formatting --- beets/importer.py | 81 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index a0094f497c..6992adbbbd 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -269,7 +269,9 @@ def set_config(self, config): iconfig["incremental"] = False if iconfig["reflink"]: - iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False]) + iconfig["reflink"] = iconfig["reflink"].as_choice( + ["auto", True, False] + ) # Copy, move, reflink, link, and hardlink are mutually exclusive. if iconfig["move"]: @@ -327,16 +329,24 @@ def log_choice(self, task: ImportTask, duplicate=False): self.tag_log("skip", paths) def should_resume(self, path: PathBytes): - raise NotImplementedError("Inheriting class must implement `should_resume`") + raise NotImplementedError( + "Inheriting class must implement `should_resume`" + ) def choose_match(self, task: ImportTask): - raise NotImplementedError("Inheriting class must implement `choose_match`") + raise NotImplementedError( + "Inheriting class must implement `choose_match`" + ) def resolve_duplicate(self, task: ImportTask, found_duplicates): - raise NotImplementedError("Inheriting class must implement `resolve_duplicate`") + raise NotImplementedError( + "Inheriting class must implement `resolve_duplicate`" + ) def choose_item(self, task: ImportTask): - raise NotImplementedError("Inheriting class must implement `choose_item`") + raise NotImplementedError( + "Inheriting class must implement `choose_item`" + ) def run(self): """Run the import task.""" @@ -543,7 +553,9 @@ def __init__(self, toppath, paths, items): self.is_album = True self.search_ids = [] # user-supplied candidate IDs. - def set_choice(self, choice: action | autotag.AlbumMatch | autotag.TrackMatch): + def set_choice( + self, choice: action | autotag.AlbumMatch | autotag.TrackMatch + ): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. @@ -567,7 +579,9 @@ def set_choice(self, choice: action | autotag.AlbumMatch | autotag.TrackMatch): self.choice_flag = action.APPLY # Implicit choice. # Union is needed here for python 3.9 compatibility! # Cast needed as mypy can't infer the type - self.match = cast(Union[autotag.AlbumMatch, autotag.TrackMatch], choice) + self.match = cast( + Union[autotag.AlbumMatch, autotag.TrackMatch], choice + ) def save_progress(self): """Updates the progress state to indicate that this album has @@ -645,7 +659,9 @@ def remove_duplicates(self, lib: library.Library): for item in duplicate_items: item.remove() if lib.directory in util.ancestry(item.path): - log.debug("deleting duplicate {0}", util.displayable_path(item.path)) + log.debug( + "deleting duplicate {0}", util.displayable_path(item.path) + ) util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) @@ -677,8 +693,7 @@ def finalize(self, session: ImportSession): self.save_progress() if session.config["incremental"] and not ( # Should we skip recording to incremental list? - self.skip - and session.config["incremental_skip_later"] + self.skip and session.config["incremental_skip_later"] ): self.save_history() @@ -735,7 +750,9 @@ def lookup_candidates(self): candidate IDs are stored in self.search_ids: if present, the initial lookup is restricted to only those IDs. """ - artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids) + artist, album, prop = autotag.tag_album( + self.items, search_ids=self.search_ids + ) self.cur_artist = artist self.cur_album = album self.candidates = prop.candidates @@ -755,7 +772,9 @@ def find_duplicates(self, lib: library.Library): # Construct a query to find duplicates with this metadata. We # use a temporary Album object to generate any computed fields. tmp_album = library.Album(lib, **info) - keys: list[str] = config["import"]["duplicate_keys"]["album"].as_str_seq() + keys: list[str] = config["import"]["duplicate_keys"][ + "album" + ].as_str_seq() dup_query = tmp_album.duplicates_query(keys) # Don't count albums with the same files as duplicates. @@ -786,7 +805,8 @@ def align_album_level_fields(self): [i.albumartist or i.artist for i in self.items] ) if freq == len(self.items) or ( - freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH + freq > 1 + and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH ): # Single-artist album. changes["albumartist"] = plur_albumartist @@ -888,10 +908,15 @@ def record_replaced(self, lib: library.Library): self.replaced_albums: dict[PathBytes, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): - dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path))) + dup_items = list( + lib.items(dbcore.query.BytesQuery("path", item.path)) + ) self.replaced_items[item] = dup_items for dup_item in dup_items: - if not dup_item.album_id or dup_item.album_id in replaced_album_ids: + if ( + not dup_item.album_id + or dup_item.album_id in replaced_album_ids + ): continue replaced_album = dup_item._cached_album if replaced_album: @@ -944,7 +969,8 @@ def _reduce_and_log(new_obj, existing_fields, overwrite_keys): self.album.artpath = replaced_album.artpath self.album.store() log.debug( - "Reimported album {}. Preserving attribute ['added']. " "Path: {}", + "Reimported album {}. Preserving attribute ['added']. " + "Path: {}", self.album.id, displayable_path(self.album.path), ) @@ -1076,7 +1102,9 @@ def find_duplicates(self, lib): # Query for existing items using the same metadata. We use a # temporary `Item` object to generate any computed fields. tmp_item = library.Item(lib, **info) - keys: list[str] = config["import"]["duplicate_keys"]["item"].as_str_seq() + keys: list[str] = config["import"]["duplicate_keys"][ + "item" + ].as_str_seq() dup_query = tmp_item.duplicates_query(keys) found_items = [] @@ -1250,7 +1278,9 @@ def extract(self): break if handler_class is None: - raise ValueError("No handler found for archive: {0}".format(self.toppath)) + raise ValueError( + "No handler found for archive: {0}".format(self.toppath) + ) extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") @@ -1370,7 +1400,9 @@ def paths(self): def singleton(self, path: PathBytes): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): - log.debug("Skipping previously-imported path: {0}", displayable_path(path)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(path) + ) self.skipped += 1 return None @@ -1393,7 +1425,9 @@ def album(self, paths: Iterable[PathBytes], dirs=None): dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(dirs) + ) self.skipped += 1 return None @@ -1423,7 +1457,8 @@ def unarchive(self): if not (self.session.config["move"] or self.session.config["copy"]): log.warning( - "Archive importing requires either " "'copy' or 'move' to be enabled." + "Archive importing requires either " + "'copy' or 'move' to be enabled." ) return @@ -1642,7 +1677,9 @@ def resolve_duplicates(session: ImportSession, task: ImportTask): if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG): found_duplicates = task.find_duplicates(session.lib) if found_duplicates: - log.debug("found duplicates: {}".format([o.id for o in found_duplicates])) + log.debug( + "found duplicates: {}".format([o.id for o in found_duplicates]) + ) # Get the default action to follow from config. duplicate_action = config["import"]["duplicate_action"].as_choice( From c17a774dd6aa484f877d256a1edecd9ce319c442 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sun, 9 Feb 2025 20:23:19 +0100 Subject: [PATCH 18/28] Removed Optional and Union and resolved a minor mypy shadowing issue. --- beets/util/pipeline.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index d797dea132..3a31175b69 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -30,11 +30,12 @@ To do so, pass an iterable of coroutines to the Pipeline constructor in place of any single coroutine. """ +from __future__ import annotations import queue import sys from threading import Lock, Thread -from typing import Callable, Generator, Optional, Union +from typing import Callable, Generator from typing_extensions import TypeVar, TypeVarTuple, Unpack @@ -152,15 +153,20 @@ def multiple(messages): return MultiMessage(messages) -A = TypeVarTuple("A") -T = TypeVar("T") +# Arguments of the function (omitting the task) +Args = TypeVarTuple("Args") +# Task as an additional argument to the function +Task = TypeVar("Task") +# Return type of the function (should normally be task but sadly +# we cant enforce this with the current stage functions without +# a refactor) R = TypeVar("R") def stage( func: Callable[ - [Unpack[A], T], - Optional[R], + [Unpack[Args], Task], + R | None, ], ): """Decorate a function to become a simple stage. @@ -176,8 +182,8 @@ def stage( [3, 4, 5] """ - def coro(*args: Unpack[A]) -> Generator[Union[R, T, None], T, R]: - task = None + def coro(*args: Unpack[Args]) -> Generator[R | Task | None, Task, None]: + task: R | Task | None = None while True: task = yield task task = func(*(args + (task,))) @@ -185,7 +191,7 @@ def coro(*args: Unpack[A]) -> Generator[Union[R, T, None], T, R]: return coro -def mutator_stage(func: Callable[[Unpack[A], T], R]): +def mutator_stage(func: Callable[[Unpack[Args], Task], R]): """Decorate a function that manipulates items in a coroutine to become a simple stage. @@ -200,7 +206,7 @@ def mutator_stage(func: Callable[[Unpack[A], T], R]): [{'x': True}, {'a': False, 'x': True}] """ - def coro(*args: Unpack[A]) -> Generator[Union[T, None], T, None]: + def coro(*args: Unpack[Args]) -> Generator[Task | None, Task, None]: task = None while True: task = yield task @@ -419,9 +425,7 @@ def run_parallel(self, queue_size=DEFAULT_QUEUE_SIZE): for i in range(1, queue_count): for coro in self.stages[i]: threads.append( - MiddlePipelineThread( - coro, queues[i - 1], queues[i], threads - ) + MiddlePipelineThread(coro, queues[i - 1], queues[i], threads) ) # Last stage. From ebe88885c58cdc80be3283f537359ba8702b8cf1 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Mon, 10 Feb 2025 11:46:50 +0100 Subject: [PATCH 19/28] Renamed to ignore_bytes and reverted typevar rename --- beets/util/__init__.py | 6 +++--- beets/util/pipeline.py | 23 +++++++++++++---------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/beets/util/__init__.py b/beets/util/__init__.py index a0053c260f..b882ed626d 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -210,7 +210,7 @@ def sorted_walk( """ # Make sure the paths aren't Unicode strings. bytes_path = bytestring_path(path) - _ignore = [ # rename prevents mypy variable shadowing issue + ignore_bytes = [ # rename prevents mypy variable shadowing issue bytestring_path(i) for i in ignore ] @@ -232,7 +232,7 @@ def sorted_walk( # Skip ignored filenames. skip = False - for pat in _ignore: + for pat in ignore_bytes: if fnmatch.fnmatch(base, pat): if logger: logger.debug( @@ -259,7 +259,7 @@ def sorted_walk( # Recurse into directories. for base in dirs: cur = os.path.join(bytes_path, base) - yield from sorted_walk(cur, _ignore, ignore_hidden, logger) + yield from sorted_walk(cur, ignore_bytes, ignore_hidden, logger) def path_as_posix(path: bytes) -> bytes: diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index 3a31175b69..8f7be81949 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -30,6 +30,7 @@ To do so, pass an iterable of coroutines to the Pipeline constructor in place of any single coroutine. """ + from __future__ import annotations import queue @@ -153,10 +154,10 @@ def multiple(messages): return MultiMessage(messages) -# Arguments of the function (omitting the task) -Args = TypeVarTuple("Args") -# Task as an additional argument to the function -Task = TypeVar("Task") +A = TypeVarTuple("A") # Arguments of a function (omitting the task) +T = TypeVar("T") # Type of the task +# Normally these are concatenated i.e. (*args, task) + # Return type of the function (should normally be task but sadly # we cant enforce this with the current stage functions without # a refactor) @@ -165,7 +166,7 @@ def multiple(messages): def stage( func: Callable[ - [Unpack[Args], Task], + [Unpack[A], T], R | None, ], ): @@ -182,8 +183,8 @@ def stage( [3, 4, 5] """ - def coro(*args: Unpack[Args]) -> Generator[R | Task | None, Task, None]: - task: R | Task | None = None + def coro(*args: Unpack[A]) -> Generator[R | T | None, T, None]: + task: R | T | None = None while True: task = yield task task = func(*(args + (task,))) @@ -191,7 +192,7 @@ def coro(*args: Unpack[Args]) -> Generator[R | Task | None, Task, None]: return coro -def mutator_stage(func: Callable[[Unpack[Args], Task], R]): +def mutator_stage(func: Callable[[Unpack[A], T], R]): """Decorate a function that manipulates items in a coroutine to become a simple stage. @@ -206,7 +207,7 @@ def mutator_stage(func: Callable[[Unpack[Args], Task], R]): [{'x': True}, {'a': False, 'x': True}] """ - def coro(*args: Unpack[Args]) -> Generator[Task | None, Task, None]: + def coro(*args: Unpack[A]) -> Generator[T | None, T, None]: task = None while True: task = yield task @@ -425,7 +426,9 @@ def run_parallel(self, queue_size=DEFAULT_QUEUE_SIZE): for i in range(1, queue_count): for coro in self.stages[i]: threads.append( - MiddlePipelineThread(coro, queues[i - 1], queues[i], threads) + MiddlePipelineThread( + coro, queues[i - 1], queues[i], threads + ) ) # Last stage. From 803a0d1fcb456e97ddd4a9b6521ae2c348377e24 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr <39738318+semohr@users.noreply.github.com> Date: Sat, 15 Feb 2025 13:17:00 +0100 Subject: [PATCH 20/28] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Šarūnas Nejus --- beets/importer.py | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 6992adbbbd..ebffe14d85 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -202,7 +202,7 @@ class ImportSession: """ logger: logging.Logger - paths: list[bytes] | None = None + paths: list[PathBytes] lib: library.Library _is_resuming: dict[bytes, bool] @@ -226,7 +226,7 @@ def __init__( A logging handler to use for the session's logger. If None, a NullHandler will be used. paths : os.PathLike or None - The paths to be imported. If None, no paths are specified. + The paths to be imported. query : dbcore.Query or None A query to filter items for import. If None, no query is applied. """ @@ -238,8 +238,7 @@ def __init__( self._merged_dirs = set() # Normalize the paths. - if paths is not None: - self.paths = list(map(normpath, paths)) + self.paths = list(map(normpath, paths or [])) def _setup_logging(self, loghandler: logging.Handler | None): logger = logging.getLogger(__name__) @@ -329,9 +328,7 @@ def log_choice(self, task: ImportTask, duplicate=False): self.tag_log("skip", paths) def should_resume(self, path: PathBytes): - raise NotImplementedError( - "Inheriting class must implement `should_resume`" - ) + raise NotImplementedError def choose_match(self, task: ImportTask): raise NotImplementedError( @@ -637,10 +634,6 @@ def imported_items(self): def apply_metadata(self): """Copy metadata from match info to the items.""" - assert isinstance( - self.match, autotag.AlbumMatch - ), "apply_metadata() only works for albums" - if config["import"]["from_scratch"]: for item in self.match.mapping: item.clear() @@ -1073,7 +1066,6 @@ def chosen_info(self): return dict(self.item) elif self.choice_flag is action.APPLY and self.match: return self.match.info.copy() - assert False def imported_items(self): return [self.item] @@ -1179,7 +1171,7 @@ def save_progress(self): ImportState().progress_reset(self.toppath) elif self.toppath: # "Directory progress" sentinel for singletons - ImportState().progress_add(self.toppath, *self.paths) + super().save_progress() @property def skip(self) -> bool: @@ -1272,16 +1264,11 @@ def extract(self): """ assert self.toppath is not None, "toppath must be set" - handler_class = None - for path_test, handler_class in self.handlers(): + for path_test, handler_class in self.handlers(): if path_test(os.fsdecode(self.toppath)): break - - if handler_class is None: - raise ValueError( - "No handler found for archive: {0}".format(self.toppath) - ) - + else: + raise ValueError(f"No handler found for archive: {self.toppath}") extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") try: From d29ef500b95b2fc383c9833b36a380e6ad019bca Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 15 Feb 2025 13:13:55 +0100 Subject: [PATCH 21/28] Session path can't be None anymore. Empty list if none --- beets/importer.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index ebffe14d85..46fe8977a5 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -1405,16 +1405,12 @@ def album(self, paths: Iterable[PathBytes], dirs=None): `dirs` is a list of parent directories used to record already imported albums. """ - if not paths: - return None if dirs is None: dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug( - "Skipping previously-imported path: {0}", displayable_path(dirs) - ) + log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) self.skipped += 1 return None @@ -1422,7 +1418,7 @@ def album(self, paths: Iterable[PathBytes], dirs=None): item for item in map(self.read_item, paths) if item ] - if items: + if len(items) > 0: return ImportTask(self.toppath, dirs, items) else: return None @@ -1511,9 +1507,6 @@ def read_tasks(session: ImportSession): import, yields single-item tasks instead. """ skipped = 0 - if session.paths is None: - log.warning("No path specified in session.") - return for toppath in session.paths: # Check whether we need to resume the import. From 0ee17c0b06eb2aa46ff9be576dacd575634e16e1 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 15 Feb 2025 13:16:26 +0100 Subject: [PATCH 22/28] Removed unnecessary comments & raise same not implemented error for all not implemented methods --- beets/importer.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 46fe8977a5..63939698ab 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -228,7 +228,7 @@ def __init__( paths : os.PathLike or None The paths to be imported. query : dbcore.Query or None - A query to filter items for import. If None, no query is applied. + A query to filter items for import. """ self.lib = lib self.logger = self._setup_logging(loghandler) @@ -331,19 +331,13 @@ def should_resume(self, path: PathBytes): raise NotImplementedError def choose_match(self, task: ImportTask): - raise NotImplementedError( - "Inheriting class must implement `choose_match`" - ) + raise NotImplementedError def resolve_duplicate(self, task: ImportTask, found_duplicates): - raise NotImplementedError( - "Inheriting class must implement `resolve_duplicate`" - ) - + raise NotImplementedError + def choose_item(self, task: ImportTask): - raise NotImplementedError( - "Inheriting class must implement `choose_item`" - ) + raise NotImplementedError def run(self): """Run the import task.""" From 9acb2b4ca3f1747bcafe6e0a793aec5e4fde4553 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 15 Feb 2025 13:20:36 +0100 Subject: [PATCH 23/28] Added typehints to ImportTask init and reverted some initial changes. --- beets/importer.py | 51 ++++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 63939698ab..a3d98466cf 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -29,7 +29,7 @@ from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp -from typing import Callable, Iterable, Sequence, Union, cast +from typing import Callable, Iterable, Sequence import mediafile @@ -335,7 +335,7 @@ def choose_match(self, task: ImportTask): def resolve_duplicate(self, task: ImportTask, found_duplicates): raise NotImplementedError - + def choose_item(self, task: ImportTask): raise NotImplementedError @@ -407,7 +407,8 @@ def already_imported(self, toppath: PathBytes, paths: Sequence[PathBytes]): _history_dirs = None @property - def history_dirs(self): + def history_dirs(self) -> set[tuple[PathBytes, ...]]: + # FIXME: This could be simplified to a cached property if self._history_dirs is None: self._history_dirs = ImportState().taghistory return self._history_dirs @@ -536,7 +537,12 @@ class ImportTask(BaseImportTask): cur_artist: str | None = None candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = [] - def __init__(self, toppath, paths, items): + def __init__( + self, + toppath: PathBytes | None, + paths: Iterable[PathBytes] | None, + items: Iterable[library.Item] | None, + ): super().__init__(toppath, paths, items) self.rec = None self.should_remove_duplicates = False @@ -563,16 +569,12 @@ def set_choice( action.ALBUMS, action.RETAG, ): - # Cast needed as mypy can't infer the type - self.choice_flag = cast(action, choice) + # TODO: redesign to stricten the type + self.choice_flag = choice # type: ignore[assignment] self.match = None else: self.choice_flag = action.APPLY # Implicit choice. - # Union is needed here for python 3.9 compatibility! - # Cast needed as mypy can't infer the type - self.match = cast( - Union[autotag.AlbumMatch, autotag.TrackMatch], choice - ) + self.match = choice # type: ignore[assignment] def save_progress(self): """Updates the progress state to indicate that this album has @@ -583,8 +585,7 @@ def save_progress(self): def save_history(self): """Save the directory in the history for incremental imports.""" - if self.paths: - ImportState().history_add(self.paths) + ImportState().history_add(self.paths) # Logical decisions. @@ -821,9 +822,9 @@ def align_album_level_fields(self): def manipulate_files( self, + session: ImportSession, operation: MoveOperation | None = None, write=False, - session: ImportSession | None = None, ): """Copy, move, link, hardlink or reflink (depending on `operation`) the files as well as write metadata. @@ -831,8 +832,8 @@ def manipulate_files( `operation` should be an instance of `util.MoveOperation`. If `write` is `True` metadata is written to the files. + # TODO: Introduce a MoveOperation.NONE or SKIP """ - assert session is not None, "session must be provided" items = self.imported_items() # Save the original paths of all items for deletion and pruning @@ -1058,7 +1059,7 @@ def chosen_info(self): assert self.choice_flag in (action.ASIS, action.RETAG, action.APPLY) if self.choice_flag in (action.ASIS, action.RETAG): return dict(self.item) - elif self.choice_flag is action.APPLY and self.match: + elif self.choice_flag is action.APPLY: return self.match.info.copy() def imported_items(self): @@ -1258,10 +1259,10 @@ def extract(self): """ assert self.toppath is not None, "toppath must be set" - for path_test, handler_class in self.handlers(): + for path_test, handler_class in self.handlers(): if path_test(os.fsdecode(self.toppath)): break - else: + else: raise ValueError(f"No handler found for archive: {self.toppath}") extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") @@ -1404,7 +1405,9 @@ def album(self, paths: Iterable[PathBytes], dirs=None): dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(dirs) + ) self.skipped += 1 return None @@ -1527,9 +1530,9 @@ def query_tasks(session: ImportSession): if session.config["singletons"]: # Search for items. for item in session.lib.items(session.query): - singleton_task = SingletonImportTask(None, item) - for task in singleton_task.handle_created(session): - yield singleton_task + task = SingletonImportTask(None, item) + for task in task.handle_created(session): + yield task else: # Search for albums. @@ -1604,9 +1607,7 @@ def emitter(task): yield SentinelImportTask(task.toppath, task.paths) return _extend_pipeline( - emitter(task), - lookup_candidates(session), - user_query(session), + emitter(task), lookup_candidates(session), user_query(session) ) # As albums: group items by albums and create task for each album From 0447df65108a2ba7245454033698c971ed9a7d66 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 15 Feb 2025 13:27:45 +0100 Subject: [PATCH 24/28] Session args to kwargs in manipulate files --- beets/importer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index a3d98466cf..79935e2152 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -1772,9 +1772,9 @@ def manipulate_files(session: ImportSession, task: ImportTask): operation = None task.manipulate_files( - operation, - write=session.config["write"], session=session, + operation=operation, + write=session.config["write"], ) # Progress, cleanup, and event. From bdabafeefdee6c488694d780100d3e8756573b24 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Mon, 17 Feb 2025 15:48:11 +0100 Subject: [PATCH 25/28] See https://github.com/beetbox/beets/pull/5611#discussion_r1957443316 --- beets/importer.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 79935e2152..758e1d30bd 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -1341,10 +1341,7 @@ def tasks(self): # it is finished. This is usually just a SentinelImportTask, but # for archive imports, send the archive task instead (to remove # the extracted directory). - if archive_task: - yield archive_task - else: - yield self.sentinel() + yield archive_task or self.sentinel() def _create(self, task: ImportTask | None): """Handle a new task to be emitted by the factory. From 87b022e48cd530e26287a6b8d0a810ca5ae5ac28 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Mon, 17 Feb 2025 16:09:18 +0100 Subject: [PATCH 26/28] See https://github.com/beetbox/beets/pull/5611#discussion_r1957443692 --- beets/importer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 758e1d30bd..4d796982f1 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -182,7 +182,7 @@ def progress_has(self, toppath: PathBytes) -> bool: """ return toppath in self.tagprogress - def progress_reset(self, toppath: PathBytes): + def progress_reset(self, toppath: PathBytes | None): """Reset the progress for `toppath`.""" with self as state: if toppath in state.tagprogress: @@ -1161,7 +1161,7 @@ def save_history(self): pass def save_progress(self): - if self.paths is None and self.toppath: + if not self.paths: # "Done" sentinel. ImportState().progress_reset(self.toppath) elif self.toppath: From cf1b4d8913d256cf2e43e17730109516fd10b7b6 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Mon, 17 Feb 2025 16:25:41 +0100 Subject: [PATCH 27/28] See https://github.com/beetbox/beets/pull/5611#discussion_r1958356845. Revert normpath additions --- beets/importer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 4d796982f1..3553e1ea05 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -436,7 +436,7 @@ def is_resuming(self, toppath: PathBytes): You have to call `ask_resume` first to determine the return value. """ - return self._is_resuming.get(normpath(toppath), False) + return self._is_resuming.get(toppath, False) def ask_resume(self, toppath: PathBytes): """If import of `toppath` was aborted in an earlier session, ask @@ -449,9 +449,9 @@ def ask_resume(self, toppath: PathBytes): if self.want_resume is True or self.should_resume(toppath): log.warning( "Resuming interrupted import of {0}", - util.displayable_path(normpath(toppath)), + util.displayable_path(toppath), ) - self._is_resuming[normpath(toppath)] = True + self._is_resuming[toppath] = True else: # Clear progress; we're starting from the top. ImportState().progress_reset(toppath) From 9683a5b9562ab18cfe73e3ec9e834f23ed578574 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Mon, 17 Feb 2025 16:30:42 +0100 Subject: [PATCH 28/28] See https://github.com/beetbox/beets/pull/5611#discussion_r1957433785 --- beets/importer.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 3553e1ea05..2bdb166697 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -1066,9 +1066,6 @@ def imported_items(self): return [self.item] def apply_metadata(self): - assert isinstance( - self.match, autotag.TrackMatch - ), "apply_metadata() only works for tracks" autotag.apply_item_metadata(self.item, self.match.info) def _emit_imported(self, lib):