diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index 4db626e2a88f1..87dec7e866ee2 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -260,14 +260,54 @@ def _validate_input(self, target: Target): raise ValueError("BatchParameters.JobName is required for Batch target") -class ContainerTargetSender(TargetSender): +class ECSTargetSender(TargetSender): def send_event(self, event): - raise NotImplementedError("ECS target is not yet implemented") + ecs_parameters = self.target.get("EcsParameters", {}) + task_definition_arn = ecs_parameters.get("TaskDefinitionArn") + + # Extract network configuration if it exists + vpc_configuration = ecs_parameters.get("NetworkConfiguration", {}).get( + "awsvpcConfiguration", {} + ) + aws_vpc_configuration = { + "subnets": vpc_configuration.get("Subnets"), + "securityGroups": vpc_configuration.get("SecurityGroups"), + "assignPublicIp": vpc_configuration.get("AssignPublicIp"), + } + + kwargs = { + "launchType": ecs_parameters.get("LaunchType"), + "networkConfiguration": {"awsvpcConfiguration": aws_vpc_configuration} + if aws_vpc_configuration + else None, + "count": ecs_parameters.get("TaskCount"), + "platformVersion": ecs_parameters.get("PlatformVersion"), + "group": ecs_parameters.get("Group"), + "capacityProviderStrategy": ecs_parameters.get("CapacityProviderStrategy"), + "enableECSManagedTags": ecs_parameters.get("EnableECSManagedTags"), + "enableExecuteCommand": ecs_parameters.get("EnableExecuteCommand"), + "propagateTags": ecs_parameters.get("PropagateTags"), + "referenceId": ecs_parameters.get("ReferenceId"), + "tags": ecs_parameters.get("Tags"), + } + + # Remove any keys with a value of None + kwargs = {k: v for k, v in kwargs.items() if v is not None} + + self.client.run_task(taskDefinition=task_definition_arn, cluster=self.arn, **kwargs) def _validate_input(self, target: Target): super()._validate_input(target) if not collections.get_safe(target, "$.EcsParameters.TaskDefinitionArn"): raise ValueError("EcsParameters.TaskDefinitionArn is required for ECS target") + ecs_parameters = target.get("EcsParameters", {}) + if ecs_parameters.get("LaunchType", {}) == "FARGATE": + if not ecs_parameters.get("NetworkConfiguration", {}): + raise ValueError("NetworkConfiguration is required for FARGATE LaunchType") + if not ecs_parameters.get("NetworkConfiguration", {}).get("awsvpcConfiguration", {}): + raise ValueError("awsvpcConfiguration is required for FARGATE LaunchType") + if ecs_parameters.get("CapacityProviderStrategy") and ecs_parameters.get("LaunchType"): + raise ValueError("only LaunchType or CapacityProviderStrategy can be provided") class EventsTargetSender(TargetSender): @@ -441,7 +481,7 @@ class TargetSenderFactory: "apigateway": ApiGatewayTargetSender, "appsync": AppSyncTargetSender, "batch": BatchTargetSender, - "ecs": ContainerTargetSender, + "ecs": ECSTargetSender, "events": EventsTargetSender, "firehose": FirehoseTargetSender, "kinesis": KinesisTargetSender, diff --git a/localstack-core/localstack/testing/pytest/fixtures.py b/localstack-core/localstack/testing/pytest/fixtures.py index d2c2adb794bfd..5bc66b0fb325b 100644 --- a/localstack-core/localstack/testing/pytest/fixtures.py +++ b/localstack-core/localstack/testing/pytest/fixtures.py @@ -1715,40 +1715,6 @@ def _create_delivery_stream(**kwargs): LOG.info("Failed to delete delivery stream %s", delivery_stream_name) -@pytest.fixture -def events_create_rule(aws_client): - rules = [] - - def _create_rule(**kwargs): - rule_name = kwargs["Name"] - bus_name = kwargs.get("EventBusName", "") - pattern = kwargs.get("EventPattern", {}) - schedule = kwargs.get("ScheduleExpression", "") - rule_arn = aws_client.events.put_rule( - Name=rule_name, - EventBusName=bus_name, - EventPattern=json.dumps(pattern), - ScheduleExpression=schedule, - )["RuleArn"] - rules.append({"name": rule_name, "bus": bus_name}) - return rule_arn - - yield _create_rule - - for rule in rules: - targets = aws_client.events.list_targets_by_rule( - Rule=rule["name"], EventBusName=rule["bus"] - )["Targets"] - - targetIds = [target["Id"] for target in targets] - if len(targetIds) > 0: - aws_client.events.remove_targets( - Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds - ) - - aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"]) - - @pytest.fixture def ses_configuration_set(aws_client): configuration_set_names = [] @@ -2220,6 +2186,171 @@ def factory(**kwargs): aws_client.route53.delete_hosted_zone(Id=zone_id) +############################### +# Events (EventBridge) fixtures +############################### + + +@pytest.fixture +def events_create_event_bus(aws_client, region_name, account_id): + event_bus_names = [] + + def _create_event_bus(**kwargs): + if "Name" not in kwargs: + kwargs["Name"] = f"test-event-bus-{short_uid()}" + + response = aws_client.events.create_event_bus(**kwargs) + event_bus_names.append(kwargs["Name"]) + return response + + yield _create_event_bus + + for event_bus_name in event_bus_names: + try: + response = aws_client.events.list_rules(EventBusName=event_bus_name) + rules = [rule["Name"] for rule in response["Rules"]] + + # Delete all rules for the current event bus + for rule in rules: + try: + response = aws_client.events.list_targets_by_rule( + Rule=rule, EventBusName=event_bus_name + ) + targets = [target["Id"] for target in response["Targets"]] + + # Remove all targets for the current rule + if targets: + for target in targets: + aws_client.events.remove_targets( + Rule=rule, EventBusName=event_bus_name, Ids=[target] + ) + + aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name) + except Exception as e: + LOG.warning(f"Failed to delete rule {rule}: {e}") + + # Delete archives for event bus + event_source_arn = ( + f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}" + ) + response = aws_client.events.list_archives(EventSourceArn=event_source_arn) + archives = [archive["ArchiveName"] for archive in response["Archives"]] + for archive in archives: + try: + aws_client.events.delete_archive(ArchiveName=archive) + except Exception as e: + LOG.warning(f"Failed to delete archive {archive}: {e}") + + aws_client.events.delete_event_bus(Name=event_bus_name) + except Exception as e: + LOG.warning(f"Failed to delete event bus {event_bus_name}: {e}") + + +@pytest.fixture +def events_put_rule(aws_client): + rules = [] + + def _put_rule(**kwargs): + if "Name" not in kwargs: + kwargs["Name"] = f"rule-{short_uid()}" + + response = aws_client.events.put_rule(**kwargs) + rules.append((kwargs["Name"], kwargs.get("EventBusName", "default"))) + return response + + yield _put_rule + + for rule, event_bus_name in rules: + try: + response = aws_client.events.list_targets_by_rule( + Rule=rule, EventBusName=event_bus_name + ) + targets = [target["Id"] for target in response["Targets"]] + + # Remove all targets for the current rule + if targets: + for target in targets: + aws_client.events.remove_targets( + Rule=rule, EventBusName=event_bus_name, Ids=[target] + ) + + aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name) + except Exception as e: + LOG.warning(f"Failed to delete rule {rule}: {e}") + + +@pytest.fixture +def events_create_rule(aws_client): + rules = [] + + def _create_rule(**kwargs): + rule_name = kwargs["Name"] + bus_name = kwargs.get("EventBusName", "") + pattern = kwargs.get("EventPattern", {}) + schedule = kwargs.get("ScheduleExpression", "") + rule_arn = aws_client.events.put_rule( + Name=rule_name, + EventBusName=bus_name, + EventPattern=json.dumps(pattern), + ScheduleExpression=schedule, + )["RuleArn"] + rules.append({"name": rule_name, "bus": bus_name}) + return rule_arn + + yield _create_rule + + for rule in rules: + targets = aws_client.events.list_targets_by_rule( + Rule=rule["name"], EventBusName=rule["bus"] + )["Targets"] + + targetIds = [target["Id"] for target in targets] + if len(targetIds) > 0: + aws_client.events.remove_targets( + Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds + ) + + aws_client.events.delete_rule(Name=rule["name"], EventBusName=rule["bus"]) + + +@pytest.fixture +def sqs_as_events_target(aws_client, sqs_get_queue_arn): + queue_urls = [] + + def _sqs_as_events_target(queue_name: str | None = None) -> tuple[str, str]: + if not queue_name: + queue_name = f"tests-queue-{short_uid()}" + sqs_client = aws_client.sqs + queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"] + queue_urls.append(queue_url) + queue_arn = sqs_get_queue_arn(queue_url) + policy = { + "Version": "2012-10-17", + "Id": f"sqs-eventbridge-{short_uid()}", + "Statement": [ + { + "Sid": f"SendMessage-{short_uid()}", + "Effect": "Allow", + "Principal": {"Service": "events.amazonaws.com"}, + "Action": "sqs:SendMessage", + "Resource": queue_arn, + } + ], + } + sqs_client.set_queue_attributes( + QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)} + ) + return queue_url, queue_arn + + yield _sqs_as_events_target + + for queue_url in queue_urls: + try: + aws_client.sqs.delete_queue(QueueUrl=queue_url) + except Exception as e: + LOG.debug("error cleaning up queue %s: %s", queue_url, e) + + @pytest.fixture def clean_up( aws_client, diff --git a/tests/aws/services/events/conftest.py b/tests/aws/services/events/conftest.py index 8c8423c9d64c7..04100d3928b0b 100644 --- a/tests/aws/services/events/conftest.py +++ b/tests/aws/services/events/conftest.py @@ -11,60 +11,7 @@ LOG = logging.getLogger(__name__) - -@pytest.fixture -def events_create_event_bus(aws_client, region_name, account_id): - event_bus_names = [] - - def _create_event_bus(**kwargs): - if "Name" not in kwargs: - kwargs["Name"] = f"test-event-bus-{short_uid()}" - - response = aws_client.events.create_event_bus(**kwargs) - event_bus_names.append(kwargs["Name"]) - return response - - yield _create_event_bus - - for event_bus_name in event_bus_names: - try: - response = aws_client.events.list_rules(EventBusName=event_bus_name) - rules = [rule["Name"] for rule in response["Rules"]] - - # Delete all rules for the current event bus - for rule in rules: - try: - response = aws_client.events.list_targets_by_rule( - Rule=rule, EventBusName=event_bus_name - ) - targets = [target["Id"] for target in response["Targets"]] - - # Remove all targets for the current rule - if targets: - for target in targets: - aws_client.events.remove_targets( - Rule=rule, EventBusName=event_bus_name, Ids=[target] - ) - - aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name) - except Exception as e: - LOG.warning(f"Failed to delete rule {rule}: {e}") - - # Delete archives for event bus - event_source_arn = ( - f"arn:aws:events:{region_name}:{account_id}:event-bus/{event_bus_name}" - ) - response = aws_client.events.list_archives(EventSourceArn=event_source_arn) - archives = [archive["ArchiveName"] for archive in response["Archives"]] - for archive in archives: - try: - aws_client.events.delete_archive(ArchiveName=archive) - except Exception as e: - LOG.warning(f"Failed to delete archive {archive}: {e}") - - aws_client.events.delete_event_bus(Name=event_bus_name) - except Exception as e: - LOG.warning(f"Failed to delete event bus {event_bus_name}: {e}") +# some fixtures are shared in localstack/testing/pytest/fixtures.py @pytest.fixture @@ -118,39 +65,6 @@ def _create_role_event_bus_to_bus(): yield _create_role_event_bus_to_bus -@pytest.fixture -def events_put_rule(aws_client): - rules = [] - - def _put_rule(**kwargs): - if "Name" not in kwargs: - kwargs["Name"] = f"rule-{short_uid()}" - - response = aws_client.events.put_rule(**kwargs) - rules.append((kwargs["Name"], kwargs.get("EventBusName", "default"))) - return response - - yield _put_rule - - for rule, event_bus_name in rules: - try: - response = aws_client.events.list_targets_by_rule( - Rule=rule, EventBusName=event_bus_name - ) - targets = [target["Id"] for target in response["Targets"]] - - # Remove all targets for the current rule - if targets: - for target in targets: - aws_client.events.remove_targets( - Rule=rule, EventBusName=event_bus_name, Ids=[target] - ) - - aws_client.events.delete_rule(Name=rule, EventBusName=event_bus_name) - except Exception as e: - LOG.warning(f"Failed to delete rule {rule}: {e}") - - @pytest.fixture def events_create_archive(aws_client, region_name, account_id): archives = [] @@ -240,44 +154,6 @@ def wait_for_archive_event_count(): yield _put_event_to_archive -@pytest.fixture -def create_sqs_events_target(aws_client, sqs_get_queue_arn): - queue_urls = [] - - def _create_sqs_events_target(queue_name: str | None = None) -> tuple[str, str]: - if not queue_name: - queue_name = f"tests-queue-{short_uid()}" - sqs_client = aws_client.sqs - queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"] - queue_urls.append(queue_url) - queue_arn = sqs_get_queue_arn(queue_url) - policy = { - "Version": "2012-10-17", - "Id": f"sqs-eventbridge-{short_uid()}", - "Statement": [ - { - "Sid": f"SendMessage-{short_uid()}", - "Effect": "Allow", - "Principal": {"Service": "events.amazonaws.com"}, - "Action": "sqs:SendMessage", - "Resource": queue_arn, - } - ], - } - sqs_client.set_queue_attributes( - QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)} - ) - return queue_url, queue_arn - - yield _create_sqs_events_target - - for queue_url in queue_urls: - try: - aws_client.sqs.delete_queue(QueueUrl=queue_url) - except Exception as e: - LOG.debug("error cleaning up queue %s: %s", queue_url, e) - - @pytest.fixture def events_allow_event_rule_to_sqs_queue(aws_client): def _allow_event_rule(sqs_queue_url, sqs_queue_arn, event_rule_arn) -> None: @@ -307,7 +183,7 @@ def _allow_event_rule(sqs_queue_url, sqs_queue_arn, event_rule_arn) -> None: @pytest.fixture def put_events_with_filter_to_sqs( - aws_client, events_create_event_bus, events_put_rule, create_sqs_events_target + aws_client, events_create_event_bus, events_put_rule, sqs_as_events_target ): def _put_events_with_filter_to_sqs( pattern: dict, @@ -322,7 +198,7 @@ def _put_events_with_filter_to_sqs( event_bus_name = f"test-bus-{short_uid()}" events_create_event_bus(Name=event_bus_name) - queue_url, queue_arn = create_sqs_events_target() + queue_url, queue_arn = sqs_as_events_target() events_put_rule( Name=rule_name, diff --git a/tests/aws/services/events/test_archive_and_replay.py b/tests/aws/services/events/test_archive_and_replay.py index 9526f31233528..6d6bcfcf4ff6c 100644 --- a/tests/aws/services/events/test_archive_and_replay.py +++ b/tests/aws/services/events/test_archive_and_replay.py @@ -369,7 +369,7 @@ def test_start_list_describe_canceled_replay( event_bus_type, events_create_default_or_custom_event_bus, events_put_rule, - create_sqs_events_target, + sqs_as_events_target, put_event_to_archive, aws_client, snapshot, @@ -390,7 +390,7 @@ def test_start_list_describe_canceled_replay( rule_arn = response["RuleArn"] # setup sqs target - queue_url, queue_arn = create_sqs_events_target() + queue_url, queue_arn = sqs_as_events_target() target_id = f"target-{short_uid()}" aws_client.events.put_targets( Rule=rule_name, diff --git a/tests/aws/services/events/test_events.py b/tests/aws/services/events/test_events.py index 471bc39216ed7..71ab822ddf837 100644 --- a/tests/aws/services/events/test_events.py +++ b/tests/aws/services/events/test_events.py @@ -914,7 +914,7 @@ def test_put_events_bus_to_bus( self, strategy, monkeypatch, - create_sqs_events_target, + sqs_as_events_target, events_create_event_bus, events_put_rule, aws_client, @@ -990,7 +990,7 @@ def test_put_events_bus_to_bus( ) # Create sqs target - queue_url, queue_arn = create_sqs_events_target() + queue_url, queue_arn = sqs_as_events_target() # Rule and target bus 2 to sqs rule_name_bus_two = f"rule-{short_uid()}" diff --git a/tests/aws/services/events/test_events_inputs.py b/tests/aws/services/events/test_events_inputs.py index fa4dc82b85d6a..3029b91f78a5d 100644 --- a/tests/aws/services/events/test_events_inputs.py +++ b/tests/aws/services/events/test_events_inputs.py @@ -29,9 +29,9 @@ reason="V1 provider does not support this feature", ) def test_put_event_input_path_and_input_transformer( - create_sqs_events_target, events_create_event_bus, events_put_rule, aws_client, snapshot + sqs_as_events_target, events_create_event_bus, events_put_rule, aws_client, snapshot ): - _, queue_arn = create_sqs_events_target() + _, queue_arn = sqs_as_events_target() bus_name = f"test-bus-{short_uid()}" events_create_event_bus(Name=bus_name) @@ -153,14 +153,14 @@ def test_put_events_with_input_path_max_level_depth( def test_put_events_with_input_path_multiple_targets( self, aws_client, - create_sqs_events_target, + sqs_as_events_target, events_create_event_bus, events_put_rule, snapshot, ): # prepare target queues - queue_url_1, queue_arn_1 = create_sqs_events_target() - queue_url_2, queue_arn_2 = create_sqs_events_target() + queue_url_1, queue_arn_1 = sqs_as_events_target() + queue_url_2, queue_arn_2 = sqs_as_events_target() bus_name = f"test-bus-{short_uid()}" events_create_event_bus(Name=bus_name) @@ -344,13 +344,13 @@ def test_put_events_with_input_transformer_input_template_json( ) def test_put_events_with_input_transformer_missing_keys( self, - create_sqs_events_target, + sqs_as_events_target, events_create_event_bus, events_put_rule, aws_client_factory, snapshot, ): - _, queue_arn = create_sqs_events_target() + _, queue_arn = sqs_as_events_target() bus_name = f"test-bus-{short_uid()}" events_create_event_bus(Name=bus_name) @@ -400,7 +400,7 @@ def test_put_events_with_input_transformer_missing_keys( def test_input_transformer_predefined_variables( self, input_template, - create_sqs_events_target, + sqs_as_events_target, events_create_event_bus, events_put_rule, aws_client, @@ -409,7 +409,7 @@ def test_input_transformer_predefined_variables( # https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-transform-target-input.html#eb-transform-input-predefined # prepare target queues - queue_url, queue_arn = create_sqs_events_target() + queue_url, queue_arn = sqs_as_events_target() bus_name = f"test-bus-{short_uid()}" events_create_event_bus(Name=bus_name) diff --git a/tests/aws/services/events/test_events_patterns.py b/tests/aws/services/events/test_events_patterns.py index 33147780e315c..c63dbe111bec7 100644 --- a/tests/aws/services/events/test_events_patterns.py +++ b/tests/aws/services/events/test_events_patterns.py @@ -363,13 +363,13 @@ def test_put_events_with_rule_pattern_exists_false( @markers.aws.validated def test_put_event_with_content_base_rule_in_pattern( self, - create_sqs_events_target, + sqs_as_events_target, events_create_event_bus, events_put_rule, snapshot, aws_client, ): - queue_url, queue_arn = create_sqs_events_target() + queue_url, queue_arn = sqs_as_events_target() # Create event bus event_bus_name = f"event-bus-{short_uid()}" diff --git a/tests/aws/services/events/test_events_schedule.py b/tests/aws/services/events/test_events_schedule.py index d416858ee2e99..f46972b0b8fb5 100644 --- a/tests/aws/services/events/test_events_schedule.py +++ b/tests/aws/services/events/test_events_schedule.py @@ -83,13 +83,13 @@ def test_put_rule_with_invalid_schedule_rate(self, schedule_expression, aws_clie @markers.aws.validated def tests_schedule_rate_target_sqs( self, - create_sqs_events_target, + sqs_as_events_target, events_put_rule, aws_client, snapshot, ): queue_name = f"test-queue-{short_uid()}" - queue_url, queue_arn = create_sqs_events_target(queue_name) + queue_url, queue_arn = sqs_as_events_target(queue_name) bus_name = "default" rule_name = f"test-rule-{short_uid()}" @@ -143,9 +143,9 @@ def tests_schedule_rate_target_sqs( @markers.aws.validated def tests_schedule_rate_custom_input_target_sqs( - self, create_sqs_events_target, events_put_rule, aws_client, snapshot + self, sqs_as_events_target, events_put_rule, aws_client, snapshot ): - queue_url, queue_arn = create_sqs_events_target() + queue_url, queue_arn = sqs_as_events_target() bus_name = "default" rule_name = f"test-rule-{short_uid()}" @@ -306,12 +306,12 @@ def tests_put_rule_with_schedule_cron( @pytest.mark.skip("Flaky, target time can be 1min off message time") def test_schedule_cron_target_sqs( self, - create_sqs_events_target, + sqs_as_events_target, events_put_rule, aws_client, snapshot, ): - queue_url, queue_arn = create_sqs_events_target() + queue_url, queue_arn = sqs_as_events_target() schedule_cron, target_datetime = get_cron_expression( 1 diff --git a/tests/aws/services/events/test_events_targets.py b/tests/aws/services/events/test_events_targets.py index 8d0e6af0f10f5..201cfdd848069 100644 --- a/tests/aws/services/events/test_events_targets.py +++ b/tests/aws/services/events/test_events_targets.py @@ -50,7 +50,7 @@ def test_put_events_with_target_events( account_id, events_put_rule, create_role_event_bus_source_to_bus_target, - create_sqs_events_target, + sqs_as_events_target, aws_client, snapshot, ): @@ -114,7 +114,7 @@ def test_put_events_with_target_events( EventPattern=json.dumps(TEST_EVENT_PATTERN), ) - queue_url, queue_arn = create_sqs_events_target() + queue_url, queue_arn = sqs_as_events_target() target_id = f"target-{short_uid()}" aws_client.events.put_targets( Rule=rule_name_target_to_sqs, diff --git a/tests/aws/services/ssm/test_ssm.py b/tests/aws/services/ssm/test_ssm.py index 79f51ab704107..210b2b363bbb5 100644 --- a/tests/aws/services/ssm/test_ssm.py +++ b/tests/aws/services/ssm/test_ssm.py @@ -169,7 +169,7 @@ def test_get_inexistent_maintenance_window(self, aws_client): @markers.aws.needs_fixing # TODO: remove parameters, set correct parameter prefix name, use events_create_event_bus and events_put_rule fixture, - # remove clean_up, use create_sqs_events_target fixture, use snapshot + # remove clean_up, use sqs_as_events_target fixture, use snapshot @pytest.mark.parametrize("strategy", ["standard", "domain", "path"]) def test_trigger_event_on_systems_manager_change( self, monkeypatch, aws_client, clean_up, strategy