Skip to content

Commit

Permalink
0 Failing tests. Fixed tests with them all store related bugs I know,…
Browse files Browse the repository at this point in the history
… renamed some jobs.
  • Loading branch information
theypsilon committed Feb 15, 2025
1 parent eeb5502 commit b1244b4
Show file tree
Hide file tree
Showing 29 changed files with 585 additions and 592 deletions.
Binary file modified docs/jobs_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/jobs_diagram_with_zip_feature.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 13 additions & 10 deletions docs/jobs_flow_diagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
dot.node('END', 'END', shape='ellipse', fillcolor='#77DD77')
dot.node('A', 'FetchFileJob [db]')
dot.node('C', 'OpenDbJob', fillcolor='#ffefd5')
dot.node('D', 'ProcessDbJob', fillcolor='#B0E0E6')
dot.node('E', 'ProcessIndexJob', fillcolor='#B0E0E6')
dot.node('D', 'ProcessDbMainJob', fillcolor='#B0E0E6')
dot.node('E', 'ProcessDbIndexJob', fillcolor='#B0E0E6')
dot.node('M', 'FetchFileJob [file]')
dot.node('N', 'ValidateFileJob [file]', fillcolor='#ffefd5')

Expand Down Expand Up @@ -49,15 +49,15 @@
c.attr(style='filled', color='#f5f5ff', penwidth='1.0')
c.node('3', 'Zip Feature', style='filled', fillcolor='#8f8fff', fontcolor='white', penwidth='0')

c.node('O', 'ProcessDbZipsWaiterJob', fillcolor='#B0E0E6')
c.node('F', 'ProcessZipJob', fillcolor='#B0E0E6')
c.node('G', 'FetchFileJob [zip index]')
c.node('H', 'ValidateFileJob [zip index]', fillcolor='#ffefd5')
c.node('I', 'OpenZipIndexJob', fillcolor='#ffefd5')
c.node('O', 'WaitDbZipsJob', fillcolor='#B0E0E6')
c.node('F', 'ProcessZipIndexJob', fillcolor='#B0E0E6')
c.node('G', 'FetchFileJob [zip summary]')
c.node('H', 'ValidateFileJob [zip summary]', fillcolor='#ffefd5')
c.node('I', 'OpenZipSummaryJob', fillcolor='#ffefd5')
c.node('J', 'FetchFileJob [zip contents]')
c.node('K', 'ValidateFileJob [zip contents]', fillcolor='#ffefd5')
c.node('L', 'OpenZipContentsJob', fillcolor='#ffefd5')

c.node('Q', '', shape='circle', fixedsize='true', width='0.01', fillcolor='#ff8f8f')
#
# EDGES
#
Expand All @@ -71,7 +71,10 @@
dot.edge('H', 'I')
dot.edge('J', 'K')
dot.edge('K', 'L')
dot.edge('L', 'E', label='inv[]', minlen='4', weight='2')
dot.edge('L', 'M', label='1:N')
dot.edge('F', 'Q', label='1:N', dir='none')
dot.edge('Q', 'M')
#dot.edge('L', 'M', label='1:N')

# “Wait” edges
dot.edge('F', 'O', style='dotted', constraint='true', label='wait')
Expand All @@ -92,7 +95,7 @@
dot.edge('G', 'F', label='backup if stored', style='dashed', constraint='false')
#dot.edge('H', 'I', label='b', style='dashed', constraint='false', dir='none')
#dot.edge('I', 'F', label='b', style='dashed', constraint='false')
dot.edge('J', 'E', label='b', style='dashed', constraint='false')
#dot.edge('J', 'E', label='b', style='dashed', constraint='false')
#dot.edge('K', 'L', label='b', style='dashed', constraint='false', dir='none')
#dot.edge('L', 'E', label='b', style='dashed', constraint='false')

Expand Down
4 changes: 2 additions & 2 deletions docs/refactor-notes.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Refactor Notes

