Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPERIMENTAL] Add Actor.runSynchronously() to synchronously wait for a closure to execute on an actor #77749

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/swift/ABI/MetadataValues.h
Original file line number Diff line number Diff line change
@@ -2586,6 +2586,10 @@ enum class JobKind : size_t {
DefaultActorOverride,
NullaryContinuation,
IsolatedDeinit,

/// A job that represents synchronous code that is waiting to run on the
/// actor.
SynchronousWait,
};

/// The priority of a job. Higher priorities are larger values.
28 changes: 28 additions & 0 deletions include/swift/ABI/Task.h
Original file line number Diff line number Diff line change
@@ -192,6 +192,34 @@ class NullaryContinuationJob : public Job {
}
};

using SwiftNullaryClosure = SWIFT_CC(swift) void (SWIFT_CONTEXT void *);

/// A job that represents synchronously waiting to run a (synchronous) job
/// on an actor. Only the thread that initiated the synchronous work can
/// complete this job.
class SynchronousWaitJob : public Job {
SwiftNullaryClosure *closure;
void *closureContext;
ConditionVariable cond;
std::atomic<bool> finished;

public:
SynchronousWaitJob(
JobPriority priority, SwiftNullaryClosure *closure, void *closureContext
) : Job({JobKind::SynchronousWait, priority}, &process),
closure(closure), closureContext(closureContext), finished(false) {}

SWIFT_CC(swiftasync)
static void process(Job *job);

void waitUntilFinished();
void signalFinished();

static bool classof(const Job *job) {
return job->Flags.getKind() == JobKind::SynchronousWait;
}
};

/// Describes type information and offers value methods for an arbitrary concrete
/// type in a way that's compatible with regular Swift and embedded Swift. In
/// regular Swift, just holds a Metadata pointer and dispatches to the value
21 changes: 21 additions & 0 deletions include/swift/Runtime/Concurrency.h
Original file line number Diff line number Diff line change
@@ -847,6 +847,27 @@ swift_distributedActor_remote_initialize(const Metadata *actorType);
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_defaultActor_enqueue(Job *job, DefaultActor *actor);

/// Synchronously wait for the given closure (function pointer + context) to
/// execute on the given actor.
///
/// The operation is executed at the given priority, and is priority-ordered
/// in the same way as all other work on the actor. If the calling thread
/// manages to take the actor lock, it will execute jobs up until this
/// operation is completed, then return. Otherwise, it will block waiting for
/// a different thread to execute this operation.
///
/// This operation is only available for default actors (ones that do not have
/// custom executors), and will trap if provided with an actor that has a
/// custom executor. Note that the blocking nature of this operation makes it
/// prone to deadlock, so it should be used sparingly.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
void swift_actor_synchronous_wait(
HeapObject *actor,
JobPriority priority,
SwiftNullaryClosure *closure,
void *closureContext
);

/// Check if the actor is a distributed 'remote' actor instance.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
bool swift_distributed_actor_is_remote(HeapObject *actor);
141 changes: 135 additions & 6 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
@@ -1180,7 +1180,7 @@ class DefaultActorImpl

#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
/// Enqueue a job onto the actor.
void enqueue(Job *job, JobPriority priority);
void enqueue(Job *job, JobPriority priority, SynchronousWaitJob *waitingForJob);

/// Enqueue a stealer for the given task since it has been escalated to the
/// new priority
@@ -1219,6 +1219,7 @@ class DefaultActorImpl
}
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */


private:
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
@@ -1360,7 +1361,15 @@ void DefaultActorImpl::scheduleActorProcessJob(
swift_task_enqueueGlobal(job);
}

void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
static void defaultActorDrain(
DefaultActorImpl *actor, SynchronousWaitJob *waitingForJob
);

