Skip to content

Commit

Permalink
IGVF-2241-add-deduplication-queue (#1249)
Browse files Browse the repository at this point in the history
  • Loading branch information
keenangraham authored Dec 17, 2024
1 parent 97e59c6 commit b8f8589
Show file tree
Hide file tree
Showing 23 changed files with 508 additions and 19 deletions.
16 changes: 16 additions & 0 deletions cdk/infrastructure/constructs/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from infrastructure.constructs.queue import InvalidationQueue
from infrastructure.constructs.queue import TransactionQueue
from infrastructure.constructs.queue import DeduplicationQueue

from infrastructure.constructs.flag import FeatureFlagService

Expand Down Expand Up @@ -72,6 +73,7 @@ class BackendProps:
opensearch_multiplexer: Multiplexer
transaction_queue: TransactionQueue
invalidation_queue: InvalidationQueue
deduplication_queue: DeduplicationQueue
feature_flag_service: FeatureFlagService
ini_name: str
cpu: int
Expand Down Expand Up @@ -121,8 +123,10 @@ def __init__(
self._allow_task_to_put_events_on_bus()
self._allow_task_to_send_messages_to_transaction_queue()
self._allow_task_to_send_messages_to_invalidation_queue()
self._allow_task_to_send_messages_to_deduplication_queue()
self._allow_task_to_send_messages_to_transaction_dead_letter_queue()
self._allow_task_to_send_messages_to_invalidation_dead_letter_queue()
self._allow_task_to_send_messages_to_deduplication_dead_letter_queue()
self._allow_task_to_download_from_files_buckets()
self._allow_task_to_upload_to_files_buckets()
self._allow_task_to_read_upload_files_user_access_keys_secret()
Expand Down Expand Up @@ -258,8 +262,10 @@ def _add_application_container_to_task(self) -> None:
'OPENSEARCH_FOR_WRITING_URL': self.opensearch_for_writing.url,
'TRANSACTION_QUEUE_URL': self.props.transaction_queue.queue.queue_url,
'INVALIDATION_QUEUE_URL': self.props.invalidation_queue.queue.queue_url,
'DEDUPLICATION_QUEUE_URL': self.props.deduplication_queue.queue.queue_url,
'TRANSACTION_DEAD_LETTER_QUEUE_URL': self.props.transaction_queue.dead_letter_queue.queue_url,
'INVALIDATION_DEAD_LETTER_QUEUE_URL': self.props.invalidation_queue.dead_letter_queue.queue_url,
'DEDUPLICATION_DEAD_LETTER_QUEUE_URL': self.props.deduplication_queue.dead_letter_queue.queue_url,
'UPLOAD_USER_ACCESS_KEYS_SECRET_ARN': self.props.existing_resources.upload_igvf_files_user_access_keys.secret.secret_arn,
'RESTRICTED_UPLOAD_USER_ACCESS_KEYS_SECRET_ARN': self.props.existing_resources.upload_igvf_restricted_files_user_access_keys.secret.secret_arn,
'APPCONFIG_APPLICATION': self.props.feature_flag_service.application.name,
Expand Down Expand Up @@ -309,6 +315,11 @@ def _allow_task_to_send_messages_to_invalidation_queue(self) -> None:
self.fargate_service.task_definition.task_role
)

def _allow_task_to_send_messages_to_deduplication_queue(self) -> None:
self.props.deduplication_queue.queue.grant_send_messages(
self.fargate_service.task_definition.task_role
)

def _allow_task_to_send_messages_to_transaction_dead_letter_queue(self) -> None:
self.props.transaction_queue.dead_letter_queue.grant_send_messages(
self.fargate_service.task_definition.task_role
Expand All @@ -319,6 +330,11 @@ def _allow_task_to_send_messages_to_invalidation_dead_letter_queue(self) -> None
self.fargate_service.task_definition.task_role
)

def _allow_task_to_send_messages_to_deduplication_dead_letter_queue(self) -> None:
self.props.deduplication_queue.dead_letter_queue.grant_send_messages(
self.fargate_service.task_definition.task_role
)

def _allow_task_to_download_from_files_buckets(self) -> None:
self.fargate_service.task_definition.task_role.add_managed_policy(
self.props.existing_resources.bucket_access_policies.download_igvf_files_policy
Expand Down
5 changes: 4 additions & 1 deletion cdk/infrastructure/constructs/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from infrastructure.constructs.queue import TransactionQueue
from infrastructure.constructs.queue import InvalidationQueue
from infrastructure.constructs.queue import DeduplicationQueue

from infrastructure.constructs.tasks.deduplicatequeue import DeduplicateInvalidationQueue
from infrastructure.constructs.tasks.deduplicatequeue import DeduplicateInvalidationQueueProps
Expand Down Expand Up @@ -98,6 +99,7 @@ class IndexerProps:
cluster: ICluster
transaction_queue: TransactionQueue
invalidation_queue: InvalidationQueue
deduplication_queue: DeduplicationQueue
opensearch_multiplexer: Multiplexer
use_opensearch_named: str
backend_url: str
Expand Down Expand Up @@ -293,7 +295,8 @@ def _run_deduplicate_invalidation_queue_automatically(self) -> None:
existing_resources=self.props.existing_resources,
cluster=self.props.cluster,
invalidation_queue=self.props.invalidation_queue,
number_of_workers=100,
deduplication_queue=self.props.deduplication_queue,
number_of_workers=120,
minutes_to_wait_between_runs=60,
cpu=1024,
memory_limit_mib=2048,
Expand Down
30 changes: 30 additions & 0 deletions cdk/infrastructure/constructs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,33 @@ def _add_alarms(self) -> None:
oldest_message_in_seconds_threshold=86400,
),
)


