Skip to content

Commit

Permalink
fix bufr2bufr
Browse files Browse the repository at this point in the history
  • Loading branch information
maaikelimper committed Jan 10, 2024
1 parent a19a9f3 commit 4386dda
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 68 deletions.
10 changes: 7 additions & 3 deletions wis2box_api/plugins/process/bufr2bufr.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ def execute(self, data):
input_bytes = base64.b64decode(encoded_data_bytes)
obs_bufr = ObservationDataBUFR(input_bytes)
LOGGER.info(f'Size of input_bytes: {len(input_bytes)}')
output_items = obs_bufr.process_data()
for output_item in output_items:
LOGGER.info(f'Output item: {output_item}')
except Exception as err:
return handle_error(f'bufr2bufr raised Exception: {err}') # noqa

try:
output_items = obs_bufr.process_data()
except Exception as err:
msg = f'ObservationDataBUFR.process_data raised Exception: {err}'
LOGGER.error(msg)
return handle_error(msg)

return data_handler.process_items(output_items)
184 changes: 123 additions & 61 deletions wis2box_api/wis2box/bufr4.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from datetime import datetime
import logging
from pathlib import Path
import tempfile

from bufr2geojson import BUFRParser
Expand Down Expand Up @@ -52,6 +51,14 @@
'typicalSecond'
]

HEADERS = ["edition", "masterTableNumber", "bufrHeaderCentre",
"bufrHeaderSubCentre", "updateSequenceNumber", "dataCategory",
"internationalDataSubCategory", "dataSubCategory",
"masterTablesVersionNumber", "localTablesVersionNumber",
"typicalYear", "typicalMonth", "typicalDay", "typicalHour",
"typicalMinute", "typicalSecond",
"numberOfSubsets", "observedData", "compressedData"]


class ObservationDataBUFR():
"""Oservation data in bufr format"""
Expand All @@ -67,8 +74,9 @@ def __init__(self, input_bytes: bytes) -> None:

self.input_bytes = input_bytes
self.stations = Stations()
self.output_items = []

