Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MonitoredQueue: fail fast when subprocess exits #99

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

d4l3k
Copy link
Member

@d4l3k d4l3k commented Feb 5, 2025

This is a wrapper around mp.Queue that fails fast when the subprocess exits. This means if subprocess BabyNCCL crashes we will report the error quickly rather than waiting for the timeout to elapse.

Test plan:

pytest -o log_cli=1 torchft/process_group_test.py torchft/multiprocessing_test.py -s -v -x

@d4l3k d4l3k requested review from fegin and H-Huang February 5, 2025 19:46
@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Meta Open Source bot. label Feb 5, 2025
@d4l3k d4l3k force-pushed the d4l3k/monitor_queue branch from 9bc6269 to 9abc936 Compare February 5, 2025 20:05
Copy link
Member

@H-Huang H-Huang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@@ -573,8 +553,8 @@ class _BabyWork(Work):
def __init__(
self,
pg: "ProcessGroupBaby",
tx: mp.Queue,
rx: mp.Queue,
tx: _MonitoredQueue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are tx and rx meant to represent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the raw communication channels with the subprocess, might be good to refactor these so all queue comms go through the PG

@@ -739,7 +739,7 @@ def _worker(
try:
pg = cls._create_pg(store, rank, world_size)
except Exception as e:
logger.exception(f"got exception in worker: {e}")
print(f"got exception in worker: {e}", file=sys.stderr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason for the logger -> print change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not initializing the logger in the subprocess so switched to stderr to make sure we actually log these

Maybe I should just instantiate a second logger as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Meta Open Source bot.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants