Skip to content

Commit

Permalink
Merge pull request #317 from nasa/issue_302
Browse files Browse the repository at this point in the history
Issue 302:  Temporal start date when revision_date query
  • Loading branch information
hhlee445 authored Nov 27, 2022
2 parents ac5d334 + 6f7d467 commit 37b5d19
Show file tree
Hide file tree
Showing 31 changed files with 90 additions and 192 deletions.
5 changes: 0 additions & 5 deletions cluster_provisioning/modules/common/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2395,7 +2395,6 @@ resource "aws_lambda_permission" "event-misfire_lambda" {
# "JOB_QUEUE": "${var.project}-job_worker-hls_data_download",
# "JOB_TYPE": local.hls_download_job_type,
# "JOB_RELEASE": var.pcm_branch,
# "ISL_BUCKET_NAME": local.isl_bucket,
# "ENDPOINT": "OPS",
# "SMOKE_RUN": "true",
# "DRY_RUN": "true"
Expand Down Expand Up @@ -2451,7 +2450,6 @@ resource "aws_lambda_function" "hlsl30_query_timer" {
"JOB_QUEUE": "opera-job_worker-hls_data_query",
"JOB_TYPE": local.hlsl30_query_job_type,
"JOB_RELEASE": var.pcm_branch,
"ISL_BUCKET_NAME": local.isl_bucket,
"MINUTES": var.hlsl30_query_timer_trigger_frequency,
"PROVIDER": var.hls_provider,
"ENDPOINT": "OPS",
Expand Down Expand Up @@ -2487,7 +2485,6 @@ resource "aws_lambda_function" "hlss30_query_timer" {
"JOB_QUEUE": "opera-job_worker-hls_data_query",
"JOB_TYPE": local.hlss30_query_job_type,
"JOB_RELEASE": var.pcm_branch,
"ISL_BUCKET_NAME": local.isl_bucket,
"PROVIDER": var.hls_provider,
"ENDPOINT": "OPS",
"MINUTES": var.hlss30_query_timer_trigger_frequency,
Expand Down Expand Up @@ -2569,7 +2566,6 @@ resource "aws_lambda_permission" "hlss30_query_timer" {
# "JOB_QUEUE": "${var.project}-job_worker-slc_data_download",
# "JOB_TYPE": local.slc_download_job_type,
# "JOB_RELEASE": var.pcm_branch,
# "ISL_BUCKET_NAME": local.isl_bucket,
# "ENDPOINT": "OPS",
# "SMOKE_RUN": "true",
# "DRY_RUN": "true"
Expand Down Expand Up @@ -2625,7 +2621,6 @@ resource "aws_lambda_function" "slcs1a_query_timer" {
"JOB_QUEUE": "opera-job_worker-slc_data_query",
"JOB_TYPE": local.slcs1a_query_job_type,
"JOB_RELEASE": var.pcm_branch,
"ISL_BUCKET_NAME": local.isl_bucket,
"MINUTES": var.slcs1a_query_timer_trigger_frequency,
"PROVIDER": var.slc_provider,
"ENDPOINT": "OPS",
Expand Down
99 changes: 50 additions & 49 deletions data_subscriber/daac_data_subscriber.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ def rebuild_auth(self, prepared_request, response):
headers = prepared_request.headers
url = prepared_request.url

if 'Authorization' in headers:
if "Authorization" in headers:
original_parsed = requests.utils.urlparse(response.request.url)
redirect_parsed = requests.utils.urlparse(url)
if (original_parsed.hostname != redirect_parsed.hostname) and \
redirect_parsed.hostname != self.auth_host and \
original_parsed.hostname != self.auth_host:
del headers['Authorization']
del headers["Authorization"]


async def run(argv: list[str]):
Expand All @@ -75,8 +75,8 @@ async def run(argv: list[str]):
raise v

settings = SettingsConf().cfg
edl = settings['DAAC_ENVIRONMENTS'][args.endpoint]['EARTHDATA_LOGIN']
cmr = settings['DAAC_ENVIRONMENTS'][args.endpoint]['BASE_URL']
edl = settings["DAAC_ENVIRONMENTS"][args.endpoint]["EARTHDATA_LOGIN"]
cmr = settings["DAAC_ENVIRONMENTS"][args.endpoint]["BASE_URL"]
netloc = urlparse(f"https://{edl}").netloc
provider_esconn_map = {"LPCLOUD": get_hls_catalog_connection(logging.getLogger(__name__)),
"ASF": get_slc_catalog_connection(logging.getLogger(__name__))}
Expand All @@ -87,7 +87,7 @@ async def run(argv: list[str]):
update_url_index(es_conn, f.readlines(), None, None, None)
exit(0)

loglevel = 'DEBUG' if args.verbose else 'INFO'
loglevel = "DEBUG" if args.verbose else "INFO"
logging.basicConfig(level=loglevel)
logging.info("Log level set to " + loglevel)

Expand Down Expand Up @@ -146,7 +146,7 @@ def create_parser():
provider = {"positionals": ["-p", "--provider"],
"kwargs": {"dest": "provider",
"choices": ["LPCLOUD", "ASF"],
"default": 'LPCLOUD',
"default": "LPCLOUD",
"help": "Specify a provider for collection search. Default is LPCLOUD."}}

collection = {"positionals": ["-c", "--collection-shortname"],
Expand Down Expand Up @@ -183,11 +183,6 @@ def create_parser():
"script as a cron, this value should be equal to or greater than how often your "
"cron runs (default: 60 minutes)."}}

isl_bucket = {"positionals": ["-i", "--isl-bucket"],
"kwargs": {"dest": "isl_bucket",
"required": True,
"help": "The incoming storage location s3 bucket where data products will be downloaded."}}

transfer_protocol = {"positionals": ["-x", "--transfer-protocol"],
"kwargs": {"dest": "transfer_protocol",
"choices": ["s3", "https"],
Expand Down Expand Up @@ -233,6 +228,12 @@ def create_parser():
"action": "store_true",
"help": "Toggle for using temporal range rather than revision date (range) in the query."}}

temporal_start_date = {"positionals": ["--temporal-start-date"],
"kwargs": {"dest": "temporal_start_date",
"default": None,
"help": "The ISO date time after which data should be retrieved. Only valid when --use-temporal is false/omitted. For Example, "
"--temporal-start-date 2021-01-14T00:00:00Z"}}

native_id = {"positionals": ["--native-id"],
"kwargs": {"dest": "native_id",
"help": "The native ID of a single product granule to be queried, overriding other query arguments if present."}}
Expand All @@ -250,20 +251,20 @@ def create_parser():
"help": "The native ID of a single product granule to be queried, overriding other query arguments if present."}}

full_parser = subparsers.add_parser("full")
full_parser_arg_list = [verbose, endpoint, provider, collection, start_date, end_date, bbox, minutes, isl_bucket,
full_parser_arg_list = [verbose, endpoint, provider, collection, start_date, end_date, bbox, minutes,
transfer_protocol, dry_run, smoke_run, no_schedule_download, release_version, job_queue,
chunk_size, batch_ids, use_temporal, native_id]
chunk_size, batch_ids, use_temporal, temporal_start_date, native_id]
_add_arguments(full_parser, full_parser_arg_list)

query_parser = subparsers.add_parser("query")
query_parser_arg_list = [verbose, endpoint, provider, collection, start_date, end_date, bbox, minutes, isl_bucket,
query_parser_arg_list = [verbose, endpoint, provider, collection, start_date, end_date, bbox, minutes,
dry_run, smoke_run, no_schedule_download, release_version, job_queue, chunk_size,
native_id, use_temporal]
native_id, use_temporal, temporal_start_date]
_add_arguments(query_parser, query_parser_arg_list)

download_parser = subparsers.add_parser("download")
download_parser_arg_list = [verbose, file, endpoint, provider, isl_bucket, transfer_protocol, dry_run, smoke_run,
batch_ids, start_date, end_date, use_temporal]
download_parser_arg_list = [verbose, file, endpoint, provider, transfer_protocol, dry_run, smoke_run,
batch_ids, start_date, end_date, use_temporal, temporal_start_date]
_add_arguments(download_parser, download_parser_arg_list)

return parser
Expand All @@ -289,7 +290,7 @@ def validate(args):


def _validate_bounds(bbox):
bounds = bbox.split(',')
bounds = bbox.split(",")
value_error = ValueError(
f"Error parsing bounds: {bbox}. Format is <W Longitude>,<S Latitude>,<E Longitude>,<N Latitude> without spaces")

Expand All @@ -303,9 +304,9 @@ def _validate_bounds(bbox):
raise value_error


def _validate_date(date, prefix='start'):
def _validate_date(date, prefix="start"):
try:
datetime.strptime(date, '%Y-%m-%dT%H:%M:%SZ')
datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
raise ValueError(
f"Error parsing {prefix} date: {date}. Format must be like 2021-01-14T00:00:00Z")
Expand Down Expand Up @@ -364,10 +365,10 @@ def _get_tokens(edl: str, username: str, password: str) -> list[dict]:
def _revoke_expired_tokens(token_list: list[dict], edl: str, username: str, password: str) -> None:
for token_dict in token_list:
now = datetime.utcnow().date()
expiration_date = datetime.strptime(token_dict['expiration_date'], "%m/%d/%Y").date()
expiration_date = datetime.strptime(token_dict["expiration_date"], "%m/%d/%Y").date()

if expiration_date <= now:
_delete_token(edl, username, password, token_dict['access_token'])
_delete_token(edl, username, password, token_dict["access_token"])
del token_dict


Expand All @@ -380,7 +381,7 @@ def _create_token(edl: str, username: str, password: str) -> str:
response_content = create_response.json()

if "error" in response_content.keys():
raise Exception(response_content['error'])
raise Exception(response_content["error"])

token = response_content["access_token"]

Expand All @@ -391,7 +392,7 @@ def _delete_token(edl: str, username: str, password: str, token: str) -> None:
url = f"https://{edl}/api/users/revoke_token"
try:
resp = requests.post(url, auth=HTTPBasicAuth(username, password),
params={'token': token})
params={"token": token})
resp.raise_for_status()
except Exception as e:
logging.warning(f"Error deleting the token: {e}")
Expand Down Expand Up @@ -472,11 +473,6 @@ async def run_query(args, token, es_conn, cmr, job_id, settings):
release_version=args.release_version,
provider=args.provider,
params=[
{
"name": "isl_bucket_name",
"value": f"--isl-bucket={args.isl_bucket}",
"from": "value"
},
{
"name": "batch_ids",
"value": "--batch-ids " + " ".join(chunk_batch_ids) if chunk_batch_ids else "",
Expand Down Expand Up @@ -560,43 +556,48 @@ def query_cmr(args, token, cmr, settings, timerange: DateTimeRange, now: datetim

request_url = f"https://{cmr}/search/granules.umm_json"
params = {
'page_size': PAGE_SIZE,
'sort_key': "-start_date",
'provider': args.provider,
'ShortName': args.collection,
'token': token,
'bounding_box': args.bbox,
"page_size": PAGE_SIZE,
"sort_key": "-start_date",
"provider": args.provider,
"ShortName": args.collection,
"token": token,
"bounding_box": args.bbox,
}

if args.native_id:
params['native-id'] = args.native_id
params["native-id"] = args.native_id

# derive and apply param "temporal"
now_date = now.strftime("%Y-%m-%dT%H:%M:%SZ")
temporal_range = _get_temporal_range(timerange.start_date, timerange.end_date, now_date)
logging.info("Temporal Range: " + temporal_range)

if args.use_temporal:
params['temporal'] = temporal_range
params["temporal"] = temporal_range
else:
params["revision_date"] = temporal_range

# if a temporal start-date is provided, set temporal
if args.temporal_start_date:
logging.info(f"{args.temporal_start_date=}")
params["temporal"] = dateutil.parser.isoparse(args.temporal_start_date).strftime("%Y-%m-%dT%H:%M:%SZ")

logging.info(f"{request_url=} {params=}")
product_granules, search_after = _request_search(args, request_url, params)

while search_after:
granules, search_after = _request_search(args, request_url, params, search_after=search_after)
product_granules.extend(granules)

if args.collection in settings['SHORTNAME_FILTERS']:
if args.collection in settings["SHORTNAME_FILTERS"]:
product_granules = [granule
for granule in product_granules
if _match_identifier(settings, args, granule)]

logging.info(f"Found {str(len(product_granules))} total granules")

for granule in product_granules:
granule['filtered_urls'] = _filter_granules(granule, args)
granule["filtered_urls"] = _filter_granules(granule, args)

return product_granules

Expand All @@ -616,17 +617,17 @@ def _get_temporal_range(start: str, end: str, now: str):


def _request_search(args, request_url, params, search_after=None):
response = requests.get(request_url, params=params, headers={'CMR-Search-After': search_after}) \
response = requests.get(request_url, params=params, headers={"CMR-Search-After": search_after}) \
if search_after else requests.get(request_url, params=params)

results = response.json()
items = results.get('items')
next_search_after = response.headers.get('CMR-Search-After')
items = results.get("items")
next_search_after = response.headers.get("CMR-Search-After")

collection_identifier_map = {"HLSL30": "LANDSAT_PRODUCT_ID",
"HLSS30": "PRODUCT_URI"}

if items and 'umm' in items[0]:
if items and "umm" in items[0]:
return [{"granule_id": item.get("umm").get("GranuleUR"),
"provider": item.get("meta").get("provider-id"),
"production_datetime": item.get("umm").get("DataGranule").get("ProductionDateTime"),
Expand Down Expand Up @@ -668,8 +669,8 @@ def _filter_granules(granule, args):


def _match_identifier(settings, args, granule) -> bool:
for filter in settings['SHORTNAME_FILTERS'][args.collection]:
if re.match(filter, granule['identifier']):
for filter in settings["SHORTNAME_FILTERS"][args.collection]:
if re.match(filter, granule["identifier"]):
return True

return False
Expand Down Expand Up @@ -861,7 +862,7 @@ def download_from_asf(
logging.info("downloading associated orbit file")
dataset_dir = extract_one_to_one(product, settings_cfg, working_dir=Path.cwd())
stage_orbit_file_args = stage_orbit_file.get_parser().parse_args([
f'--output-directory={str(dataset_dir)}',
f"--output-directory={str(dataset_dir)}",
str(product_filepath)
])
stage_orbit_file.main(stage_orbit_file_args)
Expand Down Expand Up @@ -1088,7 +1089,7 @@ def _https_transfer(url, bucket_name, token, staging_area=""):
upload_end_time = datetime.utcnow()
upload_duration = upload_end_time - upload_start_time
upload_stats = {"file_name": file_name,
"file_size (in bytes)": r.headers.get('Content-Length'),
"file_size (in bytes)": r.headers.get("Content-Length"),
"upload_duration (in seconds)": upload_duration.total_seconds(),
"upload_start_time": _convert_datetime(upload_start_time),
"upload_end_time": _convert_datetime(upload_end_time)}
Expand Down Expand Up @@ -1147,7 +1148,7 @@ def _s3_download(url, s3, tmp_dir, staging_area=""):
file_name = PurePath(url).name
target_key = str(Path(staging_area, file_name))

source = url[len("s3://"):].partition('/')
source = url[len("s3://"):].partition("/")
source_bucket = source[0]
source_key = source[2]

Expand All @@ -1167,5 +1168,5 @@ def _s3_upload(url, bucket_name, tmp_dir, staging_area=""):
return target_key


if __name__ == '__main__':
if __name__ == "__main__":
asyncio.run(run(sys.argv))
5 changes: 0 additions & 5 deletions docker/hysds-io.json.hls_download
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@
"type": "text",
"default": "--endpoint=OPS"
},
{
"name": "isl_bucket_name",
"from": "submitter",
"placeholder": "e.g. --isl-bucket=<isl_bucket>"
},
{
"name": "batch_ids",
"from": "submitter",
Expand Down
11 changes: 6 additions & 5 deletions docker/hysds-io.json.hlsl30_query
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@
"type": "text",
"default": "--job-queue=opera-job_worker-hls_data_download"
},
{
"name": "isl_bucket_name",
"from": "submitter",
"placeholder": "e.g. --isl-bucket=<isl_bucket>"
},
{
"name": "chunk_size",
"from": "submitter",
Expand All @@ -74,6 +69,12 @@
"from": "submitter",
"placeholder": "e.g. --use-temporal",
"optional": true
},
{
"name": "temporal_start_datetime",
"from": "submitter",
"placeholder": "e.g. --temporal-start-date=1970-01-01T00:00:00Z",
"optional": true
}
]
}
5 changes: 0 additions & 5 deletions docker/hysds-io.json.hlsl30_query_minutes
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@
"type": "text",
"default": "--job-queue=opera-job_worker-hls_data_download"
},
{
"name": "isl_bucket_name",
"from": "submitter",
"placeholder": "e.g. --isl-bucket=<isl_bucket>"
},
{
"name": "chunk_size",
"from": "submitter",
Expand Down
5 changes: 0 additions & 5 deletions docker/hysds-io.json.hlsl30_query_native_id
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@
"type": "text",
"default": "--job-queue=opera-job_worker-hls_data_download"
},
{
"name": "isl_bucket_name",
"from": "submitter",
"placeholder": "e.g. --isl-bucket=<isl_bucket>"
},
{
"name": "chunk_size",
"from": "submitter",
Expand Down
Loading

0 comments on commit 37b5d19

Please sign in to comment.