# transform return an array of output data
# return an array of output data
def process_data(
self
) -> list:
Expand All @@ -78,15 +86,13 @@ def process_data(
:returns: `list` of output data
"""

LOGGER.debug('Procesing BUFR data')
LOGGER.debug('Proccessing BUFR data')

# FIXME: figure out how to pass a bytestring to ecCodes BUFR reader
tmp = tempfile.NamedTemporaryFile()
with open(tmp.name, 'wb') as f:
f.write(self.input_bytes)

output_data = []

# workflow
# check for multiple messages
# split messages and process
Expand All @@ -96,17 +102,16 @@ def process_data(
data = codes_bufr_new_from_file(fh)
if data is not None:
try:
output = self.transform_message(data)
output_data.append(output)
self.transform_message(data)
except Exception as err:
msg = f'Error in transform_message: {err}'
LOGGER.error(msg)
output = {
self.output_items.append({
'errors': [msg],
}
output_data.append(output)
'warnings': []
})
codes_release(data)
return output_data
return self.output_items

def transform_message(self, bufr_in: int) -> None:
"""
Expand All @@ -121,33 +126,75 @@ def transform_message(self, bufr_in: int) -> None:
try:
codes_set(bufr_in, 'unpack', True)
except Exception as err:
LOGGER.error(f'Error unpacking message: {err}')
raise err

num_subsets = codes_get(bufr_in, 'numberOfSubsets')
LOGGER.debug(f'Found {num_subsets} subsets')

msg = f'Error unpacking message: {err}'
LOGGER.error(msg)
self.output_items.append({
'errors': [msg],
'warnings': []
})
return

# get descriptors present in the file
descriptors = codes_get_array(bufr_in, "expandedDescriptors").tolist()

# prepare the headers for the new messages
headers = {}
for header in HEADERS:
headers[header] = codes_get(bufr_in, header)
# original to be splitted by subset, so set the number of subsets to 1
headers['numberOfSubsets'] = 1
# set the master table version number
table_version = max(
28, codes_get(bufr_in, 'masterTablesVersionNumber')
)

outUE = codes_get_array(bufr_in, 'unexpandedDescriptors').tolist()
if 301150 not in outUE:
outUE.insert(0, 301150)
headers['masterTablesVersionNumber'] = table_version
# set the unexpanded descriptors
out_ue = codes_get_array(bufr_in, 'unexpandedDescriptors').tolist()
if 301150 not in out_ue:
out_ue.insert(0, 301150)
headers['unexpandedDescriptors'] = out_ue

# loop over the subsets, create a new message for each
num_subsets = codes_get(bufr_in, 'numberOfSubsets')
LOGGER.debug(f'Found {num_subsets} subsets')

for i in range(num_subsets):
idx = i + 1
LOGGER.debug(f'Processing subset {idx}')

LOGGER.debug('Copying template BUFR')
subset_out = codes_clone(TEMPLATE)
codes_set(subset_out, 'masterTablesVersionNumber', table_version)
codes_set_array(subset_out, 'unexpandedDescriptors', outUE)

LOGGER.debug('Extracting subset')
codes_set(bufr_in, 'extractSubset', idx)
codes_set(bufr_in, 'doExtractSubsets', 1)

# copy the replication factors
if 31000 in descriptors:
short_replication_factors = codes_get_array(bufr_in, "shortDelayedDescriptorReplicationFactor").tolist() # noqa
if 31001 in descriptors:
replication_factors = codes_get_array(bufr_in, "delayedDescriptorReplicationFactor").tolist() # noqa
if 31002 in descriptors:
extended_replication_factors = codes_get_array(bufr_in, "extendedDelayedDescriptorReplicationFactor").tolist() # noqa

LOGGER.debug('Copying template BUFR')
subset_out = codes_clone(TEMPLATE)

# set the replication factors, this needs to be done before
# setting the unexpanded descriptors
if 31000 in descriptors:
codes_set_array(subset_out, "inputShortDelayedDescriptorReplicationFactor", short_replication_factors) # noqa
if 31001 in descriptors:
codes_set_array(subset_out, "inputDelayedDescriptorReplicationFactor", replication_factors) # noqa
if 31002 in descriptors:
codes_set_array(subset_out, "inputExtendedDelayedDescriptorReplicationFactor", extended_replication_factors) # noqa

# we need to copy all the headers, not just the
# unexpandedDescriptors and MT number

for k, v in headers.items():
if isinstance(v, list):
codes_set_array(subset_out, k, v)
else:
codes_set(subset_out, k, v)

LOGGER.debug('Cloning subset to new message')
subset = codes_clone(bufr_in)
self.transform_subset(subset, subset_out)
Expand All @@ -169,43 +216,57 @@ def transform_subset(self, subset: int, subset_out: int) -> None:
# - check for time,
# - if temporal extent, use end time
# - set times in header
# - write a separate BUFR
# - write a separate BUFR message for each subset
# keep track of errors and warnings
warnings = []
errors = []

parser = BUFRParser(raise_on_error=True)
LOGGER.debug('Parsing subset')
try:
parser.as_geojson(subset, id='')
except Exception as err:
LOGGER.warning(err)

warnings.append(err)
try:
temp_wsi = parser.get_wsi()
temp_tsi = parser.get_tsi()
except Exception as err:
LOGGER.warning(err)
warnings.append(err)

try:
location = parser.get_location()
if location is None or None in location['coordinates']:
msg = 'Missing location in BUFR'
LOGGER.info(msg)
raise Exception(msg)
except Exception as err:
LOGGER.warning(err)
msg = f'Can not parse location from subset with wsi={temp_wsi}: {err}' # noqa
LOGGER.info(msg)

try:
data_date = parser.get_time()
except Exception as err:
LOGGER.error(err)
raise err

del parser
except Exception:
msg = f"Error parsing time from subset with wsi={temp_wsi}, skip this subset" # noqa
errors.append(msg)
self.output_items.append({
'errors': errors,
'warnings': warnings
})
return

LOGGER.debug(f'Processing temp_wsi: {temp_wsi}, temp_tsi: {temp_tsi}')
wsi = self.stations.get_valid_wsi(wsi=temp_wsi, tsi=temp_tsi)
if wsi is None:
msg = 'Station not in station list: '
msg += f'wsi={temp_wsi} tsi={temp_tsi}; skipping'
LOGGER.error(msg)
output = {
'errors': [msg]
}
return output
errors.append(msg)
self.output_items.append({
'errors': errors,
'warnings': warnings
})
return

try:
LOGGER.debug('Copying wsi to BUFR')
Expand All @@ -217,10 +278,9 @@ def transform_subset(self, subset: int, subset_out: int) -> None:
codes_bufr_copy_data(subset, subset_out)

if location is None or None in location['coordinates']:
msg = 'Missing coordinates in BUFR, setting from station report' # noqa
LOGGER.warning(msg)
msg = 'Missing coordinates in BUFR, using coordinates from station metadata' # noqa
warnings.append(msg)
location = self.stations.get_geometry(wsi)
LOGGER.debug(f'New coordinates: {location}')
long, lat, elev = location.get('coordinates')
codes_set(subset_out, '#1#longitude', long)
codes_set(subset_out, '#1#latitude', lat)
Expand All @@ -243,26 +303,28 @@ def transform_subset(self, subset: int, subset_out: int) -> None:
try:
bufr4 = codes_get_message(subset_out)
except Exception as err:
LOGGER.error(f'Error writing bufr4: {err}')
raise err
output = {
errors.append(err)
self.output_items.append({
'errors': errors,
'warnings': warnings
})
self.output_items.append({
'bufr4': bufr4,
'_meta': {
'identifier': rmk,
'wigos_station_identifier': wsi,
'data_date': isodate,
'geometry': location
}
}
return output
'id': rmk,
'properties': {
'wigos_station_identifier': wsi,
'datetime': isodate,
'geometry': location
}
},
'errors': errors,
'warnings': warnings
})
except Exception as err:
msg = f'Error processing subset: {err}'
LOGGER.error(msg)
output = {
'errors': [msg]
}
return output

