Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .github/workflows/gpu_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,17 @@ jobs:
- name: Install torchforge
run: pip install uv && uv pip install . && uv pip install .[dev]
- name: Run unit tests with coverage
# TODO add all tests
run: pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv
run: PYTHONPATH=. pytest tests/unit_tests --cov=. --cov-report=xml --durations=20 -vv
- name: Run integration tests with coverage
env:
TORCHSTORE_RDMA_ENABLED: "0" # Disable RDMA on CI to avoid hangs
run: |
# set -e ensures the CI fails if any test file fails (not just the last one)
set -e
# Run each test file in a separate process to avoid Monarch state leakage
for test_file in tests/integration_tests/test_*.py; do
echo "Running $test_file"
PYTHONPATH=. pytest "$test_file" --cov=. --cov-append --cov-report=xml --durations=20 -vvs
done
- name: Upload Coverage to Codecov
uses: codecov/codecov-action@v3
33 changes: 33 additions & 0 deletions tests/integration_tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Integration Tests

This directory contains end-to-end integration tests for Forge components.

## Running Tests

### Important: Monarch Cleanup Issues

Monarch has seen issues in the past with proper cleanup between tests, so **integration tests should NOT be run all together in a single pytest invocation**. Running multiple integration tests in the same process can cause state leakage and test failures.

### Recommended Approach

**Run individual test files:**
```bash
PYTHONPATH=. pytest tests/integration_tests/test_grpo_e2e.py -vv
PYTHONPATH=. pytest tests/integration_tests/test_policy_update.py -vv
```

**Run all integration tests (each in separate process):**
```bash
for f in tests/integration_tests/test_*.py; do
PYTHONPATH=. pytest "$f" -vv
done
```

**Run a specific test:**
```bash
PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::TestGRPOEndToEnd::test_grpo_smoke_test
```

## CI Integration

The CI pipeline (`.github/workflows/gpu_test.yaml`) automatically runs all integration tests, executing each test file in a separate process for isolation.
1 change: 1 addition & 0 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.


import argparse

import pytest
Expand Down
124 changes: 124 additions & 0 deletions tests/integration_tests/disabled_test_grpo_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""A simple smoke test that runs the GRPO loop for 3 steps.

Run this with:
PYTHONPATH=. pytest -s tests/integration_tests/test_grpo_e2e.py::test_grpo_smoke_test


"""

import logging
import shutil
from pathlib import Path

import monarch
import monarch.actor

import pytest
import torch

from forge.util.config import resolve_hf_hub_paths
from omegaconf import DictConfig, OmegaConf

# Temporary workaround - without this, proc_mesh.stop
# will raise an exit code 1 failing all other tests.
monarch.actor.unhandled_fault_hook = lambda failure: None


logger: logging.Logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

requires_cuda = pytest.mark.skipif(
not torch.cuda.is_available(),
reason="CUDA not available",
)

TEST_CHECKPOINT_DIR = "/tmp/grpo_test_checkpoint"


def _load_config(config_path: str) -> DictConfig:
"""Load and resolve config from YAML file."""
cfg = None
try:
cfg = OmegaConf.load(config_path)
except Exception as e:
pytest.fail(f"Failed to load config file {config_path}: {e}")

assert isinstance(cfg, DictConfig)
cfg = resolve_hf_hub_paths(cfg)
return cfg


def _cleanup_checkpoint_dir():
"""Clean up test checkpoint directory."""
path = Path(TEST_CHECKPOINT_DIR)
if path.exists() and path.is_dir():
try:
shutil.rmtree(path)
logger.info(f"Successfully removed {TEST_CHECKPOINT_DIR}")
except Exception as e:
logger.error(f"Failed to remove {TEST_CHECKPOINT_DIR}: {e}")


class TestGRPOEndToEnd:
"""End-to-end integration tests for GRPO training loop."""

@pytest.mark.asyncio
@pytest.mark.timeout(600) # 10 minute timeout to prevent hanging
@requires_cuda
async def test_grpo_smoke_test(self):
"""
Smoke test for GRPO training loop.

