Skip to content

Commit

Permalink
F!! Use match-case and switch to classmethods.
Browse files Browse the repository at this point in the history
  • Loading branch information
chapinb committed Jan 1, 2025
1 parent a30a592 commit 157723d
Showing 1 changed file with 35 additions and 31 deletions.
66 changes: 35 additions & 31 deletions src/luminaut/tools/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ def populate_permissive_ingress_security_group_rules(


class ExtractEventsFromConfigDiffs:
@staticmethod
@classmethod
def generate_events_from_aws_config(
cls,
resource_type: models.ResourceType,
resource_id: str,
config_item: models.AwsConfigItem,
Expand All @@ -280,7 +281,7 @@ def generate_events_from_aws_config(

diff_as_dict = asdict(config_item.diff_to_prior)
if resource_type == models.ResourceType.EC2_Instance:
ExtractEventsFromConfigDiffs.process_ec2_instance(
cls.process_ec2_instance(
config_item.config_capture_time,
diff_as_dict,
events,
Expand All @@ -289,42 +290,45 @@ def generate_events_from_aws_config(
)
return events

@staticmethod
@classmethod
def process_ec2_instance(
config_capture_time, diff_as_dict, events, resource_id, resource_type
cls, config_capture_time, diff_as_dict, events, resource_id, resource_type
):
changes = diff_as_dict["changed"]
for key, value in changes.items():
event_type = None
message = None

if key == "state":
event_type = models.TimelineEventType.COMPUTE_INSTANCE_STATE_CHANGE
message = ExtractEventsFromConfigDiffs._format_ec2_state_change_message(
"changed", value
)
elif key == "security_groups":
event_type = models.TimelineEventType.SECURITY_GROUP_ASSOCIATION_CHANGE
message = ExtractEventsFromConfigDiffs._format_ec2_sg_change_message(
value
)
elif key == "launch_time":
event_type = (
models.TimelineEventType.COMPUTE_INSTANCE_LAUNCH_TIME_UPDATED
)
message = ExtractEventsFromConfigDiffs._format_ec2_string_field_change_message(
"Launch time", value
)
elif key == "public_dns_name":
event_type = models.TimelineEventType.COMPUTE_INSTANCE_NETWORKING_CHANGE
message = ExtractEventsFromConfigDiffs._format_ec2_string_field_change_message(
"Public DNS name", value
)
elif key == "public_ip_address":
event_type = models.TimelineEventType.COMPUTE_INSTANCE_NETWORKING_CHANGE
message = ExtractEventsFromConfigDiffs._format_ec2_string_field_change_message(
"Public IP address", value
)
match key:
case "state":
event_type = models.TimelineEventType.COMPUTE_INSTANCE_STATE_CHANGE
message = cls._format_ec2_state_change_message("changed", value)
case "security_groups":
event_type = (
models.TimelineEventType.SECURITY_GROUP_ASSOCIATION_CHANGE
)
message = cls._format_ec2_sg_change_message(value)
case "launch_time":
event_type = (
models.TimelineEventType.COMPUTE_INSTANCE_LAUNCH_TIME_UPDATED
)
message = cls._format_ec2_string_field_change_message(
"Launch time", value
)
case "public_dns_name":
event_type = (
models.TimelineEventType.COMPUTE_INSTANCE_NETWORKING_CHANGE
)
message = cls._format_ec2_string_field_change_message(
"Public DNS name", value
)
case "public_ip_address":
event_type = (
models.TimelineEventType.COMPUTE_INSTANCE_NETWORKING_CHANGE
)
message = cls._format_ec2_string_field_change_message(
"Public IP address", value
)

if event_type and message:
events.append(
Expand Down

0 comments on commit 157723d

Please sign in to comment.