Skip to content

Commit

Permalink
scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jtjeferreira committed Dec 27, 2023
1 parent 2ce9333 commit 9f0ba72
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,17 @@ private[kinesis] class KinesisSchedulerSourceStage(
failStage(SchedulerUnexpectedShutdown(e))
}
override def postStop(): Unit =
schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown())(materializer.executionContext))
schedulerOpt.foreach(
scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown())(materializer.executionContext)
)

protected def executionContext(attributes: Attributes): ExecutionContext = {
val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match {
case ActorAttributes.Dispatcher("") =>
ActorAttributes.IODispatcher
case d => d
}) match {
case d@ActorAttributes.IODispatcher =>
case d @ ActorAttributes.IODispatcher =>
// this one is not a dispatcher id, but is a config path pointing to the dispatcher id
materializer.system.settings.config.getString(d.dispatcher)
case d => d.dispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object KinesisSchedulerSource {
schedulerBuilder: ShardRecordProcessorFactory => Scheduler,
settings: KinesisSchedulerSourceSettings
): Source[CommittableRecord, Future[Scheduler]] =
Source.fromGraph(new KinesisSchedulerSourceStage(settings, schedulerBuilder))
Source.fromGraph(new KinesisSchedulerSourceStage(settings, schedulerBuilder))

def sharded(
schedulerBuilder: ShardRecordProcessorFactory => Scheduler,
Expand Down

0 comments on commit 9f0ba72

Please sign in to comment.