Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions megatron/core/datasets/data_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,9 @@ def run(
)
)
num_micro_batches = int(num_micro_batches)
graph_slots = getattr(config, '_cuda_graph_num_microbatches', None)
if graph_slots is not None and num_micro_batches > graph_slots:
raise ValueError(f"{num_micro_batches=} exceeds captured CUDA graph {graph_slots=}.")

# Step 8: Broadcast to TP group and create data_iterator
new_data_iterator = create_data_iterator(
Expand Down Expand Up @@ -431,6 +434,7 @@ def get_groups_and_subsamples(self, sample_id_seqlens):
self.total_hdp_gpus,
max_seq_len_per_rank=mslpr,
min_cp_size=min_cp,
max_num_seqs=self.max_num_seqs,
)
sample_id_groups.append(sample_ids)

Expand All @@ -455,8 +459,8 @@ def _get_scheduler_max_real_num_seqs(config) -> Optional[int]:
"""Return the scheduler cap for real THD sequences.

``thd_max_packed_sequences`` is the final static THD capacity, including the
optional dummy sequence appended for a padding tail. The dp_balanced
scheduler only packs real sequences, so reserve one slot when dummy-tail
optional dummy sequence appended for a padding tail. Packing schedulers
only place real sequences, so reserve one slot when dummy-tail
padding is enabled.
"""
max_num_seqs = getattr(config, 'thd_max_packed_sequences', None)
Expand Down Expand Up @@ -522,11 +526,7 @@ def wrap_data_iterator(
if scheduler_type == 'default_dynamic_cp':
scheduler_kwargs['min_cp_size'] = config.min_dynamic_context_parallel_size

scheduler_max_num_seqs = (
_get_scheduler_max_real_num_seqs(config)
if scheduler_type == 'dp_balanced'
else getattr(config, 'thd_max_packed_sequences', None)
)
scheduler_max_num_seqs = _get_scheduler_max_real_num_seqs(config)

scheduler = scheduler_map[scheduler_type](
config.max_seqlen_per_dp_cp_rank,
Expand Down Expand Up @@ -778,7 +778,7 @@ def get_batch_on_this_rank_for_sequence_packing(
max_seqlen_kv=max_seqlen,
local_cp_size=local_cp_size,
cp_group=cp_group,
pad_between_seqs=False,
pad_between_seqs=True,
)

# Pad the already-packed THD tensors at the end when requested. CUDA Graph
Expand Down
12 changes: 10 additions & 2 deletions megatron/core/datasets/data_schedule_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ def next_hdp_group_packing_aware(
total_gpus: int,
max_seq_len_per_rank: int,
min_cp_size: int = 1,
max_num_seqs: Optional[int] = None,
) -> Tuple[List[List[int]], List[Tuple[int, int]], List[float], List[List[int]]]:
"""Form one DCP microbatch with packing-aware CP group selection.

Expand All @@ -549,7 +550,7 @@ def next_hdp_group_packing_aware(
The scheduler keeps the legacy invariant that each returned microbatch has
no empty DPxCP rank after the fill step. For non-power-of-two DPxCP layouts,
it falls back to the full DPxCP group if power-of-two expansion cannot fill
every rank.
every rank. ``max_num_seqs`` optionally caps the real sequences per subgroup.
"""
if not sample_seqlens:
return (
Expand Down Expand Up @@ -610,6 +611,11 @@ def workload(seq_len: int, cp_size: int) -> float:
for group_id, size in list(group_size.items()):
if size != cp_size:
continue
if (
max_num_seqs is not None
and len(micro_batches[group_members[group_id][0]]) >= max_num_seqs
):
continue
if packing_sequence_len.get(group_id, 0) + seq_len / cp_size > max_seq_len_per_rank:
continue
members = group_members[group_id]
Expand Down Expand Up @@ -745,7 +751,9 @@ def fill_with_full_dpxcp_group() -> None:

for sample_id, seq_len in sample_seqlens:
per_rank_len = seq_len / total_gpus
if packed_sequence_len + per_rank_len <= max_seq_len_per_rank:
if (
max_num_seqs is None or len(selected) < max_num_seqs
) and packed_sequence_len + per_rank_len <= max_seq_len_per_rank:
selected.append((sample_id, seq_len))
packed_sequence_len += per_rank_len
else:
Expand Down
4 changes: 4 additions & 0 deletions megatron/core/model_parallel_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class ModelParallelConfig:
pad_packed_seq_alignment, but cu_seqlens sequence boundaries are not extended
for the padding tail. CUDA Graph static-input padding may still pad the
cu_seqlens tensors to thd_max_packed_sequences + 1 entries.

Fused RoPE is unsafe only when disabling this option creates a hidden-only
tail beyond the last padded cu_seqlens boundary; inputs without such a tail
are unaffected.
"""

expert_model_parallel_size: int = 1
Expand Down
54 changes: 33 additions & 21 deletions megatron/core/packed_seq_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,16 @@ def _pad_cu_seqlens(cu_seqlens: Optional[Tensor], target_entries: int) -> Option
return padded


def _append_dummy_seq(cu_seqlens: Optional[Tensor], dummy_end: int) -> Optional[Tensor]:
"""Append a dummy sequence boundary to a cu_seqlens tensor.

``dummy_end`` is the padded target length. Appending it to both
``cu_seqlens_*`` and ``cu_seqlens_*_padded`` represents the post-pack
alignment tail as an ordinary dummy sequence. That keeps every token row
covered by THD metadata without enabling TE's pad-between-sequences mode.
"""
def _append_dummy_seq(
cu_seqlens: Optional[Tensor], dummy_end: Optional[int] = None
) -> Optional[Tensor]:
"""Append a boundary, repeating the final offset by default for a zero-token dummy."""
if cu_seqlens is None:
return None

dummy = torch.full((1,), int(dummy_end), dtype=cu_seqlens.dtype, device=cu_seqlens.device)
if dummy_end is None:
dummy = cu_seqlens[-1:]
else:
dummy = cu_seqlens.new_full((1,), int(dummy_end))
return torch.cat((cu_seqlens, dummy), dim=0)


Expand Down Expand Up @@ -206,7 +204,7 @@ def _resolve_thd_padding_lengths(

Returns:
local_actual_T: Current rank's token-like tensor length.
global_actual_T: Global packed length represented by THD metadata.
global_actual_T: Global physical packed length represented by THD metadata.
local_target_len: Current rank's padded token-like tensor length.
global_target_len: Global padded endpoint represented by THD metadata.
mask_device: Device used to build the returned padding mask.
Expand All @@ -227,10 +225,15 @@ def _resolve_thd_padding_lengths(

# Prefer THD metadata for the global packed length when it is available.
has_local_tensor = local_tensor_T is not None
if packed_seq_params.cu_seqlens_q is not None:
global_actual_T = int(packed_seq_params.cu_seqlens_q[-1].item())
physical_cu_seqlens = (
packed_seq_params.cu_seqlens_q_padded
if packed_seq_params.cu_seqlens_q_padded is not None
else packed_seq_params.cu_seqlens_q
)
if physical_cu_seqlens is not None:
global_actual_T = int(physical_cu_seqlens[-1].item())
if mask_device is None:
mask_device = packed_seq_params.cu_seqlens_q.device
mask_device = physical_cu_seqlens.device
else:
assert has_local_tensor, (
"packed_seq_params.cu_seqlens_q must be available to derive padding_mask "
Expand Down Expand Up @@ -386,8 +389,8 @@ def pad_sequence_for_thd(
stages, Megatron asks TE which packed rows this CP rank would receive
and uses that row count as the local length instead of assuming equal
division by CP size.
- When ``pad_by_appending_dummy_seq`` is true, the padding tail is also
represented as an ordinary dummy sequence in cu_seqlens metadata.
- When ``pad_by_appending_dummy_seq`` is true, the padding tail is represented
by a zero-valid-token dummy sequence whose padded boundary spans the tail.
- ``max_num_seqs`` pads all four cu_seqlens tensors; this is required
by CUDA Graph replay because those tensors are graph inputs.

Expand Down Expand Up @@ -437,16 +440,25 @@ def pad_sequence_for_thd(
cu_seqlens_q_padded = packed_seq_params.cu_seqlens_q_padded
cu_seqlens_kv_padded = packed_seq_params.cu_seqlens_kv_padded

physical_q = cu_seqlens_q_padded if cu_seqlens_q_padded is not None else cu_seqlens_q
physical_kv = cu_seqlens_kv_padded if cu_seqlens_kv_padded is not None else cu_seqlens_kv

# Represent post-pack padding as a dummy sequence when requested.
target_cu_entries = None if max_num_seqs is None else max_num_seqs + 1
has_dummy_padding_seq = pad_by_appending_dummy_seq and global_target_len > global_actual_T
dummy_seq_len = global_target_len - global_actual_T if has_dummy_padding_seq else 0

if has_dummy_padding_seq:
cu_seqlens_q = _append_dummy_seq(cu_seqlens_q, global_target_len)
cu_seqlens_kv = _append_dummy_seq(cu_seqlens_kv, global_target_len)
cu_seqlens_q_padded = _append_dummy_seq(cu_seqlens_q_padded, global_target_len)
cu_seqlens_kv_padded = _append_dummy_seq(cu_seqlens_kv_padded, global_target_len)
if physical_q is not None and physical_kv is not None and physical_q is not physical_kv:
assert physical_q[-1].item() == physical_kv[-1].item(), (
"One appended THD tail dummy requires matching Q and KV physical endpoints."
)
# Keep the logical endpoint unchanged and span the physical tail only in the
# padded metadata so fused THD helpers initialize every tensor row.
cu_seqlens_q = _append_dummy_seq(cu_seqlens_q)
cu_seqlens_kv = _append_dummy_seq(cu_seqlens_kv)
cu_seqlens_q_padded = _append_dummy_seq(physical_q, global_target_len)
cu_seqlens_kv_padded = _append_dummy_seq(physical_kv, global_target_len)

# Pad cu_seqlens entry counts for static CUDA Graph inputs.
if target_cu_entries is not None:
Expand Down Expand Up @@ -475,7 +487,7 @@ def pad_sequence_for_thd(
local_cp_size=packed_seq_params.local_cp_size,
cp_group=packed_seq_params.cp_group,
total_tokens=local_target_len if target_cu_entries is None else None,
pad_between_seqs=False if has_dummy_padding_seq else packed_seq_params.pad_between_seqs,
pad_between_seqs=has_dummy_padding_seq or packed_seq_params.pad_between_seqs,
)

# True marks padded local token slots for routing/loss paths.
Expand Down
Loading
Loading