Skip to content

Commit

Permalink
Merge pull request #25 from ecmwf/develop
Browse files Browse the repository at this point in the history
Release 0.8.1
  • Loading branch information
jameshawkes authored Sep 24, 2024
2 parents 1a881ec + cea8d9d commit 7823b55
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 102 deletions.
23 changes: 16 additions & 7 deletions polytope_server/common/datasource/polytope.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
# does it submit to any jurisdiction.
#

import copy
import json
import logging
import os
import copy

import yaml
from polytope.utility.exceptions import PolytopeError
Expand Down Expand Up @@ -61,7 +61,7 @@ def retrieve(self, request):

# Set the "pre-path" for this request
pre_path = {}
for k,v in r.items():
for k, v in r.items():
if k in self.req_single_keys:
if isinstance(v, list):
v = v[0]
Expand All @@ -70,14 +70,23 @@ def retrieve(self, request):
polytope_mars_config = copy.deepcopy(self.config)
polytope_mars_config["options"]["pre_path"] = pre_path

transforms = []
for transform in polytope_mars_config["options"]["axis_config"]:
if transform["axis_name"] in r.keys():
logging.info("Found axis {} in request".format(transform["axis_name"]))
transforms.append(transform)
if transform["axis_name"] in ("latitude", "longitude", "values"):
transforms.append(transform)

polytope_mars_config["options"]["axis_config"] = transforms

polytope_mars = PolytopeMars(
polytope_mars_config,
log_context= {
"user": request.user.realm + ':' + request.user.username,
log_context={
"user": request.user.realm + ":" + request.user.username,
"id": request.id,
})

},
)

try:
self.output = polytope_mars.extract(r)
Expand Down Expand Up @@ -111,7 +120,7 @@ def match(self, request):
raise Exception("got {} : {}, but expected one of {}".format(k, r[k], v))

# Check that there is only one value if required
for k, v in r.items():
for k, v in r.items():
if k in self.req_single_keys:
v = [v] if isinstance(v, str) else v
if len(v) > 1:
Expand Down
50 changes: 31 additions & 19 deletions polytope_server/common/staging/s3_boto3_staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import json
import logging
import random
import time
from concurrent.futures import Future, ThreadPoolExecutor

Expand Down Expand Up @@ -59,7 +58,6 @@ def submit(self, fn, /, *args, **kwargs):

class S3Staging_boto3(staging.Staging):
def __init__(self, config):

self.bucket = config.get("bucket", "default")
self.url = config.get("url", None)

Expand All @@ -76,17 +74,9 @@ def __init__(self, config):
for name in ["boto", "urllib3", "s3transfer", "boto3", "botocore", "nose"]:
logging.getLogger(name).setLevel(logging.WARNING)

prefix = "https" if self.use_ssl else "http"

if config.get("random_host", False):
self.host = config.get("random_host", {}).get("host", self.host)
index = random.randint(0, config.get("random_host", {}).get("max", 1) - 1)
# replace %%ID%% in the host with the index
self.host = self.host.replace("%%ID%%", str(index))
self.url = self.url + "/" + str(index)
logging.info(f"Using random host: {self.host}")
self.prefix = "https" if self.use_ssl else "http"

self._internal_url = f"{prefix}://{self.host}:{self.port}"
self._internal_url = f"http://{self.host}:{self.port}"

# Setup Boto3 client
self.s3_client = boto3.client(
Expand Down Expand Up @@ -125,7 +115,10 @@ def create(self, name, data, content_type):
# else using content-disposition header
try:
multipart_upload = self.s3_client.create_multipart_upload(
Bucket=self.bucket, Key=name, ContentType=content_type, ContentDisposition="attachment"
Bucket=self.bucket,
Key=name,
ContentType=content_type,
ContentDisposition="attachment",
)
upload_id = multipart_upload["UploadId"]

Expand All @@ -140,7 +133,15 @@ def create(self, name, data, content_type):
else:
for part_data in self.iterator_buffer(data, self.buffer_size):
if part_data:
futures.append(executor.submit(self.upload_part, name, part_number, part_data, upload_id))
futures.append(
executor.submit(
self.upload_part,
name,
part_number,
part_data,
upload_id,
)
)
part_number += 1

for future in futures:
Expand All @@ -153,7 +154,10 @@ def create(self, name, data, content_type):
raise ValueError("No data retrieved")

self.s3_client.complete_multipart_upload(
Bucket=self.bucket, Key=name, UploadId=upload_id, MultipartUpload={"Parts": parts}
Bucket=self.bucket,
Key=name,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)

logging.info(f"Successfully uploaded {name} in {len(parts)} parts.")
Expand All @@ -168,7 +172,11 @@ def create(self, name, data, content_type):
def upload_part(self, name, part_number, data, upload_id):
logging.debug(f"Uploading part {part_number} of {name}, {len(data)} bytes")
response = self.s3_client.upload_part(
Bucket=self.bucket, Key=name, PartNumber=part_number, UploadId=upload_id, Body=data
Bucket=self.bucket,
Key=name,
PartNumber=part_number,
UploadId=upload_id,
Body=data,
)
return {"PartNumber": part_number, "ETag": response["ETag"]}

Expand All @@ -190,14 +198,14 @@ def set_bucket_policy(self):
},
{
"Sid": "AllowListBucket",
"Effect": "Allow",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:ListBucket",
"Resource": f"arn:aws:s3:::{self.bucket}",
},
{
"Sid": "AllowGetBucketLocation",
"Effect": "Allow",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:GetBucketLocation",
"Resource": f"arn:aws:s3:::{self.bucket}",
Expand Down Expand Up @@ -239,7 +247,11 @@ def stat(self, name):

def get_url(self, name):
if self.url:
return f"{self.url}/{self.bucket}/{name}"
if self.url.startswith("http"):
# This covers both http and https
return f"{self.url}/{self.bucket}/{name}"
else:
return f"{self.prefix}://{self.url}/{self.bucket}/{name}"
return None

def get_internal_url(self, name):
Expand Down
Loading

0 comments on commit 7823b55

Please sign in to comment.