Skip to content

Commit c9e038f

Browse files
committed
chaosaws msk ready Signed-off-by: Jorge Tapicha <[email protected]>
1 parent d11e8fd commit c9e038f

File tree

5 files changed

+280
-0
lines changed

5 files changed

+280
-0
lines changed

chaosaws/msk/__init__.py

Whitespace-only changes.

chaosaws/msk/actions.py

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from typing import List
2+
3+
from chaoslib.types import Configuration, Secrets
4+
from chaoslib.exceptions import FailedActivity
5+
from chaosaws import aws_client, get_logger
6+
from chaosaws.types import AWSResponse
7+
8+
__all__ = ["reboot_msk_broker", "delete_cluster"]
9+
10+
logger = get_logger()
11+
12+
13+
def reboot_msk_broker(
14+
cluster_arn: str,
15+
broker_ids: List[str],
16+
configuration: Configuration = None,
17+
secrets: Secrets = None,
18+
) -> AWSResponse:
19+
"""
20+
Reboot the specified brokers in an MSK cluster.
21+
"""
22+
client = aws_client("kafka", configuration, secrets)
23+
logger.debug(
24+
f"Rebooting MSK brokers: {broker_ids} in cluster {cluster_arn}"
25+
)
26+
try:
27+
return client.reboot_broker(
28+
ClusterArn=cluster_arn,
29+
BrokerIds=broker_ids
30+
)
31+
except client.exceptions.NotFoundException as e:
32+
raise FailedActivity("The specified cluster was not found") from e
33+
34+
35+
def delete_cluster(
36+
cluster_arn: str,
37+
configuration: Configuration = None,
38+
secrets: Secrets = None,
39+
) -> AWSResponse:
40+
"""
41+
Delete the given MSK cluster.
42+
"""
43+
client = aws_client("kafka", configuration, secrets)
44+
logger.debug(f"Deleting MSK cluster: {cluster_arn}")
45+
try:
46+
return client.delete_cluster(ClusterArn=cluster_arn)
47+
except client.exceptions.NotFoundException as e:
48+
raise FailedActivity("The specified cluster was not found") from e

chaosaws/msk/probes.py

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from typing import List
2+
3+
from chaoslib.types import Configuration, Secrets
4+
from chaoslib.exceptions import FailedActivity
5+
from chaosaws import aws_client, get_logger
6+
from chaosaws.types import AWSResponse
7+
8+
__all__ = ["describe_msk_cluster", "get_bootstrap_servers"]
9+
10+
logger = get_logger()
11+
12+
13+
def describe_msk_cluster(
14+
cluster_arn: str,
15+
configuration: Configuration = None,
16+
secrets: Secrets = None,
17+
) -> AWSResponse:
18+
"""
19+
Describe an MSK cluster.
20+
"""
21+
client = aws_client("kafka", configuration, secrets)
22+
logger.debug(f"Describing MSK cluster: {cluster_arn}")
23+
try:
24+
return client.describe_cluster(ClusterArn=cluster_arn)
25+
except client.exceptions.NotFoundException as e:
26+
raise FailedActivity("The specified cluster was not found") from e
27+
28+
29+
def get_bootstrap_servers(
30+
cluster_arn: str,
31+
configuration: Configuration = None,
32+
secrets: Secrets = None,
33+
) -> List[str]:
34+
"""
35+
Get the bootstrap servers for an MSK cluster.
36+
"""
37+
client = aws_client("kafka", configuration, secrets)
38+
logger.debug(f"Getting bootstrap servers for MSK cluster: {cluster_arn}")
39+
try:
40+
response = client.get_bootstrap_brokers(ClusterArn=cluster_arn)
41+
return response["BootstrapBrokerString"].split(",") if response else []
42+
except client.exceptions.NotFoundException as e:
43+
raise FailedActivity("The specified cluster was not found") from e

tests/msk/test_msk_actions.py

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from unittest.mock import MagicMock, patch
2+
import pytest
3+
from chaosaws.msk.actions import reboot_msk_broker, delete_cluster
4+
from chaoslib.exceptions import FailedActivity
5+
6+
7+
class NotFoundException(Exception):
8+
def __init__(self, message="Cluster not found"):
9+
super().__init__(f"{message}")
10+
11+
12+
@patch("chaosaws.msk.actions.aws_client", autospec=True)
13+
def test_reboot_msk_broker_success(aws_client):
14+
client = MagicMock()
15+
aws_client.return_value = client
16+
cluster_arn = "arn_msk_cluster"
17+
broker_ids = ["1"]
18+
client.reboot_broker.return_value = {
19+
"ResponseMetadata": {
20+
"HTTPStatusCode": 200
21+
}
22+
}
23+
24+
response = reboot_msk_broker(cluster_arn=cluster_arn,
25+
broker_ids=broker_ids)
26+
27+
client.reboot_broker.assert_called_with(ClusterArn=cluster_arn,
28+
BrokerIds=broker_ids)
29+
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
30+
31+
32+
@patch("chaosaws.msk.actions.aws_client", autospec=True)
33+
def test_reboot_msk_broker_not_found(aws_client):
34+
client = MagicMock()
35+
aws_client.return_value = client
36+
cluster_arn = "arn_msk_cluster"
37+
broker_ids = ["1"]
38+
39+
client.exceptions = MagicMock()
40+
client.exceptions.NotFoundException = NotFoundException
41+
client.reboot_broker.side_effect = NotFoundException(
42+
"Cluster not found"
43+
)
44+
45+
expected_error_message = "The specified cluster was not found"
46+
47+
with pytest.raises(FailedActivity) as exc_info:
48+
reboot_msk_broker(cluster_arn=cluster_arn, broker_ids=broker_ids)
49+
50+
assert expected_error_message in str(
51+
exc_info.value
52+
)
53+
54+
55+
@patch("chaosaws.msk.actions.aws_client", autospec=True)
56+
def test_delete_cluster_success(aws_client):
57+
client = MagicMock()
58+
aws_client.return_value = client
59+
cluster_arn = "arn_msk_cluster"
60+
client.delete_cluster.return_value = {
61+
"ResponseMetadata": {
62+
"HTTPStatusCode": 200
63+
}
64+
}
65+
66+
response = delete_cluster(cluster_arn=cluster_arn)
67+
68+
client.delete_cluster.assert_called_with(ClusterArn=cluster_arn)
69+
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
70+
71+
72+
@patch("chaosaws.msk.actions.aws_client", autospec=True)
73+
def test_delete_cluster_not_found(aws_client):
74+
client = MagicMock()
75+
aws_client.return_value = client
76+
cluster_arn = "arn_msk_cluster"
77+
78+
client.exceptions = MagicMock()
79+
client.exceptions.NotFoundException = NotFoundException
80+
client.delete_cluster.side_effect = NotFoundException(
81+
"Cluster not found"
82+
)
83+
84+
expected_error_message = "The specified cluster was not found"
85+
86+
with pytest.raises(FailedActivity) as exc_info:
87+
delete_cluster(cluster_arn=cluster_arn)
88+
89+
assert expected_error_message in str(
90+
exc_info.value
91+
)

