Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions megatron/core/inference/data_parallel_inference_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,16 @@ def start(self):
if header == Headers.STOP:
self.state = self.CoordinatorState.RUNNING

elif header in (Headers.START_CUDA_PROFILER, Headers.STOP_CUDA_PROFILER):
# Profiler control: broadcast to every connected DP engine. Not a
# state transition, so no CoordinatorState checks — just forward.
if sender_identity not in known_clients:
logging.warning("Coordinator: ignoring profiler signal from unknown client.")
continue
broadcast_payload = msgpack.packb(deserialized_payload, use_bin_type=True)
for data_parallel_rank_id in list(self.identities_of_data_parallel_ranks):
self._send_to_engine(data_parallel_rank_id, broadcast_payload)

elif header == Headers.ENGINE_REPLY:
# This is the output of a single engine step on some data parallel rank.
assert sender_identity in self.identities_of_data_parallel_ranks
Expand Down
6 changes: 6 additions & 0 deletions megatron/core/inference/engines/dynamic_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2349,6 +2349,12 @@ def schedule_requests(self) -> int:
nvtx_range_pop("add_request")
elif header == Headers.SET_GENERATION_EPOCH:
new_generation_epoch = data[1]
elif header == Headers.START_CUDA_PROFILER:
# Side-effect, not a state transition: apply immediately on every
# rank so an outer nsys --capture-range=cudaProfilerApi starts here.
torch.cuda.cudart().cudaProfilerStart()
elif header == Headers.STOP_CUDA_PROFILER:
torch.cuda.cudart().cudaProfilerStop()
else:
# Control signal: queue for second pass.
self._pending_signals.append(message)
Expand Down
2 changes: 2 additions & 0 deletions megatron/core/inference/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class Headers(Enum):
DISCONNECT = auto()
SHUTDOWN = auto()
TP_BROADCAST = auto()
START_CUDA_PROFILER = auto()
STOP_CUDA_PROFILER = auto()


class UnknownHeaderError(Exception):
Expand Down
13 changes: 13 additions & 0 deletions megatron/core/inference/inference_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ def unpause_engines(self) -> None:
"""Sends UNPAUSE to all engines. No synchronization needed."""
self._send_signal_to_engines(Headers.UNPAUSE)

def start_cuda_profiler(self) -> None:
"""Sends START_CUDA_PROFILER to all engines via coordinator.

Each engine calls ``torch.cuda.profiler.start()`` (cudaProfilerStart) on
its next loop iteration, so an outer ``nsys profile --capture-range=
cudaProfilerApi`` begins recording. No synchronization needed.
"""
self._send_signal_to_engines(Headers.START_CUDA_PROFILER)

def stop_cuda_profiler(self) -> None:
"""Sends STOP_CUDA_PROFILER to all engines (cudaProfilerStop)."""
self._send_signal_to_engines(Headers.STOP_CUDA_PROFILER)

def set_generation_epoch(self, generation_epoch: int):
"""Sends a signal to stamp all in-flight requests with the given generation epoch.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from .chat_completions import bp as ChatCompletions
from .completions import bp as Completions
from .health import bp as Health
from .profile import bp as Profile

__all__ = [Completions, ChatCompletions, Health]
__all__ = [Completions, ChatCompletions, Health, Profile]
except ImportError:
__all__ = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.

"""CUDA profiler control endpoints.

POST /start_profile and /stop_profile relay a control signal through the
InferenceClient -> data-parallel coordinator -> every connected EP/DP engine,
which calls cudaProfilerStart()/cudaProfilerStop(). Pair with an outer
`nsys profile --capture-range=cudaProfilerApi` to bracket a capture window.
"""

import logging

logger = logging.getLogger(__name__)

try:
from quart import Blueprint, current_app, jsonify

bp = Blueprint('profile_api', __name__)

@bp.route('/start_profile', methods=['POST'])
@bp.route('/v1/start_profile', methods=['POST'])
async def start_profile():
"""Broadcast cudaProfilerStart to all engines."""
client = current_app.config.get('client')
if client is None:
return jsonify({"status": "error", "details": "client not initialized"}), 503
client.start_cuda_profiler()
return jsonify({"status": "ok", "action": "start_profile"}), 200

@bp.route('/stop_profile', methods=['POST'])
@bp.route('/v1/stop_profile', methods=['POST'])
async def stop_profile():
"""Broadcast cudaProfilerStop to all engines."""
client = current_app.config.get('client')
if client is None:
return jsonify({"status": "error", "details": "client not initialized"}), 503
client.stop_cuda_profiler()
return jsonify({"status": "ok", "action": "stop_profile"}), 200

except ImportError as e:
logger.warning(f"Could not import quart: {e}")
Loading