diff --git a/packages/parser/src/schemas/dynamodb.ts b/packages/parser/src/schemas/dynamodb.ts index 00170ac67..5d735ee5b 100644 --- a/packages/parser/src/schemas/dynamodb.ts +++ b/packages/parser/src/schemas/dynamodb.ts @@ -151,69 +151,103 @@ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({ * @example * ```json * { - * "Records": [ - * { - * "eventID": "1", - * "eventVersion": "1.0", - * "dynamodb": { - * "ApproximateCreationDateTime": 1693997155.0, - * "Keys": { - * "Id": { - * "N": "101" - * } - * }, - * "NewImage": { - * "Message": { - * "S": "New item!" - * }, - * "Id": { - * "N": "101" - * } + * "Records":[{ + * "eventID":"1", + * "eventName":"INSERT", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } + * }, + * "NewImage":{ + * "Message":{ + * "S":"New item!" * }, - * "StreamViewType": "NEW_AND_OLD_IMAGES", - * "SequenceNumber": "111", - * "SizeBytes": 26 + * "Id":{ + * "N":"101" + * } * }, - * "awsRegion": "us-west-2", - * "eventName": "INSERT", - * "eventSourceARN": "eventsource_arn", - * "eventSource": "aws:dynamodb" + * "SequenceNumber":"111", + * "SizeBytes":26, + * "StreamViewType":"NEW_AND_OLD_IMAGES" * }, - * { - * "eventID": "2", - * "eventVersion": "1.0", - * "dynamodb": { - * "OldImage": { - * "Message": { - * "S": "New item!" - * }, - * "Id": { - * "N": "101" - * } + * "eventSourceARN":"stream-ARN" + * }, + * { + * "eventID":"2", + * "eventName":"MODIFY", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } + * }, + * "NewImage":{ + * "Message":{ + * "S":"This item has changed" * }, - * "SequenceNumber": "222", - * "Keys": { - * "Id": { - * "N": "101" - * } + * "Id":{ + * "N":"101" + * } + * }, + * "OldImage":{ + * "Message":{ + * "S":"New item!" * }, - * "SizeBytes": 59, - * "NewImage": { - * "Message": { - * "S": "This item has changed" - * }, - * "Id": { - * "N": "101" - * } + * "Id":{ + * "N":"101" + * } + * }, + * "SequenceNumber":"222", + * "SizeBytes":59, + * "StreamViewType":"NEW_AND_OLD_IMAGES" + * }, + * "eventSourceARN":"stream-ARN" + * }, + * { + * "eventID":"3", + * "eventName":"REMOVE", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } + * }, + * "OldImage":{ + * "Message":{ + * "S":"This item has changed" * }, - * "StreamViewType": "NEW_AND_OLD_IMAGES" + * "Id":{ + * "N":"101" + * } * }, - * "awsRegion": "us-west-2", - * "eventName": "MODIFY", - * "eventSourceARN": "source_arn", - * "eventSource": "aws:dynamodb" - * } - * ] + * "SequenceNumber":"333", + * "SizeBytes":38, + * "StreamViewType":"NEW_AND_OLD_IMAGES" + * }, + * "eventSourceARN":"stream-ARN" + * }], + * "window": { + * "start": "2020-07-30T17:00:00Z", + * "end": "2020-07-30T17:05:00Z" + * }, + * "state": { + * "1": "state1" + * }, + * "shardId": "shard123456789", + * "eventSourceARN": "stream-ARN", + * "isFinalInvokeForWindow": false, + * "isWindowTerminatedEarly": false * } * ``` * @@ -222,6 +256,17 @@ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({ */ const DynamoDBStreamSchema = z.object({ Records: z.array(DynamoDBStreamRecord).min(1), + window: z + .object({ + start: z.string().datetime(), + end: z.string().datetime(), + }) + .optional(), + state: z.record(z.string(), z.string()).optional(), + shardId: z.string().optional(), + eventSourceARN: z.string().optional(), + isFinalInvokeForWindow: z.boolean().optional(), + isWindowTerminatedEarly: z.boolean().optional(), }); export { diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index 8e2172430..9e0b8ba8e 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -70,41 +70,48 @@ const KinesisDynamoDBStreamSchema = z.object({ * "partitionKey": "1", * "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", * "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", - * "approximateArrivalTimestamp": 1545084650.987 + * "approximateArrivalTimestamp": 1607497475.000 * }, * "eventSource": "aws:kinesis", * "eventVersion": "1.0", * "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", * "eventName": "aws:kinesis:record", - * "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", - * "awsRegion": "us-east-2", - * "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" - * }, - * { - * "kinesis": { - * "kinesisSchemaVersion": "1.0", - * "partitionKey": "1", - * "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", - * "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", - * "approximateArrivalTimestamp": 1545084711.166 - * }, - * "eventSource": "aws:kinesis", - * "eventVersion": "1.0", - * "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", - * "eventName": "aws:kinesis:record", - * "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", - * "awsRegion": "us-east-2", - * "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + * "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + * "awsRegion": "us-east-1", + * "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" * } - * ] + * ], + * "window": { + * "start": "2020-12-09T07:04:00Z", + * "end": "2020-12-09T07:06:00Z" + * }, + * "state": { + * "1": 282, + * "2": 715 + * }, + * "shardId": "shardId-000000000006", + * "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + * "isFinalInvokeForWindow": false, + * "isWindowTerminatedEarly": false * } *``` * @see {@link types.KinesisDataStreamEvent | KinesisDataStreamEvent} - * @see {@link https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-event-example} + * @see {@link https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-windows.html#streams-tumbling-processing} * */ const KinesisDataStreamSchema = z.object({ Records: z.array(KinesisDataStreamRecord).min(1), + window: z + .object({ + start: z.string().datetime(), + end: z.string().datetime(), + }) + .optional(), + state: z.record(z.string(), z.unknown()).optional(), + shardId: z.string().optional(), + eventSourceARN: z.string().optional(), + isFinalInvokeForWindow: z.boolean().optional(), + isWindowTerminatedEarly: z.boolean().optional(), }); export { diff --git a/packages/parser/tests/unit/schema/dynamodb.test.ts b/packages/parser/tests/unit/schema/dynamodb.test.ts index 62b4ab8d0..d10940b28 100644 --- a/packages/parser/tests/unit/schema/dynamodb.test.ts +++ b/packages/parser/tests/unit/schema/dynamodb.test.ts @@ -112,4 +112,85 @@ describe('Schema: DynamoDB', () => { // Act & Assess expect(() => DynamoDBStreamSchema.parse(event)).toThrow(); }); + + it('parses a DynamoDB Stream with tumbling window event', () => { + // Prepare + const event = structuredClone(baseEvent); + event.window = { + start: '2020-07-30T17:00:00Z', + end: '2020-07-30T17:05:00Z', + }; + event.state = { + '1': 'state1', + }; + event.shardId = 'shard123456789'; + event.eventSourceARN = 'stream-ARN'; + event.isFinalInvokeForWindow = false; + event.isWindowTerminatedEarly = false; + + // Act + const result = DynamoDBStreamSchema.parse(event); + + // Assess + expect(result).toStrictEqual({ + Records: [ + { + eventID: '1', + eventVersion: '1.0', + dynamodb: { + ApproximateCreationDateTime: 1693997155.0, + Keys: { + Id: 101, + }, + NewImage: { + Message: 'New item!', + Id: 101, + }, + StreamViewType: 'NEW_IMAGE', + SequenceNumber: '111', + SizeBytes: 26, + }, + awsRegion: 'us-west-2', + eventName: 'INSERT', + eventSourceARN: 'eventsource_arn', + eventSource: 'aws:dynamodb', + }, + { + eventID: '2', + eventVersion: '1.0', + dynamodb: { + OldImage: { + Message: 'New item!', + Id: 101, + }, + SequenceNumber: '222', + Keys: { + Id: 101, + }, + SizeBytes: 59, + NewImage: { + Message: 'This item has changed', + Id: 101, + }, + StreamViewType: 'NEW_AND_OLD_IMAGES', + }, + awsRegion: 'us-west-2', + eventName: 'MODIFY', + eventSourceARN: 'source_arn', + eventSource: 'aws:dynamodb', + }, + ], + window: { + start: '2020-07-30T17:00:00Z', + end: '2020-07-30T17:05:00Z', + }, + state: { + '1': 'state1', + }, + shardId: 'shard123456789', + eventSourceARN: 'stream-ARN', + isFinalInvokeForWindow: false, + isWindowTerminatedEarly: false, + }); + }); }); diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index 94f7bc944..0f8817abf 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -167,6 +167,41 @@ describe('Schema: Kinesis', () => { expect(parsed).toStrictEqual(transformedInput); }); + it('parses Kinesis event with tumbling window', () => { + // Prepare + const testEvent = structuredClone(kinesisStreamEvent); + testEvent.window = { + start: '2020-07-30T17:00:00Z', + end: '2020-07-30T17:05:00Z', + }; + testEvent.state = { + '1': 'state1', + }; + testEvent.shardId = 'shard123456789'; + testEvent.eventSourceARN = 'stream-ARN'; + testEvent.isFinalInvokeForWindow = false; + testEvent.isWindowTerminatedEarly = false; + + // Act + const parsed = KinesisDataStreamSchema.parse(testEvent); + + const transformedInput = { + ...testEvent, + Records: testEvent.Records.map((record, index) => { + return { + ...record, + kinesis: { + ...record.kinesis, + data: Buffer.from(record.kinesis.data, 'base64').toString(), + }, + }; + }), + }; + + // Assess + expect(parsed).toStrictEqual(transformedInput); + }); + it('throws if cannot parse SQS record of KinesisFirehoseSqsRecord', () => { // Prepare const testEvent = getTestEvent({