Skip to content

Commit

Permalink
feat: plan re-runs
Browse files Browse the repository at this point in the history
  • Loading branch information
Wroud committed Nov 27, 2024
1 parent 61dd5ad commit 827afd9
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions packages/@wroud/flow-middleware/src/MiddlewareRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export class MiddlewareRequest<Data = Record<string, any>>
{
private middlewareStates: MiddlewareStates<Data>;
private isDisposed: boolean;
private scheduledReRun: boolean;
private isRunning: boolean;

constructor(
private readonly middlewares: IMiddleware<Data>[],
Expand All @@ -34,6 +36,8 @@ export class MiddlewareRequest<Data = Record<string, any>>
this.middlewares = middlewares;
this.middlewareStates = new Map();
this.isDisposed = false;
this.scheduledReRun = false;
this.isRunning = false;
}

public use(...middleware: IMiddleware<Data>[]): this {
Expand All @@ -54,6 +58,7 @@ export class MiddlewareRequest<Data = Record<string, any>>
const executedMiddlewares: ExecutedMiddlewares<Data> = new Set();

try {
this.isRunning = true;
if (error !== undefined) {
throw error;
}
Expand All @@ -62,6 +67,12 @@ export class MiddlewareRequest<Data = Record<string, any>>
await this.handleNextError(executedMiddlewares, 0, error as Error);
} finally {
await this.cleanupSubscriptions(executedMiddlewares);
this.isRunning = false;

if (this.scheduledReRun) {
this.scheduledReRun = false;
await this.execute();
}
}
}

Expand Down Expand Up @@ -95,7 +106,11 @@ export class MiddlewareRequest<Data = Record<string, any>>

private async triggerReRun(error?: any): Promise<void> {
this.logger?.info("Re-run triggered.");
await this.execute(error);
if (this.isRunning) {
this.scheduledReRun = true;
} else {
await this.execute(error);
}
}

private async handleNextError(
Expand Down Expand Up @@ -167,7 +182,7 @@ export class MiddlewareRequest<Data = Record<string, any>>
this.logger?.info(
`Disposing subscriptions for middleware: ${middleware.name || "anonymous"}`,
);
for (const [key, { unsubscribe }] of state.entries()) {
for (const [key, { unsubscribe }] of state) {
try {
unsubscribe();
this.logger?.info(`Unsubscribed from ${key}.`);
Expand All @@ -186,7 +201,7 @@ export class MiddlewareRequest<Data = Record<string, any>>
this.isDisposed = true;
this.logger?.info("Disposing all subscriptions and request.");

for (const [middleware, state] of this.middlewareStates.entries()) {
for (const [middleware, state] of this.middlewareStates) {
this.disposeMiddlewareSubscriptions(middleware, state);
}

Expand Down

0 comments on commit 827afd9

Please sign in to comment.