Skip to content

Commit 03fe4a1

Browse files
liangan1chengjunlu
authored andcommitted
Fix merge confict issue (#63)
1 parent d616d9b commit 03fe4a1

File tree

5 files changed

+1
-106
lines changed

5 files changed

+1
-106
lines changed

src/ProcessGroupCCL.hpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,6 @@ class ProcessGroupCCL : public ProcessGroup
175175
std::vector<std::vector<at::Tensor>>& inputTensors,
176176
const ReduceScatterOptions& opts = ReduceScatterOptions()) override;
177177

178-
c10::intrusive_ptr<C10D_Work> _reduce_scatter_base(
179-
at::Tensor& outputTensor,
180-
at::Tensor& inputTensor,
181-
const ReduceScatterOptions& opts = ReduceScatterOptions()) override;
182-
183178
c10::intrusive_ptr<C10D_Work> _reduce_scatter_base(
184179
at::Tensor& outputBuffer,
185180
at::Tensor& inputBuffer,

src/cpu/cpu_ccl.cpp

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,6 @@ class VanillaCPU final: public DispatchStub {
158158
const GatherOptions& opts,
159159
ProcessGroupCCL& pg) override;
160160

161-
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> _reduce_scatter_base_(at::Tensor& outputTensor,
162-
at::Tensor& inputTensor,
163-
const ReduceScatterOptions& opts,
164-
ProcessGroupCCL& pg) override;
165-
166161
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> alltoall_base_(at::Tensor& outputTensor,
167162
at::Tensor& inputTensor,
168163
std::vector<int64_t>& outputSplitSizes,
@@ -352,51 +347,6 @@ c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> VanillaCPU::reduce_(std::vecto
352347
return work;
353348
}
354349

355-
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> VanillaCPU::_reduce_scatter_base_(at::Tensor& outputTensor,
356-
at::Tensor& inputTensor,
357-
const ReduceScatterOptions& opts,
358-
ProcessGroupCCL& pg_ccl) {
359-
const int world_size = pg_ccl.getSize();
360-
if (inputTensor.numel() != outputTensor.numel() * world_size) {
361-
TORCH_CHECK(
362-
false,
363-
"input tensor must be the same size as output size times world size");
364-
}
365-
366-
// just a wrapper to fit the collective interface
367-
auto inputs = std::vector<at::Tensor> {inputTensor};
368-
auto outputs = std::vector<at::Tensor> {outputTensor};
369-
370-
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> work;
371-
work = collective<get_ccl_comms, CPUWorkCCL>(
372-
pg_ccl,
373-
inputs,
374-
outputs,
375-
[=](at::Tensor input,
376-
at::Tensor output,
377-
ccl::reduce_attr attr,
378-
ccl::communicator& comm) {
379-
380-
ccl::event ret_evt;
381-
call_with_lock(c10d::ProcessGroupCCL::globalMutex, [&]() {
382-
CCL_CHECK(ret_evt = ccl::reduce_scatter(input.data_ptr(),
383-
output.data_ptr(),
384-
(size_t) output.numel(),
385-
cclDatatypes.at(input.scalar_type()),
386-
cclOps.at(opts.reduceOp),
387-
comm));
388-
});
389-
return ret_evt;
390-
391-
},
392-
c10d::OpType::_REDUCE_SCATTER_BASE,
393-
"oneccl_bindings_for_pytorch::cpu_work::_reduce_scatter_base");
394-
395-
work->debugName = std::string("cpu::_reduce_scatter_base");
396-
enqueue(work);
397-
return work;
398-
}
399-
400350
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> VanillaCPU::broadcast_(std::vector<at::Tensor>& tensors,
401351
const BroadcastOptions &opts,
402352
ProcessGroupCCL& pg) {

src/dispatch_stub.cpp

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -259,14 +259,6 @@ class DebugCCLStub final: public DispatchStub {
259259
return work;
260260
}
261261

262-
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> _reduce_scatter_base_(at::Tensor& outputTensor,
263-
at::Tensor& inputTensor,
264-
const ReduceScatterOptions& opts,
265-
ProcessGroupCCL& pg_ccl) {
266-
c10::DeviceType dev_type = inputTensor.device().type();
267-
return get_ccl_stub(dev_type)->_reduce_scatter_base_(outputTensor, inputTensor, opts, pg_ccl);
268-
}
269-
270262
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> alltoall_base_(at::Tensor& outputTensor,
271263
at::Tensor& inputTensor,
272264
std::vector<int64_t>& outputSplitSizes,
@@ -390,15 +382,6 @@ c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> DispatchStub::reduce(std::vect
390382
}
391383

392384

393-
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> DispatchStub::_reduce_scatter_base(at::Tensor& outputTensor,
394-
at::Tensor& inputTensor,
395-
const ReduceScatterOptions& opts,
396-
ProcessGroupCCL& pg_ccl) {
397-
checkSameType(outputTensor, {outputTensor, inputTensor});
398-
c10::DeviceType dev_type = outputTensor.device().type();
399-
return get_ccl_stub(dev_type)->_reduce_scatter_base_(outputTensor, inputTensor, opts, pg_ccl);
400-
}
401-
402385
c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> DispatchStub::broadcast(std::vector<at::Tensor>& tensors,
403386
const BroadcastOptions& opts,
404387
ProcessGroupCCL& pg_ccl) {

src/dispatch_stub.h

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,6 @@ class DispatchStub {
5555
const ReduceOptions& opts,
5656
ProcessGroupCCL& pg_ccl);
5757

58-
static c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> _reduce_scatter_base(at::Tensor& outputTensor,
59-
at::Tensor& inputTensor,
60-
const ReduceScatterOptions& opts,
61-
ProcessGroupCCL& pg_ccl);
62-
6358
static c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> broadcast(std::vector<at::Tensor>& tensors,
6459
const BroadcastOptions& opts,
6560
ProcessGroupCCL& pg_ccl);
@@ -122,15 +117,6 @@ class DispatchStub {
122117
return c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL>();
123118
}
124119

125-
virtual c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> _reduce_scatter_base_(at::Tensor& outputTensor,
126-
at::Tensor& inputTensor,
127-
const ReduceScatterOptions& opts,
128-
ProcessGroupCCL& pg_ccl) {
129-
fail(outputTensor.device().type(), "_reduce_scatter_base");
130-
return c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL>();
131-
}
132-
133-
134120
virtual c10::intrusive_ptr<ProcessGroupCCL::AsyncWorkCCL> allgather_(std::vector<std::vector<at::Tensor>>& outputTensors,
135121
std::vector<at::Tensor>& inputTensors,
136122
const AllgatherOptions& opts,

tests/test_c10d_ccl.py

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -482,26 +482,7 @@ def _test_reduce_scatter_base_basics(self, fn):
482482
def reduce_scatter_base(output_t, input_t):
483483
work = pg._reduce_scatter_base(output_t, input_t)
484484
work.wait()
485-
486-
# anticpate an error
487-
with self.assertRaisesRegex(
488-
RuntimeError,
489-
"input tensor must be the same size as output size times world size",
490-
):
491-
input_t = fn(torch.tensor([self.rank]))
492-
output_t = fn(torch.empty((self.world_size + 1), dtype=input_t.dtype))
493-
# fails the check because output_t is not correctly sized
494-
reduce_scatter_base(output_t, input_t)
495-
496-
# anticpate an error
497-
with self.assertRaisesRegex(
498-
RuntimeError, "Tensors are not equal in data type"
499-
):
500-
tensor = fn(torch.tensor([self.rank], dtype=torch.float))
501-
output_t = fn(torch.empty((self.world_size + 1), dtype=torch.long))
502-
# fails the check because the dtype is different
503-
reduce_scatter_base(output_t, tensor)
504-
485+
505486
def test_reduce_scatter_base_basics(self):
506487
self._test_reduce_scatter_base_basics(lambda t: t.clone())
507488

0 commit comments

Comments
 (0)