Skip to content

Fix compatibility issues with the latest torch #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: torch_ccl_dev_2.6
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions src/ProcessGroupCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<C10D_Work>> allreduce_xpu
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
const c10::optional<at::Tensor>& sparse_indices,
bool asyncOp,
int64_t timeout) {
auto tensor_vec = tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::XPU)
->allreduce(
tensor_vec,
c10d::AllreduceOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
*reduce_op.get(), std::chrono::milliseconds(timeout), asyncOp});

// Return input tensors as output tensors to make inplace allreduce look like
// a functional API, so that make_fx can correctly build the dependencies in
Expand Down Expand Up @@ -140,11 +141,13 @@ c10::intrusive_ptr<C10D_Work> allreduce_coalesced_xpu_(
at::TensorList tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
bool asyncOp,
int64_t timeout) {
auto tensor_vec = tensors.vec();
AllreduceCoalescedOptions opts = AllreduceCoalescedOptions{};
opts.reduceOp = *reduce_op.get();
opts.timeout = std::chrono::milliseconds(timeout);
opts.asyncOp = asyncOp;

return process_group->getBackend(c10::DeviceType::XPU)
->allreduce_coalesced(tensor_vec, opts);
Expand All @@ -160,6 +163,7 @@ c10::intrusive_ptr<C10D_Work> reduce_xpu_(
const c10::intrusive_ptr<ReduceOp>& reduce_op,
int64_t root_rank,
int64_t root_tensor,
bool asyncOp,
int64_t timeout) {
auto tensor_vec = tensors.vec();
return process_group->getBackend(c10::DeviceType::XPU)
Expand All @@ -169,7 +173,7 @@ c10::intrusive_ptr<C10D_Work> reduce_xpu_(
*reduce_op.get(),
root_rank,
root_tensor,
std::chrono::milliseconds(timeout)});
std::chrono::milliseconds(timeout), asyncOp});
}

TORCH_LIBRARY_IMPL(c10d, XPU, m) {
Expand All @@ -181,14 +185,15 @@ allgather_xpu_(
const std::vector<std::vector<at::Tensor>>& output_tensors,
at::TensorList input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
bool asyncOp,
int64_t timeout) {
auto input_tensors_vec = input_tensors.vec();
auto work =
process_group->getBackend(c10::DeviceType::XPU)
->allgather(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_tensors),
input_tensors_vec,
AllgatherOptions{std::chrono::milliseconds(timeout)});
AllgatherOptions{std::chrono::milliseconds(timeout), asyncOp});