This test runs the full GRPO pipeline for 3 training steps to verify:
- All actors and services initialize correctly
- Rollout loop generates completions
- Rewards are evaluated
- Reference model computes logprobs
- Replay buffer collects and batches experiences
- Trainer updates weights
- Policy receives weight updates
- Training completes successfully
"""
logger.info("=" * 80)
logger.info("Starting GRPO smoke test")
logger.info("=" * 80)

try:
# Load test config
config_path = "tests/integration_tests/fixtures/grpo_smoke_test.yaml"
cfg = _load_config(config_path)

logger.info("Starting GRPO smoke test with config:")
logger.info(f" Model: {cfg.model}")
logger.info(f" Group size: {cfg.group_size}")
logger.info(f" Batch size: {cfg.local_batch_size}")
logger.info(f" Training steps: {cfg.trainer.training.steps}")
logger.info(
f" Max req/res tokens: {cfg.max_req_tokens}/{cfg.max_res_tokens}"
)

# Import main here to avoid issues with module-level imports
from apps.grpo.main import main

logger.info("Starting main training loop...")
# Run the main training loop
# This should run for exactly 3 steps and then exit cleanly
await main(cfg)

logger.info("Main training loop completed successfully")
logger.info("GRPO smoke test completed successfully!")

except Exception as e:
logger.error(f"GRPO smoke test failed with error: {e}")
raise
finally:
# Cleanup
logger.info("Cleaning up test checkpoint directory...")
_cleanup_checkpoint_dir()
logger.info("Cleanup complete")
logger.info("=" * 80)
147 changes: 147 additions & 0 deletions tests/integration_tests/fixtures/grpo_smoke_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Minimal GRPO configuration for integration testing
# This config is designed to run quickly with minimal resources

# Global configuration
group_size: 2 # Minimal group size
local_batch_size: 2 # Minimal batch size
max_req_tokens: 128 # Reduced from 1024
max_res_tokens: 128 # Reduced from 1024
model: "Qwen/Qwen3-1.7B"
off_by_n: 1

# Main loop configuration
rollout_threads: 1
training_threads: 1

# Observability configuration - console only for tests
metric_logging:
console:
logging_mode: global_reduce

# Dataset configuration - streaming with limited samples
dataset:
path: "openai/gsm8k"
revision: "main"
data_split: "train"
streaming: true # Required by DatasetActor.sample()
model: ${model}

# Policy configuration
policy:
engine_args:
model: ${model}
tensor_parallel_size: 1
pipeline_parallel_size: 1
enforce_eager: true # Eager mode for simpler testing
sampling_params:
n: ${group_size}
max_tokens: ${max_res_tokens}
temperature: 1.0
top_p: 1.0

# Trainer configuration
trainer:
model:
name: qwen3
flavor: 1.7B
hf_assets_path: hf://${model}
optimizer:
name: AdamW
lr: 1e-5
eps: 1e-8
lr_scheduler:
warmup_steps: 1
training:
local_batch_size: ${local_batch_size}
seq_len: ${sum:${max_req_tokens},${max_res_tokens}}
max_norm: 1.0
steps: 1
dtype: bfloat16
gc_freq: 1
compile:
enable: false
parallelism:
data_parallel_replicate_degree: 1
data_parallel_shard_degree: 1
tensor_parallel_degree: 1
pipeline_parallel_degree: 1
context_parallel_degree: 1
expert_parallel_degree: 1
disable_loss_parallel: true
checkpoint:
enable: true
folder: /tmp/grpo_test_checkpoint
initial_load_path: hf://${model}
initial_load_in_hf: true
last_save_in_hf: false # Don't save back to HF in tests
interval: 500
async_mode: "disabled"
activation_checkpoint:
mode: selective
selective_ac_option: op

# Replay buffer configuration
replay_buffer:
batch_size: ${local_batch_size}
max_policy_age: ${off_by_n}
dp_size: ${trainer.parallelism.data_parallel_shard_degree}

# Reference model configuration
ref_model:
model:
name: qwen3
flavor: 1.7B
hf_assets_path: hf://${model}
training:
seq_len: ${trainer.training.seq_len}
dtype: bfloat16
gc_freq: 1
compile:
enable: false
parallelism:
data_parallel_replicate_degree: 1
data_parallel_shard_degree: 1
tensor_parallel_degree: 1
pipeline_parallel_degree: 1
context_parallel_degree: 1
expert_parallel_degree: 1
checkpoint:
enable: true
initial_load_path: hf://${model}
initial_load_in_hf: true

# All resource allocations
services:
policy:
procs: ${policy.engine_args.tensor_parallel_size}
num_replicas: 1
mesh_name: policy
with_gpus: true
ref_model:
procs: 1
num_replicas: 1
mesh_name: ref_model
with_gpus: true
reward_actor:
procs: 1
num_replicas: 1
mesh_name: reward_actor
with_gpus: false

actors:
dataset:
procs: 1
with_gpus: false
mesh_name: dataset
trainer:
procs: 1
with_gpus: true
mesh_name: trainer
replay_buffer:
procs: 1
with_gpus: false
mesh_name: replay_buffer
compute_advantages:
procs: 1
with_gpus: false
mesh_name: compute_advantages
Loading
Loading