tests/msk/test_msk_probes.py

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
from unittest.mock import MagicMock, patch
2+
import pytest
3+
from chaosaws.msk.probes import describe_msk_cluster, get_bootstrap_servers
4+
from chaoslib.exceptions import FailedActivity
5+
6+
7+
class NotFoundException(Exception):
8+
def __init__(self, message="Cluster not found"):
9+
super().__init__(f"{message}")
10+
11+
12+
@patch("chaosaws.msk.probes.aws_client", autospec=True)
13+
def test_describe_msk_cluster_success(aws_client):
14+
client = MagicMock()
15+
aws_client.return_value = client
16+
cluster_arn = "arn_msk_cluster"
17+
expected_response = {"ClusterInfo": {"ClusterArn": cluster_arn}}
18+
19+
client.describe_cluster.return_value = expected_response
20+
21+
response = describe_msk_cluster(cluster_arn=cluster_arn)
22+
23+
client.describe_cluster.assert_called_with(ClusterArn=cluster_arn)
24+
assert response == expected_response
25+
26+
27+
@patch("chaosaws.msk.probes.aws_client", autospec=True)
28+
def test_describe_msk_cluster_not_found(aws_client):
29+
client = MagicMock()
30+
aws_client.return_value = client
31+
cluster_arn = "arn_msk_cluster"
32+
33+
client.exceptions = MagicMock()
34+
client.exceptions.NotFoundException = NotFoundException
35+
client.describe_cluster.side_effect = NotFoundException(
36+
"Cluster not found"
37+
)
38+
39+
expected_error_message = "The specified cluster was not found"
40+
41+
with pytest.raises(FailedActivity) as exc_info:
42+
describe_msk_cluster(cluster_arn=cluster_arn)
43+
44+
assert expected_error_message in str(
45+
exc_info.value
46+
)
47+
48+
49+
@patch("chaosaws.msk.probes.aws_client", autospec=True)
50+
def test_get_bootstrap_servers_success(aws_client):
51+
client = MagicMock()
52+
aws_client.return_value = client
53+
cluster_arn = "arn_msk_cluster"
54+
bootstrap_servers = "broker1,broker2,broker3"
55+
expected_response = {"BootstrapBrokerString": bootstrap_servers}
56+
57+
client.get_bootstrap_brokers.return_value = expected_response
58+
59+
response = get_bootstrap_servers(cluster_arn=cluster_arn)
60+
61+
client.get_bootstrap_brokers.assert_called_with(ClusterArn=cluster_arn)
62+
assert response == bootstrap_servers.split(",")
63+
64+
65+
@patch("chaosaws.msk.probes.aws_client", autospec=True)
66+
def test_get_bootstrap_servers_empty_response(aws_client):
67+
client = MagicMock()
68+
aws_client.return_value = client
69+
cluster_arn = "arn_msk_cluster"
70+
71+
client.get_bootstrap_brokers.return_value = None
72+
73+
response = get_bootstrap_servers(cluster_arn=cluster_arn)
74+
75+
client.get_bootstrap_brokers.assert_called_with(ClusterArn=cluster_arn)
76+
assert response == []
77+
78+
79+
@patch("chaosaws.msk.probes.aws_client", autospec=True)
80+
def test_get_bootstrap_server_cluster_not_found(aws_client):
81+
client = MagicMock()
82+
aws_client.return_value = client
83+
cluster_arn = "arn_msk_cluster"
84+
85+
client.exceptions = MagicMock()
86+
client.exceptions.NotFoundException = NotFoundException
87+
client.get_bootstrap_brokers.side_effect = NotFoundException(
88+
"Cluster not found"
89+
)
90+
91+
expected_error_message = "The specified cluster was not found"
92+
93+
with pytest.raises(FailedActivity) as exc_info:
94+
get_bootstrap_servers(cluster_arn=cluster_arn)
95+
96+
assert expected_error_message in str(
97+
exc_info.value
98+
)

0 commit comments

Comments
 (0)