From c8a5915f7c564101ebab44709450fd9cd5b69f62 Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Fri, 3 Jan 2025 14:55:35 -0500 Subject: [PATCH 1/5] Run pyflyte-fast-execute in the same process Signed-off-by: Thomas J. Fan --- flytekit/bin/entrypoint.py | 57 ++++++++++++++++++++++++++------------ flytekit/core/constants.py | 3 ++ 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 49103319d0..2fe40bcc97 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -30,7 +30,7 @@ from flytekit.core import utils from flytekit.core.base_task import IgnoreOutputs, PythonTask from flytekit.core.checkpointer import SyncCheckpoint -from flytekit.core.constants import FLYTE_FAIL_ON_ERROR +from flytekit.core.constants import FLYTE_FAIL_ON_ERROR, FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS from flytekit.core.context_manager import ( ExecutionParameters, ExecutionState, @@ -721,6 +721,37 @@ def execute_task_cmd( ) +def _run_cmd_in_new_process(cmd, dest_dir): + """Run cmd in a new process.""" + env = os.environ.copy() + if dest_dir is not None: + dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir)) + if "PYTHONPATH" in env: + env["PYTHONPATH"] += os.pathsep + dest_dir_resolved + else: + env["PYTHONPATH"] = dest_dir_resolved + p = subprocess.Popen(cmd, env=env) + + def handle_sigterm(signum, frame): + logger.info(f"passing signum {signum} [frame={frame}] to subprocess") + p.send_signal(signum) + + signal.signal(signal.SIGTERM, handle_sigterm) + returncode = p.wait() + exit(returncode) + + +def _run_cmd_in_current_process(command: click.Command, args: List[str], dest_dir: Optional[str]): + """Run command with args in the same process.""" + + if dest_dir is not None: + dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir)) + if all(os.path.realpath(path) != dest_dir_resolved for path in sys.path): + sys.path.append(dest_dir_resolved) + + command(args) + + @_pass_through.command("pyflyte-fast-execute") @click.option("--additional-distribution", required=False) @click.option("--dest-dir", required=False) @@ -742,24 +773,16 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec cmd.extend(["--dynamic-addl-distro", additional_distribution, "--dynamic-dest-dir", dest_dir]) cmd.append(arg) + commands_to_run_in_process = {cmd.name: cmd for cmd in [map_execute_task_cmd, execute_task_cmd]} + # Use the commandline to run the task execute command rather than calling it directly in python code # since the current runtime bytecode references the older user code, rather than the downloaded distribution. - env = os.environ.copy() - if dest_dir is not None: - dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir)) - if "PYTHONPATH" in env: - env["PYTHONPATH"] += os.pathsep + dest_dir_resolved - else: - env["PYTHONPATH"] = dest_dir_resolved - p = subprocess.Popen(cmd, env=env) - - def handle_sigterm(signum, frame): - logger.info(f"passing signum {signum} [frame={frame}] to subprocess") - p.send_signal(signum) - - signal.signal(signal.SIGTERM, handle_sigterm) - returncode = p.wait() - exit(returncode) + if os.getenv(FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS, "false") == "true" or cmd[0] not in commands_to_run_in_process: + logger.debug(f"Running {cmd[0]} in a new process") + _run_cmd_in_new_process(cmd, dest_dir) + else: + logger.debug(f"Running {cmd[0]} in the same process") + _run_cmd_in_current_process(commands_to_run_in_process[cmd[0]], cmd[1:], dest_dir) @_pass_through.command("pyflyte-map-execute") diff --git a/flytekit/core/constants.py b/flytekit/core/constants.py index 903e5d5ced..9622ce47a0 100644 --- a/flytekit/core/constants.py +++ b/flytekit/core/constants.py @@ -23,6 +23,9 @@ # Set this environment variable to true to force the task to return non-zero exit code on failure. FLYTE_FAIL_ON_ERROR = "FLYTE_FAIL_ON_ERROR" +# Set this environment variable to true to force pyflyte-fast-execute to run task-execute-cmd in a separate process +FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS = "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS" + # Executions launched by the current eager task will be tagged with this key:current_eager_exec_name EAGER_TAG_KEY = "eager-exec" From 00abba6c05abc3e61428c72e851bd5391b7d43fd Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Fri, 3 Jan 2025 15:43:43 -0500 Subject: [PATCH 2/5] Use str2bool Signed-off-by: Thomas J. Fan --- flytekit/bin/entrypoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 2fe40bcc97..17d87fb2f3 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -777,7 +777,7 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec # Use the commandline to run the task execute command rather than calling it directly in python code # since the current runtime bytecode references the older user code, rather than the downloaded distribution. - if os.getenv(FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS, "false") == "true" or cmd[0] not in commands_to_run_in_process: + if str2bool(os.getenv(FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS)) or cmd[0] not in commands_to_run_in_process: logger.debug(f"Running {cmd[0]} in a new process") _run_cmd_in_new_process(cmd, dest_dir) else: From 9e8672877f8b309787877cca2d252f7d82ff8b52 Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 9 Jan 2025 10:09:44 -0500 Subject: [PATCH 3/5] Add FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS to integration test Signed-off-by: Thomas J. Fan --- tests/flytekit/integration/remote/test_remote.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index a559ffe09f..c9b5ab914d 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -110,6 +110,14 @@ def test_remote_run(): run("default_lp.py", "my_wf") +def test_remote_run_in_new_process(): + # child_workflow.parent_wf asynchronously register a parent wf1 with child lp from another wf2. + run("child_workflow.py", "parent_wf", "--a", "3", "--env", "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS=1") + + # run twice to make sure it will register a new version of the workflow. + run("default_lp.py", "my_wf", "--env", "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS=1") + + def test_remote_eager_run(): # child_workflow.parent_wf asynchronously register a parent wf1 with child lp from another wf2. run("eager_example.py", "simple_eager_workflow", "--x", "3") From eda9c8ccc25c0315aeecd404dd2fd4f25ce1eb4d Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 9 Jan 2025 11:01:15 -0500 Subject: [PATCH 4/5] Add run_args Signed-off-by: Thomas J. Fan --- tests/flytekit/integration/remote/test_remote.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index c9b5ab914d..d0be1c6780 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -65,8 +65,10 @@ def register(): assert out.returncode == 0 -def run(file_name, wf_name, *args) -> str: +def run(file_name, wf_name, run_args=None, *wf_args) -> str: # Copy the environment and set the environment variable + if run_args is None: + run_args = [] out = subprocess.run( [ "pyflyte", @@ -74,6 +76,7 @@ def run(file_name, wf_name, *args) -> str: "-c", CONFIG, "run", + *run_args, "--remote", "--destination-dir", DEST_DIR, @@ -85,7 +88,7 @@ def run(file_name, wf_name, *args) -> str: DOMAIN, MODULE_PATH / file_name, wf_name, - *args, + *wf_args, ], capture_output=True, # Capture the output streams text=True, # Return outputs as strings (not bytes) @@ -112,10 +115,10 @@ def test_remote_run(): def test_remote_run_in_new_process(): # child_workflow.parent_wf asynchronously register a parent wf1 with child lp from another wf2. - run("child_workflow.py", "parent_wf", "--a", "3", "--env", "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS=1") + run("child_workflow.py", "parent_wf", "--a", "3", run_args=["--env", "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS=1"]) # run twice to make sure it will register a new version of the workflow. - run("default_lp.py", "my_wf", "--env", "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS=1") + run("default_lp.py", "my_wf", run_args=["--env", "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS=1"]) def test_remote_eager_run(): From b3b54a9b01b4ff12b1d6e2492f9dad2a2bfdd19d Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 9 Jan 2025 12:41:20 -0500 Subject: [PATCH 5/5] Fix tests Signed-off-by: Thomas J. Fan --- tests/flytekit/integration/remote/test_remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index d0be1c6780..9c0e4ed2ea 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -65,7 +65,7 @@ def register(): assert out.returncode == 0 -def run(file_name, wf_name, run_args=None, *wf_args) -> str: +def run(file_name, wf_name, *wf_args, run_args=None) -> str: # Copy the environment and set the environment variable if run_args is None: run_args = []