From 92ab193848e5fc24931ce0a9b16b62cb9f1eeb71 Mon Sep 17 00:00:00 2001 From: sametd Date: Tue, 17 Sep 2024 11:13:49 +0000 Subject: [PATCH 01/13] removing random_host functionality --- .../common/staging/s3_boto3_staging.py | 71 +++++++++++++------ 1 file changed, 49 insertions(+), 22 deletions(-) diff --git a/polytope_server/common/staging/s3_boto3_staging.py b/polytope_server/common/staging/s3_boto3_staging.py index 59c483a..ec05e83 100644 --- a/polytope_server/common/staging/s3_boto3_staging.py +++ b/polytope_server/common/staging/s3_boto3_staging.py @@ -33,7 +33,9 @@ class AvailableThreadPoolExecutor(ThreadPoolExecutor): - def __init__(self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()): + def __init__( + self, max_workers=None, thread_name_prefix="", initializer=None, initargs=() + ): super().__init__(max_workers, thread_name_prefix, initializer, initargs) self._running_worker_futures: set[Future] = set() @@ -59,7 +61,6 @@ def submit(self, fn, /, *args, **kwargs): class S3Staging_boto3(staging.Staging): def __init__(self, config): - self.bucket = config.get("bucket", "default") self.url = config.get("url", None) @@ -78,14 +79,6 @@ def __init__(self, config): prefix = "https" if self.use_ssl else "http" - if config.get("random_host", False): - self.host = config.get("random_host", {}).get("host", self.host) - index = random.randint(0, config.get("random_host", {}).get("max", 1) - 1) - # replace %%ID%% in the host with the index - self.host = self.host.replace("%%ID%%", str(index)) - self.url = self.url + "/" + str(index) - logging.info(f"Using random host: {self.host}") - self._internal_url = f"{prefix}://{self.host}:{self.port}" # Setup Boto3 client @@ -117,7 +110,9 @@ def __init__(self, config): self.host, self.s3_client, self.bucket, self.get_type() ) - logging.info(f"Opened data staging at {self.host}:{self.port} with bucket {self.bucket}") + logging.info( + f"Opened data staging at {self.host}:{self.port} with bucket {self.bucket}" + ) def create(self, name, data, content_type): name = name + ".grib" @@ -125,7 +120,10 @@ def create(self, name, data, content_type): # else using content-disposition header try: multipart_upload = self.s3_client.create_multipart_upload( - Bucket=self.bucket, Key=name, ContentType=content_type, ContentDisposition="attachment" + Bucket=self.bucket, + Key=name, + ContentType=content_type, + ContentDisposition="attachment", ) upload_id = multipart_upload["UploadId"] @@ -136,11 +134,21 @@ def create(self, name, data, content_type): with AvailableThreadPoolExecutor(max_workers=self.max_threads) as executor: executor.wait_for_available_worker() if not data: - logging.info(f"No data provided. Uploading a single empty part for {name}.") + logging.info( + f"No data provided. Uploading a single empty part for {name}." + ) else: for part_data in self.iterator_buffer(data, self.buffer_size): if part_data: - futures.append(executor.submit(self.upload_part, name, part_number, part_data, upload_id)) + futures.append( + executor.submit( + self.upload_part, + name, + part_number, + part_data, + upload_id, + ) + ) part_number += 1 for future in futures: @@ -149,11 +157,16 @@ def create(self, name, data, content_type): if not parts: logging.warning(f"No parts uploaded for {name}. Aborting upload.") - self.s3_client.abort_multipart_upload(Bucket=self.bucket, Key=name, UploadId=upload_id) + self.s3_client.abort_multipart_upload( + Bucket=self.bucket, Key=name, UploadId=upload_id + ) raise ValueError("No data retrieved") self.s3_client.complete_multipart_upload( - Bucket=self.bucket, Key=name, UploadId=upload_id, MultipartUpload={"Parts": parts} + Bucket=self.bucket, + Key=name, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, ) logging.info(f"Successfully uploaded {name} in {len(parts)} parts.") @@ -162,13 +175,19 @@ def create(self, name, data, content_type): except ClientError as e: logging.error(f"Failed to upload {name}: {e}") if "upload_id" in locals(): - self.s3_client.abort_multipart_upload(Bucket=self.bucket, Key=name, UploadId=upload_id) + self.s3_client.abort_multipart_upload( + Bucket=self.bucket, Key=name, UploadId=upload_id + ) raise def upload_part(self, name, part_number, data, upload_id): logging.debug(f"Uploading part {part_number} of {name}, {len(data)} bytes") response = self.s3_client.upload_part( - Bucket=self.bucket, Key=name, PartNumber=part_number, UploadId=upload_id, Body=data + Bucket=self.bucket, + Key=name, + PartNumber=part_number, + UploadId=upload_id, + Body=data, ) return {"PartNumber": part_number, "ETag": response["ETag"]} @@ -251,10 +270,14 @@ def get_type(self): def list(self): try: resources = [] - data = self.s3_client.list_objects_v2(Bucket=self.bucket, MaxKeys=999999999999999) + data = self.s3_client.list_objects_v2( + Bucket=self.bucket, MaxKeys=999999999999999 + ) if data.get("contents", {}).get("IsTruncated ncated", False): - logging.warning("Truncated list of objects. Some objects may not be listed.") + logging.warning( + "Truncated list of objects. Some objects may not be listed." + ) if "Contents" not in data: # No objects in the bucket return resources @@ -270,8 +293,12 @@ def wipe(self): delete_objects = [{"Key": obj} for obj in objects_to_delete] if delete_objects: try: - logging.info(f"Deleting {len(delete_objects)} : {delete_objects} objects from {self.bucket}") - self.s3_client.delete_objects(Bucket=self.bucket, Delete={"Objects": delete_objects}) + logging.info( + f"Deleting {len(delete_objects)} : {delete_objects} objects from {self.bucket}" + ) + self.s3_client.delete_objects( + Bucket=self.bucket, Delete={"Objects": delete_objects} + ) except ClientError as e: logging.error(f"Error deleting objects: {e}") raise From 8fb58e703369d9d62f819690920be65af81c22ab Mon Sep 17 00:00:00 2001 From: sametd Date: Tue, 17 Sep 2024 11:21:02 +0000 Subject: [PATCH 02/13] refactoring url to have better customization in deployment --- polytope_server/common/staging/s3_boto3_staging.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/polytope_server/common/staging/s3_boto3_staging.py b/polytope_server/common/staging/s3_boto3_staging.py index ec05e83..9a59a45 100644 --- a/polytope_server/common/staging/s3_boto3_staging.py +++ b/polytope_server/common/staging/s3_boto3_staging.py @@ -77,9 +77,9 @@ def __init__(self, config): for name in ["boto", "urllib3", "s3transfer", "boto3", "botocore", "nose"]: logging.getLogger(name).setLevel(logging.WARNING) - prefix = "https" if self.use_ssl else "http" + self.prefix = "https" if self.use_ssl else "http" - self._internal_url = f"{prefix}://{self.host}:{self.port}" + self._internal_url = f"http://{self.host}:{self.port}" # Setup Boto3 client self.s3_client = boto3.client( @@ -258,7 +258,11 @@ def stat(self, name): def get_url(self, name): if self.url: - return f"{self.url}/{self.bucket}/{name}" + if self.url.startswith("http"): + # This covers both http and https + return f"{self.url}/{self.bucket}/{name}" + else: + return f"{self.prefix}://{self.url}/{self.bucket}/{name}" return None def get_internal_url(self, name): From 7cef9c1afaa1f07986dab3f90c09ff42258a60c4 Mon Sep 17 00:00:00 2001 From: sametd Date: Tue, 17 Sep 2024 11:22:54 +0000 Subject: [PATCH 03/13] removing unnecessary import --- polytope_server/common/staging/s3_boto3_staging.py | 1 - 1 file changed, 1 deletion(-) diff --git a/polytope_server/common/staging/s3_boto3_staging.py b/polytope_server/common/staging/s3_boto3_staging.py index 9a59a45..2be4536 100644 --- a/polytope_server/common/staging/s3_boto3_staging.py +++ b/polytope_server/common/staging/s3_boto3_staging.py @@ -20,7 +20,6 @@ import json import logging -import random import time from concurrent.futures import Future, ThreadPoolExecutor From f13d4d30c2f7a8df7e6ea9110b16ba3cc8f06b47 Mon Sep 17 00:00:00 2001 From: sametd Date: Tue, 17 Sep 2024 11:26:10 +0000 Subject: [PATCH 04/13] black --- .../common/staging/s3_boto3_staging.py | 36 +++++-------------- 1 file changed, 9 insertions(+), 27 deletions(-) diff --git a/polytope_server/common/staging/s3_boto3_staging.py b/polytope_server/common/staging/s3_boto3_staging.py index 2be4536..aa04fea 100644 --- a/polytope_server/common/staging/s3_boto3_staging.py +++ b/polytope_server/common/staging/s3_boto3_staging.py @@ -32,9 +32,7 @@ class AvailableThreadPoolExecutor(ThreadPoolExecutor): - def __init__( - self, max_workers=None, thread_name_prefix="", initializer=None, initargs=() - ): + def __init__(self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()): super().__init__(max_workers, thread_name_prefix, initializer, initargs) self._running_worker_futures: set[Future] = set() @@ -109,9 +107,7 @@ def __init__(self, config): self.host, self.s3_client, self.bucket, self.get_type() ) - logging.info( - f"Opened data staging at {self.host}:{self.port} with bucket {self.bucket}" - ) + logging.info(f"Opened data staging at {self.host}:{self.port} with bucket {self.bucket}") def create(self, name, data, content_type): name = name + ".grib" @@ -133,9 +129,7 @@ def create(self, name, data, content_type): with AvailableThreadPoolExecutor(max_workers=self.max_threads) as executor: executor.wait_for_available_worker() if not data: - logging.info( - f"No data provided. Uploading a single empty part for {name}." - ) + logging.info(f"No data provided. Uploading a single empty part for {name}.") else: for part_data in self.iterator_buffer(data, self.buffer_size): if part_data: @@ -156,9 +150,7 @@ def create(self, name, data, content_type): if not parts: logging.warning(f"No parts uploaded for {name}. Aborting upload.") - self.s3_client.abort_multipart_upload( - Bucket=self.bucket, Key=name, UploadId=upload_id - ) + self.s3_client.abort_multipart_upload(Bucket=self.bucket, Key=name, UploadId=upload_id) raise ValueError("No data retrieved") self.s3_client.complete_multipart_upload( @@ -174,9 +166,7 @@ def create(self, name, data, content_type): except ClientError as e: logging.error(f"Failed to upload {name}: {e}") if "upload_id" in locals(): - self.s3_client.abort_multipart_upload( - Bucket=self.bucket, Key=name, UploadId=upload_id - ) + self.s3_client.abort_multipart_upload(Bucket=self.bucket, Key=name, UploadId=upload_id) raise def upload_part(self, name, part_number, data, upload_id): @@ -273,14 +263,10 @@ def get_type(self): def list(self): try: resources = [] - data = self.s3_client.list_objects_v2( - Bucket=self.bucket, MaxKeys=999999999999999 - ) + data = self.s3_client.list_objects_v2(Bucket=self.bucket, MaxKeys=999999999999999) if data.get("contents", {}).get("IsTruncated ncated", False): - logging.warning( - "Truncated list of objects. Some objects may not be listed." - ) + logging.warning("Truncated list of objects. Some objects may not be listed.") if "Contents" not in data: # No objects in the bucket return resources @@ -296,12 +282,8 @@ def wipe(self): delete_objects = [{"Key": obj} for obj in objects_to_delete] if delete_objects: try: - logging.info( - f"Deleting {len(delete_objects)} : {delete_objects} objects from {self.bucket}" - ) - self.s3_client.delete_objects( - Bucket=self.bucket, Delete={"Objects": delete_objects} - ) + logging.info(f"Deleting {len(delete_objects)} : {delete_objects} objects from {self.bucket}") + self.s3_client.delete_objects(Bucket=self.bucket, Delete={"Objects": delete_objects}) except ClientError as e: logging.error(f"Error deleting objects: {e}") raise From 85131606415313b2079feb24f1a79e98e4710c76 Mon Sep 17 00:00:00 2001 From: sametd Date: Fri, 20 Sep 2024 09:52:33 +0000 Subject: [PATCH 05/13] refactoring of url in the minibased s3 --- polytope_server/common/staging/s3_staging.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/polytope_server/common/staging/s3_staging.py b/polytope_server/common/staging/s3_staging.py index 6f11e44..8f87747 100644 --- a/polytope_server/common/staging/s3_staging.py +++ b/polytope_server/common/staging/s3_staging.py @@ -81,7 +81,7 @@ def __init__(self, config): secure = config.get("secure", False) self.url = config.get("url", None) internal_url = "{}:{}".format(self.host, self.port) - secure = config.get("use_ssl", False) + use_ssl = config.get("use_ssl", False) if access_key == "" or secret_key == "": self.client = Minio( @@ -97,7 +97,8 @@ def __init__(self, config): secure=secure, ) - self.internal_url = ("https://" if secure else "http://") + internal_url + self.prefix = ("https://" if use_ssl else "http://") + internal_url + self.internal_url = f"http://{self.host}:{self.port}" try: self.client.make_bucket(self.bucket) @@ -213,10 +214,13 @@ def collect_metric_info(self): return self.storage_metric_collector.collect().serialize() def get_url(self, name): - if self.url is None: - return None - url = "{}/{}/{}".format(self.url, self.bucket, name) - return url + if self.url: + if self.url.startswith("http"): + # This covers both http and https + return f"{self.url}/{self.bucket}/{name}" + else: + return f"{self.prefix}://{self.url}/{self.bucket}/{name}" + return None def get_internal_url(self, name): url = "{}/{}/{}".format(self.internal_url, self.bucket, name) From 94edea5a59a05d00b3aeb22d65129032348c65f6 Mon Sep 17 00:00:00 2001 From: sametd Date: Fri, 20 Sep 2024 10:04:19 +0000 Subject: [PATCH 06/13] isort and black --- polytope_server/common/datasource/polytope.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/polytope_server/common/datasource/polytope.py b/polytope_server/common/datasource/polytope.py index 66cb197..aa67167 100644 --- a/polytope_server/common/datasource/polytope.py +++ b/polytope_server/common/datasource/polytope.py @@ -18,10 +18,10 @@ # does it submit to any jurisdiction. # +import copy import json import logging import os -import copy import yaml from polytope.utility.exceptions import PolytopeError @@ -61,7 +61,7 @@ def retrieve(self, request): # Set the "pre-path" for this request pre_path = {} - for k,v in r.items(): + for k, v in r.items(): if k in self.req_single_keys: if isinstance(v, list): v = v[0] @@ -70,14 +70,13 @@ def retrieve(self, request): polytope_mars_config = copy.deepcopy(self.config) polytope_mars_config["options"]["pre_path"] = pre_path - polytope_mars = PolytopeMars( polytope_mars_config, - log_context= { - "user": request.user.realm + ':' + request.user.username, + log_context={ + "user": request.user.realm + ":" + request.user.username, "id": request.id, - }) - + }, + ) try: self.output = polytope_mars.extract(r) @@ -111,7 +110,7 @@ def match(self, request): raise Exception("got {} : {}, but expected one of {}".format(k, r[k], v)) # Check that there is only one value if required - for k, v in r.items(): + for k, v in r.items(): if k in self.req_single_keys: v = [v] if isinstance(v, str) else v if len(v) > 1: From f397e8af3ad2644df94c46d2f6392bd272673d5e Mon Sep 17 00:00:00 2001 From: sametd Date: Tue, 24 Sep 2024 19:31:49 +0000 Subject: [PATCH 07/13] correct policy --- polytope_server/common/staging/s3_boto3_staging.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polytope_server/common/staging/s3_boto3_staging.py b/polytope_server/common/staging/s3_boto3_staging.py index aa04fea..7cf43d8 100644 --- a/polytope_server/common/staging/s3_boto3_staging.py +++ b/polytope_server/common/staging/s3_boto3_staging.py @@ -198,14 +198,14 @@ def set_bucket_policy(self): }, { "Sid": "AllowListBucket", - "Effect": "Allow", + "Effect": "Deny", "Principal": "*", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{self.bucket}", }, { "Sid": "AllowGetBucketLocation", - "Effect": "Allow", + "Effect": "Deny", "Principal": "*", "Action": "s3:GetBucketLocation", "Resource": f"arn:aws:s3:::{self.bucket}", From d270507e5b84e70b779f8b9d250f95ca9bf0c06a Mon Sep 17 00:00:00 2001 From: sametd Date: Tue, 24 Sep 2024 19:32:13 +0000 Subject: [PATCH 08/13] near complete rewrite of minio based client --- polytope_server/common/staging/s3_staging.py | 187 +++++++++++++------ 1 file changed, 125 insertions(+), 62 deletions(-) diff --git a/polytope_server/common/staging/s3_staging.py b/polytope_server/common/staging/s3_staging.py index 8f87747..95c7f26 100644 --- a/polytope_server/common/staging/s3_staging.py +++ b/polytope_server/common/staging/s3_staging.py @@ -27,20 +27,33 @@ # ####################################################################### -import copy import json import logging import time +import warnings +from collections import namedtuple from concurrent.futures import Future, ThreadPoolExecutor -import minio from minio import Minio -from minio.definitions import UploadPart -from minio.error import BucketAlreadyExists, BucketAlreadyOwnedByYou, NoSuchKey +from minio.error import S3Error from ..metric_collector import S3StorageMetricCollector from . import staging +# Ensure that DeprecationWarnings are displayed +warnings.simplefilter("always", DeprecationWarning) + +warnings.warn( + f"The '{__name__}' module is deprecated and will be removed in a future version. " + "Please migrate to the new module 's3_boto3' to avoid disruption.", + DeprecationWarning, + stacklevel=1, +) + + +# Defining a named tuple to represent a part with part_number and etag +Part = namedtuple("Part", ["part_number", "etag"]) + class AvailableThreadPoolExecutor(ThreadPoolExecutor): @@ -81,7 +94,7 @@ def __init__(self, config): secure = config.get("secure", False) self.url = config.get("url", None) internal_url = "{}:{}".format(self.host, self.port) - use_ssl = config.get("use_ssl", False) + self.use_ssl = config.get("use_ssl", False) if access_key == "" or secret_key == "": self.client = Minio( @@ -97,16 +110,17 @@ def __init__(self, config): secure=secure, ) - self.prefix = ("https://" if use_ssl else "http://") + internal_url + self.prefix = "https" if self.use_ssl else "http" self.internal_url = f"http://{self.host}:{self.port}" try: self.client.make_bucket(self.bucket) self.client.set_bucket_policy(self.bucket, self.bucket_policy()) - except BucketAlreadyExists: - pass - except BucketAlreadyOwnedByYou: - pass + except S3Error as err: + if err.code in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): + pass + else: + raise self.storage_metric_collector = S3StorageMetricCollector(endpoint, self.client, self.bucket, self.get_type()) @@ -114,60 +128,102 @@ def __init__(self, config): "Opened data staging at {}:{}/{}, locatable from {}".format(self.host, self.port, self.bucket, self.url) ) - def upload_part(self, part_number, buf, metadata, name, upload_id): - logging.debug(f"Uploading part {part_number} ({len(buf)} bytes) of {name}") - etag = self.client._do_put_object( - self.bucket, name, buf, len(buf), part_number=part_number, metadata=metadata, upload_id=upload_id - ) - return etag, len(buf) - def create(self, name, data, content_type): - url = self.get_url(name) - logging.info("Putting to staging: {}".format(name)) + name = name + ".grib" + try: + # Prepare headers for content type and content disposition + headers = { + "Content-Type": content_type, + "Content-Disposition": "attachment", + } + + # Initiate a multipart upload + upload_id = self.client._create_multipart_upload( + bucket_name=self.bucket, + object_name=name, + headers=headers, + ) - metadata = minio.helpers.amzprefix_user_metadata({}) - metadata["Content-Type"] = content_type + parts = [] + part_number = 1 + futures = [] + + with AvailableThreadPoolExecutor(max_workers=self.max_threads) as executor: + executor.wait_for_available_worker() + if not data: + logging.info(f"No data provided. Uploading a single empty part for {name}.") + # Upload an empty part + result = self.upload_part(name, part_number, b"", upload_id) + parts.append(result) + else: + # Ensure 'data' is an iterable of bytes objects + if isinstance(data, bytes): + data_iter = [data] # Wrap bytes object in a list to make it iterable + elif hasattr(data, "read"): + # If 'data' is a file-like object, read it in chunks + data_iter = iter(lambda: data.read(self.buffer_size), b"") + elif hasattr(data, "__iter__"): + data_iter = data # Assume it's already an iterable of bytes + else: + raise TypeError("data must be bytes, a file-like object, or an iterable over bytes") + + for part_data in self.iterator_buffer(data_iter, self.buffer_size): + if part_data: + futures.append( + executor.submit( + self.upload_part, + name, + part_number, + part_data, + upload_id, + ) + ) + part_number += 1 + + for future in futures: + result = future.result() + parts.append(result) + + if not parts: + logging.warning(f"No parts uploaded for {name}. Aborting upload.") + self.client._abort_multipart_upload(self.bucket, name, upload_id) + raise ValueError("No data retrieved") + + # Complete multipart upload + self.client._complete_multipart_upload( + bucket_name=self.bucket, + object_name=name, + upload_id=upload_id, + parts=parts, + ) - upload_id = self.client._new_multipart_upload(self.bucket, name, metadata) + logging.info(f"Successfully uploaded {name} in {len(parts)} parts.") + return self.get_url(name) - parts = {} - part_number = 1 - futures = [] + except S3Error as e: + logging.error(f"Failed to upload {name}: {e}") + if "upload_id" in locals(): + self.client._abort_multipart_upload(self.bucket, name, upload_id) + raise - with AvailableThreadPoolExecutor(max_workers=self.max_threads) as executor: - executor.wait_for_available_worker() - for buf in self.iterator_buffer(data, self.buffer_size): - if len(buf) == 0: - break - future = executor.submit(self.upload_part, copy.copy(part_number), buf, metadata, name, upload_id) - futures.append((future, part_number)) - part_number += 1 + def upload_part(self, name, part_number, data, upload_id): + logging.debug(f"Uploading part {part_number} of {name}, {len(data)} bytes") - try: - for future, part_number in futures: - etag, size = future.result() - parts[part_number] = UploadPart(self.bucket, name, upload_id, part_number, etag, None, size) - except Exception as e: - logging.error(f"Error uploading parts: {str(e)}") - self.client._remove_incomplete_upload(self.bucket, name, upload_id) - raise + # 'data' is expected to be a bytes object + if not isinstance(data, bytes): + raise TypeError(f"'data' must be bytes, got {type(data)}") - # Completing upload - try: - logging.info(parts) - try: - self.client._complete_multipart_upload(self.bucket, name, upload_id, parts) - except Exception: - time.sleep(5) - self.client._complete_multipart_upload(self.bucket, name, upload_id, parts) - - except Exception as e: - logging.error(f"Error completing multipart upload: {str(e)}") - self.client._remove_incomplete_upload(self.bucket, name, upload_id) - raise + response = self.client._upload_part( + bucket_name=self.bucket, + object_name=name, + data=data, + headers=None, + upload_id=upload_id, + part_number=part_number, + ) + etag = response.replace('"', "") # Remove any quotes from the ETag - logging.info("Put to {}".format(url)) - return url + return Part(part_number=part_number, etag=etag) def read(self, name): try: @@ -175,8 +231,11 @@ def read(self, name): if response.status == 200: return response.data logging.error("Could not read object {}, returned with status: {}".format(name, response.status)) - except NoSuchKey: - raise KeyError() + except S3Error as err: + if err.code == "NoSuchKey": + raise KeyError() + else: + raise def delete(self, name): if not self.query(name): @@ -189,15 +248,19 @@ def query(self, name): try: self.client.stat_object(self.bucket, name) return True - except NoSuchKey: - return False + except S3Error as err: + if err.code == "NoSuchKey": + return def stat(self, name): try: obj = self.client.stat_object(self.bucket, name) return obj.content_type, obj.size - except NoSuchKey: - raise KeyError() + except S3Error as err: + if err.code == "NoSuchKey": + raise KeyError() + else: + raise def list(self): resources = [] From 3583150397f69cfd5457d93b482750ad784157ce Mon Sep 17 00:00:00 2001 From: sametd Date: Tue, 24 Sep 2024 19:49:19 +0000 Subject: [PATCH 09/13] minio client endpoint is arranged and should_set_policy introduced to support seaweedfs --- polytope_server/common/staging/s3_staging.py | 23 ++++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/polytope_server/common/staging/s3_staging.py b/polytope_server/common/staging/s3_staging.py index 95c7f26..933002e 100644 --- a/polytope_server/common/staging/s3_staging.py +++ b/polytope_server/common/staging/s3_staging.py @@ -87,42 +87,47 @@ def __init__(self, config): self.port = config.get("port", "8000") self.max_threads = config.get("max_threads", 20) self.buffer_size = config.get("buffer_size", 20 * 1024 * 1024) - endpoint = "{}:{}".format(self.host, self.port) access_key = config.get("access_key", "") secret_key = config.get("secret_key", "") self.bucket = config.get("bucket", "default") secure = config.get("secure", False) self.url = config.get("url", None) - internal_url = "{}:{}".format(self.host, self.port) + self.internal_url = f"http://{self.host}:{self.port}" self.use_ssl = config.get("use_ssl", False) + self.should_set_policy = config.get("should_set_policy", False) + + #remove the protocol from the internal_url, both http and https can be removed + endpoint = self.internal_url.split("://")[-1] if access_key == "" or secret_key == "": self.client = Minio( - internal_url, + endpoint, secure=secure, ) else: self.client = Minio( - internal_url, + endpoint, access_key=access_key, secret_key=secret_key, secure=secure, ) self.prefix = "https" if self.use_ssl else "http" - self.internal_url = f"http://{self.host}:{self.port}" try: self.client.make_bucket(self.bucket) - self.client.set_bucket_policy(self.bucket, self.bucket_policy()) + if self.should_set_policy: + self.client.set_bucket_policy(self.bucket, self.bucket_policy()) except S3Error as err: if err.code in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): pass else: raise - self.storage_metric_collector = S3StorageMetricCollector(endpoint, self.client, self.bucket, self.get_type()) + self.storage_metric_collector = S3StorageMetricCollector( + self.internal_url, self.client, self.bucket, self.get_type() + ) logging.info( "Opened data staging at {}:{}/{}, locatable from {}".format(self.host, self.port, self.bucket, self.url) @@ -313,14 +318,14 @@ def bucket_policy(self): }, { "Sid": "AllowListBucket", - "Effect": "Allow", + "Effect": "Deny", "Principal": "*", "Action": "s3:ListBucket", "Resource": f"arn:aws:s3:::{self.bucket}", }, { "Sid": "AllowGetBucketLocation", - "Effect": "Allow", + "Effect": "Deny", "Principal": "*", "Action": "s3:GetBucketLocation", "Resource": f"arn:aws:s3:::{self.bucket}", From 5a289fbe1eaa8d927812abcf2e3e228e7e4e1a66 Mon Sep 17 00:00:00 2001 From: sametd Date: Tue, 24 Sep 2024 20:01:22 +0000 Subject: [PATCH 10/13] black --- polytope_server/common/staging/s3_staging.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polytope_server/common/staging/s3_staging.py b/polytope_server/common/staging/s3_staging.py index 933002e..0976c4e 100644 --- a/polytope_server/common/staging/s3_staging.py +++ b/polytope_server/common/staging/s3_staging.py @@ -95,8 +95,8 @@ def __init__(self, config): self.internal_url = f"http://{self.host}:{self.port}" self.use_ssl = config.get("use_ssl", False) self.should_set_policy = config.get("should_set_policy", False) - - #remove the protocol from the internal_url, both http and https can be removed + + # remove the protocol from the internal_url, both http and https can be removed endpoint = self.internal_url.split("://")[-1] if access_key == "" or secret_key == "": From 7d20cf23affbf419d64d023516de8f3180f4fc24 Mon Sep 17 00:00:00 2001 From: jameshawkes Date: Tue, 24 Sep 2024 20:32:38 +0000 Subject: [PATCH 11/13] allows handling transforms in polytope datasource which don't match the request --- polytope_server/common/datasource/polytope.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/polytope_server/common/datasource/polytope.py b/polytope_server/common/datasource/polytope.py index 66cb197..48c1597 100644 --- a/polytope_server/common/datasource/polytope.py +++ b/polytope_server/common/datasource/polytope.py @@ -70,6 +70,18 @@ def retrieve(self, request): polytope_mars_config = copy.deepcopy(self.config) polytope_mars_config["options"]["pre_path"] = pre_path + transforms = [] + for transform in polytope_mars_config["options"]["axis_config"]: + if transform["axis_name"] in r.keys(): + logging.info("Found axis {} in request".format(transform["axis_name"])) + transforms.append(transform) + if transform["axis_name"] in ("latitude", "longitude", "values"): + transforms.append(transform) + + + polytope_mars_config["options"]["axis_config"] = transforms + + polytope_mars = PolytopeMars( polytope_mars_config, From b2424b347955910cfd925016e7042525c0da25d7 Mon Sep 17 00:00:00 2001 From: jameshawkes Date: Tue, 24 Sep 2024 20:35:57 +0000 Subject: [PATCH 12/13] black --- polytope_server/common/datasource/polytope.py | 1 - 1 file changed, 1 deletion(-) diff --git a/polytope_server/common/datasource/polytope.py b/polytope_server/common/datasource/polytope.py index b1eb872..9808d8e 100644 --- a/polytope_server/common/datasource/polytope.py +++ b/polytope_server/common/datasource/polytope.py @@ -78,7 +78,6 @@ def retrieve(self, request): if transform["axis_name"] in ("latitude", "longitude", "values"): transforms.append(transform) - polytope_mars_config["options"]["axis_config"] = transforms polytope_mars = PolytopeMars( From cea8d9d14e266f79e2d525263a73ad3bd30ff953 Mon Sep 17 00:00:00 2001 From: jameshawkes Date: Tue, 24 Sep 2024 20:50:58 +0000 Subject: [PATCH 13/13] bump version --- polytope_server/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polytope_server/version.py b/polytope_server/version.py index aefc76a..039aa6d 100644 --- a/polytope_server/version.py +++ b/polytope_server/version.py @@ -20,4 +20,4 @@ # Single-source Polytope version number -__version__ = "0.8.0" +__version__ = "0.8.1"