Shared helper functions used across multiple phases. Provides Kubernetes helpers, endpoint detection, cloud upload, capacity validation, HuggingFace access checks, workload profile rendering, and OS-level utilities.
utilities/
├── __init__.py -- Empty package marker
├── cluster.py -- Cluster connectivity and platform detection
├── capacity_validator.py -- GPU memory / KV cache validation
├── endpoint.py -- Endpoint discovery and model verification
├── kube_helpers.py -- Pod lifecycle helpers
├── cloud_upload.py -- GCS/S3 upload
├── huggingface.py -- HuggingFace Hub access checks
├── profile_renderer.py -- Workload profile template renderer
└── os/
├── __init__.py -- Empty package marker
├── filesystem.py -- Filesystem utilities
└── platform.py -- Platform detection
Main entry point for cluster setup during step 00 of every phase.
Connects to the cluster, detects platform type, stores a self-contained context.ctx kubeconfig in the workspace, and resolves cluster metadata (server URL, hostname, kube context, username). Runs fully even in dry-run mode so that subsequent dry-run commands use the canonical kubeconfig path.
Internally calls:
_kube_api_connect()-- Connect via Kubernetes Python client; setcontext.is_openshift._detect_local_platform()-- Detect Kind or Minikube by inspecting kube-system pods._store_kubeconfig()-- Create self-containedcontext.ctxfrom provided kubeconfig, named config, current context extraction, or OpenShiftoc login._resolve_cluster_metadata()-- Populatecluster_server,cluster_name,context_name,username.
Establish a Kubernetes API connection. Resolution order:
- Explicit kubeconfig file
- Cluster URL + token (direct API config, SSL verification disabled)
- Default kubeconfig (
~/.kube/config) - In-cluster config
is_openshift(api_client) -> bool-- Check for OpenShift API groups.get_service_endpoint(api_client, namespace, service_name) -> str | None-- Returnip:portfor a Service.get_gateway_address(api_client, namespace, gateway_name) -> str | None-- Return first address from a Gateway CR's status.load_stacks_info(context) -> list[dict]-- Read per-stack config (name, namespace, model, method) from rendered stacks.print_phase_banner(context, extra_fields=None)-- Print a bordered phase summary banner with cluster, stack, and model info.
Validates vLLM deployment parameters against model and GPU hardware constraints using planner.capacity_planner from llm-d-planner.
Run capacity validation for all active deployment methods (standalone, decode, prefill). Returns diagnostic messages. When ignore_failures=False, errors indicate deployment will fail.
Checks performed:
- GPU count:
TP x PP x DPmust not exceed requested accelerators per pod. - Valid tensor parallelism values for the model architecture.
maxModelLendoes not exceed the model's maximum context length.- GPU memory sufficient to load model weights + activation memory.
- KV cache memory sufficient for at least one request at
maxModelLen. - Maximum concurrent request estimation.
@dataclass
class ValidationParams:
models: list[str] # HuggingFace model IDs
hf_token: str | None # For gated model access
replicas: int
gpu_memory: int # Per-GPU memory in GB (0 = skip GPU checks)
tp: int # Tensor parallelism
pp: int # Pipeline parallelism
dp: int # Data parallelism
accelerator_nr: int # GPUs requested per pod
gpu_memory_util: float # 0.0 to 1.0
max_model_len: int
ignore_failures: bool
label: str # e.g. "standalone", "decode", "prefill"find_standalone_endpoint(cmd, namespace, inference_port=80) -> (ip, service_name, port)-- Find standalone services labelledstood-up-from=llm-d-benchmark.find_gateway_endpoint(cmd, namespace, release) -> (ip_or_hostname, gateway_name, port)-- Find the gateway IP from Gateway resource status. Detects HTTPS (port 443) from managed fields. Falls back to service ClusterIP.find_custom_endpoint(cmd, namespace, method_pattern) -> (ip, name, port)-- Multi-level fallback for non-standard deployments: service name match (check port namesdefault,http,https), then pod name match (probe ports, metrics port, pod IP).
discover_hf_token_secret(cmd, namespace) -> str | None-- Find secrets matchingllm-d-hf.*tokenpattern.extract_hf_token_from_secret(cmd, namespace, secret_name, key="HF_TOKEN") -> str | None-- Decode HF token from K8s Secret. Falls back to scanning all data values forhf_prefix.
test_model_serving(cmd, namespace, host, port, expected_model, ...) -> str | None-- Query/v1/modelsvia ephemeral curl pod. Retries up tomax_retries(default 12) withretry_interval(default 15s) on transient failures (503, "not ready", "still loading"). ReturnsNoneon success.validate_model_response(stdout, expected_model, host, port) -> str | None-- Check that the/v1/modelsresponse JSON contains the expected model ID.
Shared kubectl patterns for the run phase.
CRASH_STATES-- Terminal container states:CrashLoopBackOff,Error,OOMKilled,CreateContainerConfigError,ImagePullBackOff,ErrImagePull,InvalidImageName.DATA_ACCESS_LABEL="role=llm-d-benchmark-data-access"
find_data_access_pod(cmd, namespace) -> str | None-- Find the data-access pod by its well-known label.
wait_for_pods_by_label(cmd, label, namespace, timeout, context) -> list[str]-- Two-phase wait: (1)condition=Ready=True(pods running), (2)condition=ready=False(pods finished). Checks for crash states. Returns error list (empty on success).wait_for_pod(cmd, pod_name, namespace, timeout, context, poll_interval=15) -> str-- Per-pod polling until terminal phase. Returns"Succeeded","Failed", or an error description. Detects crash states.
collect_pod_results(cmd, data_pod, namespace, remote_prefix, experiment_id, parallel_idx, local_results_dir, context) -> (local_path, success, error_msg)-- Copy results for a single parallel pod instance from the PVC viakubectl cp.sync_analysis_dir(local_path, analysis_dir, experiment_suffix)-- Move theanalysis/subdirectory from results to a dedicated directory, then remove it from results.
delete_pods_by_names(cmd, pod_names, namespace, context)-- Delete pods by individual name.delete_pods_by_label(cmd, label, namespace, context)-- Delete all pods matchingapp=<label>.
capture_pod_logs(cmd, pod_names, namespace, log_dir, context)-- Capture logs from individual harness pods to files.capture_label_logs(cmd, namespace, label, dest, label_name, context)-- Capture aggregated logs for all pods matching a label selector.capture_infrastructure_logs(cmd, namespace, log_dir, model_label, results_dir, context)-- Capture pod status snapshot (kubectl get pods -o wide) and logs from model-serving pods, EPP (inference scheduler) pods, and IGW (inference gateway) pods. Runsprocess_epp_logs.pyon captured EPP logs if the script is available. results_dir is the per-experiment results directory passed toprocess_epp_logs.py.
upload_results_dir(cmd, local_path, output, context, relative_path=None) -> str | None-- Upload a single directory to GCS (gcloud storage cp) or S3 (aws s3 cp). ReturnsNoneon success, error string on failure. No-op whenoutput == "local". Logs the would-be command in dry-run mode.upload_all_results(cmd, results_dir, output, context) -> str | None-- Bulk upload the entire results directory (safety-net in step 09).
Gated-model detection and token access verification using the huggingface_hub library.
class GatedStatus(Enum):
NOT_GATED = "not_gated"
GATED = "gated"
ERROR = "error"
class AccessStatus(Enum):
AUTHORIZED = "authorized"
UNAUTHORIZED = "unauthorized"
ERROR = "error"
@dataclass
class ModelAccessResult:
model_id: str
gated: GatedStatus
access: AccessStatus | None
detail: str
ok: bool # property: True if accessibleis_model_gated(model_id) -> GatedStatus-- Check gating status viamodel_info().user_has_model_access(model_id, hf_token) -> AccessStatus-- Verify token access to a gated model.check_model_access(model_id, hf_token=None) -> ModelAccessResult-- Combined gated + access check with descriptive detail messages.
Replaces REPLACE_ENV_* tokens in .yaml.in profile templates with runtime values. Uses regex substitution (not Jinja2).
PROFILE_TOKENS: dict[str, TokenDef] = {
"LLMDBENCH_DEPLOY_CURRENT_MODEL": TokenDef("model.name", "Model name being served"),
"LLMDBENCH_DEPLOY_CURRENT_TOKENIZER": TokenDef("model.name", "Tokenizer model name"),
"LLMDBENCH_HARNESS_STACK_ENDPOINT_URL": TokenDef(None, "Endpoint URL (runtime-detected)"),
"LLMDBENCH_RUN_DATASET_DIR": TokenDef("experiment.datasetDir", "Dataset directory"),
}build_env_map(plan_config=None, runtime_values=None) -> dict[str, str]-- Build the substitution map from plan config and runtime overrides.render_profile(template_content, env_map) -> str-- ReplaceREPLACE_ENV_*tokens. Unknown tokens are left as-is.render_profile_file(source_path, dest_path, env_map) -> Path-- Render a template file and write the result.apply_overrides(profile_content, overrides) -> str-- Apply dottedkey=valueoverrides to rendered YAML. Coerces values to int/float/bool where appropriate.
directory_exists_and_nonempty(path) -> boolfile_exists_and_nonzero(path) -> boolcreate_tmp_directory(prefix=None, suffix=None, base_dir=None) -> Pathcreate_directory(path, exist_ok=True) -> Pathcopy_directory(source, destination, overwrite=False)get_absolute_path(path) -> Path-- Resolve to absolute, expanding~.resolve_specification_file(name_or_path, base_dir=None) -> Path-- Look up a spec by bare name, category/name, or path. Searchesconfig/specification/directories. RaisesFileNotFoundError(no match) orValueError(ambiguous).remove_directory(path)create_workspace(workspace_dir) -> Path-- Create or ensure workspace exists. Uses a temp dir if none specified.create_sub_dir_workload(workspace_dir, sub_dir=None) -> Path-- Create a run-specific subdirectory withusername-timestampnaming.
@dataclass(frozen=True)
class PlatformInfo:
system: str # e.g. "darwin", "linux"
machine: str # e.g. "x86_64", "arm64"
is_mac: bool # property
is_linux: bool # property
def get_platform_info() -> PlatformInfo: ...
def get_user_id() -> str: ... # OS username via getpass.getuser()