Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ vllm serve Qwen/Qwen3-235B-A22 \
--tensor-parallel-size 16 \
--enable-expert-parallel \
--additional-config '{
"expert_map_path": "/path/to/eplb.json"
"eplb_config": {"expert_map_path": "/path/to/eplb.json"}
}'
```

Expand Down
41 changes: 41 additions & 0 deletions vllm_ascend/eplb/core/eplb_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from multiprocessing import Process, Queue
from typing import Any

import numpy as np
import torch
import torch.distributed as dist
from vllm.logger import logger
Expand Down Expand Up @@ -60,6 +61,16 @@ def do_update(self):
old_placement = self.global2local(self.old_expert_maps, self.num_local_experts)
_, _, new_placement = self.calculate_rebalance_experts(load_info, old_placement)

if self.rank_id == 0:
hotness = self._calculate_hotness(old_placement, load_info)
current_mean, current_max = self._compute_imbalance(old_placement, hotness)
update_mean, update_max = self._compute_imbalance(new_placement, hotness)
logger.info(
"[Expert Hotness] Current: mean={:.3f}, max={:.3f}, Updated: mean={:.3f}, max={:.3f}".format(
current_mean, current_max, update_mean, update_max
)
)

if not torch.is_tensor(new_placement):
new_placement = torch.tensor(new_placement)
self.check_expert_placement(old_placement, new_placement)
Expand Down Expand Up @@ -251,6 +262,36 @@ def pack_update_info(self, update_info_generator):

return list(zip(send_all, recv_all, maps, log2phy_all, layer_ids))

@staticmethod
def _compute_imbalance(deployment_all_layer, hotness_all_layer: np.ndarray):
imbalance_list = []
deployment_all_layer = np.array(deployment_all_layer)
for deployment, hotness in zip(deployment_all_layer, hotness_all_layer):
counts = np.bincount(deployment.reshape(-1), minlength=hotness.shape[0])

unit_hotness = np.divide(hotness, counts, out=np.zeros_like(hotness, dtype=float), where=counts != 0)

stage_load = unit_hotness[deployment].sum(-1)
stage_par = stage_load.max() / stage_load.mean()
imbalance_list.append(stage_par)

max_val = max(imbalance_list)
mean_val = sum(imbalance_list) / len(imbalance_list)
return mean_val, max_val
Comment on lines +267 to +280
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This method has several critical issues that can lead to crashes or incorrect calculations:

  1. np.bincount will raise a ValueError if deployment contains negative values, which it can for unassigned experts.
  2. unit_hotness[deployment] will incorrectly index from the end of the array if deployment contains -1.
  3. stage_load.mean() can be zero, leading to a ZeroDivisionError when calculating stage_par.
  4. If deployment_all_layer is empty, imbalance_list will be empty, causing max() to raise a ValueError.

Please refactor this method to handle these edge cases gracefully.

        imbalance_list = []
        for deployment, hotness in zip(deployment_all_layer, hotness_all_layer):
            deployment_flat = deployment.ravel()
            valid_mask = deployment_flat >= 0
            if not np.any(valid_mask):
                imbalance_list.append(1.0)
                continue

            counts = np.bincount(deployment_flat[valid_mask], minlength=hotness.shape[0])
            unit_hotness = np.divide(hotness, counts, out=np.zeros_like(hotness, dtype=float), where=counts != 0)

            temp_deployment = np.where(deployment >= 0, deployment, 0)
            stage_load_per_expert = unit_hotness[temp_deployment]
            stage_load_per_expert[deployment < 0] = 0
            stage_load = stage_load_per_expert.sum(-1)

            mean_load = stage_load.mean()
            stage_par = stage_load.max() / mean_load if mean_load > 0 else 1.0
            imbalance_list.append(stage_par)

        if not imbalance_list:
            return 0.0, 0.0

        max_val = np.max(imbalance_list)
        mean_val = np.mean(imbalance_list)
        return mean_val, max_val


@staticmethod
def _calculate_hotness(deployment_all_layer, moe_load_all_layer):
hotnesses = []
num_of_expert = deployment_all_layer.shape[1] * deployment_all_layer.shape[2]
for deployment, rank_load in zip(deployment_all_layer, moe_load_all_layer.numpy()):
hotness = np.zeros(num_of_expert, dtype=rank_load.dtype)
deployment_flat = deployment.ravel()
rank_load_flat = rank_load.ravel()
np.add.at(hotness, deployment_flat, rank_load_flat)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The deployment_flat array can contain -1 for unassigned expert slots. Using np.add.at with negative indices will cause it to wrap around and incorrectly add to the hotness of the wrong expert. Please filter for non-negative indices before this operation.

Suggested change
np.add.at(hotness, deployment_flat, rank_load_flat)
valid_mask = deployment_flat >= 0
np.add.at(hotness, deployment_flat[valid_mask], rank_load_flat[valid_mask])

hotnesses.append(hotness)

return np.array(hotnesses)


class EplbProcess:
def __init__(self, shared_dict, policy_type: int = 0, enable_d2d: bool = True):
Expand Down
37 changes: 0 additions & 37 deletions vllm_ascend/eplb/eplb_updator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def __init__(self, eplb_config, loader: D2DExpertWeightLoader, eplb_process: Epl
self.eplb_loader = loader
self.eplb_process = eplb_process
self.shared_dict = self.eplb_process.shared_dict
self.moe_imbalance_dict: dict[int, float] = {}
self.comm_group = get_dynamic_eplb_group()

def set_adaptor(self, adaptor: VllmEplbAdaptor):
Expand Down Expand Up @@ -139,44 +138,8 @@ def compute_and_set_moe_load(self):
self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}")

if dist.get_rank() == 0:
self.compute_moe_imbalance(moe_load)
self.summarize_moe_imbalance()

return moe_load

def compute_moe_imbalance(self, moe_load: torch.Tensor):
self.moe_imbalance_dict.clear()

layer_card_load = moe_load.sum(dim=-1).cpu().float()

for layer_idx in range(layer_card_load.size(0)):
layer_load = layer_card_load[layer_idx]

mean_load = layer_load.mean().item()
max_load = layer_load.max().item()

moe_load_imbalance = max_load / (mean_load + 1e-6)

logger.debug(f"[ModelRunner][MOE_load_stats][Layer {layer_idx}] PAR={moe_load_imbalance:.4f}")

self.moe_imbalance_dict[layer_idx] = moe_load_imbalance

def summarize_moe_imbalance(self):
values = list(self.moe_imbalance_dict.values())
if not values:
logger.info("[MOE_load_stats] No data available.")
return

avg_imbalance = sum(values) / len(values)
max_imbalance = max(values)
min_imbalance = min(values)

logger.info(
f"[ModelRunner][MOE_load_stats] Peak-to-Average-Ratio: "
f"Mean={avg_imbalance:.4f}, Max={max_imbalance:.4f}, Min={min_imbalance:.4f}"
)

def warm_up_eplb(self):
self.shared_dict["expert_maps"] = self.adaptor.get_global_expert_map()
self.compute_and_set_moe_load()
Expand Down
Loading