// Copy output tensors (not storage) so that this can be used in a functional
// manner
Expand Down Expand Up @@ -234,12 +239,15 @@ TORCH_LIBRARY_IMPL(c10d, XPU, m) {
c10::intrusive_ptr<c10d::Work> allgather_into_tensor_coalesced_xpu_(
at::TensorList outputs,
at::TensorList inputs,
const c10::intrusive_ptr<ProcessGroup>& process_group) {
const c10::intrusive_ptr<ProcessGroup>& process_group,
bool asyncOp) {

auto output_vec = outputs.vec();
auto input_vec = inputs.vec();
AllgatherOptions opts = AllgatherOptions{};
opts.asyncOp = asyncOp;
return process_group->getBackend(c10::DeviceType::XPU)
->allgather_into_tensor_coalesced(output_vec, input_vec);
->allgather_into_tensor_coalesced(output_vec, input_vec, opts);
}

TORCH_LIBRARY_IMPL(c10d, XPU, m) {
Expand All @@ -249,12 +257,16 @@ TORCH_LIBRARY_IMPL(c10d, XPU, m) {
c10::intrusive_ptr<C10D_Work> allgather_coalesced_xpu_(
const std::vector<std::vector<at::Tensor>>& output_lists,
const at::TensorList& input_list,
const c10::intrusive_ptr<ProcessGroup>& process_group) {
const c10::intrusive_ptr<ProcessGroup>& process_group,
bool asyncOp) {
auto input_list_vec = input_list.vec();
AllgatherOptions opts = AllgatherOptions{};
opts.asyncOp = asyncOp;
return process_group->getBackend(c10::DeviceType::XPU)
->allgather_coalesced(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_lists),
input_list_vec);
input_list_vec,
opts);
}

TORCH_LIBRARY_IMPL(c10d, XPU, m) {
Expand All @@ -266,13 +278,14 @@ c10::intrusive_ptr<C10D_Work> gather_xpu_(
const at::TensorList& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
int64_t root_rank,
bool asyncOp,
int64_t timeout) {
auto input_tensors_vec = input_tensors.vec();
return process_group->getBackend(c10::DeviceType::XPU)
->gather(
const_cast<std::vector<std::vector<at::Tensor>>&>(output_tensors),
input_tensors_vec,
GatherOptions{root_rank, std::chrono::milliseconds(timeout)});
GatherOptions{root_rank, std::chrono::milliseconds(timeout), asyncOp});
}

TORCH_LIBRARY_IMPL(c10d, XPU, m) {
Expand Down Expand Up @@ -329,6 +342,7 @@ reduce_scatter_xpu_(
const std::vector<std::vector<at::Tensor>>& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
bool asyncOp,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto work =
Expand All @@ -337,7 +351,7 @@ reduce_scatter_xpu_(
output_tensors_vec,
const_cast<std::vector<std::vector<at::Tensor>>&>(input_tensors),
ReduceScatterOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
*reduce_op.get(), std::chrono::milliseconds(timeout), asyncOp});

return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<C10D_Work>>(
output_tensors_vec, work);
Expand Down Expand Up @@ -394,6 +408,7 @@ c10::intrusive_ptr<C10D_Work> reduce_scatter_tensor_coalesced_xpu_(
at::TensorList inputs,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const c10::intrusive_ptr<ReduceOp>& reduce_op,
bool asyncOp,
int64_t timeout) {
auto output_vec = outputs.vec();
auto input_vec = inputs.vec();
Expand All @@ -402,7 +417,7 @@ c10::intrusive_ptr<C10D_Work> reduce_scatter_tensor_coalesced_xpu_(
output_vec,
input_vec,
ReduceScatterOptions{
*reduce_op.get(), std::chrono::milliseconds(timeout)});
*reduce_op.get(), std::chrono::milliseconds(timeout), asyncOp});
}

TORCH_LIBRARY_IMPL(c10d, XPU, m) {
Expand All @@ -415,14 +430,15 @@ c10::intrusive_ptr<C10D_Work> alltoall_base_xpu_(
const c10::intrusive_ptr<ProcessGroup>& process_group,
std::vector<int64_t> output_split_sizes,
std::vector<int64_t> input_split_sizes,
bool asyncOp,
int64_t timeout) {
return process_group->getBackend(c10::DeviceType::XPU)
->alltoall_base(
output,
input,
output_split_sizes,
input_split_sizes,
AllToAllOptions{std::chrono::milliseconds(timeout)});
AllToAllOptions{std::chrono::milliseconds(timeout), asyncOp});
}

TORCH_LIBRARY_IMPL(c10d, XPU, m) {
Expand All @@ -433,14 +449,15 @@ std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<C10D_Work>> alltoall_xpu_
const at::TensorList& output_tensors,
const at::TensorList& input_tensors,
const c10::intrusive_ptr<ProcessGroup>& process_group,
bool asyncOp,
int64_t timeout) {
auto output_tensors_vec = output_tensors.vec();
auto input_tensors_vec = input_tensors.vec();
auto work = process_group->getBackend(c10::DeviceType::XPU)
->alltoall(
output_tensors_vec,
input_tensors_vec,
AllToAllOptions{std::chrono::milliseconds(timeout)});
AllToAllOptions{std::chrono::milliseconds(timeout), asyncOp});
return std::tuple<std::vector<at::Tensor>, c10::intrusive_ptr<C10D_Work>>(
std::move(output_tensors_vec), work);
}
Expand Down Expand Up @@ -494,9 +511,14 @@ c10::intrusive_ptr<Work> barrier_xpu(
at::Tensor /* unused */,
const c10::intrusive_ptr<ProcessGroup>& process_group,
const std::vector<int64_t>& device_ids,
bool asyncOp,
int64_t timeout) {
BarrierOptions opts = BarrierOptions{};
opts.device_ids = device_ids;
opts.timeout = std::chrono::milliseconds(timeout);
opts.asyncOp = asyncOp;
return process_group->getBackend(c10::DeviceType::XPU)
->barrier(BarrierOptions{device_ids, std::chrono::milliseconds(timeout)});
->barrier(opts);
}

TORCH_LIBRARY_IMPL(c10d, XPU, m) {
Expand Down