Skip to content

Commit 1dd4657

Browse files
authored
Add XPU support for barrier (#96)
* Fix the issue of gpu hang with barrier as the first call * Usage: For XPU package (built with COMPUTE_BACKEND=dpcpp): use XPU barrier; for CPU package: use CPU barrier.
1 parent 43f28a7 commit 1dd4657

File tree

6 files changed

+93
-0
lines changed

6 files changed

+93
-0
lines changed

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ target_compile_options(oneccl_bindings_for_pytorch PUBLIC -Wall
1010

1111
if(COMPUTE_BACKEND STREQUAL "dpcpp")
1212
add_subdirectory(./gpu)
13+
add_definitions (-DUSE_GPU)
1314
endif()
1415

1516
target_include_directories(oneccl_bindings_for_pytorch PUBLIC ./)

src/ProcessGroupCCL.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,19 @@ c10::intrusive_ptr<C10D_Work> recv_any_source_xpu_(
349349
TORCH_LIBRARY_IMPL(c10d, XPU, m) {
350350
m.impl("recv_any_source_", recv_any_source_xpu_);
351351
}
352+
353+
c10::intrusive_ptr<Work> barrier_xpu(
354+
at::Tensor /* unused */,
355+
const c10::intrusive_ptr<ProcessGroup>& process_group,
356+
const std::vector<int64_t>& device_ids,
357+
int64_t timeout) {
358+
return process_group->getBackend(c10::DeviceType::XPU)
359+
->barrier(BarrierOptions{device_ids, std::chrono::milliseconds(timeout)});
360+
}
361+
362+
TORCH_LIBRARY_IMPL(c10d, XPU, m) {
363+
m.impl("barrier", barrier_xpu);
364+
}
352365
} // namespace ops
353366

354367

src/dispatch_stub.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,13 @@ c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> DispatchStub::recv(std::vector
548548

549549
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> DispatchStub::barrier(const BarrierOptions& opts,
550550
ProcessGroupCCL& pg_ccl) {
551+
#ifdef USE_GPU
552+
std::cout << "Barrier: using xpu" << std::endl;
553+
c10::DeviceType dev_type = c10::DeviceType::XPU;
554+
#else
555+
std::cout << "Barrier: using cpu" << std::endl;
551556
c10::DeviceType dev_type = c10::DeviceType::CPU;
557+
#endif
552558
return get_ccl_stub(dev_type)->barrier_(opts, pg_ccl);
553559
}
554560

src/gpu/dpcpp_ccl.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,9 @@ class XPUCCLStubs final: public DispatchStub {
401401
int tag,
402402
ProcessGroupCCL& pg) override;
403403

404+
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> barrier_(const BarrierOptions& opts,
405+
ProcessGroupCCL& pg) override;
406+
404407
void destroy();
405408
void reset() override {}
406409
void runLoop();
@@ -1116,6 +1119,35 @@ c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> XPUCCLStubs::recv_(std::vector
11161119
return work;
11171120
}
11181121

1122+
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> XPUCCLStubs::barrier_(const BarrierOptions& opts,
1123+
ProcessGroupCCL& pg) {
1124+
1125+
c10::intrusive_ptr<AsyncBarrierWork> work = c10::make_intrusive<AsyncBarrierWork>();
1126+
1127+
if (pg.ccl_member_->ccl_comms.size() == 0) {
1128+
std::vector<at::Device> xpu_devices{at::Device(at::kXPU)};
1129+
const auto key = get_key_from_devs(xpu_devices);
1130+
get_ccl_comms(pg, key, xpu_devices);
1131+
}
1132+
1133+
auto& comms_map = pg.ccl_member_->ccl_comms;
1134+
for(auto iter = comms_map.begin(); iter != comms_map.end(); iter++){
1135+
for(size_t i =0 ; i < iter->second->comms.size(); i++){
1136+
work->getEvents().emplace_back(
1137+
call_with_lock(c10d::ProcessGroupCCL::globalMutex, [&](){
1138+
if (i < iter->second->streams.size()) {
1139+
CCL_CHECK(return ccl::barrier(iter->second->comms[i],
1140+
iter->second->streams[i]););
1141+
} else {
1142+
CCL_CHECK(return ccl::barrier(iter->second->comms[i]););
1143+
}
1144+
})
1145+
);
1146+
}
1147+
}
1148+
return work;
1149+
}
1150+
11191151
RegisterXPUMethods xpu_register;
11201152

11211153
}

tests/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@ For cross-nodes p2p test, run:
2323
mpiexec -host nodeA,nodeB -np 24 -ppn 12 python -u test_p2p_crossnodes.py --dist_url $NODE_IP --world_size 24
2424
```
2525

26+
## functionality validation of barrier
27+
For cpu barrier, run:
28+
29+
```bash
30+
mpirun -np 2 python test_barrier.py
31+
```
32+
33+
For xpu barrier (built with "COMPUTE_BACKEND=dpcpp"), run:
34+
35+
```bash
36+
mpirun -np 2 python test_barrier.py --device xpu
37+
```
38+
2639
## broadcast/allreduce profiling
2740
To start the test_allreduce.py test, run:
2841

tests/test_barrier.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import torch
2+
import intel_extension_for_pytorch
3+
import oneccl_bindings_for_pytorch
4+
import torch.distributed as dist
5+
import os
6+
7+
import argparse
8+
parser = argparse.ArgumentParser()
9+
parser.add_argument('--device', '-dev', type=str, default='cpu', help='Device type to use: cpu, xpu')
10+
args = parser.parse_args()
11+
12+
os.environ['RANK'] = str(os.environ.get('PMI_RANK', 0))
13+
os.environ['WORLD_SIZE'] = str(os.environ.get('PMI_SIZE', 1))
14+
os.environ['MASTER_ADDR'] = '127.0.0.1'
15+
os.environ['MASTER_PORT'] = '29500'
16+
17+
dist.init_process_group("ccl")
18+
rank = dist.get_rank()
19+
size = dist.get_world_size()
20+
21+
if args.device == 'xpu':
22+
device = "xpu:{}".format(rank)
23+
else:
24+
device = 'cpu'
25+
26+
print("Barrier using device: ", args.device)
27+
dist.barrier()
28+
print("Finish")

0 commit comments

Comments
 (0)