def get_local_filepath(self, date_):
yyyymmdd = date_.strftime('%Y-%m-%d')
return Path(yyyymmdd) / 'wis' / self.topic_hierarchy.dirpath
errors.append(msg)
self.output_items.append({
'errors': errors,
'warnings': warnings
})
7 changes: 6 additions & 1 deletion wis2box_api/wis2box/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,16 @@ def process_items(self, output_items: []):
continue

filename = f'{identifier}.{fmt}'
geometry = None
if 'geometry' in item['_meta']:
geometry = item['_meta']['geometry']
elif 'geometry' in item['_meta']['properties']:
geometry = item['_meta']['properties']['geometry']
meta = {
'id': identifier,
'wigos_station_identifier': wsi,
'data_date': data_date.isoformat(),
'geometry': item['_meta']['geometry'],
'geometry': geometry,
}
data.append(
{
Expand Down
5 changes: 2 additions & 3 deletions wis2box_api/wis2box/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ def get_valid_wsi(self, wsi, tsi=None) -> str:
if wsi in self.stations:
return wsi
elif tsi is not None:
for station in self.stations:
# check if the tsi is contain in the string
if tsi in station['properties']['wigos_station_identifier']:
for station in self.stations.values():
if station['properties']['traditional_station_identifier'] == tsi: # noqa
return station['properties']['wigos_station_identifier']
return None

Expand Down

0 comments on commit 4386dda

Please sign in to comment.