diff --git a/src/sqs/sqs.test.ts b/src/sqs/sqs.test.ts index 09677ab32..cd8936373 100644 --- a/src/sqs/sqs.test.ts +++ b/src/sqs/sqs.test.ts @@ -4,7 +4,7 @@ import { waitUntil } from "test/utils/wait-until"; import { statsd } from "config/statsd"; import { sqsQueueMetrics } from "config/metric-names"; import { AWSError, Request as AwsRequest, Service, Response } from "aws-sdk"; -import { BaseMessagePayload, SQSMessageContext } from "~/src/sqs/sqs.types"; +import { BaseMessagePayload } from "~/src/sqs/sqs.types"; import { preemptiveRateLimitCheck } from "utils/preemptive-rate-limit"; import { when } from "jest-when"; import { SendMessageResult } from "aws-sdk/clients/sqs"; @@ -259,97 +259,4 @@ describe("SQS", () => { }); }); - describe("deleteStaleMessages", () => { - - // Mock the SQSMessageContext object - const context = { - log: { - warn: jest.fn(), - error: jest.fn() - } - } as unknown as SQSMessageContext; - - beforeEach(() => { - queue = createSqsQueue(1); - queue.start(); - }); - - // Test case for when the message is not from the targeted queue - it("should return false when message is not from targeted queue", async () => { - const message = { - Body: JSON.stringify({}), - MessageId: "12345" - }; - const result = await queue.deleteStaleMessages(message, context); - expect(result).toBe(false); - }); - - // Test case for when the message does not have a body - it("should return false when message has no body", async () => { - const message = { - MessageId: "12345" - }; - const result = await queue.deleteStaleMessages(message, context); - expect(result).toBe(false); - }); - - // Test case for when the message is from the targeted queue and is stale - it("should delete stale message and return true", async () => { - const message = { - Body: JSON.stringify({ - webhookReceived: Date.now() - 2 * 24 * 60 * 60 * 1000 // Two days ago - }), - MessageId: "12345" - }; - const deleteMessage = jest.fn(); - const mockThis = { - queueName: "deployment", - deleteMessage - }; - const result = await queue.deleteStaleMessages.call(mockThis, message, context); - expect(result).toBe(true); - expect(deleteMessage).toHaveBeenCalledWith(context); - // eslint-disable-next-line @typescript-eslint/unbound-method - expect(context.log.warn).toHaveBeenCalledWith( - { deletedMessageId: "12345" }, - "Deleted stale message from deployment queue" - ); - }); - - // Test case for when the message is from the targeted queue and is not stale - it("should return false when message is not stale", async () => { - const message = { - Body: JSON.stringify({ - webhookReceived: Date.now() - 12 * 60 * 60 * 1000 // 12 hours ago - }), - MessageId: "12345" - }; - const result = await queue.deleteStaleMessages(message, context); - expect(result).toBe(false); - }); - - // Test case for when deleting the message fails - it("should return false and log an error when deleting the message fails", async () => { - const message = { - Body: JSON.stringify({ - webhookReceived: Date.now() - 2 * 24 * 60 * 60 * 1000 // Two days ago - }), - MessageId: "12345" - }; - const deleteMessage = jest.fn().mockRejectedValue(new Error("Failed to delete message")); - const mockThis = { - queueName: "deployment", - deleteMessage - }; - const result = await queue.deleteStaleMessages.call(mockThis, message, context); - expect(result).toBe(false); - expect(deleteMessage).toHaveBeenCalledWith(context); - // eslint-disable-next-line @typescript-eslint/unbound-method - expect(context.log.error).toHaveBeenCalledWith( - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - { error: expect.any(Error), deletedMessageId: "12345" }, - "Failed to delete stale message from deployment queue" - ); - }); - }); }); diff --git a/src/sqs/sqs.ts b/src/sqs/sqs.ts index d78ed6b11..fd28f0d6b 100644 --- a/src/sqs/sqs.ts +++ b/src/sqs/sqs.ts @@ -17,7 +17,6 @@ const MAX_MESSAGE_VISIBILITY_TIMEOUT_SEC: number = 12 * 60 * 60 - 1; const DEFAULT_LONG_POLLING_INTERVAL = 4; const PROCESSING_DURATION_HISTOGRAM_BUCKETS = "10_100_500_1000_2000_3000_5000_10000_30000_60000"; const EXTRA_VISIBILITY_TIMEOUT_DELAY = 2; -const ONE_DAY_MILLI = 24 * 60 * 60 * 1000; const isNotAFailure = (errorHandlingResult: ErrorHandlingResult) => { return !errorHandlingResult.isFailure; @@ -244,42 +243,6 @@ export class SqsQueue { } } - public async deleteStaleMessages(message: Message, context: SQSMessageContext): Promise { - const TARGETED_QUEUES = ["deployment"]; - if (!message?.Body || !TARGETED_QUEUES.includes(this.queueName)) { - return false; - } - - const messageBody = JSON.parse(message.Body) as { webhookReceived?: number }; - const webhookReceived = messageBody?.webhookReceived; - if (!webhookReceived) { - context.log.warn( - { deletedMessageId: message.MessageId }, - `No webhookReceived timestamp found in message from ${this.queueName} queue` - ); - return false; - } - - if (Date.now() - webhookReceived > ONE_DAY_MILLI) { - try { - await this.deleteMessage(context); - context.log.warn( - { deletedMessageId: message.MessageId }, - `Deleted stale message from ${this.queueName} queue` - ); - return true; - } catch (error: unknown) { - context.log.error( - { error, deletedMessageId: message.MessageId }, - `Failed to delete stale message from ${this.queueName} queue` - ); - return false; - } - } - - return false; - } - private async executeMessage(message: Message, listenerContext: SQSContext): Promise { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment const payload: MessagePayload = message.Body ? JSON.parse(message.Body) : {}; @@ -321,7 +284,6 @@ export class SqsQueue { try { const messageProcessingStartTime = Date.now(); - if (await this.deleteStaleMessages(message, context)) return; const rateLimitCheckResult = await preemptiveRateLimitCheck(context, this); if (rateLimitCheckResult.isExceedThreshold) {