diff --git a/.github/workflows/dist_testing.yml b/.github/workflows/dist_testing.yml new file mode 100644 index 000000000000..59907ef489de --- /dev/null +++ b/.github/workflows/dist_testing.yml @@ -0,0 +1,43 @@ +name: Testing distributed PyG + +on: # yamllint disable-line rule:truthy + push: + branches: + - master + pull_request: + +jobs: + + minimal_pytest: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 40 + + # Skip workflow if only certain files have been changed. + - name: Get changed files + id: changed-files-specific + uses: tj-actions/changed-files@v41 + with: + files: | + torch_geometric/distributed/** + test/distributed/** + + - name: Setup packages + if: steps.changed-files-specific.outputs.any_changed == 'true' + uses: ./.github/actions/setup + + - name: Install main package + if: steps.changed-files-specific.outputs.any_changed == 'true' + run: | + pip install -e .[test] + + - name: Run tests + if: steps.changed-files-specific.outputs.any_changed == 'true' + timeout-minutes: 10 + run: | + DIST_TEST=1 pytest test/distributed + shell: bash diff --git a/test/distributed/test_dist_link_neighbor_loader.py b/test/distributed/test_dist_link_neighbor_loader.py index 07d8960e0c88..6a78a817701a 100644 --- a/test/distributed/test_dist_link_neighbor_loader.py +++ b/test/distributed/test_dist_link_neighbor_loader.py @@ -16,7 +16,7 @@ Partitioner, ) from torch_geometric.distributed.partition import load_partition_info -from torch_geometric.testing import onlyLinux, withPackage +from torch_geometric.testing import onlyDistributedTest def create_dist_data(tmp_path: str, rank: int): @@ -156,8 +156,7 @@ def dist_link_neighbor_loader_hetero( assert loader.channel.empty() -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('num_parts', [2]) @pytest.mark.parametrize('num_workers', [0]) @pytest.mark.parametrize('async_sampling', [True]) @@ -203,8 +202,7 @@ def test_dist_link_neighbor_loader_homo( w1.join() -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('num_parts', [2]) @pytest.mark.parametrize('num_workers', [0]) @pytest.mark.parametrize('async_sampling', [True]) diff --git a/test/distributed/test_dist_link_neighbor_sampler.py b/test/distributed/test_dist_link_neighbor_sampler.py index 49ec3297b568..10effa8284ff 100644 --- a/test/distributed/test_dist_link_neighbor_sampler.py +++ b/test/distributed/test_dist_link_neighbor_sampler.py @@ -16,7 +16,7 @@ from torch_geometric.distributed.rpc import init_rpc, shutdown_rpc from torch_geometric.sampler import EdgeSamplerInput, NeighborSampler from torch_geometric.sampler.neighbor_sampler import edge_sample -from torch_geometric.testing import onlyLinux, withPackage +from torch_geometric.testing import onlyDistributedTest def create_data(rank, world_size, time_attr: Optional[str] = None): @@ -236,8 +236,7 @@ def dist_link_neighbor_sampler_temporal( assert out_dist.num_sampled_edges == out.num_sampled_edges -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('disjoint', [False, True]) def test_dist_link_neighbor_sampler(disjoint): mp_context = torch.multiprocessing.get_context('spawn') @@ -262,8 +261,7 @@ def test_dist_link_neighbor_sampler(disjoint): w1.join() -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('seed_time', [None, torch.tensor([3, 6])]) @pytest.mark.parametrize('temporal_strategy', ['uniform', 'last']) def test_dist_link_neighbor_sampler_temporal(seed_time, temporal_strategy): @@ -289,8 +287,7 @@ def test_dist_link_neighbor_sampler_temporal(seed_time, temporal_strategy): w1.join() -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('seed_time', [[1, 1], [3, 7]]) @pytest.mark.parametrize('temporal_strategy', ['uniform', 'last']) def test_dist_neighbor_sampler_edge_level_temporal( diff --git a/test/distributed/test_dist_neighbor_loader.py b/test/distributed/test_dist_neighbor_loader.py index 7abb3ec212ba..31ef5103d3e8 100644 --- a/test/distributed/test_dist_neighbor_loader.py +++ b/test/distributed/test_dist_neighbor_loader.py @@ -15,7 +15,7 @@ Partitioner, ) from torch_geometric.distributed.partition import load_partition_info -from torch_geometric.testing import onlyLinux, withPackage +from torch_geometric.testing import onlyDistributedTest def create_dist_data(tmp_path: str, rank: int): @@ -161,8 +161,7 @@ def dist_neighbor_loader_hetero( assert loader.channel.empty() -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('num_parts', [2]) @pytest.mark.parametrize('num_workers', [0]) @pytest.mark.parametrize('async_sampling', [True]) @@ -204,8 +203,7 @@ def test_dist_neighbor_loader_homo( w1.join() -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('num_parts', [2]) @pytest.mark.parametrize('num_workers', [0]) @pytest.mark.parametrize('async_sampling', [True]) diff --git a/test/distributed/test_dist_neighbor_sampler.py b/test/distributed/test_dist_neighbor_sampler.py index bbab44911706..ac3ea9c29f0a 100644 --- a/test/distributed/test_dist_neighbor_sampler.py +++ b/test/distributed/test_dist_neighbor_sampler.py @@ -19,7 +19,7 @@ from torch_geometric.distributed.rpc import init_rpc, shutdown_rpc from torch_geometric.sampler import NeighborSampler, NodeSamplerInput from torch_geometric.sampler.neighbor_sampler import node_sample -from torch_geometric.testing import onlyLinux, withPackage +from torch_geometric.testing import onlyDistributedTest def create_data(rank: int, world_size: int, time_attr: Optional[str] = None): @@ -399,8 +399,7 @@ def dist_neighbor_sampler_temporal_hetero( assert out_dist.num_sampled_nodes[k] == out.num_sampled_nodes[k] -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('disjoint', [False, True]) def test_dist_neighbor_sampler(disjoint): mp_context = torch.multiprocessing.get_context('spawn') @@ -426,8 +425,7 @@ def test_dist_neighbor_sampler(disjoint): w1.join() -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('seed_time', [None, torch.tensor([3, 6])]) @pytest.mark.parametrize('temporal_strategy', ['uniform']) def test_dist_neighbor_sampler_temporal(seed_time, temporal_strategy): @@ -454,8 +452,7 @@ def test_dist_neighbor_sampler_temporal(seed_time, temporal_strategy): w1.join() -@onlyLinux -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('seed_time', [[3, 7]]) @pytest.mark.parametrize('temporal_strategy', ['last']) def test_dist_neighbor_sampler_edge_level_temporal( @@ -487,7 +484,7 @@ def test_dist_neighbor_sampler_edge_level_temporal( w1.join() -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('disjoint', [False, True]) def test_dist_neighbor_sampler_hetero(tmp_path, disjoint): mp_context = torch.multiprocessing.get_context('spawn') @@ -525,7 +522,7 @@ def test_dist_neighbor_sampler_hetero(tmp_path, disjoint): w1.join() -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('seed_time', [None, [0, 0], [2, 2]]) @pytest.mark.parametrize('temporal_strategy', ['uniform', 'last']) def test_dist_neighbor_sampler_temporal_hetero( @@ -578,7 +575,7 @@ def test_dist_neighbor_sampler_temporal_hetero( w1.join() -@withPackage('pyg_lib') +@onlyDistributedTest @pytest.mark.parametrize('seed_time', [[0, 0], [1, 2]]) @pytest.mark.parametrize('temporal_strategy', ['uniform', 'last']) def test_dist_neighbor_sampler_edge_level_temporal_hetero( diff --git a/test/distributed/test_dist_utils.py b/test/distributed/test_dist_utils.py index d88bc94f21dc..780885c3ffda 100644 --- a/test/distributed/test_dist_utils.py +++ b/test/distributed/test_dist_utils.py @@ -2,8 +2,10 @@ from torch_geometric.distributed.utils import remove_duplicates from torch_geometric.sampler import SamplerOutput +from torch_geometric.testing import onlyDistributedTest +@onlyDistributedTest def test_remove_duplicates(): node = torch.tensor([0, 1, 2, 3]) out_node = torch.tensor([0, 4, 1, 5, 1, 6, 2, 7, 3, 8]) @@ -16,6 +18,7 @@ def test_remove_duplicates(): assert node.tolist() == [0, 1, 2, 3, 4, 5, 6, 7, 8] +@onlyDistributedTest def test_remove_duplicates_disjoint(): node = torch.tensor([0, 1, 2, 3]) batch = torch.tensor([0, 1, 2, 3]) diff --git a/test/distributed/test_local_feature_store.py b/test/distributed/test_local_feature_store.py index 5dd1a1c441d2..68d2b319fe09 100644 --- a/test/distributed/test_local_feature_store.py +++ b/test/distributed/test_local_feature_store.py @@ -1,8 +1,10 @@ import torch from torch_geometric.distributed import LocalFeatureStore +from torch_geometric.testing import onlyDistributedTest +@onlyDistributedTest def test_local_feature_store_global_id(): store = LocalFeatureStore() @@ -29,6 +31,7 @@ def test_local_feature_store_global_id(): assert torch.equal(out, feat[torch.tensor([3, 8, 4])]) +@onlyDistributedTest def test_local_feature_store_utils(): store = LocalFeatureStore() @@ -57,6 +60,7 @@ def test_local_feature_store_utils(): assert store.get_tensor_size(attr) == (6, 3) +@onlyDistributedTest def test_homogeneous_feature_store(): node_id = torch.randperm(6) x = torch.randn(6, 32) @@ -86,6 +90,7 @@ def test_homogeneous_feature_store(): ) +@onlyDistributedTest def test_heterogeneous_feature_store(): node_type = 'paper' edge_type = ('paper', 'to', 'paper') diff --git a/test/distributed/test_local_graph_store.py b/test/distributed/test_local_graph_store.py index fc36536c3694..b08a68025e59 100644 --- a/test/distributed/test_local_graph_store.py +++ b/test/distributed/test_local_graph_store.py @@ -1,9 +1,10 @@ import torch from torch_geometric.distributed import LocalGraphStore -from torch_geometric.testing import get_random_edge_index +from torch_geometric.testing import get_random_edge_index, onlyDistributedTest +@onlyDistributedTest def test_local_graph_store(): graph_store = LocalGraphStore() @@ -34,6 +35,7 @@ def test_local_graph_store(): assert len(graph_store.get_all_edge_attrs()) == 0 +@onlyDistributedTest def test_homogeneous_graph_store(): edge_id = torch.randperm(300) edge_index = get_random_edge_index(100, 100, 300) @@ -63,6 +65,7 @@ def test_homogeneous_graph_store(): ) +@onlyDistributedTest def test_heterogeneous_graph_store(): edge_type = ('paper', 'to', 'paper') edge_id_dict = {edge_type: torch.randperm(300)} @@ -94,6 +97,7 @@ def test_heterogeneous_graph_store(): ) +@onlyDistributedTest def test_sorted_graph_store(): edge_index_sorted = torch.tensor([[1, 7, 5, 6, 1], [0, 0, 1, 1, 2]]) edge_id_sorted = torch.tensor([0, 1, 2, 3, 4]) diff --git a/test/distributed/test_partition.py b/test/distributed/test_partition.py index 089dd7bc121d..46e494124916 100644 --- a/test/distributed/test_partition.py +++ b/test/distributed/test_partition.py @@ -1,6 +1,5 @@ import os.path as osp -import pytest import torch from torch_geometric.datasets import FakeDataset, FakeHeteroDataset @@ -9,20 +8,11 @@ LocalGraphStore, Partitioner, ) +from torch_geometric.testing import onlyDistributedTest from torch_geometric.typing import EdgeTypeStr -try: - # TODO Using `pyg-lib` metis partitioning leads to some weird bugs in the - # CI. As such, we require `torch-sparse` for these tests for now. - rowptr = torch.tensor([0, 1]) - col = torch.tensor([0]) - torch.ops.torch_sparse.partition(rowptr, col, None, 1, True) - WITH_METIS = True -except (AttributeError, RuntimeError): - WITH_METIS = False - -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_partition_data(tmp_path): data = FakeDataset()[0] @@ -68,7 +58,7 @@ def test_partition_data(tmp_path): node_feats1['feats']['x']) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_partition_hetero_data(tmp_path): data = FakeHeteroDataset()[0] @@ -102,7 +92,7 @@ def test_partition_hetero_data(tmp_path): assert osp.exists(edge_feats_path) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_partition_data_temporal(tmp_path): data = FakeDataset()[0] data.time = torch.arange(data.num_nodes) @@ -122,7 +112,7 @@ def test_partition_data_temporal(tmp_path): assert torch.equal(data.time, node_feats1['time']) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_partition_data_edge_level_temporal(tmp_path): data = FakeDataset(edge_dim=2)[0] data.edge_time = torch.arange(data.num_edges) @@ -144,7 +134,7 @@ def test_partition_data_edge_level_temporal(tmp_path): edge_feats1['edge_time']) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_partition_hetero_data_temporal(tmp_path): data = FakeHeteroDataset()[0] @@ -167,7 +157,7 @@ def test_partition_hetero_data_temporal(tmp_path): assert torch.equal(data[key].time, node_feats1[key]['time']) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_partition_hetero_data_edge_level_temporal(tmp_path): data = FakeHeteroDataset(edge_dim=2)[0] @@ -196,7 +186,7 @@ def test_partition_hetero_data_edge_level_temporal(tmp_path): ) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_from_partition_data(tmp_path): data = FakeDataset()[0] @@ -230,7 +220,7 @@ def test_from_partition_data(tmp_path): assert torch.allclose(data.x[id2], x2) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_from_partition_hetero_data(tmp_path): data = FakeHeteroDataset()[0] @@ -257,7 +247,7 @@ def test_from_partition_hetero_data(tmp_path): assert node_types == set(data.node_types) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_from_partition_temporal_data(tmp_path): data = FakeDataset()[0] data.time = torch.arange(data.num_nodes) @@ -282,7 +272,7 @@ def test_from_partition_temporal_data(tmp_path): assert torch.equal(time2, data.time) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_from_partition_edge_level_temporal_data(tmp_path): data = FakeDataset(edge_dim=2)[0] data.edge_time = torch.arange(data.num_edges) @@ -309,7 +299,7 @@ def test_from_partition_edge_level_temporal_data(tmp_path): assert torch.equal(data.edge_time[edge_id2], time2) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_from_partition_hetero_temporal_data(tmp_path): data = FakeHeteroDataset()[0] @@ -341,7 +331,7 @@ def test_from_partition_hetero_temporal_data(tmp_path): assert torch.equal(times2[key], data[key].time) -@pytest.mark.skipif(not WITH_METIS, reason='Not compiled with METIS support') +@onlyDistributedTest def test_from_partition_hetero_edge_level_temporal_data(tmp_path): data = FakeHeteroDataset(edge_dim=2)[0] diff --git a/test/distributed/test_rpc.py b/test/distributed/test_rpc.py index 92dddc655892..440d18bec5bd 100644 --- a/test/distributed/test_rpc.py +++ b/test/distributed/test_rpc.py @@ -6,7 +6,7 @@ from torch_geometric.distributed import LocalFeatureStore from torch_geometric.distributed.dist_context import DistContext from torch_geometric.distributed.rpc import RPCRouter -from torch_geometric.testing import onlyLinux +from torch_geometric.testing import onlyDistributedTest def run_rpc_feature_test( @@ -80,7 +80,7 @@ def run_rpc_feature_test( assert rpc.rpc_is_initialized() is False -@onlyLinux +@onlyDistributedTest def test_dist_feature_lookup(): cpu_tensor0 = torch.cat([torch.ones(128, 1024), torch.ones(128, 1024) * 2]) cpu_tensor1 = torch.cat([torch.zeros(128, 1024), torch.zeros(128, 1024)]) diff --git a/torch_geometric/testing/__init__.py b/torch_geometric/testing/__init__.py index 509362fa0b1a..7b6349d6094d 100644 --- a/torch_geometric/testing/__init__.py +++ b/torch_geometric/testing/__init__.py @@ -6,6 +6,8 @@ from .decorators import ( is_full_test, onlyFullTest, + is_distributed_test, + onlyDistributedTest, onlyLinux, noWindows, onlyPython, @@ -32,6 +34,8 @@ __all__ = [ 'is_full_test', 'onlyFullTest', + 'is_distributed_test', + 'onlyDistributedTest', 'onlyLinux', 'noWindows', 'onlyPython', diff --git a/torch_geometric/testing/decorators.py b/torch_geometric/testing/decorators.py index 964619078251..f378b6bb9ca9 100644 --- a/torch_geometric/testing/decorators.py +++ b/torch_geometric/testing/decorators.py @@ -28,6 +28,23 @@ def onlyFullTest(func: Callable) -> Callable: )(func) +def is_distributed_test() -> bool: + r"""Whether to run the distributed test suite.""" + return ((is_full_test() or os.getenv('DIST_TEST', '0') == '1') + and sys.platform == 'linux' and has_package('pyg_lib')) + + +def onlyDistributedTest(func: Callable) -> Callable: + r"""A decorator to specify that this function belongs to the distributed + test suite. + """ + import pytest + return pytest.mark.skipif( + not is_distributed_test(), + reason="Fast test run", + )(func) + + def onlyLinux(func: Callable) -> Callable: r"""A decorator to specify that this function should only execute on Linux systems.