diff --git a/include/swift/ABI/MetadataValues.h b/include/swift/ABI/MetadataValues.h index 4da3e79343f3b..cb1c97ddfdf32 100644 --- a/include/swift/ABI/MetadataValues.h +++ b/include/swift/ABI/MetadataValues.h @@ -2586,6 +2586,10 @@ enum class JobKind : size_t { DefaultActorOverride, NullaryContinuation, IsolatedDeinit, + + /// A job that represents synchronous code that is waiting to run on the + /// actor. + SynchronousWait, }; /// The priority of a job. Higher priorities are larger values. diff --git a/include/swift/ABI/Task.h b/include/swift/ABI/Task.h index e450332b6c3d6..befd70a7189be 100644 --- a/include/swift/ABI/Task.h +++ b/include/swift/ABI/Task.h @@ -192,6 +192,34 @@ class NullaryContinuationJob : public Job { } }; +using SwiftNullaryClosure = SWIFT_CC(swift) void (SWIFT_CONTEXT void *); + +/// A job that represents synchronously waiting to run a (synchronous) job +/// on an actor. Only the thread that initiated the synchronous work can +/// complete this job. +class SynchronousWaitJob : public Job { + SwiftNullaryClosure *closure; + void *closureContext; + ConditionVariable cond; + std::atomic finished; + +public: + SynchronousWaitJob( + JobPriority priority, SwiftNullaryClosure *closure, void *closureContext + ) : Job({JobKind::SynchronousWait, priority}, &process), + closure(closure), closureContext(closureContext), finished(false) {} + + SWIFT_CC(swiftasync) + static void process(Job *job); + + void waitUntilFinished(); + void signalFinished(); + + static bool classof(const Job *job) { + return job->Flags.getKind() == JobKind::SynchronousWait; + } +}; + /// Describes type information and offers value methods for an arbitrary concrete /// type in a way that's compatible with regular Swift and embedded Swift. In /// regular Swift, just holds a Metadata pointer and dispatches to the value diff --git a/include/swift/Runtime/Concurrency.h b/include/swift/Runtime/Concurrency.h index 774333e67034d..30ee58ca02cb2 100644 --- a/include/swift/Runtime/Concurrency.h +++ b/include/swift/Runtime/Concurrency.h @@ -847,6 +847,27 @@ swift_distributedActor_remote_initialize(const Metadata *actorType); SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) void swift_defaultActor_enqueue(Job *job, DefaultActor *actor); +/// Synchronously wait for the given closure (function pointer + context) to +/// execute on the given actor. +/// +/// The operation is executed at the given priority, and is priority-ordered +/// in the same way as all other work on the actor. If the calling thread +/// manages to take the actor lock, it will execute jobs up until this +/// operation is completed, then return. Otherwise, it will block waiting for +/// a different thread to execute this operation. +/// +/// This operation is only available for default actors (ones that do not have +/// custom executors), and will trap if provided with an actor that has a +/// custom executor. Note that the blocking nature of this operation makes it +/// prone to deadlock, so it should be used sparingly. +SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) +void swift_actor_synchronous_wait( + HeapObject *actor, + JobPriority priority, + SwiftNullaryClosure *closure, + void *closureContext +); + /// Check if the actor is a distributed 'remote' actor instance. SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) bool swift_distributed_actor_is_remote(HeapObject *actor); diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 17299015c1748..b05dbf9f8a736 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -1180,7 +1180,7 @@ class DefaultActorImpl #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS /// Enqueue a job onto the actor. - void enqueue(Job *job, JobPriority priority); + void enqueue(Job *job, JobPriority priority, SynchronousWaitJob *waitingForJob); /// Enqueue a stealer for the given task since it has been escalated to the /// new priority @@ -1219,6 +1219,7 @@ class DefaultActorImpl } #endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */ + private: #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION @@ -1360,7 +1361,15 @@ void DefaultActorImpl::scheduleActorProcessJob( swift_task_enqueueGlobal(job); } -void DefaultActorImpl::enqueue(Job *job, JobPriority priority) { +static void defaultActorDrain( + DefaultActorImpl *actor, SynchronousWaitJob *waitingForJob +); + +void DefaultActorImpl::enqueue(Job *job, JobPriority priority, SynchronousWaitJob *waitingForJob) { + // If we've been told that we are waiting for a job, it has to be the + // job we're enqueing. + assert(!waitingForJob || waitingForJob == job); + // We can do relaxed loads here, we are just using the current head in the // atomic state and linking that into the new job we are inserting, we don't // need acquires @@ -1428,6 +1437,15 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) { if (!oldState.isScheduled() && newState.isScheduled()) { // We took responsibility to schedule the actor for the first time. See // also ownership rule (1) + + // When there is a synchronous wait job, drain the queue now to + // complete the synchronous wait job rather than scheduling an actor + // processing job. This thread can't do any other work until the + // synchronous wait job completes anyway. + if (waitingForJob) { + return defaultActorDrain(this, waitingForJob); + } + return scheduleActorProcessJob(newState.getMaxPriority(), taskExecutor); } @@ -1637,7 +1655,15 @@ Job *DefaultActorImpl::drainOne() { // At the point of return from the job execution, we may not be holding the lock // of the same actor that we had started off with, so we need to reevaluate what // the current actor is -static void defaultActorDrain(DefaultActorImpl *actor) { +// +// When this operation encounters a synchronous wait job, it will compare +// that job's address against the given waitingForJob. If they are the +// same, this thread is responsible for completing the synchronously waiting +// job: once it has been executed, this function will unlock the actor and +// return. +static void defaultActorDrain( + DefaultActorImpl *actor, SynchronousWaitJob *waitingForJob +) { SWIFT_TASK_DEBUG_LOG("Draining default actor %p", actor); DefaultActorImpl *currentActor = actor; @@ -1659,12 +1685,21 @@ static void defaultActorDrain(DefaultActorImpl *actor) { /*taskExecutor, will be replaced per each job. */ TaskExecutorRef::undefined()); + // If we are waiting for a specific job, prevent switching off of this + // actor to a new one. In other words, we want the thread to follow the + // actor (to this job) and not the task (which we normally do). + if (waitingForJob) + trackingInfo.disallowSwitching(); + while (true) { Job *job = currentActor->drainOne(); if (job == NULL) { // No work left to do, try unlocking the actor. This may fail if there is // work concurrently enqueued in which case, we'd try again in the loop - if (currentActor->unlock(false)) { + // + // If we're waiting for a job, then force the unlock so we are guaranteed + // to exit. + if (currentActor->unlock(waitingForJob != nullptr)) { break; } } else { @@ -1673,10 +1708,28 @@ static void defaultActorDrain(DefaultActorImpl *actor) { trackingInfo.setTaskExecutor(taskExecutor); } + // Keep track of whether this was a synchronous wait job. If so, it + // won't get deallocated until after we've signaled it. + auto waitJob = dyn_cast(job); + // This thread is now going to follow the task on this actor. It may hop off // the actor runJobInEstablishedExecutorContext(job); + // If the job we just finished was a synchronous wait job, signal it. + if (waitJob) { +#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS + waitJob->signalFinished(); +#endif + + // If we just finished up the synchronous wait job for this thread, + // we're done. Unlock the actor. + if (waitJob == waitingForJob) { + currentActor->unlock(true); + break; + } + } + // We could have come back from the job on a generic executor and not as // part of a default actor. If so, there is no more work left for us to do // here. @@ -1711,7 +1764,7 @@ void ProcessOutOfLineJob::process(Job *job) { DefaultActorImpl *actor = self->Actor; swift_cxx_deleteObject(self); - return defaultActorDrain(actor); // 'return' forces tail call + return defaultActorDrain(actor, nullptr); // 'return' forces tail call } #endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */ @@ -2084,7 +2137,7 @@ void swift::swift_defaultActor_enqueue(Job *job, DefaultActor *_actor) { #if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS assert(false && "Should not enqueue onto default actor in actor as locks model"); #else - asImpl(_actor)->enqueue(job, job->getPriority()); + asImpl(_actor)->enqueue(job, job->getPriority(), /*waitingForJob=*/nullptr); #endif } @@ -2344,6 +2397,53 @@ class IsolatedDeinitJob : public Job { } }; } // namespace + +void SynchronousWaitJob::process(Job *_job) { + auto *job = cast(_job); + + auto *closure = job->closure; + auto *closureContext = job->closureContext; + + closure(closureContext); +} + +void SynchronousWaitJob::waitUntilFinished() { +#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS + assert(false && "Not used when actors are locks"); +#else + SWIFT_TASK_DEBUG_LOG("Waiting until synchronous job %p is finished", this); + + // This lock really protects nothing but we need to hold it + // while calling the condition wait. + cond.lock(); + + // Condition variables can have spurious wakeups so we need to check this in + // a do-while loop. + while (true) { + bool isFinished = finished.load(std::memory_order_relaxed); + if (isFinished) + break; + + cond.wait(); + } + + cond.unlock(); +#endif +} + +void SynchronousWaitJob::signalFinished() { +#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS + assert(false && "Not used when actors are locks"); +#else + SWIFT_TASK_DEBUG_LOG("Signaling completion of synchronous job %p", this); + + cond.withLock([&] { + finished.store(true, std::memory_order_relaxed); + cond.signal(); + }); +#endif +} + #endif SWIFT_CC(swift) @@ -2574,3 +2674,32 @@ bool swift::swift_distributed_actor_is_remote(HeapObject *_actor) { bool DefaultActorImpl::isDistributedRemote() { return this->isDistributedRemoteActor; } + +void swift::swift_actor_synchronous_wait( + HeapObject *actor, + JobPriority priority, + SwiftNullaryClosure *closure, + void *closureContext +) SWIFT_CC(swift) { + // We only permit this operation on default actors. + auto metadata = cast(actor->metadata); + if (!isDefaultActorClass(metadata)) { + swift_Concurrency_fatalError(0, "cannot synchronously wait on a non-default actor\n"); + } + + auto *defaultActor = static_cast(actor); + +#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS + (void)defaultActor->tryLock(false); + closure(closureContext); + (void)defaultActor->unlock(false); +#else + SynchronousWaitJob job(priority, closure, closureContext); + + // Enqueue the job on the actor. + asImpl(defaultActor)->enqueue(&job, job.getPriority(), &job); + + // Wait until it finishes. + job.waitUntilFinished(); +#endif +} diff --git a/stdlib/public/Concurrency/Actor.swift b/stdlib/public/Concurrency/Actor.swift index 02b522ea830c6..be2bb81bdbec2 100644 --- a/stdlib/public/Concurrency/Actor.swift +++ b/stdlib/public/Concurrency/Actor.swift @@ -88,6 +88,14 @@ public func _defaultActorDestroy(_ actor: AnyObject) @usableFromInline internal func _enqueueOnMain(_ job: UnownedJob) +@available(SwiftStdlib 6.1, *) +@_silgen_name("swift_actor_synchronous_wait") +func _actorSynchronousWait( + _ actor: AnyObject, + priority: UInt8, + body: () -> Void +) + #if $Macros /// Produce a reference to the actor to which the enclosing code is /// isolated, or `nil` if the code is nonisolated. @@ -109,3 +117,47 @@ public func extractIsolation( return Builtin.extractFunctionIsolation(fn) } #endif + +extension Actor { + /// Synchronously wait for the given closure (function pointer + context) to + /// execute on the given actor and return its result. + /// + /// The operation is executed at the given priority, and is in the same way as + /// all other work on the actor. If the calling thread manages to take the + /// actor lock, it will execute jobs up until this operation is completed, + /// then return. Otherwise, it will block waiting for a different thread to + /// execute this operation. + /// + /// This operation is only available for default actors (ones that do not have + /// custom executors), and will trap if provided with an actor that has a + /// custom executor. Note that the blocking nature of this operation makes it + /// prone to deadlock, so it should be used sparingly. + /// + /// - Parameters: + /// - priority: The priority of the synchronous operation. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - operation: The synchronous operation to perform. + @_spi(Experimental) + @available(SwiftStdlib 6.1, *) + nonisolated + public func runSynchronously( + priority: TaskPriority? = nil, + operation: sending (isolated Self) throws(E) -> T + ) throws(E) -> T { + let priority = priority ?? Task.currentPriority + typealias IsolatedOperation = (isolated Self) throws(E) -> T + typealias NonisolatedOperation = (Self) throws(E) -> T + return try withoutActuallyEscaping(operation) { (operation: @escaping IsolatedOperation) throws(E) in + let nonisolatedOperation = unsafeBitCast(operation, to: NonisolatedOperation.self) + var result: Result? = nil + _actorSynchronousWait(self, priority: priority.rawValue) { + do throws(E) { + result = try .success(nonisolatedOperation(self)) + } catch { + result = .failure(error) + } + } + return try result.unsafelyUnwrapped.get() + } + } +} diff --git a/test/Concurrency/Runtime/run_synchronously.swift b/test/Concurrency/Runtime/run_synchronously.swift new file mode 100644 index 0000000000000..c0657aadb47b4 --- /dev/null +++ b/test/Concurrency/Runtime/run_synchronously.swift @@ -0,0 +1,155 @@ +// RUN: %target-run-simple-swift + +// REQUIRES: executable_test +// REQUIRES: concurrency + +// REQUIRES: concurrency_runtime +// UNSUPPORTED: back_deployment_runtime + +// This test exercises the Actor.runSynchronously() operation that runs a +// synchronous block on an actor from synchronous code, blocking the thread +// until it has completed. + +@_spi(Experimental) import _Concurrency + +#if canImport(Darwin) +import Darwin +#endif + +class Box { + var value: T + init(value: T) { + self.value = value + } +} + +/// Actor whose sole purpose is to count calls to nextCount(), and also build up +/// a string representation of all of the values it as counted. +actor CountingActor { + var count = 0 + + // Put the string representation of the count into a box so that we get + // dynamic exclusivity checking for the string value in there that is + // being constantly mutated. This will help catch bugs where we aren't + // providing mutual exclusivity on the actor. + let stringCount: Box = Box(value: "0") + + func nextCount() -> Int { + count = count + 1 + stringCount.value += " \(count)" + return count + } +} + +/// Blocking synchronous call to the actor's nextCount(). +@available(SwiftStdlib 6.1, *) +func blockingNextCount(_ myActor: CountingActor, offsetValue: Int) -> Int { + return myActor.runSynchronously { myActor in + myActor.nextCount() + } +} + +/// Perform blocking calls to nextCount() on the given actor `iterations` times. +@available(SwiftStdlib 6.1, *) +func syncQueryActor(_ myActor: CountingActor, iterations: Int, offsetValue: Int) { + var previous = blockingNextCount(myActor, offsetValue: offsetValue) + for _ in 1.. previous) + previous = current + } +} + +/// Perform asynchronous calls to nextCount() on the given actor `iterations` +/// times. +func asyncQueryActor(_ myActor: CountingActor, iterations: Int, offsetValue: Int) async { + var previous = await myActor.nextCount() + for _ in 1.. previous, "Current \(current) vs. previous \(previous)") + previous = current + } +} + +// Test configuration. +let tasksPerPriorityBucket = 500 +let iterationsPerTask = 100 + +let myActor = CountingActor() + +// Kick off a whole bunch of tasks at various priority levels. Most of them +// asynchronously call the nextCount() operation on the actor repeatedly, but we +// have one task per priority bucket perform blocking synchronous operations. +var allTasks: [Task] = [] +for priority: TaskPriority in [.background, .utility, .low, .medium, .high, .userInitiated] { + for i in 0.. 0 { + ((index - 1) * progressBarWidth) / numTasks + } else { + 0 + } + let printed = index * progressBarWidth / numTasks + + for _ in priorPrinted..