diff --git a/packages/modules/module-replication/src/api-handlers/replicate.ts b/packages/modules/module-replication/src/api-handlers/replicate.ts index 005c518bf2..87fdfd3292 100644 --- a/packages/modules/module-replication/src/api-handlers/replicate.ts +++ b/packages/modules/module-replication/src/api-handlers/replicate.ts @@ -15,6 +15,15 @@ const checkInput = (input: any) => { } } +function canIgnoreError(error: any) { + return ( + error != null && + (error.name === 'ServiceBusyError' || + error.name === 'RequestTimeoutError' || + error.name === 'AlreadyDisposedError') + ) +} + const handler = async (req: ResolveRequest, res: ResolveResponse) => { let input try { @@ -35,13 +44,16 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { res.status(202) res.end('Replication has been started') } catch (error) { - try { - await req.resolve.eventstoreAdapter.setReplicationStatus({ - status: 'error', - statusData: error, - }) - } catch (e) { - error.message += e.message + if (!canIgnoreError(error)) { + try { + await req.resolve.eventstoreAdapter.setReplicationStatus({ + status: 'error', + statusData: error, + }) + } catch (e) { + error.message += '\n' + error.message += e.message + } } res.status(500) @@ -63,12 +75,13 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { lastEvent: input.events[input.events.length - 1], }) } catch (error) { - try { - await req.resolve.eventstoreAdapter.setReplicationStatus({ - status: 'error', - statusData: error, - }) - } catch (e) {} + if (!canIgnoreError(error)) + try { + await req.resolve.eventstoreAdapter.setReplicationStatus({ + status: 'error', + statusData: error, + }) + } catch (e) {} } await req.resolve.broadcastEvent() } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/index.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/index.ts index e23bc1efe9..47b117711d 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/index.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/index.ts @@ -14,7 +14,7 @@ import { } from './frozen-errors' import loadEvents from './load-events' import getNextCursor from './get-next-cursor' -import throwBadCursor from './throw-bad-cursor' +import loadEventsByTimestampResult from './load-events-by-timestamp-result' import snapshotTrigger from './snapshot-trigger' import incrementalImport from './incremental-import' import importSecretsStream from './import-secrets' @@ -94,7 +94,7 @@ export { EventstoreFrozenError, AlreadyFrozenError as EventstoreAlreadyFrozenError, AlreadyUnfrozenError as EventstoreAlreadyUnfrozenError, - throwBadCursor, + loadEventsByTimestampResult, getNextCursor, snapshotTrigger, iots, diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/load-events-by-timestamp-result.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/load-events-by-timestamp-result.ts new file mode 100644 index 0000000000..3195bc29b9 --- /dev/null +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/load-events-by-timestamp-result.ts @@ -0,0 +1,20 @@ +import type { StoredEventBatchPointer, StoredEvent } from './types' + +const timestampCursorMessage = + 'Cursor cannot be used when reading by timestamp boundary' + +const loadEventsByTimestampResult = ( + events: StoredEvent[] +): StoredEventBatchPointer => { + return { + get cursor(): string { + throw new Error(timestampCursorMessage) + }, + events, + toJSON: function () { + return { events: this.events, cursor: timestampCursorMessage } + }, + } as StoredEventBatchPointer +} + +export default loadEventsByTimestampResult diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/throw-bad-cursor.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/throw-bad-cursor.ts deleted file mode 100644 index bc0d6c4758..0000000000 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/throw-bad-cursor.ts +++ /dev/null @@ -1,5 +0,0 @@ -const throwBadCursor = (): void => { - throw new Error('Cursor cannot be used when reading by timestamp boundary') -} - -export default throwBadCursor diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/load-events-by-timestamp.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/load-events-by-timestamp.ts index e67c254d16..de08a62a56 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/load-events-by-timestamp.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/load-events-by-timestamp.ts @@ -1,7 +1,7 @@ import { TimestampFilter, StoredEventBatchPointer, - throwBadCursor, + loadEventsByTimestampResult, } from '@resolve-js/eventstore-base' import createQuery from './create-query' import { AdapterPool } from './types' @@ -28,12 +28,7 @@ const loadEventsByTimestamp = async ( events.push(shapeEvent(event)) } - return { - get cursor() { - return throwBadCursor() as any - }, - events, - } + return loadEventsByTimestampResult(events) } export default loadEventsByTimestamp diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-mysql/src/load-events-by-timestamp.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-mysql/src/load-events-by-timestamp.ts index 9c27f41ba5..78ce0ee849 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-mysql/src/load-events-by-timestamp.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-mysql/src/load-events-by-timestamp.ts @@ -1,7 +1,7 @@ import { TimestampFilter, StoredEventBatchPointer, - throwBadCursor, + loadEventsByTimestampResult, } from '@resolve-js/eventstore-base' import { AdapterPool } from './types' @@ -47,12 +47,7 @@ const loadEventsByTimestamp = async ( events.push(shapeEvent(event)) } - return { - get cursor() { - return throwBadCursor() as any - }, - events, - } + return loadEventsByTimestampResult(events) } export default loadEventsByTimestamp diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/load-events-by-timestamp.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/load-events-by-timestamp.ts index 49d0652f92..7f2c1b0d73 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/load-events-by-timestamp.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/load-events-by-timestamp.ts @@ -1,7 +1,7 @@ import { TimestampFilter, - throwBadCursor, StoredEventBatchPointer, + loadEventsByTimestampResult, } from '@resolve-js/eventstore-base' import { AdapterPool } from './types' @@ -53,12 +53,7 @@ const loadEventsByTimestamp = async ( events.push(shapeEvent(event)) } - return { - get cursor() { - return throwBadCursor() as any - }, - events, - } + return loadEventsByTimestampResult(events) } export default loadEventsByTimestamp diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts index 1decf3ab8e..c3056a0cff 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts @@ -86,7 +86,9 @@ const build: ExternalMethods['build'] = async ( const state = await basePool.getReplicationState(basePool) if (state.status === 'error') { log.error( - `Refuse to start or continue replication with error state: ${state.statusData}` + `Refuse to start or continue replication with error state: ${JSON.stringify( + state.statusData + )}` ) return } else if (state.status === 'serviceError') { diff --git a/packages/runtime/runtimes/runtime-aws-serverless/src/lambda-worker.ts b/packages/runtime/runtimes/runtime-aws-serverless/src/lambda-worker.ts index 3969a414e0..b98747279c 100644 --- a/packages/runtime/runtimes/runtime-aws-serverless/src/lambda-worker.ts +++ b/packages/runtime/runtimes/runtime-aws-serverless/src/lambda-worker.ts @@ -341,10 +341,10 @@ export const lambdaWorker = async ( runtime?.monitoring?.group({ Part: 'Internal' }).error(error) if (error instanceof Error) { - log.error('error', error.message) - log.error('error', error.stack) + log.error('Lambda top-level error message:', error.message) + log.error('Lambda top-level error stack:', error.stack) } else { - log.error(JSON.stringify(error)) + log.error('Lambda unknown top-level error:', JSON.stringify(error)) } throw error diff --git a/packages/tools/scripts/src/process_manager.js b/packages/tools/scripts/src/process_manager.js index 8a21cf9f21..ba009ba98d 100644 --- a/packages/tools/scripts/src/process_manager.js +++ b/packages/tools/scripts/src/process_manager.js @@ -10,7 +10,7 @@ export const processRegister = (command, opts) => { return process } -export const processStopAll = () => { +export const processStopAll = (error) => { const promises = [] for (const process of processes) { promises.push( @@ -25,6 +25,10 @@ export const processStopAll = () => { } processes.length = 0 + if (error) { + // eslint-disable-next-line no-console + console.error(error) + } return Promise.all(promises) } diff --git a/tests/eventstore-filter-events/index.test.ts b/tests/eventstore-filter-events/index.test.ts index dc2a3e1026..53b7f4d822 100644 --- a/tests/eventstore-filter-events/index.test.ts +++ b/tests/eventstore-filter-events/index.test.ts @@ -340,4 +340,30 @@ describe(`${adapterFactory.name}. Eventstore adapter events filtering`, () => { ) } }) + + test('should cast result loaded by timestamp to JSON without errors', async () => { + const result = await adapter.loadEvents({ + limit: 1, + startTime: storedEvents[0].timestamp, + finishTime: storedEvents[countEvents / 3 - 1].timestamp, + }) + expect(() => JSON.stringify(result)).not.toThrow() + }) + + test('should throw when passing both cursor and times', async () => { + await expect( + adapter.loadEvents({ + limit: 1, + startTime: 0, + cursor: null, + }) + ).rejects.toThrow() + await expect( + adapter.loadEvents({ + limit: 1, + finishTime: 0, + cursor: null, + }) + ).rejects.toThrow() + }) })