Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add msk probes and actions #148

Merged
merged 5 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased][]

### Added

* MSK probes `chaosaws.msk.probes.describe_msk_cluster` `chaosaws.msk.probes.get_bootstrap_servers`
* MSK actions `chaosaws.msk.actions.reboot_msk_broker` `chaosaws.msk.actions.delete_cluster`

[Unreleased]: https://github.com/chaostoolkit-incubator/chaostoolkit-aws/compare/0.33.0...HEAD

## [0.33.0][] - 2024-02-26
Expand Down
2 changes: 2 additions & 0 deletions chaosaws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ def load_exported_activities() -> List[DiscoveredActivities]:
activities.extend(discover_actions("chaosaws.fis.actions"))
activities.extend(discover_probes("chaosaws.s3.probes"))
activities.extend(discover_actions("chaosaws.s3.actions"))
activities.extend(discover_probes("chaosaws.msk.probes"))
activities.extend(discover_actions("chaosaws.msk.actions"))
activities.extend(
discover_activities("chaosaws.s3.controls.upload", "control")
)
Expand Down
Empty file added chaosaws/msk/__init__.py
Empty file.
49 changes: 49 additions & 0 deletions chaosaws/msk/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import List

from chaoslib.types import Configuration, Secrets
from chaoslib.exceptions import FailedActivity

from chaosaws import aws_client, get_logger
from chaosaws.types import AWSResponse

__all__ = ["reboot_msk_broker", "delete_cluster"]

logger = get_logger()


def reboot_msk_broker(
cluster_arn: str,
broker_ids: List[str],
configuration: Configuration = None,
secrets: Secrets = None,
) -> AWSResponse:
"""
Reboot the specified brokers in an MSK cluster.
"""
client = aws_client("kafka", configuration, secrets)
logger.debug(
f"Rebooting MSK brokers: {broker_ids} in cluster {cluster_arn}"
)
try:
return client.reboot_broker(
ClusterArn=cluster_arn,
BrokerIds=broker_ids
)
except client.exceptions.NotFoundException:
raise FailedActivity("The specified cluster was not found" )


def delete_cluster(
cluster_arn: str,
configuration: Configuration = None,
secrets: Secrets = None,
) -> AWSResponse:
"""
Delete the given MSK cluster.
"""
client = aws_client("kafka", configuration, secrets)
logger.debug(f"Deleting MSK cluster: {cluster_arn}")
try:
return client.delete_cluster(ClusterArn=cluster_arn)
except client.exceptions.NotFoundException:
raise FailedActivity("The specified cluster was not found")
44 changes: 44 additions & 0 deletions chaosaws/msk/probes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import List

from chaoslib.types import Configuration, Secrets
from chaoslib.exceptions import FailedActivity

from chaosaws import aws_client, get_logger
from chaosaws.types import AWSResponse

__all__ = ["describe_msk_cluster", "get_bootstrap_servers"]

logger = get_logger()


def describe_msk_cluster(
cluster_arn: str,
configuration: Configuration = None,
secrets: Secrets = None,
) -> AWSResponse:
"""
Describe an MSK cluster.
"""
client = aws_client("kafka", configuration, secrets)
logger.debug(f"Describing MSK cluster: {cluster_arn}")
try:
return client.describe_cluster(ClusterArn=cluster_arn)
except client.exceptions.NotFoundException:
raise FailedActivity("The specified cluster was not found")


def get_bootstrap_servers(
cluster_arn: str,
configuration: Configuration = None,
secrets: Secrets = None,
) -> List[str]:
"""
Get the bootstrap servers for an MSK cluster.
"""
client = aws_client("kafka", configuration, secrets)
logger.debug(f"Getting bootstrap servers for MSK cluster: {cluster_arn}")
try:
response = client.get_bootstrap_brokers(ClusterArn=cluster_arn)
return response["BootstrapBrokerString"].split(",") if response else []
except client.exceptions.NotFoundException:
raise FailedActivity("The specified cluster was not found")
91 changes: 91 additions & 0 deletions tests/msk/test_msk_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from unittest.mock import MagicMock, patch
import pytest
from chaosaws.msk.actions import reboot_msk_broker, delete_cluster
from chaoslib.exceptions import FailedActivity


class NotFoundException(Exception):
def __init__(self, message="Cluster not found"):
super().__init__(f"{message}")


