Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.18 system test] MCG bucket notification #11251

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ocs_ci/ocs/bucket_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,23 @@ def s3_delete_object(s3_obj, bucketname, object_key, versionid=None):
return s3_obj.s3_client.delete_object(Bucket=bucketname, Key=object_key)


def s3_put_object_tagging(s3_obj, bucketname, object_key, tags):
"""
Boto3 client based object tagging

Args:
s3_obj (MCG): MCG object
bucketname (str): Name of the bucket
object_key (str): Key of the object
tags (List): List of key value pair of tags

"""

return s3_obj.s3_client.put_object_tagging(
Bucket=bucketname, Key=object_key, Tagging={"TagSet": tags}
)


def s3_put_bucket_website(s3_obj, bucketname, website_config):
"""
Boto3 client based Put bucket website function
Expand Down
3 changes: 2 additions & 1 deletion tests/cross_functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,8 @@ def finalizer():
), f"Failed to scale up the statefulset {constants.NOOBAA_DB_STATEFULSET}"

try:
restore_pvc_objs[0].delete()
for restore_pvc_obj in restore_pvc_objs:
restore_pvc_obj.delete()
except CommandFailed as ex:
if f'"{restore_pvc_objs[0].name}" not found' not in str(ex):
raise ex
Expand Down
281 changes: 281 additions & 0 deletions tests/cross_functional/system_test/test_mcg_bucket_notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
import pytest

from concurrent.futures import ThreadPoolExecutor, as_completed

from ocs_ci.ocs import constants
from ocs_ci.framework.pytest_customization.marks import ignore_leftover_label
from ocs_ci.ocs.bucket_utils import (
write_random_test_objects_to_bucket,
s3_delete_object,
s3_put_object_tagging,
expire_objects_in_bucket,
)
from ocs_ci.ocs.resources.bucket_notifications_manager import (
BucketNotificationsManager,
logger,
)
from ocs_ci.ocs.resources.mcg_lifecycle_policies import (
LifecyclePolicy,
ExpirationRule,
LifecycleFilter,
)
from ocs_ci.ocs.resources.pod import (
get_pod_node,
get_pods_having_label,
Pod,
get_noobaa_core_pod,
)
from ocs_ci.utility.utils import TimeoutSampler


@ignore_leftover_label(constants.CUSTOM_MCG_LABEL)
class TestBucketNotificationSystemTest:

@pytest.fixture(autouse=True, scope="class")
def notify_manager(self, request, pvc_factory_class):
"""
Set up Kafka and the BucketNotificationsManager
Note that the dependency on the pvc_factory_class fixture is necessary
to guarantee the correct teardown order. Otherwise the pvc factory teardown
might fail when deleting a PVC that the BucketNotificationsManager is still using.

Returns:
BucketNotificationsManager: An instance of the BucketNotificationsManager class

"""
notif_manager = BucketNotificationsManager()
notif_manager.pvc_factory = pvc_factory_class
request.addfinalizer(notif_manager.cleanup)

notif_manager.setup_kafka()
return notif_manager

def verify_events(
self, notify_manager, topic, bucket_names, event_name, timeout=120, sleep=5
):

for events in TimeoutSampler(
timeout=timeout,
sleep=sleep,
func=notify_manager.get_events,
topic=topic,
):
for event in events:
if (
event["eventName"] == event_name
and event["s3"]["bucket"]["name"] in bucket_names
):
logger.info(
f'{event_name} event found for the bucket {event["s3"]["bucket"]["name"]}'
)
bucket_names.remove(event["s3"]["bucket"]["name"])

if len(bucket_names) == 0:
logger.info(f"Verified {event_name} for all the buckets.")
break

def test_bucket_notification_system_test(
self,
nodes,
notify_manager,
bucket_factory,
reduce_expiration_interval,
awscli_pod,
test_directory_setup,
mcg_obj,
noobaa_db_backup_and_recovery,
snapshot_factory,
setup_mcg_bg_features,
validate_mcg_bg_features,
):
"""
This is to test MCG bucket notification feature on multiple MCG buckets
with bucket notification enabled for multiple s3 operation. We also perform some
Noobaa specific disruptive operations to verify notification is not affected

Steps:
1. Enable bucket notifications on the NooBaa CR
2. Create a Kafka topic and add a Kafka notification connection to the NooBaa CR
3. Create 5 buckets and configure bucket notification for each of the s3 operation
4. Perform Object upload and verify object upload events are received for all the buckets
5. Tag object from the bucket and restart noobaa pod nodes.
Verify ObjectTagging:Put events has occurred for all the buckets.
6. Remove object from the bucket and restart kafka pods. Verify ObjectRemoved:Delete event has occurred
7. Enable object expiration for 1 day and expire the objects manually.
Perform Noobaa db backup and recovery.
Make sure notifications are updated for LifecycleExpiration:Delete event

