Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes, move spinlock to cc, minor changes #42

Open
wants to merge 61 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
9894178
Fix cygwin win32 workaround
jkunstwald May 26, 2021
6a49c26
Add allocator support to MPMCQueue, assert enqueue success
jkunstwald Aug 1, 2021
2ec342e
Use sized integer types
jkunstwald Sep 2, 2021
e1f2ae3
Remove td SpinLock, use cc::spin_lock
jkunstwald Sep 5, 2021
b192294
Deprecate num_physical_cores
jkunstwald Sep 7, 2021
a02283d
Remove new functions from experimental namespace
jkunstwald Sep 9, 2021
820eb61
Fix bug in counter handle, double default task size
jkunstwald Sep 10, 2021
1f92145
Add submit_batched_on_counter
jkunstwald Sep 10, 2021
ed2eb2b
Fix submit_batched_n
jkunstwald Oct 1, 2021
8c40227
Adapt to rlog api
jkunstwald Oct 8, 2021
e88deae
WIP: Minor rewrite
Nov 12, 2021
5bfe3f4
Continue rewrite
jkunstwald Nov 12, 2021
64839e0
Continue rewrite, clean up scheduler TU (1 of ?)
jkunstwald Nov 13, 2021
3a804ef
Continue rewrite, clean up scheduler TU (2 of ?)
jkunstwald Nov 13, 2021
85df78f
Continue rewrite, clean up scheduler TU (3 of ?)
jkunstwald Nov 13, 2021
5da6aea
Continue rewrite, clean up scheduler TU (4 of ?)
jkunstwald Nov 13, 2021
763eecb
Continue rewrite, clean up scheduler TU (5 of ?)
jkunstwald Nov 13, 2021
e7a901c
Clean up Task, fix possible misalignment issues, add allocator support
jkunstwald Nov 13, 2021
a44bd89
Add missing cc forward declarations
jkunstwald Nov 13, 2021
7112ab8
Fix new waiting mechanism, tentatively enable it
jkunstwald Nov 15, 2021
492fc81
Fix race in counterAddWaitingFiber
jkunstwald Nov 15, 2021
9ad5400
Continue API rewrite
Nov 18, 2021
502dda6
Add AutoCounter, add td::createCounterDependency
jkunstwald Nov 18, 2021
50f529c
Fix some issues in submission headers
Nov 19, 2021
4d24866
Minor changes
Nov 19, 2021
3bcc135
Add comment about difficulties in submitMethod
Nov 19, 2021
aec2075
Add td::getApproximateCounterValue
jkunstwald Feb 18, 2022
dd54bc3
Remove pinned wait limits per thread, add public compare-and-swap fun…
jkunstwald Mar 3, 2022
2e6d23b
Adapt to cc allocator location change
jkunstwald Mar 5, 2022
deaac0b
Add td::CriticalSection
jkunstwald Mar 5, 2022
26ed8d6
Minor comment addition
jkunstwald Apr 12, 2022
839a05c
Fix compilation on clang
jkunstwald Apr 23, 2022
de257fa
Remove old scheduler.hh/cc
jkunstwald Apr 23, 2022
f784af1
Fix warnings
May 6, 2022
80e06f1
Fix submitBatched for zero elements or max batch count
jkunstwald May 22, 2022
362ffb6
Remove windows include from threading header
Sep 21, 2022
b6084cc
Remove unused math_intrin.hh header
Sep 22, 2022
2fd32b6
Remove heavy includes from Fiber.hh
Sep 22, 2022
eec0421
Clean up native fiber and threading abstraction
Sep 26, 2022
95e3f79
Fix compilation error in Task.hh
Sep 27, 2022
3f61328
Minor change
Sep 29, 2022
b41b6ee
Minor change
Oct 4, 2022
8e684b4
Improve submitTasks perf for empty submits
jkunstwald Oct 8, 2022
db78514
Update to latest rlog API
Feb 22, 2023
2c84dd5
Use cc::mpmc_queue, remove own copy
May 24, 2023
87fa323
Add moodycamel concurrent queue
May 24, 2023
8c7bb9e
Add tests from arcana samples, fix compatibility
May 24, 2023
954b040
Use moodycamel queue (replacing yukov MPMC), clean up scheduler
May 24, 2023
2c57c3b
Unify naming (1/?)
May 24, 2023
5798ba0
Execute all resumable fibers first before dequeuing new tasks
May 24, 2023
d9123c8
Add task priorities
May 24, 2023
09e5679
Rename fiber.hh to Fiber.hh
Oct 10, 2023
a7bae53
Remove lowercase fiber.hh from VCS
Oct 10, 2023
dd83f3c
Fix compilation on clang
Oct 10, 2023
f2be4b1
Fix compilation on clang - 2/?
Oct 10, 2023
b2e76c8
Fix compilation on clang - 3/?
Oct 10, 2023
4611377
Add task template, configurable lambda signature
Oct 13, 2023
adbfd81
Task: minor fix
Oct 13, 2023
c09676c
Make OS sleep timeout configurable
jkunstwald Jun 25, 2024
2f15520
Threading: Close Win32 handle when joining threads
jkunstwald Oct 8, 2024
32eea70
Merge remote-tracking branch 'origin/develop' into kw/develop
jkunstwald Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ option(TD_NO_WAITS "If enabled, worker threads do not wait/sleep when idle, mini
# define library

file(GLOB_RECURSE SOURCES "src/*.cc")
file(GLOB_RECURSE HEADERS "src/*.hh")
file(GLOB_RECURSE HEADERS "src/*.hh" "src/*.h")

arcana_add_library(TD task-dispatcher SOURCES HEADERS)

Expand Down
62 changes: 62 additions & 0 deletions licenses/moodycamel.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
This license file applies to everything in this repository except that which
is explicitly annotated as being written by other authors, i.e. the Boost
queue (included in the benchmarks for comparison), Intel's TBB library (ditto),
dlib::pipe (ditto),
the CDSChecker tool (used for verification), the Relacy model checker (ditto),
and Jeff Preshing's semaphore implementation (used in the blocking queue) which
has a zlib license (embedded in lightweightsempahore.h).

---

Simplified BSD License:

Copyright (c) 2013-2016, Cameron Desrochers.
All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

- Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of
conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

---

I have also chosen to dual-license under the Boost Software License as an alternative to
the Simplified BSD license above:

Boost Software License - Version 1.0 - August 17th, 2003

Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:

The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
34 changes: 34 additions & 0 deletions src/task-dispatcher/AutoCounter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "AutoCounter.hh"

#include "Scheduler.hh"

td::AutoCounter::operator td::CounterHandle() &
{
if (!handle.isValid())
{
handle = td::acquireCounter();
}

return handle;
}

int32_t td::waitForCounter(AutoCounter& autoCounter, bool bPinned)
{
if (!autoCounter.handle.isValid())
{
// return immediately for uninitialized syncs
return 0;
}

// perform real wait
int32_t const res = td::waitForCounter(autoCounter.handle, bPinned);

// this call must not be raced on
if (td::releaseCounterIfOnZero(autoCounter.handle))
{
// mark the sync as uninitialized
autoCounter.handle.invalidate();
}

return res;
}
35 changes: 35 additions & 0 deletions src/task-dispatcher/AutoCounter.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <clean-core/assert.hh>

#include <task-dispatcher/CounterHandle.hh>
#include <task-dispatcher/common/api.hh>

namespace td
{
// An automatically managed CounterHandle
// acquires a counter on first usage and releases it when waiting
struct TD_API AutoCounter
{
AutoCounter() = default;

AutoCounter(AutoCounter&& rhs) noexcept : handle(rhs.handle) { rhs.handle.invalidate(); }
AutoCounter& operator=(AutoCounter&& rhs) noexcept
{
CC_ASSERT(!handle.isValid() && "Must call td::waitForCounter() on AutoCounter before dropping it");
handle = rhs.handle;
rhs.handle.invalidate();
return *this;
}

~AutoCounter() { CC_ASSERT(!handle.isValid() && "Must call td::waitForCounter() on AutoCounter before dropping it"); }

operator CounterHandle() &;

[[deprecated("For less restricted usage, use raw CounterHandle")]] AutoCounter(AutoCounter const&) = delete;
[[deprecated("For less restricted usage, use raw CounterHandle")]] AutoCounter& operator=(AutoCounter const&) = delete;
[[deprecated("Would immediately drop AutoCounter")]] operator CounterHandle() && = delete;

CounterHandle handle = {};
};
}
96 changes: 96 additions & 0 deletions src/task-dispatcher/BatchedSubmission.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once

#include <type_traits>

#include <clean-core/allocator.hh>
#include <clean-core/assert.hh>
#include <clean-core/enable_if.hh>
#include <clean-core/forward.hh>
#include <clean-core/utility.hh>

#include <task-dispatcher/CounterHandle.hh>
#include <task-dispatcher/Scheduler.hh>
#include <task-dispatcher/container/Task.hh>

//
// Batched submission
//
// larger header than the raw scheduler

namespace td
{
// submits multiple tasks calling a lambda "void f(uint32_t i)" from 0 to num - 1
template <class F>
void submitNumbered(CounterHandle counter, F&& func, uint32_t numElements, cc::allocator* scratch, ETaskPriority priority = ETaskPriority::Default)
{
static_assert(std::is_invocable_v<F, unsigned>, "function must be invocable with index argument");
static_assert(std::is_same_v<std::invoke_result_t<F, uint32_t>, void>, "return must be void");

td::Task* tasks = scratch->new_array_sized<td::Task>(numElements);

for (uint32_t i = 0u; i < numElements; ++i)
{
tasks[i].initWithLambda([=] { func(i); });
}

td::submitTasks(counter, cc::span{tasks, numElements}, priority);
scratch->delete_array_sized(tasks, numElements);
}

// submits tasks calling a lambda "void f(uint start, uint end, uint batchIdx)" for evenly sized batches from 0 to num - 1
// maxNumBatches: maximum amount of batches to partition the range into
// returns amount of batches
template <class F>
uint32_t submitBatched(CounterHandle counter, F&& func, uint32_t numElements, uint32_t maxNumBatches, cc::allocator* scratch, ETaskPriority priority = ETaskPriority::Default)
{
static_assert(std::is_invocable_v<F, uint32_t, uint32_t, uint32_t>, "function must be invocable with element start, end, and index argument");

if (numElements == 0 || maxNumBatches == 0)
{
return 0;
}

uint32_t const batchSize = cc::int_div_ceil(numElements, maxNumBatches);
uint32_t const numBatches = cc::int_div_ceil(numElements, batchSize);
CC_ASSERT(numBatches <= maxNumBatches && "programmer error");

td::Task* tasks = scratch->new_array_sized<td::Task>(numBatches);

for (uint32_t batch = 0u, start = 0u, end = cc::min(batchSize, numElements); //
batch < numBatches; //
++batch, start = batch * batchSize, end = cc::min((batch + 1) * batchSize, numElements))
{
tasks[batch].initWithLambda([=] { func(start, end, batch); });
}

td::submitTasks(counter, cc::span{tasks, numBatches}, priority);
scratch->delete_array_sized(tasks, numBatches);
return numBatches;
}

// submits batched tasks calling a lambda "void f(T& value, uint32_t idx, uint32_t batchIdx)" for each element in the span
// data in 'values' must stay alive until all tasks ran through
// returns amount of batches
template <class T, class F>
uint32_t submitBatchedOnArray(CounterHandle counter, F&& func, cc::span<T> values, uint32_t maxNumBatches, cc::allocator* scratch, ETaskPriority priority = ETaskPriority::Default)
{
static_assert(std::is_invocable_v<F, T&, uint32_t, uint32_t>, "function must be invocable with element reference");
static_assert(std::is_same_v<std::invoke_result_t<F, T&, uint32_t, uint32_t>, void>, "return must be void");

T* const pValues = values.data();
uint32_t const numValues = uint32_t(values.size());

// copy the lambda
return td::submitBatched(
counter,
[func, pValues](uint32_t start, uint32_t end, uint32_t batchIdx)
{
for (uint32_t i = start; i < end; ++i)
{
func(pValues[i], i, batchIdx);
}
},
numValues, maxNumBatches, scratch, priority);
}

}
75 changes: 75 additions & 0 deletions src/task-dispatcher/CallableSubmission.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#pragma once

#include <type_traits> // for is_invocable, is_class, is_member_function_pointer_v
#include <utility> // for tuple_size

#include <clean-core/apply.hh>
#include <clean-core/enable_if.hh>
#include <clean-core/forward.hh>
#include <clean-core/move.hh>
#include <clean-core/tuple.hh>

#include <task-dispatcher/CounterHandle.hh>
#include <task-dispatcher/Scheduler.hh>
#include <task-dispatcher/container/Task.hh>

//
// Callable submission
//
// large header, only required for exotic callables
// prefer LambdaSubmission.hh if possible

namespace td
{
// submit a task based on any callable (lambda, funcptr, type with operator()), with arguments passed to it
// arguments are moved into the task capture immediately
template <class F, class... Args>
void submitCallable(CounterHandle counter, F&& func, Args&&... args)
{
static_assert(std::is_invocable_v<F, Args...>, "function must be invocable with the given args");
static_assert(std::is_same_v<std::invoke_result_t<F, Args...>, void>, "return must be void");

// A lambda calling func(args...), moving the arguments into the capture (instead of copying)
Task dispatch(
[func, tup = cc::tuple(cc::move(args)...)]
{
// "apply" the argument tuple to go back to a parameter pack
cc::apply(
[&func](auto&&... args)
{
// actually call the callable with expanded arguments
func(decltype(args)(args)...);
},
tup);
});

td::submitTasks(counter, cc::span{dispatch});
}

// submit a task based on a member function, with arguments passed to it
// arguments are moved into the task capture immediately
template <class F, class FObj, class... Args, cc::enable_if<std::is_member_function_pointer_v<F>> = true>
void submitMethod(CounterHandle counter, FObj* pInst, F pMemberFunc, Args&&... args)
{
// NOTE: the SFINAE and static asserts here wouldn't be necessary if we could just use void (FObj::*pMemberFunc)(Args...)
// as the type for the member function ptr but that creates ambiguity about 'Args...'
static_assert(std::is_invocable_v<F, FObj, Args...>, "function must be invocable with the given args");
static_assert(std::is_same_v<std::invoke_result_t<F, FObj, Args...>, void>, "return must be void");

// A lambda calling pInst->func(args...), moving the arguments into the capture (instead of copying)
Task dispatch(
[pMemberFunc, pInst, tup = cc::tuple(cc::move(args)...)]
{
// "apply" the argument tuple to go back to a parameter pack
cc::apply(
[&pMemberFunc, pInst](auto&&... args)
{
// actually call the method with expanded arguments
(pInst->*pMemberFunc)(decltype(args)(args)...);
},
tup);
});

td::submitTasks(counter, cc::span{dispatch});
}
}
23 changes: 23 additions & 0 deletions src/task-dispatcher/CounterHandle.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include <stdint.h>

namespace td
{
// Handle to a counter, the core synchronization mechanism
struct CounterHandle
{
uint32_t _value = 0;

CounterHandle() = default;
explicit constexpr CounterHandle(uint32_t val) : _value(val) {}

void invalidate() & { _value = 0; }
bool isValid() const { return _value != 0; }

bool operator==(CounterHandle rhs) const { return _value == rhs._value; }
bool operator!=(CounterHandle rhs) const { return _value != rhs._value; }
};

inline constexpr CounterHandle NullCounterHandle = CounterHandle{};
}
55 changes: 55 additions & 0 deletions src/task-dispatcher/CriticalSection.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "CriticalSection.hh"

#include <clean-core/assert.hh>

#include "Scheduler.hh"

namespace
{
enum CritSecState : int32_t
{
CritSecState_Unlocked = 0,
CritSecState_Locked
};
}

td::CriticalSection::CriticalSection()
{
CC_ASSERT(td::isInsideScheduler() && "td::CriticalSection must only be used from inside a scheduler");
mCounter = td::acquireCounter();
}

td::CriticalSection::~CriticalSection() { destroy(); }

void td::CriticalSection::lock(bool pinned) noexcept
{
while (true)
{
// try to acquire
int32_t prevState = td::compareAndSwapCounter(mCounter, CritSecState_Unlocked, CritSecState_Locked);

if (prevState == CritSecState_Unlocked)
{
// CAS successful
return;
}

// wait until this is unlocked, then retry CAS
td::waitForCounter(mCounter, pinned);
}
}

void td::CriticalSection::unlock() noexcept
{
int32_t prevState = td::compareAndSwapCounter(mCounter, CritSecState_Locked, CritSecState_Unlocked);
CC_ASSERT(prevState == CritSecState_Locked && "unlocked a critical section that was already unlocked");
}

void td::CriticalSection::destroy()
{
if (mCounter.isValid())
{
td::releaseCounter(mCounter);
mCounter = {};
}
}
Loading