@patch("chaosaws.msk.actions.aws_client", autospec=True)
def test_reboot_msk_broker_success(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
broker_ids = ["1"]
client.reboot_broker.return_value = {
"ResponseMetadata": {
"HTTPStatusCode": 200
}
}

response = reboot_msk_broker(cluster_arn=cluster_arn,
broker_ids=broker_ids)

client.reboot_broker.assert_called_with(ClusterArn=cluster_arn,
BrokerIds=broker_ids)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200


@patch("chaosaws.msk.actions.aws_client", autospec=True)
def test_reboot_msk_broker_not_found(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
broker_ids = ["1"]

client.exceptions = MagicMock()
client.exceptions.NotFoundException = NotFoundException
client.reboot_broker.side_effect = NotFoundException(
"Cluster not found"
)

expected_error_message = "The specified cluster was not found"

with pytest.raises(FailedActivity) as exc_info:
reboot_msk_broker(cluster_arn=cluster_arn, broker_ids=broker_ids)

assert expected_error_message in str(
exc_info.value
)


@patch("chaosaws.msk.actions.aws_client", autospec=True)
def test_delete_cluster_success(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
client.delete_cluster.return_value = {
"ResponseMetadata": {
"HTTPStatusCode": 200
}
}

response = delete_cluster(cluster_arn=cluster_arn)

client.delete_cluster.assert_called_with(ClusterArn=cluster_arn)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200


@patch("chaosaws.msk.actions.aws_client", autospec=True)
def test_delete_cluster_not_found(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"

client.exceptions = MagicMock()
client.exceptions.NotFoundException = NotFoundException
client.delete_cluster.side_effect = NotFoundException(
"Cluster not found"
)

expected_error_message = "The specified cluster was not found"

with pytest.raises(FailedActivity) as exc_info:
delete_cluster(cluster_arn=cluster_arn)

assert expected_error_message in str(
exc_info.value
)
84 changes: 84 additions & 0 deletions tests/msk/test_msk_probes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from unittest.mock import MagicMock, patch
import pytest
from chaosaws.msk.probes import describe_msk_cluster, get_bootstrap_servers
from chaoslib.exceptions import FailedActivity


class NotFoundException(Exception):
def __init__(self, message="Cluster not found"):
super().__init__(f"{message}")


@patch("chaosaws.msk.probes.aws_client", autospec=True)
def test_describe_msk_cluster_success(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
expected_response = {"ClusterInfo": {"ClusterArn": cluster_arn}}

client.describe_cluster.return_value = expected_response

response = describe_msk_cluster(cluster_arn=cluster_arn)

client.describe_cluster.assert_called_with(ClusterArn=cluster_arn)
assert response == expected_response


@patch("chaosaws.msk.probes.aws_client", autospec=True)
def test_describe_msk_cluster_not_found(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"

client.exceptions = MagicMock()
client.exceptions.NotFoundException = NotFoundException
client.describe_cluster.side_effect = NotFoundException(
"Cluster not found"
)

expected_error_message = "The specified cluster was not found"

with pytest.raises(FailedActivity) as exc_info:
describe_msk_cluster(cluster_arn=cluster_arn)

assert expected_error_message in str(
exc_info.value
)


@patch("chaosaws.msk.probes.aws_client", autospec=True)
def test_get_bootstrap_servers_success(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"
bootstrap_servers = "broker1,broker2,broker3"
expected_response = {"BootstrapBrokerString": bootstrap_servers}

client.get_bootstrap_brokers.return_value = expected_response

response = get_bootstrap_servers(cluster_arn=cluster_arn)

client.get_bootstrap_brokers.assert_called_with(ClusterArn=cluster_arn)
assert response == bootstrap_servers.split(",")


@patch("chaosaws.msk.probes.aws_client", autospec=True)
def test_get_bootstrap_server_cluster_not_found(aws_client):
client = MagicMock()
aws_client.return_value = client
cluster_arn = "arn_msk_cluster"

client.exceptions = MagicMock()
client.exceptions.NotFoundException = NotFoundException
client.get_bootstrap_brokers.side_effect = NotFoundException(
"Cluster not found"
)

expected_error_message = "The specified cluster was not found"

with pytest.raises(FailedActivity) as exc_info:
get_bootstrap_servers(cluster_arn=cluster_arn)

assert expected_error_message in str(
exc_info.value
)
Loading