diff --git a/requirements.txt b/requirements.txt index c1381f71..a3007afb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ scikit-learn==0.22.2.post1 scipy==1.4.1 threadpoolctl==2.1.0 urllib3==1.25.9 +s3fs==2021.4.0 diff --git a/setup.py b/setup.py index 3c4ed1a4..90f88127 100644 --- a/setup.py +++ b/setup.py @@ -72,7 +72,8 @@ 'scikit-learn>=0.18', 'scipy>=0.17.0', 'threadpoolctl>=1.0.0', - 'urllib3>=1.22' + 'urllib3>=1.22', + 's3fs>=2021.4.0' ], # List additional groups of dependencies here (e.g. development diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index f6a95ec6..5c9b79d9 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -2091,7 +2091,7 @@ def _infer_sig_len(file_name, fmt, n_sig, dir_name, pn_dir=None): file_size = os.path.getsize(os.path.join(dir_name, file_name)) else: file_size = download._remote_file_size(file_name=file_name, - pn_dir=pn_dir) + remote_dir=pn_dir) sig_len = int(file_size / (BYTES_PER_SAMPLE[fmt] * n_sig)) diff --git a/wfdb/io/annotation.py b/wfdb/io/annotation.py index 14855f0d..b58a738c 100644 --- a/wfdb/io/annotation.py +++ b/wfdb/io/annotation.py @@ -1620,7 +1620,7 @@ def rdann(record_name, extension, sampfrom=0, sampto=None, shift_samps=False, >>> ann = wfdb.rdann('sample-data/100', 'atr', sampto=300000) """ - if (pn_dir is not None) and ('.' not in pn_dir): + if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')): dir_list = pn_dir.split('/') pn_dir = posixpath.join(dir_list[0], record.get_version(dir_list[0]), @@ -2255,7 +2255,7 @@ def ann2rr(record_name, extension, pn_dir=None, start_time=None, >>> 257 """ - if (pn_dir is not None) and ('.' not in pn_dir): + if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')): dir_list = pn_dir.split('/') pn_dir = posixpath.join(dir_list[0], record.get_version(dir_list[0]), *dir_list[1:]) diff --git a/wfdb/io/download.py b/wfdb/io/download.py index 8a5a6e77..3e1bbf10 100644 --- a/wfdb/io/download.py +++ b/wfdb/io/download.py @@ -49,7 +49,7 @@ def set_db_index_url(db_index_url=PN_INDEX_URL): config.db_index_url = db_index_url -def _remote_file_size(url=None, file_name=None, pn_dir=None): +def _remote_file_size(url=None, file_name=None, remote_dir=None): """ Get the remote file size in bytes. @@ -59,11 +59,16 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None): The full url of the file. Use this option to explicitly state the full url. file_name : str, optional - The base file name. Use this argument along with pn_dir if you + The base file name. Use this argument along with remote_dir if you want the full url to be constructed. - pn_dir : str, optional - The base file name. Use this argument along with file_name if - you want the full url to be constructed. + remote_dir : str, optional + The remote directory of either two things: + (1) The S3 URI form which to find the required header file. This + should always begin with 's3' in order to work correctly. An + example input would be: 's3://my-aws-bucket/' + (2) The PhysioNet database directory from which to find the + required header file. eg. For file '100.hea' in + 'http://physionet.org/content/mitdb', remote_dir='mitdb'. Returns ------- @@ -72,8 +77,17 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None): """ # Option to construct the url - if file_name and pn_dir: - url = posixpath.join(config.db_index_url, pn_dir, file_name) + if file_name and remote_dir: + if remote_dir.startswith('s3'): + # Set up the remote AWS S3 file system + import s3fs + fs = s3fs.S3FileSystem(anon=True) + file_dir = posixpath.join(remote_dir, file_name) + # Read and decode the lines + with fs.open(file_dir) as f: + return f.size + else: + url = posixpath.join(config.db_index_url, remote_dir, file_name) response = requests.head(url, headers={'Accept-Encoding': 'identity'}) # Raise HTTPError if invalid url @@ -85,7 +99,7 @@ def _remote_file_size(url=None, file_name=None, pn_dir=None): return remote_file_size -def _stream_header(file_name, pn_dir): +def _stream_header(file_name, remote_dir): """ Stream the lines of a remote header file. @@ -93,10 +107,14 @@ def _stream_header(file_name, pn_dir): ---------- file_name : str The name of the headerr file to be read. - pn_dir : str - The PhysioNet database directory from which to find the - required header file. eg. For file '100.hea' in - 'http://physionet.org/content/mitdb', pn_dir='mitdb'. + remote_dir : str + The remote directory of either two things: + (1) The S3 URI form which to find the required header file. This + should always begin with 's3' in order to work correctly. An + example input would be: 's3://my-aws-bucket/' + (2) The PhysioNet database directory from which to find the + required header file. eg. For file '100.hea' in + 'http://physionet.org/content/mitdb', remote_dir='mitdb'. Returns ------- @@ -106,15 +124,23 @@ def _stream_header(file_name, pn_dir): All of the comment header lines. """ - # Full url of header location - url = posixpath.join(config.db_index_url, pn_dir, file_name) - response = requests.get(url) - - # Raise HTTPError if invalid url - response.raise_for_status() - - # Get each line as a string - filelines = response.content.decode('iso-8859-1').splitlines() + if remote_dir.startswith('s3'): + # Set up the remote AWS S3 file system + import s3fs + fs = s3fs.S3FileSystem(anon=True) + file_dir = posixpath.join(remote_dir, file_name) + # Read and decode the lines + with fs.open(file_dir) as f: + filelines = f.readlines() + filelines = [l.decode('iso-8859-1') for l in filelines] + else: + # Full url of header location + url = posixpath.join(config.db_index_url, remote_dir, file_name) + response = requests.get(url) + # Raise HTTPError if invalid url + response.raise_for_status() + # Get each line as a string + filelines = response.content.decode('iso-8859-1').splitlines() # Separate content into header and comment lines header_lines = [] @@ -139,7 +165,7 @@ def _stream_header(file_name, pn_dir): return (header_lines, comment_lines) -def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype): +def _stream_dat(file_name, remote_dir, byte_count, start_byte, dtype): """ Stream data from a remote dat file into a 1d numpy array. @@ -147,8 +173,14 @@ def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype): ---------- file_name : str The name of the dat file to be read. - pn_dir : str - The PhysioNet directory where the dat file is located. + remote_dir : str + The remote directory of either two things: + (1) The S3 URI form which to find the required header file. This + should always begin with 's3' in order to work correctly. An + example input would be: 's3://my-aws-bucket/' + (2) The PhysioNet database directory from which to find the + required header file. eg. For file '100.hea' in + 'http://physionet.org/content/mitdb', remote_dir='mitdb'. byte_count : int The number of bytes to be read. start_byte : int @@ -162,32 +194,42 @@ def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype): The data read from the dat file. """ - # Full url of dat file - url = posixpath.join(config.db_index_url, pn_dir, file_name) + if remote_dir.startswith('s3'): + # Set up the remote AWS S3 file system + import s3fs + fs = s3fs.S3FileSystem(anon=True) + file_dir = posixpath.join(remote_dir, file_name) + # Read and decode the lines + with fs.open(file_dir) as f: + file_content = f.read() + else: + # Full url of dat file + url = posixpath.join(config.db_index_url, remote_dir, file_name) - # Specify the byte range - end_byte = start_byte + byte_count - 1 - headers = {"Range":"bytes=%d-%d" % (start_byte, end_byte), - 'Accept-Encoding': '*'} + # Specify the byte range + end_byte = start_byte + byte_count - 1 + headers = {"Range":"bytes=%d-%d" % (start_byte, end_byte), + 'Accept-Encoding': '*'} - # Get the content - response = requests.get(url, headers=headers, stream=True) + # Get the content + response = requests.get(url, headers=headers, stream=True) - # Raise HTTPError if invalid url - response.raise_for_status() + # Raise HTTPError if invalid url + response.raise_for_status() + file_content = response.content # Convert to numpy array if type(dtype) == str: # Convert 24-bit to 16-bit then proceed - temp_data = np.frombuffer(response.content, 'b').reshape(-1,3)[:,1:].flatten().view('i2') + temp_data = np.frombuffer(file_content, 'b').reshape(-1,3)[:,1:].flatten().view('i2') sig_data = np.fromstring(temp_data, dtype='i2') else: - sig_data = np.fromstring(response.content, dtype=dtype) + sig_data = np.fromstring(file_content, dtype=dtype) return sig_data -def _stream_annotation(file_name, pn_dir): +def _stream_annotation(file_name, remote_dir): """ Stream an entire remote annotation file from Physionet. @@ -195,8 +237,14 @@ def _stream_annotation(file_name, pn_dir): ---------- file_name : str The name of the annotation file to be read. - pn_dir : str - The PhysioNet directory where the annotation file is located. + remote_dir : str + The remote directory of either two things: + (1) The S3 URI form which to find the required header file. This + should always begin with 's3' in order to work correctly. An + example input would be: 's3://my-aws-bucket/' + (2) The PhysioNet database directory from which to find the + required header file. eg. For file '100.hea' in + 'http://physionet.org/content/mitdb', remote_dir='mitdb'. Returns ------- @@ -204,16 +252,24 @@ def _stream_annotation(file_name, pn_dir): The resulting data stream in numpy array format. """ - # Full url of annotation file - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content - response = requests.get(url) - # Raise HTTPError if invalid url - response.raise_for_status() - + if remote_dir.startswith('s3'): + # Set up the remote AWS S3 file system + import s3fs + fs = s3fs.S3FileSystem(anon=True) + file_dir = posixpath.join(remote_dir, file_name) + # Read and decode the lines + with fs.open(file_dir) as f: + file_content = f.read() + else: + # Full url of annotation file + url = posixpath.join(config.db_index_url, remote_dir, file_name) + # Get the content + response = requests.get(url) + # Raise HTTPError if invalid url + response.raise_for_status() + file_content = response.content # Convert to numpy array - ann_data = np.fromstring(response.content, dtype=np.dtype('4s', wave_file.read(4))]) if chunk_ID != 'RIFF': raise Exception('{} is not a .wav-format file'.format(record_name)) - correct_chunk_size = os.path.getsize(record_name) - 8 + try: + correct_chunk_size = wave_file.size - 8 + except AttributeError: + correct_chunk_size = os.path.getsize(record_name) - 8 chunk_size = struct.unpack('>> ECG 4 500 """ - if (pn_dir is not None) and ('.' not in pn_dir): + if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')): dir_list = pn_dir.split('/') pn_dir = posixpath.join(dir_list[0], get_version(dir_list[0]), *dir_list[1:]) @@ -3815,7 +3846,7 @@ def signame(record_name, pn_dir=None, sig_nums=[]): >>> ECG 4 """ - if (pn_dir is not None) and ('.' not in pn_dir): + if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')): dir_list = pn_dir.split('/') pn_dir = posixpath.join(dir_list[0], get_version(dir_list[0]), *dir_list[1:]) @@ -3894,7 +3925,7 @@ def wfdbdesc(record_name, pn_dir=None): Checksum: 20052 """ - if (pn_dir is not None) and ('.' not in pn_dir): + if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')): dir_list = pn_dir.split('/') pn_dir = posixpath.join(dir_list[0], get_version(dir_list[0]), *dir_list[1:]) @@ -3922,8 +3953,13 @@ def wfdbdesc(record_name, pn_dir=None): start_time = 'not specified' print(f'Starting time: {start_time}') - record_length = str(datetime.timedelta(seconds=record.sig_len/record.fs)) - print(f'Length: {record_length} ({record.sig_len} sample intervals)') + try: + record_length = str(datetime.timedelta(seconds=record.sig_len/record.fs)) + sig_len = record.sig_len + except TypeError: + record_length = 'Unknown' + sig_len = 'Unknown' + print(f'Length: {record_length} ({sig_len} sample intervals)') print(f'Sampling frequency: {record.fs} Hz') print(f'{record.n_sig} signal{"" if record.n_sig==1 else "s"}') @@ -4020,7 +4056,7 @@ def wfdbtime(record_name, input_times, pn_dir=None): s1153 00:00:09.224 [19:46:34.981000 01/01/0001] """ - if (pn_dir is not None) and ('.' not in pn_dir): + if (pn_dir is not None) and ('.' not in pn_dir) and (not pn_dir.startswith('s3')): dir_list = pn_dir.split('/') pn_dir = posixpath.join(dir_list[0], get_version(dir_list[0]), *dir_list[1:])