diff --git a/cdk/infrastructure/constructs/backend.py b/cdk/infrastructure/constructs/backend.py index 8344093980..5321ea5cac 100644 --- a/cdk/infrastructure/constructs/backend.py +++ b/cdk/infrastructure/constructs/backend.py @@ -39,6 +39,7 @@ from infrastructure.constructs.queue import InvalidationQueue from infrastructure.constructs.queue import TransactionQueue +from infrastructure.constructs.queue import DeduplicationQueue from infrastructure.constructs.flag import FeatureFlagService @@ -72,6 +73,7 @@ class BackendProps: opensearch_multiplexer: Multiplexer transaction_queue: TransactionQueue invalidation_queue: InvalidationQueue + deduplication_queue: DeduplicationQueue feature_flag_service: FeatureFlagService ini_name: str cpu: int @@ -121,8 +123,10 @@ def __init__( self._allow_task_to_put_events_on_bus() self._allow_task_to_send_messages_to_transaction_queue() self._allow_task_to_send_messages_to_invalidation_queue() + self._allow_task_to_send_messages_to_deduplication_queue() self._allow_task_to_send_messages_to_transaction_dead_letter_queue() self._allow_task_to_send_messages_to_invalidation_dead_letter_queue() + self._allow_task_to_send_messages_to_deduplication_dead_letter_queue() self._allow_task_to_download_from_files_buckets() self._allow_task_to_upload_to_files_buckets() self._allow_task_to_read_upload_files_user_access_keys_secret() @@ -258,8 +262,10 @@ def _add_application_container_to_task(self) -> None: 'OPENSEARCH_FOR_WRITING_URL': self.opensearch_for_writing.url, 'TRANSACTION_QUEUE_URL': self.props.transaction_queue.queue.queue_url, 'INVALIDATION_QUEUE_URL': self.props.invalidation_queue.queue.queue_url, + 'DEDUPLICATION_QUEUE_URL': self.props.deduplication_queue.queue.queue_url, 'TRANSACTION_DEAD_LETTER_QUEUE_URL': self.props.transaction_queue.dead_letter_queue.queue_url, 'INVALIDATION_DEAD_LETTER_QUEUE_URL': self.props.invalidation_queue.dead_letter_queue.queue_url, + 'DEDUPLICATION_DEAD_LETTER_QUEUE_URL': self.props.deduplication_queue.dead_letter_queue.queue_url, 'UPLOAD_USER_ACCESS_KEYS_SECRET_ARN': self.props.existing_resources.upload_igvf_files_user_access_keys.secret.secret_arn, 'RESTRICTED_UPLOAD_USER_ACCESS_KEYS_SECRET_ARN': self.props.existing_resources.upload_igvf_restricted_files_user_access_keys.secret.secret_arn, 'APPCONFIG_APPLICATION': self.props.feature_flag_service.application.name, @@ -309,6 +315,11 @@ def _allow_task_to_send_messages_to_invalidation_queue(self) -> None: self.fargate_service.task_definition.task_role ) + def _allow_task_to_send_messages_to_deduplication_queue(self) -> None: + self.props.deduplication_queue.queue.grant_send_messages( + self.fargate_service.task_definition.task_role + ) + def _allow_task_to_send_messages_to_transaction_dead_letter_queue(self) -> None: self.props.transaction_queue.dead_letter_queue.grant_send_messages( self.fargate_service.task_definition.task_role @@ -319,6 +330,11 @@ def _allow_task_to_send_messages_to_invalidation_dead_letter_queue(self) -> None self.fargate_service.task_definition.task_role ) + def _allow_task_to_send_messages_to_deduplication_dead_letter_queue(self) -> None: + self.props.deduplication_queue.dead_letter_queue.grant_send_messages( + self.fargate_service.task_definition.task_role + ) + def _allow_task_to_download_from_files_buckets(self) -> None: self.fargate_service.task_definition.task_role.add_managed_policy( self.props.existing_resources.bucket_access_policies.download_igvf_files_policy diff --git a/cdk/infrastructure/constructs/indexer.py b/cdk/infrastructure/constructs/indexer.py index c70bd638f6..b7b207d963 100644 --- a/cdk/infrastructure/constructs/indexer.py +++ b/cdk/infrastructure/constructs/indexer.py @@ -27,6 +27,7 @@ from infrastructure.constructs.queue import TransactionQueue from infrastructure.constructs.queue import InvalidationQueue +from infrastructure.constructs.queue import DeduplicationQueue from infrastructure.constructs.tasks.deduplicatequeue import DeduplicateInvalidationQueue from infrastructure.constructs.tasks.deduplicatequeue import DeduplicateInvalidationQueueProps @@ -98,6 +99,7 @@ class IndexerProps: cluster: ICluster transaction_queue: TransactionQueue invalidation_queue: InvalidationQueue + deduplication_queue: DeduplicationQueue opensearch_multiplexer: Multiplexer use_opensearch_named: str backend_url: str @@ -293,7 +295,8 @@ def _run_deduplicate_invalidation_queue_automatically(self) -> None: existing_resources=self.props.existing_resources, cluster=self.props.cluster, invalidation_queue=self.props.invalidation_queue, - number_of_workers=100, + deduplication_queue=self.props.deduplication_queue, + number_of_workers=120, minutes_to_wait_between_runs=60, cpu=1024, memory_limit_mib=2048, diff --git a/cdk/infrastructure/constructs/queue.py b/cdk/infrastructure/constructs/queue.py index a16fdf9468..51bf18de20 100644 --- a/cdk/infrastructure/constructs/queue.py +++ b/cdk/infrastructure/constructs/queue.py @@ -120,3 +120,33 @@ def _add_alarms(self) -> None: oldest_message_in_seconds_threshold=86400, ), ) + + +class DeduplicationQueue(QueueBase): + + def __init__( + self, + scope: Construct, + construct_id: str, + *, + props: QueueProps, + **kwargs: Any + ) -> None: + super().__init__( + scope, + construct_id, + props=props, + **kwargs, + ) + + def _add_alarms(self) -> None: + QueueAlarms( + self, + 'DeduplicationQueueAlarms', + props=QueueAlarmsProps( + existing_resources=self.props.existing_resources, + queue=self.queue, + dead_letter_queue=self.dead_letter_queue, + oldest_message_in_seconds_threshold=86400, + ), + ) diff --git a/cdk/infrastructure/constructs/tasks/deduplicatequeue.py b/cdk/infrastructure/constructs/tasks/deduplicatequeue.py index 7ec9035d3f..5c7d346c0c 100644 --- a/cdk/infrastructure/constructs/tasks/deduplicatequeue.py +++ b/cdk/infrastructure/constructs/tasks/deduplicatequeue.py @@ -27,6 +27,7 @@ from infrastructure.constructs.existing.types import ExistingResources from infrastructure.constructs.queue import InvalidationQueue +from infrastructure.constructs.queue import DeduplicationQueue from typing import Any @@ -39,6 +40,7 @@ class DeduplicateInvalidationQueueProps: existing_resources: ExistingResources cluster: ICluster invalidation_queue: InvalidationQueue + deduplication_queue: DeduplicationQueue number_of_workers: int minutes_to_wait_between_runs: int cpu: int @@ -68,7 +70,8 @@ def __init__( self._define_docker_asset() self._define_log_driver_for_application_container() self._define_scheduled_fargate_task() - self._allow_task_to_consume_messages_from_invalidation_queue() + self._allow_task_to_read_write_from_invalidation_queue() + self._allow_task_to_read_write_from_deduplication_queue() self._define_manual_event_rule() def _define_event_source(self) -> None: @@ -99,6 +102,7 @@ def _define_scheduled_fargate_task(self) -> None: image=self.application_image, environment={ 'QUEUE_URL': self.props.invalidation_queue.queue.queue_url, + 'STORAGE_QUEUE_URL': self.props.deduplication_queue.queue.queue_url, 'NUM_WORKERS': f'{self.props.number_of_workers}', }, log_driver=self.application_log_driver, @@ -116,10 +120,21 @@ def _define_scheduled_fargate_task(self) -> None: ), ) - def _allow_task_to_consume_messages_from_invalidation_queue(self) -> None: + def _allow_task_to_read_write_from_invalidation_queue(self) -> None: self.props.invalidation_queue.queue.grant_consume_messages( self.scheduled_fargate_task.task_definition.task_role ) + self.props.invalidation_queue.queue.grant_send_messages( + self.scheduled_fargate_task.task_definition.task_role + ) + + def _allow_task_to_read_write_from_deduplication_queue(self) -> None: + self.props.deduplication_queue.queue.grant_consume_messages( + self.scheduled_fargate_task.task_definition.task_role + ) + self.props.deduplication_queue.queue.grant_send_messages( + self.scheduled_fargate_task.task_definition.task_role + ) def _define_manual_event_rule(self) -> None: self.event_rule = Rule( diff --git a/cdk/infrastructure/stacks/backend.py b/cdk/infrastructure/stacks/backend.py index 3b0413d8a3..d6ae212257 100644 --- a/cdk/infrastructure/stacks/backend.py +++ b/cdk/infrastructure/stacks/backend.py @@ -15,6 +15,7 @@ from infrastructure.constructs.queue import QueueProps from infrastructure.constructs.queue import TransactionQueue from infrastructure.constructs.queue import InvalidationQueue +from infrastructure.constructs.queue import DeduplicationQueue from infrastructure.constructs.flag import FeatureFlagServiceProps from infrastructure.constructs.flag import FeatureFlagService @@ -60,6 +61,13 @@ def __init__( existing_resources=self.existing_resources, ), ) + self.deduplication_queue = DeduplicationQueue( + self, + 'DeduplicationQueue', + props=QueueProps( + existing_resources=self.existing_resources, + ), + ) self.feature_flag_service = FeatureFlagService( self, 'FeatureFlags', @@ -79,6 +87,7 @@ def __init__( opensearch_multiplexer=opensearch_multiplexer, transaction_queue=self.transaction_queue, invalidation_queue=self.invalidation_queue, + deduplication_queue=self.deduplication_queue, feature_flag_service=self.feature_flag_service, ) ) @@ -91,6 +100,7 @@ def __init__( cluster=self.backend.fargate_service.cluster, transaction_queue=self.transaction_queue, invalidation_queue=self.invalidation_queue, + deduplication_queue=self.deduplication_queue, opensearch_multiplexer=opensearch_multiplexer, use_opensearch_named=self.backend.props.write_to_opensearch_named, backend_url=f'https://{self.backend.domain_name}', diff --git a/cdk/tests/conftest.py b/cdk/tests/conftest.py index e1f2bf0664..ac5b649466 100644 --- a/cdk/tests/conftest.py +++ b/cdk/tests/conftest.py @@ -452,6 +452,19 @@ def invalidation_queue(stack, existing_resources): ) +@pytest.fixture +def deduplication_queue(stack, existing_resources): + from infrastructure.constructs.queue import QueueProps + from infrastructure.constructs.queue import DeduplicationQueue + return DeduplicationQueue( + stack, + 'DeduplicationQueue', + props=QueueProps( + existing_resources=existing_resources, + ), + ) + + @pytest.fixture def feature_flag_service(stack, config): from infrastructure.constructs.flag import FeatureFlagServiceProps diff --git a/cdk/tests/unit/snapshots/test_constructs_indexer/test_constructs_indexer_indexer_match_snapshot/indexer_constructs_template.json b/cdk/tests/unit/snapshots/test_constructs_indexer/test_constructs_indexer_indexer_match_snapshot/indexer_constructs_template.json index 81d446b013..c9417523f5 100644 --- a/cdk/tests/unit/snapshots/test_constructs_indexer/test_constructs_indexer_indexer_match_snapshot/indexer_constructs_template.json +++ b/cdk/tests/unit/snapshots/test_constructs_indexer/test_constructs_indexer_indexer_match_snapshot/indexer_constructs_template.json @@ -273,6 +273,97 @@ }, "Type": "AWS::IAM::Role" }, + "DeduplicationQueue199CF29F": { + "DeletionPolicy": "Delete", + "Properties": { + "RedrivePolicy": { + "deadLetterTargetArn": { + "Fn::GetAtt": [ + "DeduplicationQueueDeadLetterQueueC5D62AD6", + "Arn" + ] + }, + "maxReceiveCount": 10 + }, + "VisibilityTimeout": 120 + }, + "Type": "AWS::SQS::Queue", + "UpdateReplacePolicy": "Delete" + }, + "DeduplicationQueueDeadLetterQueueC5D62AD6": { + "DeletionPolicy": "Delete", + "Properties": { + "MessageRetentionPeriod": 1209600 + }, + "Type": "AWS::SQS::Queue", + "UpdateReplacePolicy": "Delete" + }, + "DeduplicationQueueDeduplicationQueueAlarmsDeadLetterQueueHasMessagesAlarmDEF79C23": { + "Properties": { + "AlarmActions": [ + { + "Ref": "TestTopic339EC197" + } + ], + "ComparisonOperator": "GreaterThanOrEqualToThreshold", + "Dimensions": [ + { + "Name": "QueueName", + "Value": { + "Fn::GetAtt": [ + "DeduplicationQueueDeadLetterQueueC5D62AD6", + "QueueName" + ] + } + } + ], + "EvaluationPeriods": 1, + "MetricName": "ApproximateNumberOfMessagesVisible", + "Namespace": "AWS/SQS", + "OKActions": [ + { + "Ref": "TestTopic339EC197" + } + ], + "Period": 300, + "Statistic": "Maximum", + "Threshold": 1 + }, + "Type": "AWS::CloudWatch::Alarm" + }, + "DeduplicationQueueDeduplicationQueueAlarmsQueueHasOldMessagesAlarm2251106E": { + "Properties": { + "AlarmActions": [ + { + "Ref": "TestTopic339EC197" + } + ], + "ComparisonOperator": "GreaterThanOrEqualToThreshold", + "Dimensions": [ + { + "Name": "QueueName", + "Value": { + "Fn::GetAtt": [ + "DeduplicationQueue199CF29F", + "QueueName" + ] + } + } + ], + "EvaluationPeriods": 1, + "MetricName": "ApproximateAgeOfOldestMessage", + "Namespace": "AWS/SQS", + "OKActions": [ + { + "Ref": "TestTopic339EC197" + } + ], + "Period": 300, + "Statistic": "Maximum", + "Threshold": 86400 + }, + "Type": "AWS::CloudWatch::Alarm" + }, "DownloadManagedPolicyF8118CAF": { "Properties": { "Description": "", @@ -417,14 +508,20 @@ "Ref": "InvalidationQueue8614463D" } }, + { + "Name": "STORAGE_QUEUE_URL", + "Value": { + "Ref": "DeduplicationQueue199CF29F" + } + }, { "Name": "NUM_WORKERS", - "Value": "100" + "Value": "120" } ], "Essential": true, "Image": { - "Fn::Sub": "${AWS::AccountId}.dkr.ecr.${AWS::Region}.${AWS::URLSuffix}/cdk-hnb659fds-container-assets-${AWS::AccountId}-${AWS::Region}:a827b71c585b071ab4fecd5a1bc9b8fcd8f90d6dc80e3d026fc84c02b7a2b2d3" + "Fn::Sub": "${AWS::AccountId}.dkr.ecr.${AWS::Region}.${AWS::URLSuffix}/cdk-hnb659fds-container-assets-${AWS::AccountId}-${AWS::Region}:b64424101f6719672add2ffa1e7e04bdb394d7015003ad99b4ebf5cc89e8ce07" }, "LogConfiguration": { "LogDriver": "awslogs", @@ -700,6 +797,50 @@ "Arn" ] } + }, + { + "Action": [ + "sqs:SendMessage", + "sqs:GetQueueAttributes", + "sqs:GetQueueUrl" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "InvalidationQueue8614463D", + "Arn" + ] + } + }, + { + "Action": [ + "sqs:ReceiveMessage", + "sqs:ChangeMessageVisibility", + "sqs:GetQueueUrl", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "DeduplicationQueue199CF29F", + "Arn" + ] + } + }, + { + "Action": [ + "sqs:SendMessage", + "sqs:GetQueueAttributes", + "sqs:GetQueueUrl" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "DeduplicationQueue199CF29F", + "Arn" + ] + } } ], "Version": "2012-10-17" diff --git a/cdk/tests/unit/snapshots/test_constructs_tasks_deduplicatequeue/test_constructs_tasks_deduplicatequeue/deduplicatequeue_task_template.json b/cdk/tests/unit/snapshots/test_constructs_tasks_deduplicatequeue/test_constructs_tasks_deduplicatequeue/deduplicatequeue_task_template.json index 1959f77d4c..bf0e825b45 100644 --- a/cdk/tests/unit/snapshots/test_constructs_tasks_deduplicatequeue/test_constructs_tasks_deduplicatequeue/deduplicatequeue_task_template.json +++ b/cdk/tests/unit/snapshots/test_constructs_tasks_deduplicatequeue/test_constructs_tasks_deduplicatequeue/deduplicatequeue_task_template.json @@ -33,6 +33,97 @@ } }, "Resources": { + "DeduplicationQueue199CF29F": { + "DeletionPolicy": "Delete", + "Properties": { + "RedrivePolicy": { + "deadLetterTargetArn": { + "Fn::GetAtt": [ + "DeduplicationQueueDeadLetterQueueC5D62AD6", + "Arn" + ] + }, + "maxReceiveCount": 10 + }, + "VisibilityTimeout": 120 + }, + "Type": "AWS::SQS::Queue", + "UpdateReplacePolicy": "Delete" + }, + "DeduplicationQueueDeadLetterQueueC5D62AD6": { + "DeletionPolicy": "Delete", + "Properties": { + "MessageRetentionPeriod": 1209600 + }, + "Type": "AWS::SQS::Queue", + "UpdateReplacePolicy": "Delete" + }, + "DeduplicationQueueDeduplicationQueueAlarmsDeadLetterQueueHasMessagesAlarmDEF79C23": { + "Properties": { + "AlarmActions": [ + { + "Ref": "TestTopic339EC197" + } + ], + "ComparisonOperator": "GreaterThanOrEqualToThreshold", + "Dimensions": [ + { + "Name": "QueueName", + "Value": { + "Fn::GetAtt": [ + "DeduplicationQueueDeadLetterQueueC5D62AD6", + "QueueName" + ] + } + } + ], + "EvaluationPeriods": 1, + "MetricName": "ApproximateNumberOfMessagesVisible", + "Namespace": "AWS/SQS", + "OKActions": [ + { + "Ref": "TestTopic339EC197" + } + ], + "Period": 300, + "Statistic": "Maximum", + "Threshold": 1 + }, + "Type": "AWS::CloudWatch::Alarm" + }, + "DeduplicationQueueDeduplicationQueueAlarmsQueueHasOldMessagesAlarm2251106E": { + "Properties": { + "AlarmActions": [ + { + "Ref": "TestTopic339EC197" + } + ], + "ComparisonOperator": "GreaterThanOrEqualToThreshold", + "Dimensions": [ + { + "Name": "QueueName", + "Value": { + "Fn::GetAtt": [ + "DeduplicationQueue199CF29F", + "QueueName" + ] + } + } + ], + "EvaluationPeriods": 1, + "MetricName": "ApproximateAgeOfOldestMessage", + "Namespace": "AWS/SQS", + "OKActions": [ + { + "Ref": "TestTopic339EC197" + } + ], + "Period": 300, + "Statistic": "Maximum", + "Threshold": 86400 + }, + "Type": "AWS::CloudWatch::Alarm" + }, "DeduplicatorRunDeduplicateInvalidationQueueB8C28A49": { "Properties": { "EventPattern": { @@ -157,6 +248,12 @@ "Ref": "InvalidationQueue8614463D" } }, + { + "Name": "STORAGE_QUEUE_URL", + "Value": { + "Ref": "DeduplicationQueue199CF29F" + } + }, { "Name": "NUM_WORKERS", "Value": "50" @@ -164,7 +261,7 @@ ], "Essential": true, "Image": { - "Fn::Sub": "${AWS::AccountId}.dkr.ecr.${AWS::Region}.${AWS::URLSuffix}/cdk-hnb659fds-container-assets-${AWS::AccountId}-${AWS::Region}:a827b71c585b071ab4fecd5a1bc9b8fcd8f90d6dc80e3d026fc84c02b7a2b2d3" + "Fn::Sub": "${AWS::AccountId}.dkr.ecr.${AWS::Region}.${AWS::URLSuffix}/cdk-hnb659fds-container-assets-${AWS::AccountId}-${AWS::Region}:b64424101f6719672add2ffa1e7e04bdb394d7015003ad99b4ebf5cc89e8ce07" }, "LogConfiguration": { "LogDriver": "awslogs", @@ -440,6 +537,50 @@ "Arn" ] } + }, + { + "Action": [ + "sqs:SendMessage", + "sqs:GetQueueAttributes", + "sqs:GetQueueUrl" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "InvalidationQueue8614463D", + "Arn" + ] + } + }, + { + "Action": [ + "sqs:ReceiveMessage", + "sqs:ChangeMessageVisibility", + "sqs:GetQueueUrl", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "DeduplicationQueue199CF29F", + "Arn" + ] + } + }, + { + "Action": [ + "sqs:SendMessage", + "sqs:GetQueueAttributes", + "sqs:GetQueueUrl" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "DeduplicationQueue199CF29F", + "Arn" + ] + } } ], "Version": "2012-10-17" diff --git a/cdk/tests/unit/test_constructs_backend.py b/cdk/tests/unit/test_constructs_backend.py index c6dc8bb35b..9d7f1ec238 100644 --- a/cdk/tests/unit/test_constructs_backend.py +++ b/cdk/tests/unit/test_constructs_backend.py @@ -13,6 +13,7 @@ def test_constructs_backend_initialize_backend_construct( opensearch_multiplexer, transaction_queue, invalidation_queue, + deduplication_queue, feature_flag_service, ): from infrastructure.constructs.backend import Backend @@ -53,6 +54,7 @@ def test_constructs_backend_initialize_backend_construct( opensearch_multiplexer=opensearch_multiplexer, transaction_queue=transaction_queue, invalidation_queue=invalidation_queue, + deduplication_queue=deduplication_queue, feature_flag_service=feature_flag_service, cpu=2048, memory_limit_mib=4096, @@ -239,6 +241,12 @@ def test_constructs_backend_initialize_backend_construct( 'Ref': 'InvalidationQueue8614463D' } }, + { + 'Name': 'DEDUPLICATION_QUEUE_URL', + 'Value': { + 'Ref': 'DeduplicationQueue199CF29F' + } + }, { 'Name': 'TRANSACTION_DEAD_LETTER_QUEUE_URL', 'Value': { @@ -251,6 +259,12 @@ def test_constructs_backend_initialize_backend_construct( 'Ref': 'InvalidationQueueDeadLetterQueueFE5C594E' } }, + { + 'Name': 'DEDUPLICATION_DEAD_LETTER_QUEUE_URL', + 'Value': { + 'Ref': 'DeduplicationQueueDeadLetterQueueC5D62AD6' + } + }, { 'Name': 'UPLOAD_USER_ACCESS_KEYS_SECRET_ARN', 'Value': { @@ -619,6 +633,20 @@ def test_constructs_backend_initialize_backend_construct( ] } }, + { + 'Action': [ + 'sqs:SendMessage', + 'sqs:GetQueueAttributes', + 'sqs:GetQueueUrl' + ], + 'Effect': 'Allow', + 'Resource': { + 'Fn::GetAtt': [ + 'DeduplicationQueue199CF29F', + 'Arn' + ] + } + }, { 'Action': [ 'sqs:SendMessage', @@ -647,6 +675,20 @@ def test_constructs_backend_initialize_backend_construct( ] } }, + { + 'Action': [ + 'sqs:SendMessage', + 'sqs:GetQueueAttributes', + 'sqs:GetQueueUrl' + ], + 'Effect': 'Allow', + 'Resource': { + 'Fn::GetAtt': [ + 'DeduplicationQueueDeadLetterQueueC5D62AD6', + 'Arn' + ] + } + }, { 'Action': [ 'secretsmanager:GetSecretValue', @@ -914,6 +956,7 @@ def test_constructs_backend_backend_construct_define_domain_name( opensearch_multiplexer, transaction_queue, invalidation_queue, + deduplication_queue, feature_flag_service, ): from infrastructure.config import Config @@ -954,6 +997,7 @@ def test_constructs_backend_backend_construct_define_domain_name( opensearch_multiplexer=opensearch_multiplexer, transaction_queue=transaction_queue, invalidation_queue=invalidation_queue, + deduplication_queue=deduplication_queue, feature_flag_service=feature_flag_service, cpu=2048, memory_limit_mib=4096, @@ -989,6 +1033,7 @@ def test_constructs_backend_backend_construct_define_domain_name( opensearch_multiplexer=opensearch_multiplexer, transaction_queue=transaction_queue, invalidation_queue=invalidation_queue, + deduplication_queue=deduplication_queue, feature_flag_service=feature_flag_service, cpu=2048, memory_limit_mib=4096, diff --git a/cdk/tests/unit/test_constructs_indexer.py b/cdk/tests/unit/test_constructs_indexer.py index b5595b7fd8..306d93d085 100644 --- a/cdk/tests/unit/test_constructs_indexer.py +++ b/cdk/tests/unit/test_constructs_indexer.py @@ -10,6 +10,7 @@ def test_constructs_indexer_initialize_indexer( application_load_balanced_fargate_service, transaction_queue, invalidation_queue, + deduplication_queue, opensearch_multiplexer, ): from infrastructure.constructs.indexer import InvalidationServiceProps @@ -25,6 +26,7 @@ def test_constructs_indexer_initialize_indexer( cluster=application_load_balanced_fargate_service.cluster, transaction_queue=transaction_queue, invalidation_queue=invalidation_queue, + deduplication_queue=deduplication_queue, opensearch_multiplexer=opensearch_multiplexer, use_opensearch_named='Opensearch', backend_url='some-url.test', @@ -41,7 +43,7 @@ def test_constructs_indexer_initialize_indexer( assert isinstance(indexer, Indexer) template.resource_count_is( 'AWS::SQS::Queue', - 4, + 6, ) template.has_resource_properties( 'AWS::SQS::Queue', @@ -73,6 +75,21 @@ def test_constructs_indexer_initialize_indexer( 'VisibilityTimeout': 120 } ) + template.has_resource_properties( + 'AWS::SQS::Queue', + { + 'RedrivePolicy': { + 'deadLetterTargetArn': { + 'Fn::GetAtt': [ + 'DeduplicationQueueDeadLetterQueueC5D62AD6', + 'Arn' + ] + }, + 'maxReceiveCount': 10 + }, + 'VisibilityTimeout': 120 + } + ) template.has_resource_properties( 'AWS::EC2::SecurityGroupIngress', { @@ -427,7 +444,7 @@ def test_constructs_indexer_initialize_indexer( ) template.resource_count_is( 'AWS::CloudWatch::Alarm', - 18 + 20 ) cpu_scaling_resources = template.find_resources( 'AWS::ApplicationAutoScaling::ScalingPolicy', @@ -473,6 +490,7 @@ def test_constructs_indexer_indexer_match_snapshot( application_load_balanced_fargate_service, transaction_queue, invalidation_queue, + deduplication_queue, opensearch_multiplexer, snapshot, ): @@ -490,6 +508,7 @@ def test_constructs_indexer_indexer_match_snapshot( cluster=application_load_balanced_fargate_service.cluster, transaction_queue=transaction_queue, invalidation_queue=invalidation_queue, + deduplication_queue=deduplication_queue, opensearch_multiplexer=opensearch_multiplexer, use_opensearch_named='Opensearch', backend_url='some-url.test', diff --git a/cdk/tests/unit/test_constructs_queue.py b/cdk/tests/unit/test_constructs_queue.py index fa1c835c14..242e108f63 100644 --- a/cdk/tests/unit/test_constructs_queue.py +++ b/cdk/tests/unit/test_constructs_queue.py @@ -84,3 +84,25 @@ def test_constructs_queue_initialize_invalidation_queue(stack, existing_resource 'AWS::CloudWatch::Alarm', 2 ) + + +def test_constructs_queue_initialize_deduplication_queue(stack, existing_resources): + from infrastructure.constructs.queue import QueueProps + from infrastructure.constructs.queue import DeduplicationQueue + deduplication_queue = DeduplicationQueue( + stack, + 'DeduplicationQueue', + props=QueueProps( + existing_resources=existing_resources, + ), + ) + assert isinstance(deduplication_queue, DeduplicationQueue) + template = Template.from_stack(stack) + template.resource_count_is( + 'AWS::SQS::Queue', + 2 + ) + template.resource_count_is( + 'AWS::CloudWatch::Alarm', + 2 + ) diff --git a/cdk/tests/unit/test_constructs_tasks_deduplicatequeue.py b/cdk/tests/unit/test_constructs_tasks_deduplicatequeue.py index 44466d8f65..67e72ec047 100644 --- a/cdk/tests/unit/test_constructs_tasks_deduplicatequeue.py +++ b/cdk/tests/unit/test_constructs_tasks_deduplicatequeue.py @@ -11,6 +11,7 @@ def test_constructs_tasks_deduplicatequeue( existing_resources, application_load_balanced_fargate_service, invalidation_queue, + deduplication_queue, snapshot, ): from infrastructure.constructs.tasks.deduplicatequeue import DeduplicateInvalidationQueue @@ -23,6 +24,7 @@ def test_constructs_tasks_deduplicatequeue( existing_resources=existing_resources, cluster=application_load_balanced_fargate_service.cluster, invalidation_queue=invalidation_queue, + deduplication_queue=deduplication_queue, number_of_workers=50, minutes_to_wait_between_runs=25, cpu=512, diff --git a/cdk/tests/unit/test_stacks_backend.py b/cdk/tests/unit/test_stacks_backend.py index 58d0dc07ef..695478057e 100644 --- a/cdk/tests/unit/test_stacks_backend.py +++ b/cdk/tests/unit/test_stacks_backend.py @@ -154,7 +154,7 @@ def test_stacks_backend_initialize_backend_stack(config): ) template.resource_count_is( 'AWS::SQS::Queue', - 4 + 6 ) template.resource_count_is( 'AWS::ECS::Service', diff --git a/docker-compose.test-indexer.yml b/docker-compose.test-indexer.yml index b5c92d3bec..f2026b1c36 100644 --- a/docker-compose.test-indexer.yml +++ b/docker-compose.test-indexer.yml @@ -53,8 +53,10 @@ services: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - TRANSACTION_QUEUE_URL=http://localstack:4566/000000000000/transaction-queue - INVALIDATION_QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue + - DEDUPLICATION_QUEUE_URL=http://localhost:4566/000000000000/deduplication-queue - TRANSACTION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/transaction-dead-letter-queue - INVALIDATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/invalidation-dead-letter-queue + - DEDUPLICATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/deduplication-dead-letter-queue volumes: - ".:/igvfd" - "/igvfd/src/igvfd.egg-info" @@ -130,6 +132,7 @@ services: environment: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue + - STORAGE_QUEUE_URL=http://localhost:4566/000000000000/deduplication-queue - NUM_WORKERS=20 - AWS_ACCESS_KEY_ID=testing - AWS_SECRET_ACCESS_KEY=testing @@ -147,8 +150,10 @@ services: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - TRANSACTION_QUEUE_URL=http://localstack:4566/000000000000/transaction-queue - INVALIDATION_QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue + - DEDUPLICATION_QUEUE_URL=http://localhost:4566/000000000000/deduplication-queue - TRANSACTION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/transaction-dead-letter-queue - INVALIDATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/invalidation-dead-letter-queue + - DEDUPLICATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/deduplication-dead-letter-queue volumes: - ".:/igvfd" - "/igvfd/src/igvfd.egg-info" diff --git a/docker-compose.test.yml b/docker-compose.test.yml index e3d69e14d6..36b1624829 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -31,8 +31,10 @@ services: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - TRANSACTION_QUEUE_URL=http://localstack:4566/000000000000/transaction-queue - INVALIDATION_QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue + - DEDUPLICATION_QUEUE_URL=http://localhost:4566/000000000000/deduplication-queue - TRANSACTION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/transaction-dead-letter-queue - INVALIDATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/invalidation-dead-letter-queue + - DEDUPLICATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/deduplication-dead-letter-queue volumes: - ".:/igvfd" - "/igvfd/src/igvfd.egg-info" diff --git a/docker-compose.yml b/docker-compose.yml index d026ffdd17..43317a69cd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,10 +51,12 @@ services: image: igvfd-pyramid environment: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - - TRANSACTION_QUEUE_URL=http://localstack:4566/000000000000/transaction-queue + - TRANSACTION_QUEUE_URL=http://localhost:4566/000000000000/transaction-queue - INVALIDATION_QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue + - DEDUPLICATION_QUEUE_URL=http://localhost:4566/000000000000/deduplication-queue - TRANSACTION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/transaction-dead-letter-queue - INVALIDATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/invalidation-dead-letter-queue + - DEDUPLICATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/deduplication-dead-letter-queue volumes: - ".:/igvfd" - "/igvfd/src/igvfd.egg-info" @@ -73,10 +75,12 @@ services: image: igvfd-pyramid environment: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - - TRANSACTION_QUEUE_URL=http://localstack:4566/000000000000/transaction-queue + - TRANSACTION_QUEUE_URL=http://localhost:4566/000000000000/transaction-queue - INVALIDATION_QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue + - DEDUPLICATION_QUEUE_URL=http://localhost:4566/000000000000/deduplication-queue - TRANSACTION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/transaction-dead-letter-queue - INVALIDATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/invalidation-dead-letter-queue + - DEDUPLICATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/deduplication-dead-letter-queue volumes: - ".:/igvfd" - "/igvfd/src/igvfd.egg-info" @@ -96,10 +100,12 @@ services: image: igvfd-pyramid environment: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - - TRANSACTION_QUEUE_URL=http://localstack:4566/000000000000/transaction-queue + - TRANSACTION_QUEUE_URL=http://localhost:4566/000000000000/transaction-queue - INVALIDATION_QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue + - DEDUPLICATION_QUEUE_URL=http://localhost:4566/000000000000/deduplication-queue - TRANSACTION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/transaction-dead-letter-queue - INVALIDATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/invalidation-dead-letter-queue + - DEDUPLICATION_DEAD_LETTER_QUEUE_URL=http://localhost:4566/000000000000/deduplication-dead-letter-queue volumes: - ".:/igvfd" - "/igvfd/src/igvfd.egg-info" @@ -131,7 +137,7 @@ services: environment: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - OPENSEARCH_URL=http://opensearch:9200 - - TRANSACTION_QUEUE_URL=http://localstack:4566/000000000000/transaction-queue + - TRANSACTION_QUEUE_URL=http://localhost:4566/000000000000/transaction-queue - INVALIDATION_QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue - RESOURCES_INDEX=snovault-resources volumes: @@ -167,6 +173,7 @@ services: environment: - LOCALSTACK_ENDPOINT_URL=http://localstack:4566 - QUEUE_URL=http://localhost:4566/000000000000/invalidation-queue + - STORAGE_QUEUE_URL=http://localhost:4566/000000000000/deduplication-queue - NUM_WORKERS=20 - AWS_ACCESS_KEY_ID=testing - AWS_SECRET_ACCESS_KEY=testing diff --git a/docker/dedup/Dockerfile b/docker/dedup/Dockerfile index f2f0769b46..36eca12c5c 100644 --- a/docker/dedup/Dockerfile +++ b/docker/dedup/Dockerfile @@ -1,6 +1,6 @@ FROM public.ecr.aws/docker/library/golang:1.21.5-bookworm AS builder -ARG VERSION=v1.1.0 +ARG VERSION=v2.0.0 WORKDIR /build diff --git a/docker/dedup/entrypoint.sh b/docker/dedup/entrypoint.sh index 311cb8cb40..5fc44481d2 100644 --- a/docker/dedup/entrypoint.sh +++ b/docker/dedup/entrypoint.sh @@ -1,2 +1,3 @@ -#!/bin/sh +#!/bin/bash + exec "$@" diff --git a/docker/dedup/run-forever.sh b/docker/dedup/run-forever.sh index 35f78032ea..f17da4df2d 100644 --- a/docker/dedup/run-forever.sh +++ b/docker/dedup/run-forever.sh @@ -1,4 +1,4 @@ #!/bin/bash echo "Starting deduplicator" sleep 60 -./dedup -numWorkers=${NUM_WORKERS} -queueURL=${QUEUE_URL} -runForever -secondsToSleepBetweenRuns=600 +./dedup -numWorkers=${NUM_WORKERS} -queueURL=${QUEUE_URL} -storageQueueURL=${STORAGE_QUEUE_URL} -runForever -secondsToSleepBetweenRuns=600 diff --git a/docker/dedup/run-once.sh b/docker/dedup/run-once.sh index a3fa0e90f2..1f26faeea1 100644 --- a/docker/dedup/run-once.sh +++ b/docker/dedup/run-once.sh @@ -1,3 +1,3 @@ #!/bin/bash echo "Running once" -./dedup -numWorkers=${NUM_WORKERS} -queueURL=${QUEUE_URL} +./dedup -numWorkers=${NUM_WORKERS} -queueURL=${QUEUE_URL} -storageQueueURL=${STORAGE_QUEUE_URL} diff --git a/docker/localstack/create_localstack.sh b/docker/localstack/create_localstack.sh index 187f02e61e..1b3cbd74b9 100644 --- a/docker/localstack/create_localstack.sh +++ b/docker/localstack/create_localstack.sh @@ -12,7 +12,12 @@ INVALIDATION_DLQ_ARN=$(awslocal sqs get-queue-attributes --queue-url $INVALIDATI echo $INVALIDATION_DLQ_URL -echo 'Creating SQS' +DEDUPLICATION_DLQ_URL=$(awslocal sqs create-queue --queue-name 'deduplication-dead-letter-queue' --query 'QueueUrl' --output text) +DEDUPLICATION_DLQ_ARN=$(awslocal sqs get-queue-attributes --queue-url $DEDUPLICATION_DLQ_URL --attribute-names QueueArn --query 'Attributes.QueueArn' --output text) + +echo $DEDUPLICATION_DLQ_URL +echo 'Creating SQS' awslocal sqs create-queue --queue-name transaction-queue --attributes '{"RedrivePolicy": "{\"deadLetterTargetArn\":\"'$TRANSACTION_DLQ_ARN'\",\"maxReceiveCount\":\"3\"}", "VisibilityTimeout": "60"}' awslocal sqs create-queue --queue-name invalidation-queue --attributes '{"RedrivePolicy": "{\"deadLetterTargetArn\":\"'$INVALIDATION_DLQ_ARN'\",\"maxReceiveCount\":\"3\"}", "VisibilityTimeout": "120"}' +awslocal sqs create-queue --queue-name deduplication-queue --attributes '{"RedrivePolicy": "{\"deadLetterTargetArn\":\"'$DEDUPLICATION_DLQ_ARN'\",\"maxReceiveCount\":\"3\"}", "VisibilityTimeout": "600"}' diff --git a/setup.cfg b/setup.cfg index 434c4e34c1..c51bd9a626 100644 --- a/setup.cfg +++ b/setup.cfg @@ -52,7 +52,7 @@ install_requires = xlrd==1.2.0 zope.interface==5.3.0 zope.sqlalchemy==1.2 - snovault@git+https://github.com/IGVF-DACC/snovault.git@IGVF-2102-dataset-summary-agg + snovault@git+https://github.com/IGVF-DACC/snovault.git@add-memory-queue [options.extras_require] test = diff --git a/src/igvfd/__init__.py b/src/igvfd/__init__.py index 5587a81920..6a1220be21 100644 --- a/src/igvfd/__init__.py +++ b/src/igvfd/__init__.py @@ -158,6 +158,11 @@ def configure_invalidation_queue(config): configure_invalidation_queue(config) +def configure_deduplication_queue(config): + from snovault.app import configure_deduplication_queue + configure_deduplication_queue(config) + + def configure_transaction_dead_letter_queue(config): from snovault.app import configure_transaction_dead_letter_queue configure_transaction_dead_letter_queue(config) @@ -168,6 +173,11 @@ def configure_invalidation_dead_letter_queue(config): configure_invalidation_dead_letter_queue(config) +def configure_deduplication_dead_letter_queue(config): + from snovault.app import configure_deduplication_dead_letter_queue + configure_deduplication_dead_letter_queue(config) + + def session(config): """ To create a session secret on the server: $ cat /dev/urandom | head -c 256 | base64 > session-secret.b64 @@ -235,8 +245,10 @@ def main(global_config, **local_config): config.include(configure_sqs_client) config.include(configure_transaction_queue) config.include(configure_invalidation_queue) + config.include(configure_deduplication_queue) config.include(configure_transaction_dead_letter_queue) config.include(configure_invalidation_dead_letter_queue) + config.include(configure_deduplication_dead_letter_queue) config.include('snovault') config.commit() # commit so search can override listing