Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run command in pyflyte-fast-execute in the same process #3029

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from flytekit.core import utils
from flytekit.core.base_task import IgnoreOutputs, PythonTask
from flytekit.core.checkpointer import SyncCheckpoint
from flytekit.core.constants import FLYTE_FAIL_ON_ERROR
from flytekit.core.constants import FLYTE_FAIL_ON_ERROR, FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS
from flytekit.core.context_manager import (
ExecutionParameters,
ExecutionState,
Expand Down Expand Up @@ -721,6 +721,37 @@ def execute_task_cmd(
)


def _run_cmd_in_new_process(cmd, dest_dir):
"""Run cmd in a new process."""
env = os.environ.copy()
if dest_dir is not None:
dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir))
if "PYTHONPATH" in env:
env["PYTHONPATH"] += os.pathsep + dest_dir_resolved
else:
env["PYTHONPATH"] = dest_dir_resolved
p = subprocess.Popen(cmd, env=env)

def handle_sigterm(signum, frame):
logger.info(f"passing signum {signum} [frame={frame}] to subprocess")
p.send_signal(signum)

signal.signal(signal.SIGTERM, handle_sigterm)
returncode = p.wait()
exit(returncode)


def _run_cmd_in_current_process(command: click.Command, args: List[str], dest_dir: Optional[str]):
"""Run command with args in the same process."""

if dest_dir is not None:
dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir))
if all(os.path.realpath(path) != dest_dir_resolved for path in sys.path):
sys.path.append(dest_dir_resolved)

command(args)


@_pass_through.command("pyflyte-fast-execute")
@click.option("--additional-distribution", required=False)
@click.option("--dest-dir", required=False)
Expand All @@ -742,24 +773,16 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec
cmd.extend(["--dynamic-addl-distro", additional_distribution, "--dynamic-dest-dir", dest_dir])
cmd.append(arg)

commands_to_run_in_process = {cmd.name: cmd for cmd in [map_execute_task_cmd, execute_task_cmd]}

# Use the commandline to run the task execute command rather than calling it directly in python code
# since the current runtime bytecode references the older user code, rather than the downloaded distribution.
env = os.environ.copy()
if dest_dir is not None:
dest_dir_resolved = os.path.realpath(os.path.expanduser(dest_dir))
if "PYTHONPATH" in env:
env["PYTHONPATH"] += os.pathsep + dest_dir_resolved
else:
env["PYTHONPATH"] = dest_dir_resolved
p = subprocess.Popen(cmd, env=env)

def handle_sigterm(signum, frame):
logger.info(f"passing signum {signum} [frame={frame}] to subprocess")
p.send_signal(signum)

signal.signal(signal.SIGTERM, handle_sigterm)
returncode = p.wait()
exit(returncode)
if os.getenv(FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS, "false") == "true" or cmd[0] not in commands_to_run_in_process:
logger.debug(f"Running {cmd[0]} in a new process")
_run_cmd_in_new_process(cmd, dest_dir)
else:
logger.debug(f"Running {cmd[0]} in the same process")
_run_cmd_in_current_process(commands_to_run_in_process[cmd[0]], cmd[1:], dest_dir)


@_pass_through.command("pyflyte-map-execute")
Expand Down
3 changes: 3 additions & 0 deletions flytekit/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
# Set this environment variable to true to force the task to return non-zero exit code on failure.
FLYTE_FAIL_ON_ERROR = "FLYTE_FAIL_ON_ERROR"

# Set this environment variable to true to force pyflyte-fast-execute to run task-execute-cmd in a separate process
FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS = "FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we run a workflow in the integration tests with FLYTE_FAST_EXECUTE_CMD_IN_NEW_PROCESS enabled? Just want to make sure the old behavior is still working.

# Executions launched by the current eager task will be tagged with this key:current_eager_exec_name
EAGER_TAG_KEY = "eager-exec"

Expand Down
Loading