From 827afd9557821cdfe0609195ced121ee3db83c08 Mon Sep 17 00:00:00 2001 From: Aleksei Potsetsuev Date: Wed, 27 Nov 2024 17:52:18 +0800 Subject: [PATCH] feat: plan re-runs --- .../flow-middleware/src/MiddlewareRequest.ts | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/packages/@wroud/flow-middleware/src/MiddlewareRequest.ts b/packages/@wroud/flow-middleware/src/MiddlewareRequest.ts index f1e1dcc..e2b46d8 100644 --- a/packages/@wroud/flow-middleware/src/MiddlewareRequest.ts +++ b/packages/@wroud/flow-middleware/src/MiddlewareRequest.ts @@ -24,6 +24,8 @@ export class MiddlewareRequest> { private middlewareStates: MiddlewareStates; private isDisposed: boolean; + private scheduledReRun: boolean; + private isRunning: boolean; constructor( private readonly middlewares: IMiddleware[], @@ -34,6 +36,8 @@ export class MiddlewareRequest> this.middlewares = middlewares; this.middlewareStates = new Map(); this.isDisposed = false; + this.scheduledReRun = false; + this.isRunning = false; } public use(...middleware: IMiddleware[]): this { @@ -54,6 +58,7 @@ export class MiddlewareRequest> const executedMiddlewares: ExecutedMiddlewares = new Set(); try { + this.isRunning = true; if (error !== undefined) { throw error; } @@ -62,6 +67,12 @@ export class MiddlewareRequest> await this.handleNextError(executedMiddlewares, 0, error as Error); } finally { await this.cleanupSubscriptions(executedMiddlewares); + this.isRunning = false; + + if (this.scheduledReRun) { + this.scheduledReRun = false; + await this.execute(); + } } } @@ -95,7 +106,11 @@ export class MiddlewareRequest> private async triggerReRun(error?: any): Promise { this.logger?.info("Re-run triggered."); - await this.execute(error); + if (this.isRunning) { + this.scheduledReRun = true; + } else { + await this.execute(error); + } } private async handleNextError( @@ -167,7 +182,7 @@ export class MiddlewareRequest> this.logger?.info( `Disposing subscriptions for middleware: ${middleware.name || "anonymous"}`, ); - for (const [key, { unsubscribe }] of state.entries()) { + for (const [key, { unsubscribe }] of state) { try { unsubscribe(); this.logger?.info(`Unsubscribed from ${key}.`); @@ -186,7 +201,7 @@ export class MiddlewareRequest> this.isDisposed = true; this.logger?.info("Disposing all subscriptions and request."); - for (const [middleware, state] of this.middlewareStates.entries()) { + for (const [middleware, state] of this.middlewareStates) { this.disposeMiddlewareSubscriptions(middleware, state); }