- The actual removal of files could happen right after index processing if the index processing ends up with a barrier that waits for files to be downloaded.
- When we store the zip_index in the store, should it contain pext paths or not?
- This zip index, is gonna be the source of next runs if there_is_a_recent_store_index is False
- When we store the zip_summary in the store, should it contain pext paths or not?
- This zip summary, is gonna be the source of next runs if there_is_a_recent_store_index is False
- By now, this information can be deduced from zip_description path and target_folder_path but this does not work if there are mixed "pext" and non-pext paths.
- Need to do a store change after the refactor: Cheats has currently zip_id cheats_folder_snes, it should not have zip_id at all, same with Games, which has zip_id nes_palettes and should not have zip_id either. Furthermore, Cheats or Games should be expected to be identical within the zip summary folders and within the db folders. So we might want to be checking db.folders when processing the zips. That should allow a logic simplification when processing pext parents & pext subfolders, since we now need to infere store information based on its children and that should no longer be necessary.
15 changes: 0 additions & 15 deletions src/downloader/jobs/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,3 @@ class Index:
files: Dict[str, Any]
folders: Dict[str, Any]
base_files_url: Optional[str] = None

def merge_zip_index(self, zip_index: 'Index'):
self.files.update(zip_index.files)
self.folders.update(zip_index.folders)

def subtract(self, other: 'Index'):
for key in other.files:
if key in self.files:
del self.files[key]

for key in other.folders:
if key in self.folders:
del self.folders[key]


24 changes: 12 additions & 12 deletions src/downloader/jobs/jobs_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
from downloader.jobs.get_file_job import GetFileJob
from downloader.jobs.index import Index
from downloader.jobs.open_zip_contents_job import ZipKind
from downloader.jobs.open_zip_index_job import OpenZipIndexJob
from downloader.jobs.process_db_job import ProcessDbJob
from downloader.jobs.process_zip_job import ProcessZipJob
from downloader.jobs.open_zip_summary_job import OpenZipSummaryJob
from downloader.jobs.process_db_main_job import ProcessDbMainJob
from downloader.jobs.process_zip_index_job import ProcessZipIndexJob
from downloader.jobs.validate_file_job import ValidateFileJob
from downloader.local_store_wrapper import StoreWrapper, new_store_fragment_drive_paths
from downloader.logger import Logger
Expand All @@ -43,7 +43,7 @@ class ZipJobContext:
zip_id: str
zip_description: Dict[str, Any]
config: Config
job: ProcessDbJob
job: ProcessDbMainJob


