From 4386dda7f097f1b0a67f48f2f40aac5868f0a2a7 Mon Sep 17 00:00:00 2001 From: Maaike Date: Wed, 10 Jan 2024 10:49:45 +0100 Subject: [PATCH] fix bufr2bufr --- wis2box_api/plugins/process/bufr2bufr.py | 10 +- wis2box_api/wis2box/bufr4.py | 184 +++++++++++++++-------- wis2box_api/wis2box/handle.py | 7 +- wis2box_api/wis2box/station.py | 5 +- 4 files changed, 138 insertions(+), 68 deletions(-) diff --git a/wis2box_api/plugins/process/bufr2bufr.py b/wis2box_api/plugins/process/bufr2bufr.py index 329bd8a..1d2359b 100644 --- a/wis2box_api/plugins/process/bufr2bufr.py +++ b/wis2box_api/plugins/process/bufr2bufr.py @@ -130,10 +130,14 @@ def execute(self, data): input_bytes = base64.b64decode(encoded_data_bytes) obs_bufr = ObservationDataBUFR(input_bytes) LOGGER.info(f'Size of input_bytes: {len(input_bytes)}') - output_items = obs_bufr.process_data() - for output_item in output_items: - LOGGER.info(f'Output item: {output_item}') except Exception as err: return handle_error(f'bufr2bufr raised Exception: {err}') # noqa + try: + output_items = obs_bufr.process_data() + except Exception as err: + msg = f'ObservationDataBUFR.process_data raised Exception: {err}' + LOGGER.error(msg) + return handle_error(msg) + return data_handler.process_items(output_items) diff --git a/wis2box_api/wis2box/bufr4.py b/wis2box_api/wis2box/bufr4.py index 9a95faf..12e1cfa 100644 --- a/wis2box_api/wis2box/bufr4.py +++ b/wis2box_api/wis2box/bufr4.py @@ -21,7 +21,6 @@ from datetime import datetime import logging -from pathlib import Path import tempfile from bufr2geojson import BUFRParser @@ -52,6 +51,14 @@ 'typicalSecond' ] +HEADERS = ["edition", "masterTableNumber", "bufrHeaderCentre", + "bufrHeaderSubCentre", "updateSequenceNumber", "dataCategory", + "internationalDataSubCategory", "dataSubCategory", + "masterTablesVersionNumber", "localTablesVersionNumber", + "typicalYear", "typicalMonth", "typicalDay", "typicalHour", + "typicalMinute", "typicalSecond", + "numberOfSubsets", "observedData", "compressedData"] + class ObservationDataBUFR(): """Oservation data in bufr format""" @@ -67,8 +74,9 @@ def __init__(self, input_bytes: bytes) -> None: self.input_bytes = input_bytes self.stations = Stations() + self.output_items = [] - # transform return an array of output data + # return an array of output data def process_data( self ) -> list: @@ -78,15 +86,13 @@ def process_data( :returns: `list` of output data """ - LOGGER.debug('Procesing BUFR data') + LOGGER.debug('Proccessing BUFR data') # FIXME: figure out how to pass a bytestring to ecCodes BUFR reader tmp = tempfile.NamedTemporaryFile() with open(tmp.name, 'wb') as f: f.write(self.input_bytes) - output_data = [] - # workflow # check for multiple messages # split messages and process @@ -96,17 +102,16 @@ def process_data( data = codes_bufr_new_from_file(fh) if data is not None: try: - output = self.transform_message(data) - output_data.append(output) + self.transform_message(data) except Exception as err: msg = f'Error in transform_message: {err}' LOGGER.error(msg) - output = { + self.output_items.append({ 'errors': [msg], - } - output_data.append(output) + 'warnings': [] + }) codes_release(data) - return output_data + return self.output_items def transform_message(self, bufr_in: int) -> None: """ @@ -121,33 +126,75 @@ def transform_message(self, bufr_in: int) -> None: try: codes_set(bufr_in, 'unpack', True) except Exception as err: - LOGGER.error(f'Error unpacking message: {err}') - raise err - - num_subsets = codes_get(bufr_in, 'numberOfSubsets') - LOGGER.debug(f'Found {num_subsets} subsets') - + msg = f'Error unpacking message: {err}' + LOGGER.error(msg) + self.output_items.append({ + 'errors': [msg], + 'warnings': [] + }) + return + + # get descriptors present in the file + descriptors = codes_get_array(bufr_in, "expandedDescriptors").tolist() + + # prepare the headers for the new messages + headers = {} + for header in HEADERS: + headers[header] = codes_get(bufr_in, header) + # original to be splitted by subset, so set the number of subsets to 1 + headers['numberOfSubsets'] = 1 + # set the master table version number table_version = max( 28, codes_get(bufr_in, 'masterTablesVersionNumber') ) - - outUE = codes_get_array(bufr_in, 'unexpandedDescriptors').tolist() - if 301150 not in outUE: - outUE.insert(0, 301150) + headers['masterTablesVersionNumber'] = table_version + # set the unexpanded descriptors + out_ue = codes_get_array(bufr_in, 'unexpandedDescriptors').tolist() + if 301150 not in out_ue: + out_ue.insert(0, 301150) + headers['unexpandedDescriptors'] = out_ue + + # loop over the subsets, create a new message for each + num_subsets = codes_get(bufr_in, 'numberOfSubsets') + LOGGER.debug(f'Found {num_subsets} subsets') for i in range(num_subsets): idx = i + 1 LOGGER.debug(f'Processing subset {idx}') - LOGGER.debug('Copying template BUFR') - subset_out = codes_clone(TEMPLATE) - codes_set(subset_out, 'masterTablesVersionNumber', table_version) - codes_set_array(subset_out, 'unexpandedDescriptors', outUE) - LOGGER.debug('Extracting subset') codes_set(bufr_in, 'extractSubset', idx) codes_set(bufr_in, 'doExtractSubsets', 1) + # copy the replication factors + if 31000 in descriptors: + short_replication_factors = codes_get_array(bufr_in, "shortDelayedDescriptorReplicationFactor").tolist() # noqa + if 31001 in descriptors: + replication_factors = codes_get_array(bufr_in, "delayedDescriptorReplicationFactor").tolist() # noqa + if 31002 in descriptors: + extended_replication_factors = codes_get_array(bufr_in, "extendedDelayedDescriptorReplicationFactor").tolist() # noqa + + LOGGER.debug('Copying template BUFR') + subset_out = codes_clone(TEMPLATE) + + # set the replication factors, this needs to be done before + # setting the unexpanded descriptors + if 31000 in descriptors: + codes_set_array(subset_out, "inputShortDelayedDescriptorReplicationFactor", short_replication_factors) # noqa + if 31001 in descriptors: + codes_set_array(subset_out, "inputDelayedDescriptorReplicationFactor", replication_factors) # noqa + if 31002 in descriptors: + codes_set_array(subset_out, "inputExtendedDelayedDescriptorReplicationFactor", extended_replication_factors) # noqa + + # we need to copy all the headers, not just the + # unexpandedDescriptors and MT number + + for k, v in headers.items(): + if isinstance(v, list): + codes_set_array(subset_out, k, v) + else: + codes_set(subset_out, k, v) + LOGGER.debug('Cloning subset to new message') subset = codes_clone(bufr_in) self.transform_subset(subset, subset_out) @@ -169,43 +216,57 @@ def transform_subset(self, subset: int, subset_out: int) -> None: # - check for time, # - if temporal extent, use end time # - set times in header - # - write a separate BUFR + # - write a separate BUFR message for each subset + # keep track of errors and warnings + warnings = [] + errors = [] + parser = BUFRParser(raise_on_error=True) LOGGER.debug('Parsing subset') try: parser.as_geojson(subset, id='') except Exception as err: LOGGER.warning(err) - + warnings.append(err) try: temp_wsi = parser.get_wsi() temp_tsi = parser.get_tsi() except Exception as err: LOGGER.warning(err) + warnings.append(err) try: location = parser.get_location() + if location is None or None in location['coordinates']: + msg = 'Missing location in BUFR' + LOGGER.info(msg) + raise Exception(msg) except Exception as err: - LOGGER.warning(err) + msg = f'Can not parse location from subset with wsi={temp_wsi}: {err}' # noqa + LOGGER.info(msg) try: data_date = parser.get_time() - except Exception as err: - LOGGER.error(err) - raise err - - del parser + except Exception: + msg = f"Error parsing time from subset with wsi={temp_wsi}, skip this subset" # noqa + errors.append(msg) + self.output_items.append({ + 'errors': errors, + 'warnings': warnings + }) + return LOGGER.debug(f'Processing temp_wsi: {temp_wsi}, temp_tsi: {temp_tsi}') wsi = self.stations.get_valid_wsi(wsi=temp_wsi, tsi=temp_tsi) if wsi is None: msg = 'Station not in station list: ' msg += f'wsi={temp_wsi} tsi={temp_tsi}; skipping' - LOGGER.error(msg) - output = { - 'errors': [msg] - } - return output + errors.append(msg) + self.output_items.append({ + 'errors': errors, + 'warnings': warnings + }) + return try: LOGGER.debug('Copying wsi to BUFR') @@ -217,10 +278,9 @@ def transform_subset(self, subset: int, subset_out: int) -> None: codes_bufr_copy_data(subset, subset_out) if location is None or None in location['coordinates']: - msg = 'Missing coordinates in BUFR, setting from station report' # noqa - LOGGER.warning(msg) + msg = 'Missing coordinates in BUFR, using coordinates from station metadata' # noqa + warnings.append(msg) location = self.stations.get_geometry(wsi) - LOGGER.debug(f'New coordinates: {location}') long, lat, elev = location.get('coordinates') codes_set(subset_out, '#1#longitude', long) codes_set(subset_out, '#1#latitude', lat) @@ -243,26 +303,28 @@ def transform_subset(self, subset: int, subset_out: int) -> None: try: bufr4 = codes_get_message(subset_out) except Exception as err: - LOGGER.error(f'Error writing bufr4: {err}') - raise err - output = { + errors.append(err) + self.output_items.append({ + 'errors': errors, + 'warnings': warnings + }) + self.output_items.append({ 'bufr4': bufr4, '_meta': { - 'identifier': rmk, - 'wigos_station_identifier': wsi, - 'data_date': isodate, - 'geometry': location - } - } - return output + 'id': rmk, + 'properties': { + 'wigos_station_identifier': wsi, + 'datetime': isodate, + 'geometry': location + } + }, + 'errors': errors, + 'warnings': warnings + }) except Exception as err: msg = f'Error processing subset: {err}' - LOGGER.error(msg) - output = { - 'errors': [msg] - } - return output - - def get_local_filepath(self, date_): - yyyymmdd = date_.strftime('%Y-%m-%d') - return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath + errors.append(msg) + self.output_items.append({ + 'errors': errors, + 'warnings': warnings + }) diff --git a/wis2box_api/wis2box/handle.py b/wis2box_api/wis2box/handle.py index d9fe1a0..7aae370 100644 --- a/wis2box_api/wis2box/handle.py +++ b/wis2box_api/wis2box/handle.py @@ -141,11 +141,16 @@ def process_items(self, output_items: []): continue filename = f'{identifier}.{fmt}' + geometry = None + if 'geometry' in item['_meta']: + geometry = item['_meta']['geometry'] + elif 'geometry' in item['_meta']['properties']: + geometry = item['_meta']['properties']['geometry'] meta = { 'id': identifier, 'wigos_station_identifier': wsi, 'data_date': data_date.isoformat(), - 'geometry': item['_meta']['geometry'], + 'geometry': geometry, } data.append( { diff --git a/wis2box_api/wis2box/station.py b/wis2box_api/wis2box/station.py index 61cd8ec..3eb0f67 100644 --- a/wis2box_api/wis2box/station.py +++ b/wis2box_api/wis2box/station.py @@ -63,9 +63,8 @@ def get_valid_wsi(self, wsi, tsi=None) -> str: if wsi in self.stations: return wsi elif tsi is not None: - for station in self.stations: - # check if the tsi is contain in the string - if tsi in station['properties']['wigos_station_identifier']: + for station in self.stations.values(): + if station['properties']['traditional_station_identifier'] == tsi: # noqa return station['properties']['wigos_station_identifier'] return None