-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.py
650 lines (585 loc) · 30.8 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
"""
Sentinel-1 EW reader.
The script provides Sentinel1Product and Sentinel1Band classes.
Sentinel1Product class describes the product and consists of two Sentinel1Band classes, landmask
information (and function to find it), location of borders (x_min and x_max).
Sentinel1Band class describes a band of Sentinel-1 product.
In addition to band data, the class includes information about noise, calibration parameters,
geolocation grid and functions to calculate these parameters.
NOTE: currently incidence angle correction can not be turned off for HH band.
@author: Dmitrii Murashkin
"""
import os
import zipfile
from xml.etree import ElementTree
from datetime import datetime
from multiprocessing.pool import ThreadPool
from io import BytesIO
from functools import partial
import numpy as np
from scipy.interpolate import RectBivariateSpline
from scipy.interpolate import griddata
from scipy.interpolate import interp1d
from PIL import Image
from .utils import scene_time
Image.MAX_IMAGE_PIXELS = None # turn off the warning about large image size
class Sentinel1Band(object):
""" Represents a Sentinel-1 band of a Sentinel-1 product.
It is initialized with paths for data, annotation, calibration and noise files as well as with the band name.
It has the following attributes:
des - band designator or short name: 'hh' or 'hv'
data_path - path to the tiff file that contains band data (or a ZipExtFile instance)
noise_path - path to the xml file that contains noise LUT (or a ZipExtFile instance)
calibration_path - path to the xml file thant containes calibration parameters LUT (or a ZipExtFile instance)
annotation_path - path to the xml file with annotation (or a ZipExtFile instance)
denoised - flag, showing if data has been denoised (to prevent double noise removal)
Image band max and min values are taken from kmeans cluster analysis of a set of images.
For more information look into 'gray_level_reduction.py'
The following methods are available:
read_data() -- should be executed first
read_noise()
read_calibration()
subtract_noise()
incidence_angle_correction(elevation_angle)
"""
def __init__(self, data_path, annotation_path, calibration_path, noise_path, band_name, scale_noise):
self.des = band_name.lower()
self.img_max = 4 if self.des == 'hh' else -15
self.img_min = -29 if self.des == 'hh' else -32
self.incidence_angle_correction_coefficient = 0.213 if self.des == 'hh' else 0.053
self.data_path = data_path
self.noise_path = noise_path
self.calibration_path = calibration_path
self.annotation_path = annotation_path
self.denoised = False
self.scale_noise = scale_noise
def read_data(self):
if type(self.data_path) is str:
data = Image.open(self.data_path)
else:
unziped_bytes = BytesIO(self.data_path.read())
data = Image.open(unziped_bytes)
self.data = np.array(data, dtype=np.float32)
self.denoised = False
self.X, self.Y = self.data.shape
self.nodata_mask = np.where(self.data == 0, True, False)
def read_noise(self, azimuth_noise=True):
""" Read noise table from the band noise file, interpolate it for the entire image.
self.noise has same shape as self.data
"""
if not hasattr(self, 'X') or not hasattr(self, 'Y'):
print('Read data first.')
return False
""" First, deal with noise in the range direction. """
noise_file = ElementTree.parse(self.noise_path).getroot()
try:
self.noise_path.seek(0)
except Exception:
pass
noise = np.array([j for i in noise_file[1] for j in i[3].text.split(' ')], dtype=np.float32)
noise_y = np.array([j for i in noise_file[1] for j in i[2].text.split(' ')], dtype=np.int16)
noise_x = np.array([i[1].text for i in noise_file[1] for j in range(int(i[2].get('count')))], dtype=np.int16)
"""
2D interpolation:
RectBivariateSpline can be used for regular grid only, this is not the option for
Sentinel-1 since noise data can contain differend number of values for each row.
interp2d introduces horisontal stripes into noise data
griddata seems to be the best solution
"""
x_new = np.arange(0, self.X, 1, dtype=np.int16)
y_new = np.arange(0, self.Y, 1, dtype=np.int16)
xx, yy = np.meshgrid(y_new, x_new)
self.noise = griddata(np.vstack((noise_y, noise_x)).transpose(), noise, (xx, yy),
method='linear', fill_value=0).astype(np.float32)
""" if noise data has incorrect units (before July 2015) than scale it:
noise_scaled = noise * k_noise * DN
where k_noise is 56065.87 (given at a ESA document),
DN is given in the band calibration file (index 6)
"""
if self.noise.max() < 1:
cf = ElementTree.parse(self.calibration_path).getroot()
DN = float(cf[2][0][6].text.split(' ')[0])
self.noise *= 56065.87 * DN
""" Second, take into account noise in the azimuth direction (if possible).
According https://qc.sentinel1.eo.esa.int/ipf/ only products taken after 13 March 2018 contain this information. """
if azimuth_noise:
try:
self._read_azimuth_noise(noise_file)
if self.scale_noise:
self.scale_noise_patches()
self.noise *= self.azimuth_noise
except Exception:
print('Failed to read azimuth noise for {0} (this is normal for Sentinel-1 scenes taken before 13 March 2018).'.format(self.noise_path))
def scale_noise_patches(self):
cla = 1
""" Calculate scale factors"""
swath_list = sorted(list(set([item['swath'] for item in self.scalloping_lut])), reverse=True)
for swath in swath_list:
patches = [item for item in self.scalloping_lut if item['swath'] == swath]
if patches[0]['sample_max'] >= self.Y - 1:
continue
""" Calculate scale factor"""
curr_data = []
next_data = []
curr_noise = []
next_noise = []
pixel_count = []
for patch in patches:
th = 300 if self.des == 'hv' else 700
nd = self.data[patch['line_min']:patch['line_max'], patch['sample_max'] + 1:patch['sample_max'] + 3]
nd[nd > th] = th
if nd.sum() == 0:
continue
next_data.append(nd.sum())
cd = self.data[patch['line_min']:patch['line_max'], patch['sample_max'] - 1:patch['sample_max'] + 1]
cd[cd > th] = th
curr_data.append(cd.sum())
curr_noise.append(self.noise[patch['line_min']:patch['line_max'], patch['sample_max'] - 1:patch['sample_max'] + 1].sum())
next_noise.append(self.noise[patch['line_min']:patch['line_max'], patch['sample_max'] + 1:patch['sample_max'] + 3].sum())
pixel_count.append(cd.size)
size = sum(pixel_count)
scale = ((sum(curr_data) / size)**2 - (sum(next_data) / size)**2 + sum(next_noise) / size) / (sum(curr_noise) / size) * cla
# scale = ((sum(curr_data) / size) - (sum(next_data) / size) + sum(next_noise) / size) / (sum(curr_noise) / size) * cla
""" Apply scale factor"""
for patch in patches:
self.noise[patch['line_min']:patch['line_max'], patch['sample_min']:patch['sample_max']] *= scale
def _read_azimuth_noise(self, noise_file):
""" Read scalloping noise data.
The noise file should be passed here for support of zip-archives.
If .SAFE folder is used as input for the Sentinel1Product then noise_file can be taken from self.noise_file.
If scalloping noise information exists, also use patches to produce a precise nodata mask.
"""
self.scalloping_lut = [{'line_min': int(i[1].text), 'line_max': int(i[3].text), 'sample_min': int(i[2].text), 'sample_max': int(i[4].text),
'lines': np.array(i[5].text.split(' '), dtype=np.int16), 'noise': np.array(i[6].text.split(' '), dtype=np.float32),
'swath': i[0].text} for i in noise_file[2]]
""" Interpolate scalloping noise """
self.azimuth_noise = np.zeros((self.X, self.Y), dtype=np.float32)
nodata_mask = np.ones((self.X, self.Y), dtype=bool)
for patch in self.scalloping_lut:
if len(patch['noise']) == 1:
noise_line = np.repeat(patch['noise'][0], patch['line_max'] - patch['line_min'] + 1)
else:
scalloping = interp1d(patch['lines'], patch['noise'], kind='linear', fill_value='extrapolate')
noise_line = scalloping(np.arange(patch['line_min'], patch['line_max'] + 1))
self.azimuth_noise[patch['line_min']:patch['line_max'] + 1, patch['sample_min']:patch['sample_max'] + 1] = noise_line[:, np.newaxis]
nodata_mask[patch['line_min']:patch['line_max'] + 1, patch['sample_min']:patch['sample_max'] + 1] = False
self.nodata_mask = nodata_mask
def read_calibration(self):
""" Read calibration table from product folder.
cal_par - calibration parameter number: 3 - SigmaNought, 4 - BetaNought,
5 - gamma, 6 - dn. These parameters are given in the band calibration file
self.calibration has same shape as self.data
All 4 parameters are read, than only sigma is interpolated for entire image.
"""
if not hasattr(self, 'X') or not hasattr(self, 'Y'):
print('Read data first.')
return False
calibration_file = ElementTree.parse(self.calibration_path).getroot()
calibration_x = int(calibration_file[2].get('count'))
calibration_y = int(calibration_file[2][0][2].get('count'))
result = []
for cal_par in [3, 4, 5, 6]:
calibration = np.array([i[cal_par].text.split(' ') for i in calibration_file[2]], dtype=np.float32).ravel()
result.append(np.array(calibration).reshape(calibration_x, calibration_y))
self.sigma0, self.beta0, self.gamma, self.dn = result
self.calibration_azimuth_list = [int(i) for i in calibration_file[2][0][2].text.split(' ')]
self.calibration_range_list = [int(i) for i in [j[1].text for j in calibration_file[2]]]
# gamma_interp = RectBivariateSpline(self.calibration_range_list, self.calibration_azimuth_list, self.gamma, kx=1, ky=1)
sigma0_interp = RectBivariateSpline(self.calibration_range_list, self.calibration_azimuth_list, self.sigma0, kx=1, ky=1)
x_new = np.arange(0, self.X, 1, dtype=np.int16)
y_new = np.arange(0, self.Y, 1, dtype=np.int16)
# self.calibration = gamma_interp(x_new, y_new).astype(np.float32)
self.calibration = sigma0_interp(x_new, y_new).astype(np.float32)
def subtract_noise(self, in_place=True):
""" Calibrated and denoised data is equal to
(data**2 - Noise) / Calibration**2
"""
if not hasattr(self, 'data'):
print('Read data first.')
return False
elif not hasattr(self, 'noise'):
print('Read noise first.')
return False
elif not hasattr(self, 'calibration'):
print('Read calibration first.')
return False
if not self.denoised:
data = self.data**2 - self.noise
data = data / self.calibration**2
threshold = 1 / self.calibration.max()
data[data < threshold] = threshold
data = np.log10(data) * 10
else:
print('Product is already denoised.')
if in_place:
self.data = data
self.denoised = True
return True
else:
return data
def normalize(self, output_range=[0, 1], extend=True):
""" Scale data to output_range.
"""
""" Normalize """
if extend:
self.data -= self.data.min()
self.data /= self.data.max()
else:
self.data -= self.img_min
self.data /= self.img_max - self.img_min
""" Scale to output_range """
self.data = self.data * (output_range[1] - output_range[0]) + output_range[0]
def clip_normalize(self, output_range=[0, 1], extend=True):
""" Clip data and normalize it
"""
self.clip()
self.normalize(output_range=output_range, extend=extend)
def clip(self):
self.data[self.data > self.img_max] = self.img_max
self.data[self.data < self.img_min] = self.img_min
def extend(self):
""" Return normalized band data to clipped or original
"""
self.data *= (self.img_max - self.img_min)
self.data += self.img_min
def incidence_angle_correction(self, elevation_angle):
self.data = self.data + self.incidence_angle_correction_coefficient * (elevation_angle - elevation_angle.min())
def remove_useless_data(self):
self.calibration = None
self.noise = None
self.elevation_angle = None
def fill_nodata(self, offset=3):
""" extend lower part """
swath_list = sorted(list(set([item['swath'] for item in self.scalloping_lut])), reverse=True)
for swath in swath_list:
patches = sorted([item for item in self.scalloping_lut if item['swath'] == swath], key=lambda x: x['line_min'])
patch = patches[-1]
if patch['line_max'] < self.X - 1 - offset:
edge_line = patch['line_max'] - 1 - offset
min_sample = patch['sample_min']
self.data[edge_line + 1:, min_sample:patch['sample_max'] + 1 + offset] = np.flip(self.data[edge_line * 2 + 1 - self.X:edge_line, min_sample:patch['sample_max'] + 1 + offset], axis=0)
""" extend upper part """
swath_list = sorted(list(set([item['swath'] for item in self.scalloping_lut])), reverse=False)
for swath in swath_list:
patches = sorted([item for item in self.scalloping_lut if item['swath'] == swath], key=lambda x: x['line_min'])
patch = patches[0]
if patch['line_min'] > 0:
edge_line = patch['line_min'] + offset
min_sample = patch['sample_min'] - offset
min_sample = 0 if min_sample < 0 else min_sample
self.data[:edge_line, min_sample:patch['sample_max'] + 1] = np.flip(self.data[edge_line + 1:edge_line * 2 + 1, min_sample:patch['sample_max'] + 1], axis=0)
def detect_borders(self):
data = self.subtract_noise(in_place=False)
if self.des.lower() == 'hh':
self.left_lim, _ = self._find_border_coordinates(data, -32, quantile_value=0.24)
_, self.right_lim = self._find_border_coordinates(data, -27, quantile_value=0.24)
elif self.des.lower() == 'hv':
self.left_lim, self.right_lim = self._find_border_coordinates(data, -32, quantile_value=0.74)
else:
pass
def _find_border_coordinates(self, data, threshold_value, quantile_value=0.24, offset=5):
vertical_quantile = np.quantile(data, quantile_value, axis=0)
try:
left_lim = 250 - np.flip(np.where(vertical_quantile[:250] < threshold_value, True, False)).argmin() + offset
except Exception:
left_lim = offset
try:
right_lim = vertical_quantile.shape[0] - (200 - np.where(vertical_quantile[-200:] < threshold_value, True, False).argmax()) - (1 + offset)
except Exception:
right_lim = vertical_quantile.shape[0] - (1 + offset)
return left_lim, right_lim
class Sentinel1Product(object):
""" The main class that represents a Sentinel-1 EW product.
It contains information about the scene and band data in Sentinel1Band objects (one object per band).
Input is expected to be a path to a Sentinel-1 scene (both *.SAFE and *.zip are supported).
"""
def __init__(self, product_path, scale_noise=False):
""" Set paths to auxilary data.
Create Sentinel1Band object for each band in the product.
Parse date and time of the product into self.timestamp
"""
""" If *product_path* is a folder, set path to data and auxilary data,
otherwise unpack it first (create tmp_folder if it does not exist)
"""
try:
self.product_name = os.path.basename(product_path).split('.')[0]
self.scale_noise = scale_noise
print(self.product_path)
except Exception:
pass
def _band_number(x):
""" Function expects a .xml filename from Sentinel-1 product folder.
It returns the band number (the last character before the file extention, *00<band_num>.xml or *00<band_num>.tiff)
"""
return int(os.path.split(x)[1].split('.')[0][-1])
if os.path.isdir(product_path):
self.zip = False
self.product_path = os.path.abspath(product_path)
self.data_files = sorted([os.path.join(self.product_path, 'measurement', item) for item in os.listdir(os.path.join(self.product_path, 'measurement'))], key=_band_number)
self.annotation_files = sorted([os.path.join(self.product_path, 'annotation', item) for item in os.listdir(os.path.join(self.product_path, 'annotation')) if '.xml' in item], key=_band_number)
self.noise_files = sorted([os.path.join(self.product_path, 'annotation', 'calibration', item) for item in os.listdir(os.path.join(self.product_path, 'annotation', 'calibration')) if 'noise' in item], key=_band_number)
self.calibration_files = sorted([os.path.join(self.product_path, 'annotation', 'calibration', item) for item in os.listdir(os.path.join(self.product_path, 'annotation', 'calibration')) if 'calibration' in item], key=_band_number)
elif not os.path.isfile(product_path):
print('File {0} does not exist.'.format(product_path))
return False
else:
if not zipfile.is_zipfile(product_path):
print('File {0} is not a zip file.'.format(product_path))
try:
self.zip = True
zipdata = zipfile.ZipFile(product_path)
data_files = sorted([item for item in zipdata.namelist() if 'measurement' in item and '.tif' in item], key=_band_number)
xml_files = [item for item in zipdata.namelist() if '.xml' in item]
annotation_files = sorted([item for item in xml_files if 'annotation' in item and 'calibration' not in item and 'rfi' not in item], key=_band_number)
noise_files = sorted([item for item in xml_files if 'noise' in item], key=_band_number)
calibration_files = sorted([item for item in xml_files if 'calibration' in item and 'noise' not in item], key=_band_number)
self.data_files = [zipdata.open(item) for item in data_files]
self.annotation_files = [zipdata.open(item) for item in annotation_files]
self.noise_files = [zipdata.open(item) for item in noise_files]
self.calibration_files = [zipdata.open(item) for item in calibration_files]
except Exception:
print('Zip file reading error.')
return False
""" Create a Sentinel1Band object for each band in the product. """
for d, a, c, n in zip(self.data_files, self.annotation_files, self.calibration_files, self.noise_files):
if type(a) == str:
name_string = a
else:
name_string = a.name
band_name = os.path.split(name_string)[1].split('-')[3].upper()
setattr(self, band_name, Sentinel1Band(d, a, c, n, band_name, self.scale_noise))
""" Create datetime object """
self.timestamp = scene_time(product_path)
try:
from osgeo import gdal
if self.zip:
p_gdal = gdal.Open('/vsizip/' + os.path.join(product_path, data_files[0]))
else:
p_gdal = gdal.Open(self.data_files[0])
self.gdal_data = {'X': p_gdal.GetRasterBand(1).XSize,
'Y': p_gdal.GetRasterBand(1).YSize,
'GCPs': p_gdal.GetGCPs(),
'GCP_proj': p_gdal.GetGCPProjection()}
except Exception:
pass
def detect_borders(self):
""" Detect noise next to the vertical borders of a given image.
Set different thresholds for HH and HV bands since amplitude of measurements is different.
Return border coordinates, that can be used for slising: img[min_lim:max_lim] returns
image without border noise.
"""
""" Set thresholds to -32dB for HH left, -27dB for HH right and -32db for HV band, check 200 columns from edges """
if hasattr(self.HH, 'data') and hasattr(self.HV, 'data'):
self.x_min = max(self.HH.left_lim, self.HV.left_lim)
self.x_max = min(self.HH.right_lim, self.HV.right_lim)
else:
if hasattr(self.HV, 'data'):
self.x_min, self.x_max = self.HV.left_lim, self.HV.right_lim
if hasattr(self.HH, 'data'):
self.x_min, self.x_max = self.HH.left_lim, self.HH.right_lim
def read_GCPs(self):
""" Parse Ground Control Points (GCPs) from the annotation file.
Since GCPs should be same for all the bands within a product, only one band is used (default - HH, if available).
"""
if hasattr(self.HH, 'data'):
band = self.HH
elif hasattr(self.HV, 'data'):
band = self.HV
else:
print('Read HH or HV band data before reading GCPs.')
return False
annotation_file = ElementTree.parse(band.annotation_path).getroot()
self.GCPs = [{'azimuth_time': datetime.strptime(i[0].text, "%Y-%m-%dT%H:%M:%S.%f"),
'slant_range_time': float(i[1].text),
'line': int(i[2].text),
'pixel': int(i[3].text),
'latitude': float(i[4].text),
'longitude': float(i[5].text),
'height': float(i[6].text),
'incidence_angle': float(i[7].text),
'elevation_angle': float(i[8].text),
} for i in annotation_file[7][0]]
return True
def interpolate_GCP_parameter(self, parameter, gcps_per_line=21):
""" Calculate one of the following parameters for every pixel:
'azimuth_time'
'slant_range_time'
'line'
'pixel'
'latitude'
'longitude'
'height'
'incidence_angle'
'elevation_angle'
Geolocation grid is interpolated linearly.
Normally *gcps_per_line* should not be modified.
Here it is assumed that there are always 21 geopoints per azimuth line.
"""
if not hasattr(self, 'GCPs'):
self.read_GCPs()
if len(self.GCPs) % gcps_per_line != 0:
print("Number of GCPs is not multiple of {0}.".format(gcps_per_line))
print("Please, specify a different value for *gcps_per_line*.")
return False
ran = int(len(self.GCPs) / gcps_per_line)
xs = np.array([i['line'] for i in self.GCPs])[::gcps_per_line]
ys = np.array([i['pixel'] for i in self.GCPs])[:gcps_per_line]
gcp_data = np.array([i[parameter] for i in self.GCPs], dtype=np.float32)
gcp_data_spline = RectBivariateSpline(xs, ys, gcp_data.reshape(ran, gcps_per_line), kx=1, ky=1)
x_new = np.arange(0, self.HH.X, 1, dtype=np.int16)
y_new = np.arange(0, self.HH.Y, 1, dtype=np.int16)
result = gcp_data_spline(x_new, y_new).astype(np.float32)
if 'x_min' in self.gdal_data:
result = result[:, self.gdal_data['x_min']:self.gdal_data['x_max']]
setattr(self, parameter, result)
return True
def interpolate_latitude(self, gcps_per_line=21):
if hasattr(self, 'latitude'):
print('Latitudes have already been interpolated.')
return True
self.interpolate_GCP_parameter('latitude', gcps_per_line=gcps_per_line)
def interpolate_longitude(self, gcps_per_line=21):
if hasattr(self, 'longitude'):
print('Longitudes have already been interpolated.')
return True
self.interpolate_GCP_parameter('longitude', gcps_per_line=gcps_per_line)
def interpolate_height(self, gcps_per_line=21):
if hasattr(self, 'height'):
print('Heights have already been interpolated.')
return True
self.interpolate_GCP_parameter('height', gcps_per_line=gcps_per_line)
def interpolate_elevation_angle(self, gcps_per_line=21):
if hasattr(self, 'elevation_angle'):
print('Elevation angles have already been interpolated.')
return True
self.interpolate_GCP_parameter('elevation_angle', gcps_per_line=gcps_per_line)
def interpolate_incidence_angle(self, gcps_per_line=21):
if hasattr(self, 'incidence_angle'):
print('Incidence angles have already been interpolated.')
return True
self.interpolate_GCP_parameter('incidence_angle', gcps_per_line=gcps_per_line)
def is_shifted(self):
""" Check if first lines of swaths ara shifted relative to each other (black steps on top or at the bottom of the image)
"""
if hasattr(self.HH, 'data'):
self.shifted = True if self.HH.data[:400, -1500:].mean() < 100 else False
elif hasattr(self.HV, 'data'):
self.shifted = True if self.HV.data[:400, -1500:].mean() < 40 else False
else:
print('Read data first.')
return False
self.HH.shifted = True if self.shifted else False
self.HV.shifted = True if self.shifted else False
return True
def read_data(self, band='both', incidence_angle_correction=True, keep_calibration_data=True, parallel=False, crop_borders=True, correct_hv=False):
""" Shortcut for reading data, noise, calibration, and noise subtraction)
"""
if band.lower() == 'both':
band_list = [self.HH, self.HV]
elif band.lower() == 'hh':
band_list = [self.HH]
elif band.lower() == 'hv':
band_list = [self.HV]
_rsb = partial(_read_single_band, keep_calibration_data=keep_calibration_data)
if not parallel:
list(map(_rsb, band_list))
else:
pool = ThreadPool(2)
pool.map(_rsb, band_list)
pool.close()
pool.join()
""" Incidence angle correction. """
if incidence_angle_correction:
self.read_GCPs()
self.interpolate_elevation_angle()
if hasattr(self, 'HH'):
self.HH.incidence_angle_correction(self.elevation_angle)
if hasattr(self, 'HV') and (correct_hv):
self.HV.incidence_angle_correction(self.elevation_angle)
""" Replace infinits (that appears after noise subtraction and calibration) with nofinite_data_val. """
nofinite_data_val = -32
for band in band_list:
band.nofinite_data_mask = np.where(np.isfinite(band.data), False, True)
band.data[band.nofinite_data_mask] = nofinite_data_val
""" Save nodata mask to self.gdal_data for further data writing. """
self.gdal_data['nodata_mask'] = self.HH.nodata_mask
if crop_borders:
self.crop_borders()
return True
def crop_borders(self):
""" Remove "dirty" pixels on the left and the right sides of the project.
The pixels are cut on all of the following (if exists):
* band data
* latitude
* longitude
* elevation angle
* incidence angle
* height
Therefore this function should be called after all the needed parameters are read and interpolated.
"""
self.detect_borders()
self.gdal_data['x_min'] = self.x_min
self.gdal_data['x_max'] = self.x_max
for item in ['latitude', 'longitude', 'elevation_angle', 'incidence_angle', 'height']:
if hasattr(self, item):
setattr(self, item, getattr(self, item)[:, self.x_min:self.x_max])
for band in [self.HH, self.HV]:
for item in ['data', 'noise', 'calibration']:
if hasattr(band, item):
attr = getattr(band, item)
if type(attr) != np.ndarray:
continue
setattr(band, item, getattr(band, item)[:, self.x_min:self.x_max])
def orbit_direction(self):
annotation_file = ElementTree.parse(self.annotation_files[0]).getroot()
return annotation_file[2][0][0].text.lower()
def fill_nodata(self):
try:
self.HH.fill_nodata()
except Exception:
print('Could not fill nodata for HH band')
pass
try:
self.HV.fill_nodata()
except Exception:
print('Could not fill nodata for HV band')
pass
def mask_land(self, custom_library=False):
""" Function creates a boolean array under self.landmask with land mask.
It relies on the *basemap* library.
read_data() must be called before mask_land().
set custom_library=True if the *landmask* library from IUP repo should be used
instead of *basemap* """
if not hasattr(self, 'latitude'):
self.interpolate_latitude()
if not hasattr(self, 'longitude'):
self.interpolate_longitude()
try:
from landmask import maskoceans
except ImportError:
try:
from mpl_toolkits.basemap import maskoceans
except ImportError:
print("Neither the landmask nor the basemap library found")
masked_array = maskoceans(self.longitude, self.latitude,
np.zeros_like(self.longitude, dtype=np.uint8),
resolution='f', grid=0.75, inlands=False)
self.landmask = ~masked_array.mask
def _read_single_band(band, keep_calibration_data=True):
band.read_data()
band.read_calibration()
band.read_noise(azimuth_noise=False)
try:
band.detect_borders()
except Exception:
pass
band.read_noise()
band.subtract_noise()
if not keep_calibration_data:
band.noise = False
band.calibration = False
if hasattr(band, 'azimuth_noise'):
band.azimuth_noise = False
return True
if __name__ == '__main__':
pass