"""

NUM_OF_BUCKETS = 5
events_to_notify = [
"s3:ObjectCreated:*",
"s3:ObjectRemoved:*",
"s3:LifecycleExpiration:*",
"s3:ObjectRestore:*",
"s3:ObjectTagging:*",
]

# feature_setup_map = setup_mcg_bg_features(
# num_of_buckets=5,
# object_amount=5,
# is_disruptive=True,
# skip_any_features=["nsfs", "rgw kafka", "caching"],
# )

# 1. Enable bucket notification on Noobaa CR
notify_manager.enable_bucket_notifs_on_cr(use_provided_pvc=True)

# 2. Create kafka topic and add it to the Noobaa CR
topic = notify_manager.create_kafka_topic()
secret, conn_config_path = notify_manager.create_kafka_conn_secret(topic)
notify_manager.add_notif_conn_to_noobaa_cr(secret)

# 3. Create the five buckets that is needed for the testing
buckets_created = []
for i in range(NUM_OF_BUCKETS):
bucket = bucket_factory()[0]
logger.info(f"Enabling bucket notification for the bucket {bucket.name}")
notify_manager.put_bucket_notification(
awscli_pod,
mcg_obj,
bucket.name,
events=events_to_notify,
conn_config_path=conn_config_path,
)
buckets_created.append(bucket.name)

# 4. Write some object to all the buckets and verify the events
# has occurred for all the buckets
obj_written = []
for bucket in buckets_created:
obj_written = write_random_test_objects_to_bucket(
awscli_pod,
bucket,
file_dir=test_directory_setup.origin_dir,
amount=2,
mcg_obj=mcg_obj,
)
self.verify_events(
notify_manager,
topic,
bucket_names=buckets_created[:],
event_name="ObjectCreated:Put",
)

with ThreadPoolExecutor(max_workers=5) as executor:

# 5. Tag object from the bucket and restart noobaa pod nodes
# Verify ObjectTagging:Put events has occurred for all the buckets
noobaa_pod_nodes = [get_pod_node(get_noobaa_core_pod())]
future_objs = []
for bucket in buckets_created:
logger.info(f"Tagging object {obj_written[1]} from the bucket {bucket}")
future_objs.append(
executor.submit(
s3_put_object_tagging,
mcg_obj,
bucket,
obj_written[1],
[{"Key": "type", "Value": "media"}],
)
)

logger.info("Stopping Noobaa pod nodes")
nodes.stop_nodes(nodes=noobaa_pod_nodes)
for future in as_completed(future_objs):
future.result()

logger.info(
"Verifying if ObjectTagging:Put events has occurred for all the buckets"
)
self.verify_events(
notify_manager,
topic,
bucket_names=buckets_created[:],
event_name="ObjectTagging:Put",
timeout=1200,
sleep=30,
)

logger.info("Starting Noobaa pod nodes")
nodes.start_nodes(nodes=noobaa_pod_nodes)

# 6. Remove object from the bucket and restart kafka pods
# Verify ObjectRemoved event has occurred
kafka_kind_label = "strimzi.io/kind=Kafka"
kafka_pods = [
Pod(**pod_info)
for pod_info in get_pods_having_label(label=kafka_kind_label)
]
future_objs = []
for bucket in buckets_created:
logger.info(f"Deleting object {obj_written[0]} from bucket {bucket}")
future_objs.append(
executor.submit(
s3_delete_object,
mcg_obj,
bucket,
obj_written[1],
)
)

logger.info("Restarting Kafka pods")
for pod in kafka_pods:
logger.info(f"Deleting pod {pod.name}")
pod.delete()
for future in as_completed(future_objs):
future.result()

logger.info(
"Verifying if ObjectRemoved:Delete events has occurred for all buckets"
)
self.verify_events(
notify_manager,
topic,
bucket_names=buckets_created[:],
event_name="ObjectRemoved:Delete",
timeout=300,
sleep=30,
)

# 7. Enable object expiration for 1 day and expire the objects manually
# Perform noobaa db backup and recovery. Make sure notifications are updated
# for LifecycleExpiration:Delete event

for bucket in buckets_created:
logger.info(f"Setting object expiration on bucket {bucket}")
lifecycle_policy = LifecyclePolicy(
ExpirationRule(days=1, filter=LifecycleFilter())
)
mcg_obj.s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket, LifecycleConfiguration=lifecycle_policy.as_dict()
)
expire_objects_in_bucket(bucket)

reduce_expiration_interval(interval=5)
logger.info("Reduced the object expiration interval to 5 minutes")

# Backup noobaa db and recovery
noobaa_db_backup_and_recovery(snapshot_factory=snapshot_factory)
logger.info("Noobaa db backup and recovery is completed")

logger.info(
"Verifying if LifecycleExpiration:Delete events has occurred for all the buckets"
)
self.verify_events(
notify_manager,
topic,
bucket_names=buckets_created[:],
event_name="LifecycleExpiration:Delete",
timeout=600,
sleep=30,
)

# validate_mcg_bg_features(
# feature_setup_map,
# run_in_bg=False,
# skip_any_features=["nsfs", "rgw kafka", "caching"],
# object_amount=5,
# )
# logger.info("No issues seen with the MCG bg feature validation")
Loading