class DeduplicationQueue(QueueBase):

def __init__(
self,
scope: Construct,
construct_id: str,
*,
props: QueueProps,
**kwargs: Any
) -> None:
super().__init__(
scope,
construct_id,
props=props,
**kwargs,
)

def _add_alarms(self) -> None:
QueueAlarms(
self,
'DeduplicationQueueAlarms',
props=QueueAlarmsProps(
existing_resources=self.props.existing_resources,
queue=self.queue,
dead_letter_queue=self.dead_letter_queue,
oldest_message_in_seconds_threshold=86400,
),
)
19 changes: 17 additions & 2 deletions cdk/infrastructure/constructs/tasks/deduplicatequeue.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from infrastructure.constructs.existing.types import ExistingResources

from infrastructure.constructs.queue import InvalidationQueue
from infrastructure.constructs.queue import DeduplicationQueue

from typing import Any

Expand All @@ -39,6 +40,7 @@ class DeduplicateInvalidationQueueProps:
existing_resources: ExistingResources
cluster: ICluster
invalidation_queue: InvalidationQueue
deduplication_queue: DeduplicationQueue
number_of_workers: int
minutes_to_wait_between_runs: int
cpu: int
Expand Down Expand Up @@ -68,7 +70,8 @@ def __init__(
self._define_docker_asset()
self._define_log_driver_for_application_container()
self._define_scheduled_fargate_task()
self._allow_task_to_consume_messages_from_invalidation_queue()
self._allow_task_to_read_write_from_invalidation_queue()
self._allow_task_to_read_write_from_deduplication_queue()
self._define_manual_event_rule()

def _define_event_source(self) -> None:
Expand Down Expand Up @@ -99,6 +102,7 @@ def _define_scheduled_fargate_task(self) -> None:
image=self.application_image,
environment={
'QUEUE_URL': self.props.invalidation_queue.queue.queue_url,
'STORAGE_QUEUE_URL': self.props.deduplication_queue.queue.queue_url,
'NUM_WORKERS': f'{self.props.number_of_workers}',
},
log_driver=self.application_log_driver,
Expand All @@ -116,10 +120,21 @@ def _define_scheduled_fargate_task(self) -> None:
),
)

def _allow_task_to_consume_messages_from_invalidation_queue(self) -> None:
def _allow_task_to_read_write_from_invalidation_queue(self) -> None:
self.props.invalidation_queue.queue.grant_consume_messages(
self.scheduled_fargate_task.task_definition.task_role
)
self.props.invalidation_queue.queue.grant_send_messages(
self.scheduled_fargate_task.task_definition.task_role
)

def _allow_task_to_read_write_from_deduplication_queue(self) -> None:
self.props.deduplication_queue.queue.grant_consume_messages(
self.scheduled_fargate_task.task_definition.task_role
)
self.props.deduplication_queue.queue.grant_send_messages(
self.scheduled_fargate_task.task_definition.task_role
)

def _define_manual_event_rule(self) -> None:
self.event_rule = Rule(
Expand Down
10 changes: 10 additions & 0 deletions cdk/infrastructure/stacks/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from infrastructure.constructs.queue import QueueProps
from infrastructure.constructs.queue import TransactionQueue
from infrastructure.constructs.queue import InvalidationQueue
from infrastructure.constructs.queue import DeduplicationQueue

from infrastructure.constructs.flag import FeatureFlagServiceProps
from infrastructure.constructs.flag import FeatureFlagService
Expand Down Expand Up @@ -60,6 +61,13 @@ def __init__(
existing_resources=self.existing_resources,
),
)
self.deduplication_queue = DeduplicationQueue(
self,
'DeduplicationQueue',
props=QueueProps(
existing_resources=self.existing_resources,
),
)
self.feature_flag_service = FeatureFlagService(
self,
'FeatureFlags',
Expand All @@ -79,6 +87,7 @@ def __init__(
opensearch_multiplexer=opensearch_multiplexer,
transaction_queue=self.transaction_queue,
invalidation_queue=self.invalidation_queue,
deduplication_queue=self.deduplication_queue,
feature_flag_service=self.feature_flag_service,
)
)
Expand All @@ -91,6 +100,7 @@ def __init__(
cluster=self.backend.fargate_service.cluster,
transaction_queue=self.transaction_queue,
invalidation_queue=self.invalidation_queue,
deduplication_queue=self.deduplication_queue,
opensearch_multiplexer=opensearch_multiplexer,
use_opensearch_named=self.backend.props.write_to_opensearch_named,
backend_url=f'https://{self.backend.domain_name}',
Expand Down
13 changes: 13 additions & 0 deletions cdk/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,19 @@ def invalidation_queue(stack, existing_resources):
)


@pytest.fixture
def deduplication_queue(stack, existing_resources):
from infrastructure.constructs.queue import QueueProps
from infrastructure.constructs.queue import DeduplicationQueue
return DeduplicationQueue(
stack,
'DeduplicationQueue',
props=QueueProps(
existing_resources=existing_resources,
),
)


@pytest.fixture
def feature_flag_service(stack, config):
from infrastructure.constructs.flag import FeatureFlagServiceProps
Expand Down
Loading

0 comments on commit b8f8589

Please sign in to comment.