diff --git a/.gitignore b/.gitignore index f2b9457..ebc1d1f 100644 --- a/.gitignore +++ b/.gitignore @@ -131,3 +131,9 @@ dmypy.json pytest-tmp/ wandb/ + +# psiflow internal and its symlinks +psiflow_internal/ +psiflow_log +psiflow_submit_scripts +psiflow_task_logs diff --git a/psiflow/execution.py b/psiflow/execution.py index 44238ff..ba44be9 100644 --- a/psiflow/execution.py +++ b/psiflow/execution.py @@ -443,7 +443,6 @@ def new_file(self, prefix: str, suffix: str) -> File: @classmethod def from_config( cls, - path: Optional[Union[str, Path]] = None, parsl_log_level: str = "WARNING", usage_tracking: int = 3, retries: int = 2, @@ -457,10 +456,10 @@ def from_config( container_engine: str = "apptainer", container_addopts: str = " --no-eval -e --no-mount home -W /tmp --writable-tmpfs", container_entrypoint: str = "/opt/entry.sh", + make_symlinks: bool = True, **kwargs, ) -> ExecutionContext: - if path is None: - path = Path.cwd().resolve() / "psiflow_internal" + path = Path.cwd().resolve() / "psiflow_internal" psiflow.resolve_and_check(path) if path.exists(): shutil.rmtree(path) @@ -548,7 +547,17 @@ def from_config( internal_tasks_max_threads=internal_tasks_max_threads, # std_autopath=std_autopath, ) - return ExecutionContext(config, definitions, path / "context_dir") + context = ExecutionContext(config, definitions, path / "context_dir") + + if make_symlinks: + src, dest = Path.cwd() / f'psiflow_log', path / 'parsl.log' + _create_symlink(src, dest) + src, dest = Path.cwd() / f'psiflow_submit_scripts', path / '000' / 'submit_scripts' + _create_symlink(src, dest, is_dir=True) + src, dest = Path.cwd() / f'psiflow_task_logs', path / '000' / 'task_logs' + _create_symlink(src, dest, is_dir=True) + + return context class ExecutionContextLoader: @@ -573,10 +582,6 @@ def load( "use_threadpool": True, }, } - path = Path.cwd() / ".psiflow_internal" - if path.exists(): - shutil.rmtree(path) - psiflow_config["path"] = path else: assert len(sys.argv) == 2 path_config = psiflow.resolve_and_check(Path(sys.argv[1])) @@ -666,3 +671,16 @@ def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> s class MyWorkQueueExecutor(WorkQueueExecutor): def _get_launch_command(self, block_id): return self.worker_command + + +def _create_symlink(src: Path, dest: Path, is_dir: bool = False) -> None: + """Create or replace symbolic link""" + if src.is_symlink(): + src.unlink() + if is_dir: + dest.mkdir(parents=True, exist_ok=True) + else: + dest.touch(exist_ok=True) + src.symlink_to(dest, target_is_directory=is_dir) + +