void DefaultActorImpl::enqueue(Job *job, JobPriority priority, SynchronousWaitJob *waitingForJob) {
// If we've been told that we are waiting for a job, it has to be the
// job we're enqueing.
assert(!waitingForJob || waitingForJob == job);

// We can do relaxed loads here, we are just using the current head in the
// atomic state and linking that into the new job we are inserting, we don't
// need acquires
@@ -1428,6 +1437,15 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
if (!oldState.isScheduled() && newState.isScheduled()) {
// We took responsibility to schedule the actor for the first time. See
// also ownership rule (1)

// When there is a synchronous wait job, drain the queue now to
// complete the synchronous wait job rather than scheduling an actor
// processing job. This thread can't do any other work until the
// synchronous wait job completes anyway.
if (waitingForJob) {
return defaultActorDrain(this, waitingForJob);
}

return scheduleActorProcessJob(newState.getMaxPriority(), taskExecutor);
}

@@ -1637,7 +1655,15 @@ Job *DefaultActorImpl::drainOne() {
// At the point of return from the job execution, we may not be holding the lock
// of the same actor that we had started off with, so we need to reevaluate what
// the current actor is
static void defaultActorDrain(DefaultActorImpl *actor) {
//
// When this operation encounters a synchronous wait job, it will compare
// that job's address against the given waitingForJob. If they are the
// same, this thread is responsible for completing the synchronously waiting
// job: once it has been executed, this function will unlock the actor and
// return.
static void defaultActorDrain(
DefaultActorImpl *actor, SynchronousWaitJob *waitingForJob
) {
SWIFT_TASK_DEBUG_LOG("Draining default actor %p", actor);
DefaultActorImpl *currentActor = actor;

@@ -1659,12 +1685,21 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
/*taskExecutor, will be replaced per each job. */
TaskExecutorRef::undefined());

// If we are waiting for a specific job, prevent switching off of this
// actor to a new one. In other words, we want the thread to follow the
// actor (to this job) and not the task (which we normally do).
if (waitingForJob)
trackingInfo.disallowSwitching();

while (true) {
Job *job = currentActor->drainOne();
if (job == NULL) {
// No work left to do, try unlocking the actor. This may fail if there is
// work concurrently enqueued in which case, we'd try again in the loop
if (currentActor->unlock(false)) {
//
// If we're waiting for a job, then force the unlock so we are guaranteed
// to exit.
if (currentActor->unlock(waitingForJob != nullptr)) {
break;
}
} else {
@@ -1673,10 +1708,28 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
trackingInfo.setTaskExecutor(taskExecutor);
}

// Keep track of whether this was a synchronous wait job. If so, it
// won't get deallocated until after we've signaled it.
auto waitJob = dyn_cast<SynchronousWaitJob>(job);

// This thread is now going to follow the task on this actor. It may hop off
// the actor
runJobInEstablishedExecutorContext(job);

// If the job we just finished was a synchronous wait job, signal it.
if (waitJob) {
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
waitJob->signalFinished();
#endif

// If we just finished up the synchronous wait job for this thread,
// we're done. Unlock the actor.
if (waitJob == waitingForJob) {
currentActor->unlock(true);
break;
}
}

// We could have come back from the job on a generic executor and not as
// part of a default actor. If so, there is no more work left for us to do
// here.
@@ -1711,7 +1764,7 @@ void ProcessOutOfLineJob::process(Job *job) {
DefaultActorImpl *actor = self->Actor;

swift_cxx_deleteObject(self);
return defaultActorDrain(actor); // 'return' forces tail call
return defaultActorDrain(actor, nullptr); // 'return' forces tail call
}

#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
@@ -2084,7 +2137,7 @@ void swift::swift_defaultActor_enqueue(Job *job, DefaultActor *_actor) {
#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
assert(false && "Should not enqueue onto default actor in actor as locks model");
#else
asImpl(_actor)->enqueue(job, job->getPriority());
asImpl(_actor)->enqueue(job, job->getPriority(), /*waitingForJob=*/nullptr);
#endif
}

@@ -2344,6 +2397,53 @@ class IsolatedDeinitJob : public Job {
}
};
} // namespace

void SynchronousWaitJob::process(Job *_job) {
auto *job = cast<SynchronousWaitJob>(_job);

auto *closure = job->closure;
auto *closureContext = job->closureContext;

closure(closureContext);
}

void SynchronousWaitJob::waitUntilFinished() {
#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
assert(false && "Not used when actors are locks");
#else
SWIFT_TASK_DEBUG_LOG("Waiting until synchronous job %p is finished", this);

// This lock really protects nothing but we need to hold it
// while calling the condition wait.
cond.lock();

// Condition variables can have spurious wakeups so we need to check this in
// a do-while loop.
while (true) {
bool isFinished = finished.load(std::memory_order_relaxed);
if (isFinished)
break;

cond.wait();
}

cond.unlock();
#endif
}

void SynchronousWaitJob::signalFinished() {
#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
assert(false && "Not used when actors are locks");
#else
SWIFT_TASK_DEBUG_LOG("Signaling completion of synchronous job %p", this);

cond.withLock([&] {
finished.store(true, std::memory_order_relaxed);
cond.signal();
});
#endif
}

#endif

SWIFT_CC(swift)
@@ -2574,3 +2674,32 @@ bool swift::swift_distributed_actor_is_remote(HeapObject *_actor) {
bool DefaultActorImpl::isDistributedRemote() {
return this->isDistributedRemoteActor;
}

void swift::swift_actor_synchronous_wait(
HeapObject *actor,
JobPriority priority,
SwiftNullaryClosure *closure,
void *closureContext
) SWIFT_CC(swift) {
// We only permit this operation on default actors.
auto metadata = cast<ClassMetadata>(actor->metadata);
if (!isDefaultActorClass(metadata)) {
swift_Concurrency_fatalError(0, "cannot synchronously wait on a non-default actor\n");
}

auto *defaultActor = static_cast<DefaultActor*>(actor);

#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
(void)defaultActor->tryLock(false);
closure(closureContext);
(void)defaultActor->unlock(false);
#else
SynchronousWaitJob job(priority, closure, closureContext);

// Enqueue the job on the actor.
asImpl(defaultActor)->enqueue(&job, job.getPriority(), &job);

// Wait until it finishes.
job.waitUntilFinished();
#endif
}
52 changes: 52 additions & 0 deletions stdlib/public/Concurrency/Actor.swift
Original file line number Diff line number Diff line change
@@ -88,6 +88,14 @@ public func _defaultActorDestroy(_ actor: AnyObject)
@usableFromInline
internal func _enqueueOnMain(_ job: UnownedJob)

@available(SwiftStdlib 6.1, *)
@_silgen_name("swift_actor_synchronous_wait")
func _actorSynchronousWait(
_ actor: AnyObject,
priority: UInt8,
body: () -> Void
)

#if $Macros
/// Produce a reference to the actor to which the enclosing code is
/// isolated, or `nil` if the code is nonisolated.
@@ -109,3 +117,47 @@ public func extractIsolation<each Arg, Result>(
return Builtin.extractFunctionIsolation(fn)
}
#endif

extension Actor {
/// Synchronously wait for the given closure (function pointer + context) to
/// execute on the given actor and return its result.
///
/// The operation is executed at the given priority, and is in the same way as
/// all other work on the actor. If the calling thread manages to take the
/// actor lock, it will execute jobs up until this operation is completed,
/// then return. Otherwise, it will block waiting for a different thread to
/// execute this operation.
///
/// This operation is only available for default actors (ones that do not have
/// custom executors), and will trap if provided with an actor that has a
/// custom executor. Note that the blocking nature of this operation makes it
/// prone to deadlock, so it should be used sparingly.
///
/// - Parameters:
/// - priority: The priority of the synchronous operation.
/// Pass `nil` to use the priority from `Task.currentPriority`.
/// - operation: The synchronous operation to perform.
@_spi(Experimental)
@available(SwiftStdlib 6.1, *)
nonisolated
public func runSynchronously<T, E>(
priority: TaskPriority? = nil,
operation: sending (isolated Self) throws(E) -> T
) throws(E) -> T {
let priority = priority ?? Task.currentPriority
typealias IsolatedOperation = (isolated Self) throws(E) -> T
typealias NonisolatedOperation = (Self) throws(E) -> T
return try withoutActuallyEscaping(operation) { (operation: @escaping IsolatedOperation) throws(E) in
let nonisolatedOperation = unsafeBitCast(operation, to: NonisolatedOperation.self)
var result: Result<T, E>? = nil
_actorSynchronousWait(self, priority: priority.rawValue) {
do throws(E) {
result = try .success(nonisolatedOperation(self))
} catch {
result = .failure(error)
}
}
return try result.unsafelyUnwrapped.get()
}
}
}
155 changes: 155 additions & 0 deletions test/Concurrency/Runtime/run_synchronously.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// RUN: %target-run-simple-swift

// REQUIRES: executable_test
// REQUIRES: concurrency

// REQUIRES: concurrency_runtime
// UNSUPPORTED: back_deployment_runtime

// This test exercises the Actor.runSynchronously() operation that runs a
// synchronous block on an actor from synchronous code, blocking the thread
// until it has completed.

@_spi(Experimental) import _Concurrency

#if canImport(Darwin)
import Darwin
#endif

class Box<T> {
var value: T
init(value: T) {
self.value = value
}
}

/// Actor whose sole purpose is to count calls to nextCount(), and also build up
/// a string representation of all of the values it as counted.
actor CountingActor {
var count = 0

// Put the string representation of the count into a box so that we get
// dynamic exclusivity checking for the string value in there that is
// being constantly mutated. This will help catch bugs where we aren't
// providing mutual exclusivity on the actor.
let stringCount: Box<String> = Box(value: "0")

func nextCount() -> Int {
count = count + 1
stringCount.value += " \(count)"
return count
}
}

/// Blocking synchronous call to the actor's nextCount().
@available(SwiftStdlib 6.1, *)
func blockingNextCount(_ myActor: CountingActor, offsetValue: Int) -> Int {
return myActor.runSynchronously { myActor in
myActor.nextCount()
}
}

/// Perform blocking calls to nextCount() on the given actor `iterations` times.
@available(SwiftStdlib 6.1, *)
func syncQueryActor(_ myActor: CountingActor, iterations: Int, offsetValue: Int) {
var previous = blockingNextCount(myActor, offsetValue: offsetValue)
for _ in 1..<iterations {
let current = blockingNextCount(myActor, offsetValue: offsetValue)
assert(current > previous)
previous = current
}
}

/// Perform asynchronous calls to nextCount() on the given actor `iterations`
/// times.
func asyncQueryActor(_ myActor: CountingActor, iterations: Int, offsetValue: Int) async {
var previous = await myActor.nextCount()
for _ in 1..<iterations {
let current = await myActor.nextCount()
if current <= previous {
print("Current \(current) vs. previous \(previous)")
}

//assert(current > previous, "Current \(current) vs. previous \(previous)")
previous = current
}
}

// Test configuration.
let tasksPerPriorityBucket = 500
let iterationsPerTask = 100

let myActor = CountingActor()

// Kick off a whole bunch of tasks at various priority levels. Most of them
// asynchronously call the nextCount() operation on the actor repeatedly, but we
// have one task per priority bucket perform blocking synchronous operations.
var allTasks: [Task<Void, Never>] = []
for priority: TaskPriority in [.background, .utility, .low, .medium, .high, .userInitiated] {
for i in 0..<tasksPerPriorityBucket {
allTasks.append(Task.detached(priority: priority) {
if i < 1, #available(SwiftStdlib 6.1, *) {
syncQueryActor(myActor, iterations: iterationsPerTask, offsetValue: i)
} else {
await asyncQueryActor(myActor, iterations: iterationsPerTask, offsetValue: i)
}
}
)
}
}

func flush() {
#if canImport(Darwin)
fflush(stdout)
#endif
}

// Have the main thread synchronously call the nextCount() operation on the
// actor a lot.
if #available(SwiftStdlib 6.1, *) {
syncQueryActor(myActor, iterations: iterationsPerTask, offsetValue: 17)
} else {
await asyncQueryActor(myActor, iterations: iterationsPerTask, offsetValue: 17)
}
print("Finished main actor tasks")

// Wait for all of the tasks to finish up.
print("[", terminator: "")
flush()

let progressBarWidth = 60
let numTasks = allTasks.count
allTasks.shuffle()
for (index, task) in allTasks.enumerated() {
await task.value

let priorPrinted = if index > 0 {
((index - 1) * progressBarWidth) / numTasks
} else {
0
}
let printed = index * progressBarWidth / numTasks

for _ in priorPrinted..<printed {
print("#", terminator: "")
flush()
}
}
print("]")
print("Verifying result...", terminator: "")


let expectedCount = (numTasks + 1) * iterationsPerTask
var expectedString = "0"
for i in 1...expectedCount {
expectedString += " "
expectedString += String(i)
}
let actualString = await myActor.stringCount.value
if actualString != expectedString {
print("Expected: \(expectedString)")
print("Actual: \(actualString)")
}
assert(actualString == expectedString)

print(" DONE!")