def make_get_file_job(source: str, target: str, info: str, silent: bool, logger: Optional[Logger] = None) -> GetFileJob:
Expand All @@ -68,9 +68,9 @@ def make_get_zip_file_jobs(db: DbEntity, zip_id: str, description: Dict[str, Any
return get_file_job, validate_job


def make_open_zip_index_job(z: ZipJobContext, file_description: Dict[str, Any], process_zip_backup: Optional[ProcessZipJob]) -> Job:
def make_open_zip_summary_job(z: ZipJobContext, file_description: Dict[str, Any], process_zip_backup: Optional[ProcessZipIndexJob]) -> Job:
get_file_job, validate_job = make_get_zip_file_jobs(db=z.job.db, zip_id=z.zip_id, description=file_description, tag=make_zip_tag(z.job.db, z.zip_id))
open_zip_index_job = OpenZipIndexJob(
open_zip_summary_job = OpenZipSummaryJob(
zip_id=z.zip_id,
zip_description=z.zip_description,
db=z.job.db,
Expand All @@ -82,28 +82,28 @@ def make_open_zip_index_job(z: ZipJobContext, file_description: Dict[str, Any],
get_file_job=get_file_job,
process_zip_backup=process_zip_backup
)
open_zip_index_job.add_tag(make_zip_tag(z.job.db, z.zip_id))
validate_job.after_job = open_zip_index_job
open_zip_summary_job.add_tag(make_zip_tag(z.job.db, z.zip_id))
validate_job.after_job = open_zip_summary_job
if process_zip_backup is not None:
process_zip_backup.summary_download_failed = validate_job.info
return get_file_job


def make_process_zip_job(zip_id: str, zip_description: Dict[str, Any], zip_index: Dict[str, Any], config: Config, db: DbEntity, ini_description: Dict[str, Any], store: StoreWrapper, full_resync: bool, has_new_zip_index: bool) -> ProcessZipJob:
def make_process_zip_job(zip_id: str, zip_description: Dict[str, Any], zip_summary: Dict[str, Any], config: Config, db: DbEntity, ini_description: Dict[str, Any], store: StoreWrapper, full_resync: bool, has_new_zip_summary: bool) -> ProcessZipIndexJob:
base_files_url = db.base_files_url
if 'base_files_url' in zip_description:
base_files_url = zip_description['base_files_url']

job = ProcessZipJob(
job = ProcessZipIndexJob(
zip_id=zip_id,
zip_description=zip_description,
zip_index=Index(files=zip_index['files'], folders=zip_index['folders'], base_files_url=base_files_url),
zip_index=Index(files=zip_summary['files'], folders=zip_summary['folders'], base_files_url=base_files_url),
config=config,
db=db,
ini_description=ini_description,
store=store,
full_resync=full_resync,
has_new_zip_index=has_new_zip_index,
has_new_zip_summary=has_new_zip_summary,
result_zip_index = new_store_fragment_drive_paths()
)
job.add_tag(make_zip_tag(db, zip_id))
Expand Down
4 changes: 2 additions & 2 deletions src/downloader/jobs/open_db_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from downloader.db_entity import DbEntity, make_db_tag
from downloader.job_system import WorkerResult
from downloader.jobs.open_db_job import OpenDbJob
from downloader.jobs.process_db_job import ProcessDbJob
from downloader.jobs.process_db_main_job import ProcessDbMainJob
from downloader.jobs.worker_context import DownloaderWorkerBase


Expand All @@ -35,7 +35,7 @@ def operate_on(self, job: OpenDbJob) -> WorkerResult: # type: ignore[override]
return [], e

ini_description, store, full_resync = job.ini_description, job.store, job.full_resync
return [ProcessDbJob(db=db, ini_description=ini_description, store=store, full_resync=full_resync).add_tag(make_db_tag(job.section))], None
return [ProcessDbMainJob(db=db, ini_description=ini_description, store=store, full_resync=full_resync).add_tag(make_db_tag(job.section))], None

def _open_db(self, section: str, temp_path: str) -> DbEntity:
self._ctx.logger.bench('Loading database start: ', section)
Expand Down
5 changes: 2 additions & 3 deletions src/downloader/jobs/open_zip_contents_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from downloader.file_filter import FileFoldersHolder, Config
from downloader.job_system import Job, JobSystem
from downloader.jobs.get_file_job import GetFileJob
from downloader.jobs.process_index_job import ProcessIndexJob
from downloader.jobs.process_db_index_job import ProcessDbIndexJob
from downloader.path_package import PathPackage
from downloader.local_store_wrapper import StoreWrapper

Expand All @@ -47,6 +47,7 @@ class OpenZipContentsJob(Job):

zip_id: str
zip_kind: ZipKind
zip_description: Dict[str, Any]
target_folder: Optional[PathPackage]
total_amount_of_files_in_zip: int
files_to_unzip: List[PathPackage]
Expand All @@ -56,10 +57,8 @@ class OpenZipContentsJob(Job):
zip_base_files_url: str
filtered_data: FileFoldersHolder
get_file_job: GetFileJob
make_process_index_backup: Callable[[], Optional[ProcessIndexJob]]

def retry_job(self): return self.get_file_job
def backup_job(self): return self.make_process_index_backup()

# Results
downloaded_files: List[PathPackage] = field(default_factory=list)
Expand Down
8 changes: 5 additions & 3 deletions src/downloader/jobs/open_zip_contents_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

from downloader.job_system import WorkerResult
from downloader.jobs.index import Index
from downloader.jobs.process_index_job import ProcessIndexJob
from downloader.jobs.process_db_index_job import ProcessDbIndexJob
from downloader.jobs.process_db_index_worker import create_fetch_jobs
from downloader.path_package import PathPackage
from downloader.jobs.worker_context import DownloaderWorkerBase, DownloaderWorkerContext
from downloader.jobs.open_zip_contents_job import OpenZipContentsJob, ZipKind
Expand Down Expand Up @@ -99,11 +100,12 @@ def operate_on(self, job: OpenZipContentsJob) -> WorkerResult: # type: ignore[o
else:
files_to_recover = {file.rel_path: file.description for file in invalid_files}

self._ctx.installation_report.unmark_processed_files(job.failed_files, job.db.db_id)
#self._ctx.installation_report.unmark_processed_files(job.failed_files, job.db.db_id)

logger.bench('OpenZipContentsWorker launching recovery process index...', job.db.db_id, job.zip_id)

return [ProcessIndexJob(
return create_fetch_jobs(self._ctx, job.db.db_id, invalid_files, job.zip_description.get('base_files_url', job.db.base_files_url))
return [ProcessDbIndexJob(
db=job.db,
ini_description=job.ini_description,
config=job.config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
from downloader.db_entity import DbEntity
from downloader.job_system import Job, JobSystem
from downloader.jobs.get_file_job import GetFileJob
from downloader.jobs.process_zip_job import ProcessZipJob
from downloader.jobs.process_zip_index_job import ProcessZipIndexJob
from downloader.local_store_wrapper import StoreWrapper


@dataclass(eq=False, order=False)
class OpenZipIndexJob(Job):
class OpenZipSummaryJob(Job):
type_id: int = field(init=False, default=JobSystem.get_job_type_id())

db: DbEntity
Expand All @@ -40,7 +40,7 @@ class OpenZipIndexJob(Job):
download_path: str
config: Config
get_file_job: GetFileJob
process_zip_backup: ProcessZipJob
process_zip_backup: ProcessZipIndexJob

def retry_job(self) -> Optional[Job]: return self.get_file_job
def backup_job(self) -> Optional[Job]: return self.process_zip_backup
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@
# https://github.com/MiSTer-devel/Downloader_MiSTer

from downloader.jobs.worker_context import DownloaderWorkerBase
from downloader.jobs.open_zip_index_job import OpenZipIndexJob
from downloader.jobs.open_zip_summary_job import OpenZipSummaryJob
from downloader.jobs.jobs_factory import make_process_zip_job
from downloader.job_system import WorkerResult
from downloader.file_system import FsError

class OpenZipIndexWorker(DownloaderWorkerBase):
def job_type_id(self) -> int: return OpenZipIndexJob.type_id
class OpenZipSummaryWorker(DownloaderWorkerBase):
def job_type_id(self) -> int: return OpenZipSummaryJob.type_id
def reporter(self): return self._ctx.progress_reporter

def operate_on(self, job: OpenZipIndexJob) -> WorkerResult: # type: ignore[override]
def operate_on(self, job: OpenZipSummaryJob) -> WorkerResult: # type: ignore[override]
try:
index = self._ctx.file_system.load_dict_from_file(job.download_path)
summary = self._ctx.file_system.load_dict_from_file(job.download_path)
self._ctx.file_system.unlink(job.download_path)

return [make_process_zip_job(
zip_id=job.zip_id,
zip_description=job.zip_description,
zip_index=index,
zip_summary=summary,
config=job.config,
db=job.db,
ini_description=job.ini_description,
store=job.store,
full_resync=job.full_resync,
has_new_zip_index=True
has_new_zip_summary=True
)], None
except (FsError, OSError) as e:
return [], e
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


@dataclass(eq=False, order=False)
class ProcessIndexJob(Job):
class ProcessDbIndexJob(Job):
type_id: int = field(init=False, default=JobSystem.get_job_type_id())

db: DbEntity
Expand All @@ -46,7 +46,7 @@ def retry_job(self): return None
present_not_validated_files: List[PathPackage] = field(default_factory=list)
present_validated_files: List[PathPackage] = field(default_factory=list)
skipped_updated_files: List[PathPackage] = field(default_factory=list)
removed_copies: List[RemovedCopy] = field(default_factory=list)
removed_folders: List[RemovedCopy] = field(default_factory=list)
installed_folders: List[PathPackage] = field(default_factory=list)
directories_to_remove: List[PathPackage] = field(default_factory=list)
files_to_remove: List[PathPackage] = field(default_factory=list)
Expand Down
Loading

0 comments on commit b1244b4

Please sign in to comment.