From caa6859b14ffcc4886bbb76c8be5d4a1864a5691 Mon Sep 17 00:00:00 2001 From: Aleksei Potsetsuev Date: Wed, 27 Nov 2024 14:40:14 +0800 Subject: [PATCH] feat: support asynchronous subscriptions --- packages/@wroud/flow-middleware/CHANGELOG.md | 4 - .../flow-middleware/src/FlowMiddleware.ts | 17 - .../flow-middleware/src/MiddlewareRequest.ts | 348 +++++++----------- .../src/interfaces/IErrorMiddleware.ts | 4 +- .../src/interfaces/IFlowMiddleware.ts | 18 - .../src/interfaces/IMiddleware.ts | 8 +- .../src/interfaces/IMiddlewareRequest.ts | 11 - .../src/interfaces/IMiddlewareSubscribe.ts | 9 + 8 files changed, 140 insertions(+), 279 deletions(-) create mode 100644 packages/@wroud/flow-middleware/src/interfaces/IMiddlewareSubscribe.ts diff --git a/packages/@wroud/flow-middleware/CHANGELOG.md b/packages/@wroud/flow-middleware/CHANGELOG.md index 3005c7c..acdd1fc 100644 --- a/packages/@wroud/flow-middleware/CHANGELOG.md +++ b/packages/@wroud/flow-middleware/CHANGELOG.md @@ -27,10 +27,6 @@ All notable changes to this project will be documented in this file. ## 0.2.0 (2024-11-26) -### ✨ Features - -- base implementation ([74bb7e2](https://github.com/Wroud/foundation/commit/74bb7e2)) - ### 🩹 Fixes - do not copy data object ([02ce20a](https://github.com/Wroud/foundation/commit/02ce20a)) diff --git a/packages/@wroud/flow-middleware/src/FlowMiddleware.ts b/packages/@wroud/flow-middleware/src/FlowMiddleware.ts index c1d16e4..cd70354 100644 --- a/packages/@wroud/flow-middleware/src/FlowMiddleware.ts +++ b/packages/@wroud/flow-middleware/src/FlowMiddleware.ts @@ -5,10 +5,6 @@ import { MiddlewareRequest } from "./MiddlewareRequest.js"; import type { IErrorMiddleware } from "./interfaces/IErrorMiddleware.js"; import type { ILogger } from "@wroud/api-logger"; -/** - * Singleton class to register and manage middlewares. - * @template Data - The shape of the request data. - */ export class FlowMiddleware> implements IFlowMiddleware { @@ -20,29 +16,16 @@ export class FlowMiddleware> this.errorMiddlewares = []; } - /** - * Registers a middleware globally. - * @param {Middleware} middleware - The middleware function to register. - */ public use(...middleware: IMiddleware[]): this { this.middlewares.push(...middleware); return this; } - /** - * Registers an error-handling middleware globally. - * @param {ErrorMiddleware} errorMiddleware - The error middleware function to register. - */ public error(...errorMiddleware: IErrorMiddleware[]): this { this.errorMiddlewares.push(...errorMiddleware); return this; } - /** - * Creates a new MiddlewareRequest instance. - * @param {Data} initialData - Initial data for the request. - * @returns {MiddlewareRequest} A new MiddlewareRequest instance. - */ public createRequest( initialData: Data = {} as Data, ): IMiddlewareRequest { diff --git a/packages/@wroud/flow-middleware/src/MiddlewareRequest.ts b/packages/@wroud/flow-middleware/src/MiddlewareRequest.ts index 5a75e11..0d6b38e 100644 --- a/packages/@wroud/flow-middleware/src/MiddlewareRequest.ts +++ b/packages/@wroud/flow-middleware/src/MiddlewareRequest.ts @@ -2,29 +2,29 @@ import type { ILogger } from "@wroud/api-logger"; import type { IErrorMiddleware } from "./interfaces/IErrorMiddleware.js"; import type { IMiddleware } from "./interfaces/IMiddleware.js"; import type { IMiddlewareRequest } from "./interfaces/IMiddlewareRequest.js"; +import type { IMiddlewareUnsubscribe } from "./interfaces/IMiddlewareSubscribe.js"; -type MiddlewareStates = Map, Map void>>; -type ErrorMiddlewareStates = Map< - IErrorMiddleware, - Map void> +interface IMiddlewareState { + active: boolean; + dependencies?: any[]; + unsubscribe: IMiddlewareUnsubscribe; +} + +type MiddlewareStates = Map< + IMiddleware | IErrorMiddleware, + Map +>; + +type ExecutedMiddlewares = Set< + IMiddleware | IErrorMiddleware >; -/** - * Class representing a middleware request. - * @template Data - The shape of the request data. - */ export class MiddlewareRequest> implements IMiddlewareRequest { private middlewareStates: MiddlewareStates; - private errorMiddlewareStates: ErrorMiddlewareStates; private isDisposed: boolean; - /** - * Creates an instance of MiddlewareRequest. - * @param {Middleware[]} middlewares - Array of middleware functions. - * @param {Data} data - Initial data for the request. - */ constructor( private readonly middlewares: IMiddleware[], private readonly errorMiddlewares: IErrorMiddleware[], @@ -33,227 +33,128 @@ export class MiddlewareRequest> ) { this.middlewares = middlewares; this.middlewareStates = new Map(); - this.errorMiddlewareStates = new Map(); this.isDisposed = false; } - /** - * Executes the middleware chain. - */ public async execute(): Promise { if (this.isDisposed) { throw new Error("Cannot execute a disposed request."); } - try { - const newMiddlewareStates: MiddlewareStates = new Map(); - - for (let i = 0; i < this.middlewares.length; i++) { - const middleware = this.middlewares[i]!; - const currentState = - this.middlewareStates.get(middleware) || - new Map void>(); - - /** - * Subscribes to an external event. - * @param {string} key - Unique key for the subscription. - * @param {() => () => void} subscribeFn - Function that sets up the subscription and returns an unsubscribe function. - */ - const subscribe = ( - key: string, - subscribeFn: () => () => void, - ): void => { - if (currentState.has(key)) { - return; // Subscription already exists. - } - - const unsubscribe = subscribeFn(); - currentState.set(key, unsubscribe); - }; - - await middleware( - this.data, - () => this.executeNext(i + 1), - this.triggerReRun.bind(this), - subscribe, - ); - - newMiddlewareStates.set(middleware, currentState); - } + const executedMiddlewares: ExecutedMiddlewares = new Set(); - this.setMiddlewareStates(newMiddlewareStates); + try { + await this.executeNext(executedMiddlewares, 0); } catch (error) { - await this.handleError(error as Error); + await this.handleNextError(executedMiddlewares, 0, error as Error); + } finally { + await this.cleanupSubscriptions(executedMiddlewares); } } - /** - * Executes the next middleware in the chain. - * @param {number} nextIndex - Index of the next middleware to execute. - */ - private async executeNext(nextIndex: number): Promise { - if (nextIndex >= this.middlewares.length) return; + private async executeNext( + executedMiddlewares: ExecutedMiddlewares, + nextIndex: number, + ): Promise { + if (nextIndex >= this.middlewares.length) { + return; + } const middleware = this.middlewares[nextIndex]!; - const currentState = - this.middlewareStates.get(middleware) || new Map void>(); - - /** - * Subscribes to an external event. - * @param {string} key - Unique key for the subscription. - * @param {() => () => void} subscribeFn - Function that sets up the subscription and returns an unsubscribe function. - */ - const subscribe = (key: string, subscribeFn: () => () => void): void => { - if (currentState.has(key)) { - return; // Subscription already exists. - } - - const unsubscribe = subscribeFn(); - currentState.set(key, unsubscribe); - }; - - try { - await middleware( - this.data, - () => this.executeNext(nextIndex + 1), - this.triggerReRun.bind(this), - subscribe, - ); + let currentState = this.middlewareStates.get(middleware); + if (!currentState) { + currentState = new Map(); this.middlewareStates.set(middleware, currentState); - } catch (error) { - await this.handleError(error as Error); } + + this.markMiddlewareSubscriptionsInactive(currentState); + + await middleware( + this.data, + () => this.executeNext(executedMiddlewares, nextIndex + 1), + this.triggerReRun.bind(this), + subscribe.bind(null, currentState), + ); + + executedMiddlewares.add(middleware); } - /** - * Triggers a re-execution of the middleware chain. - */ private async triggerReRun(): Promise { - if (this.isDisposed) return; this.logger?.info("Re-run triggered."); await this.execute(); } - /** - * Handles errors by executing error-handling middlewares. - * @param {Error} error - The error to handle. - */ - private async handleError(error: Error): Promise { - if (this.isDisposed) { - this.logger?.error("Cannot handle error for a disposed request:", error); - return; - } - - if (this.errorMiddlewares.length === 0) { + private async handleNextError( + executedMiddlewares: ExecutedMiddlewares, + nextIndex: number, + error: Error, + ): Promise { + if (nextIndex >= this.errorMiddlewares.length) { this.logger?.error("Unhandled error:", error); - return; + throw error; } try { - const newErrorMiddlewareStates: ErrorMiddlewareStates = new Map(); - - for (let i = 0; i < this.errorMiddlewares.length; i++) { - const errorMiddleware = this.errorMiddlewares[i]!; - const currentState = - this.errorMiddlewareStates.get(errorMiddleware) || - new Map void>(); - - /** - * Subscribes to an external event. - * @param {string} key - Unique key for the subscription. - * @param {() => () => void} subscribeFn - Function that sets up the subscription and returns an unsubscribe function. - */ - const subscribe = ( - key: string, - subscribeFn: () => () => void, - ): void => { - if (currentState.has(key)) { - return; // Subscription already exists. - } - - const unsubscribe = subscribeFn(); - currentState.set(key, unsubscribe); - }; - - await errorMiddleware( - error, - this.data, - () => this.execute(), - this.triggerReRun.bind(this), - subscribe, - ); + const errorMiddleware = this.errorMiddlewares[nextIndex]!; + let currentState = this.middlewareStates.get(errorMiddleware); - newErrorMiddlewareStates.set(errorMiddleware, currentState); + if (!currentState) { + currentState = new Map(); + this.middlewareStates.set(errorMiddleware, currentState); } - this.setErrorMiddlewareStates(newErrorMiddlewareStates); - } catch (err) { - this.logger?.error("Error in error-handling middleware:", err); - } - } + await errorMiddleware( + error, + this.data, + () => this.handleNextError(executedMiddlewares, nextIndex + 1, error), + this.triggerReRun.bind(this), + subscribe.bind(null, currentState), + ); - private setMiddlewareStates( - newMiddlewareStates: MiddlewareStates, - ): void { - this.cleanupSubscriptions(newMiddlewareStates); - this.middlewareStates = newMiddlewareStates; + executedMiddlewares.add(errorMiddleware); + } catch (error) { + await this.handleNextError( + executedMiddlewares, + nextIndex + 1, + error as Error, + ); + } } - private setErrorMiddlewareStates( - newErrorMiddlewareStates: ErrorMiddlewareStates, + private markMiddlewareSubscriptionsInactive( + state: Map, ): void { - this.cleanupErrorSubscriptions(newErrorMiddlewareStates); - this.errorMiddlewareStates = newErrorMiddlewareStates; + for (const middlewareState of state.values()) { + middlewareState.active = false; + } } - /** - * Cleans up subscriptions that are no longer active in regular middlewares. - * @param {Map, Map void>>} newMiddlewareStates - The updated middleware states after execution. - */ - private cleanupSubscriptions( - newMiddlewareStates: Map, Map void>>, - ): void { + private async cleanupSubscriptions( + executedMiddlewares: ExecutedMiddlewares, + ): Promise { for (const [middleware, state] of this.middlewareStates.entries()) { - if (!newMiddlewareStates.has(middleware)) { + if (executedMiddlewares.has(middleware)) { + for (const [key, subscription] of state) { + if (!subscription.active) { + await subscription.unsubscribe(); + state.delete(key); + } + } + } else { this.disposeMiddlewareSubscriptions(middleware, state); } } } - /** - * Cleans up subscriptions that are no longer active in error middlewares. - * @param {Map, Map void>>} newErrorMiddlewareStates - The updated error middleware states after execution. - */ - private cleanupErrorSubscriptions( - newErrorMiddlewareStates: Map< - IErrorMiddleware, - Map void> - >, - ): void { - for (const [ - errorMiddleware, - state, - ] of this.errorMiddlewareStates.entries()) { - if (!newErrorMiddlewareStates.has(errorMiddleware)) { - this.disposeErrorMiddlewareSubscriptions(errorMiddleware, state); - } - } - } - - /** - * Disposes all subscriptions for a specific regular middleware. - * @param {Middleware} middleware - The middleware whose subscriptions are to be disposed. - * @param {Map void>} state - The subscription state of the middleware. - */ private disposeMiddlewareSubscriptions( - middleware: IMiddleware, - state: Map void>, + middleware: IMiddleware | IErrorMiddleware, + state: Map, ): void { this.logger?.info( `Disposing subscriptions for middleware: ${middleware.name || "anonymous"}`, ); - for (const [key, unsubscribe] of state.entries()) { + for (const [key, { unsubscribe }] of state.entries()) { try { unsubscribe(); this.logger?.info(`Unsubscribed from ${key}.`); @@ -264,52 +165,53 @@ export class MiddlewareRequest> state.clear(); } - /** - * Disposes all subscriptions for a specific error middleware. - * @param {ErrorMiddleware} errorMiddleware - The error middleware whose subscriptions are to be disposed. - * @param {Map void>} state - The subscription state of the error middleware. - */ - private disposeErrorMiddlewareSubscriptions( - errorMiddleware: IErrorMiddleware, - state: Map void>, - ): void { - this.logger?.info( - `Disposing subscriptions for error middleware: ${errorMiddleware.name || "anonymous"}`, - ); - for (const [key, unsubscribe] of state.entries()) { - try { - unsubscribe(); - this.logger?.info(`Unsubscribed from ${key}.`); - } catch (error) { - this.logger?.error(`Error unsubscribing from ${key}:`, error); - } - } - state.clear(); - } - - /** - * Disposes the request and all active subscriptions. - */ public dispose(): void { - if (this.isDisposed) return; + if (this.isDisposed) { + return; + } this.isDisposed = true; this.logger?.info("Disposing all subscriptions and request."); - // Dispose regular middlewares for (const [middleware, state] of this.middlewareStates.entries()) { this.disposeMiddlewareSubscriptions(middleware, state); } - // Dispose error middlewares - for (const [ - errorMiddleware, - state, - ] of this.errorMiddlewareStates.entries()) { - this.disposeErrorMiddlewareSubscriptions(errorMiddleware, state); - } - this.middlewareStates.clear(); - this.errorMiddlewareStates.clear(); } } + +function areDependenciesEqual( + depsA: any[] | undefined, + depsB: any[] | undefined, +): boolean { + if (depsA === undefined && depsB === undefined) return true; + if (depsA === undefined || depsB === undefined) return false; + if (depsA.length === 0 && depsB.length === 0) return true; + if (depsA.length !== depsB.length) return false; + for (let i = 0; i < depsA.length; i++) { + if (depsA[i] !== depsB[i]) return false; + } + return true; +} + +async function subscribe( + state: Map, + key: string, + subscribeFn: () => IMiddlewareUnsubscribe | Promise, + dependencies?: any[], +): Promise { + const existingSubscription = state.get(key); + + if (areDependenciesEqual(existingSubscription?.dependencies, dependencies)) { + return; + } + + if (existingSubscription) { + await existingSubscription.unsubscribe(); + state.delete(key); + } + + const unsubscribe = await subscribeFn(); + state.set(key, { dependencies, unsubscribe, active: true }); +} diff --git a/packages/@wroud/flow-middleware/src/interfaces/IErrorMiddleware.ts b/packages/@wroud/flow-middleware/src/interfaces/IErrorMiddleware.ts index b41c512..3153ea6 100644 --- a/packages/@wroud/flow-middleware/src/interfaces/IErrorMiddleware.ts +++ b/packages/@wroud/flow-middleware/src/interfaces/IErrorMiddleware.ts @@ -1,3 +1,5 @@ +import type { IMiddlewareSubscribe } from "./IMiddlewareSubscribe.js"; + /** * Type definition for an Error Middleware function. * @template Data - The shape of the request data. @@ -8,6 +10,6 @@ export interface IErrorMiddleware> { data: Data, next: () => Promise, triggerReRun: () => Promise, - subscribe: (key: string, subscribeFn: () => () => void) => void, + subscribe: IMiddlewareSubscribe, ): Promise; } diff --git a/packages/@wroud/flow-middleware/src/interfaces/IFlowMiddleware.ts b/packages/@wroud/flow-middleware/src/interfaces/IFlowMiddleware.ts index a3ddb32..4b07b71 100644 --- a/packages/@wroud/flow-middleware/src/interfaces/IFlowMiddleware.ts +++ b/packages/@wroud/flow-middleware/src/interfaces/IFlowMiddleware.ts @@ -2,27 +2,9 @@ import type { IErrorMiddleware } from "./IErrorMiddleware.js"; import type { IMiddleware } from "./IMiddleware.js"; import type { IMiddlewareRequest } from "./IMiddlewareRequest.js"; -/** - * Interface for the FlowMiddleware singleton. - * @template Data - The shape of the request data. - */ export interface IFlowMiddleware> { - /** - * Registers a middleware globally. - * @param {Middleware} middleware - The middleware function to register. - */ use(...middleware: IMiddleware[]): this; - - /** - * Registers an error-handling middleware globally. - * @param {ErrorMiddleware} errorMiddleware - The error middleware function to register. - */ error(...errorMiddleware: IErrorMiddleware[]): this; - /** - * Creates a new MiddlewareRequest instance. - * @param {Data} initialData - Initial data for the request. - * @returns {IMiddlewareRequest} A new MiddlewareRequest instance. - */ createRequest(initialData?: Data): IMiddlewareRequest; } diff --git a/packages/@wroud/flow-middleware/src/interfaces/IMiddleware.ts b/packages/@wroud/flow-middleware/src/interfaces/IMiddleware.ts index 26adbc6..5ce20af 100644 --- a/packages/@wroud/flow-middleware/src/interfaces/IMiddleware.ts +++ b/packages/@wroud/flow-middleware/src/interfaces/IMiddleware.ts @@ -1,12 +1,10 @@ -/** - * Type definition for a Middleware function. - * @template Data - The shape of the request data. - */ +import type { IMiddlewareSubscribe } from "./IMiddlewareSubscribe.js"; + export interface IMiddleware> { ( data: Data, next: () => Promise, triggerReRun: () => Promise, - subscribe: (key: string, subscribeFn: () => () => void) => void, + subscribe: IMiddlewareSubscribe, ): Promise; } diff --git a/packages/@wroud/flow-middleware/src/interfaces/IMiddlewareRequest.ts b/packages/@wroud/flow-middleware/src/interfaces/IMiddlewareRequest.ts index bffbd8e..25ed41f 100644 --- a/packages/@wroud/flow-middleware/src/interfaces/IMiddlewareRequest.ts +++ b/packages/@wroud/flow-middleware/src/interfaces/IMiddlewareRequest.ts @@ -1,15 +1,4 @@ -/** - * Interface for the MiddlewareRequest class. - * @template Data - The shape of the request data. - */ export interface IMiddlewareRequest> { - /** - * Executes the middleware chain. - */ execute(): Promise; - - /** - * Disposes the request and all active subscriptions. - */ dispose(): void; } diff --git a/packages/@wroud/flow-middleware/src/interfaces/IMiddlewareSubscribe.ts b/packages/@wroud/flow-middleware/src/interfaces/IMiddlewareSubscribe.ts new file mode 100644 index 0000000..2f653c2 --- /dev/null +++ b/packages/@wroud/flow-middleware/src/interfaces/IMiddlewareSubscribe.ts @@ -0,0 +1,9 @@ +export type IMiddlewareUnsubscribe = (() => void) | (() => Promise); + +export interface IMiddlewareSubscribe { + ( + key: string, + subscribeFn: () => IMiddlewareUnsubscribe | Promise, + dependencies?: any[], + ): void; +}