Skip to content

Commit e50b101

Browse files
committed
Add support for stdin/stdout streams for CRT client
This also included: * Refactoring the s3transfer crt layer to better organize the branching introduced in supporting both file objects and file name. * Introduce additional helper test methods to reduce verbosity of s3transfer crt test and improve reusability.
1 parent 580ceb0 commit e50b101

File tree

11 files changed

+575
-100
lines changed

11 files changed

+575
-100
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"type": "enhancement",
3+
"category": "``s3 cp``",
4+
"description": "Support streaming uploads from stdin and streaming downloads to stdout for CRT transfer client"
5+
}

awscli/customizations/s3/factory.py

-2
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@ def create_transfer_manager(self, params, runtime_config,
7070
def _compute_transfer_client_type(self, params, runtime_config):
7171
if params.get('paths_type') == 's3s3':
7272
return constants.DEFAULT_TRANSFER_CLIENT
73-
if params.get('is_stream'):
74-
return constants.DEFAULT_TRANSFER_CLIENT
7573
return runtime_config.get(
7674
'preferred_transfer_client', constants.DEFAULT_TRANSFER_CLIENT)
7775

awscli/s3transfer/crt.py

+130-38
Original file line numberDiff line numberDiff line change
@@ -428,19 +428,12 @@ def _crt_request_from_aws_request(self, aws_request):
428428
headers_list.append((name, str(value, 'utf-8')))
429429

430430
crt_headers = awscrt.http.HttpHeaders(headers_list)
431-
# CRT requires body (if it exists) to be an I/O stream.
432-
crt_body_stream = None
433-
if aws_request.body:
434-
if hasattr(aws_request.body, 'seek'):
435-
crt_body_stream = aws_request.body
436-
else:
437-
crt_body_stream = BytesIO(aws_request.body)
438431

439432
crt_request = awscrt.http.HttpRequest(
440433
method=aws_request.method,
441434
path=crt_path,
442435
headers=crt_headers,
443-
body_stream=crt_body_stream,
436+
body_stream=aws_request.body,
444437
)
445438
return crt_request
446439

@@ -453,6 +446,25 @@ def _convert_to_crt_http_request(self, botocore_http_request):
453446
crt_request.headers.set("host", url_parts.netloc)
454447
if crt_request.headers.get('Content-MD5') is not None:
455448
crt_request.headers.remove("Content-MD5")
449+
450+
# In general, the CRT S3 client expects a content length header. It
451+
# only expects a missing content length header if the body is not
452+
# seekable. However, botocore does not set the content length header
453+
# for GetObject API requests and so we set the content length to zero
454+
# to meet the CRT S3 client's expectation that the content length
455+
# header is set even if there is no body.
456+
if crt_request.headers.get('Content-Length') is None:
457+
if botocore_http_request.body is None:
458+
crt_request.headers.add('Content-Length', "0")
459+
460+
# Botocore sets the Transfer-Encoding header when it cannot determine
461+
# the content length of the request body (e.g. it's not seekable).
462+
# However, CRT does not support this header, but it supports
463+
# non-seekable bodies. So we remove this header to not cause issues
464+
# in the downstream CRT S3 request.
465+
if crt_request.headers.get('Transfer-Encoding') is not None:
466+
crt_request.headers.remove('Transfer-Encoding')
467+
456468
return crt_request
457469

458470
def _capture_http_request(self, request, **kwargs):
@@ -555,39 +567,20 @@ def __init__(self, crt_request_serializer, os_utils):
555567
def get_make_request_args(
556568
self, request_type, call_args, coordinator, future, on_done_after_calls
557569
):
558-
recv_filepath = None
559-
send_filepath = None
560-
s3_meta_request_type = getattr(
561-
S3RequestType, request_type.upper(), S3RequestType.DEFAULT
570+
request_args_handler = getattr(
571+
self,
572+
f'_get_make_request_args_{request_type}',
573+
self._default_get_make_request_args,
562574
)
563-
on_done_before_calls = []
564-
if s3_meta_request_type == S3RequestType.GET_OBJECT:
565-
final_filepath = call_args.fileobj
566-
recv_filepath = self._os_utils.get_temp_filename(final_filepath)
567-
file_ondone_call = RenameTempFileHandler(
568-
coordinator, final_filepath, recv_filepath, self._os_utils
569-
)
570-
on_done_before_calls.append(file_ondone_call)
571-
elif s3_meta_request_type == S3RequestType.PUT_OBJECT:
572-
send_filepath = call_args.fileobj
573-
data_len = self._os_utils.get_file_size(send_filepath)
574-
call_args.extra_args["ContentLength"] = data_len
575-
576-
crt_request = self._request_serializer.serialize_http_request(
577-
request_type, future
575+
return request_args_handler(
576+
request_type=request_type,
577+
call_args=call_args,
578+
coordinator=coordinator,
579+
future=future,
580+
on_done_before_calls=[],
581+
on_done_after_calls=on_done_after_calls,
578582
)
579583

580-
return {
581-
'request': crt_request,
582-
'type': s3_meta_request_type,
583-
'recv_filepath': recv_filepath,
584-
'send_filepath': send_filepath,
585-
'on_done': self.get_crt_callback(
586-
future, 'done', on_done_before_calls, on_done_after_calls
587-
),
588-
'on_progress': self.get_crt_callback(future, 'progress'),
589-
}
590-
591584
def get_crt_callback(
592585
self,
593586
future,
@@ -613,6 +606,97 @@ def invoke_all_callbacks(*args, **kwargs):
613606

614607
return invoke_all_callbacks
615608

609+
def _get_make_request_args_put_object(
610+
self,
611+
request_type,
612+
call_args,
613+
coordinator,
614+
future,
615+
on_done_before_calls,
616+
on_done_after_calls,
617+
):
618+
send_filepath = None
619+
if isinstance(call_args.fileobj, str):
620+
send_filepath = call_args.fileobj
621+
data_len = self._os_utils.get_file_size(send_filepath)
622+
call_args.extra_args["ContentLength"] = data_len
623+
else:
624+
call_args.extra_args["Body"] = call_args.fileobj
625+
626+
# Suppress botocore's automatic MD5 calculation by setting an override
627+
# value that will get deleted in the BotocoreCRTRequestSerializer.
628+
# The CRT S3 client is able automatically compute checksums as part of
629+
# requests it makes, and the intention is to configure automatic
630+
# checksums in a future update.
631+
call_args.extra_args["ContentMD5"] = "override-to-be-removed"
632+
633+
make_request_args = self._default_get_make_request_args(
634+
request_type=request_type,
635+
call_args=call_args,
636+
coordinator=coordinator,
637+
future=future,
638+
on_done_before_calls=on_done_before_calls,
639+
on_done_after_calls=on_done_after_calls,
640+
)
641+
make_request_args['send_filepath'] = send_filepath
642+
return make_request_args
643+
644+
def _get_make_request_args_get_object(
645+
self,
646+
request_type,
647+
call_args,
648+
coordinator,
649+
future,
650+
on_done_before_calls,
651+
on_done_after_calls,
652+
):
653+
recv_filepath = None
654+
on_body = None
655+
if isinstance(call_args.fileobj, str):
656+
final_filepath = call_args.fileobj
657+
recv_filepath = self._os_utils.get_temp_filename(final_filepath)
658+
on_done_before_calls.append(
659+
RenameTempFileHandler(
660+
coordinator, final_filepath, recv_filepath, self._os_utils
661+
)
662+
)
663+
else:
664+
on_body = OnBodyFileObjWriter(call_args.fileobj)
665+
666+
make_request_args = self._default_get_make_request_args(
667+
request_type=request_type,
668+
call_args=call_args,
669+
coordinator=coordinator,
670+
future=future,
671+
on_done_before_calls=on_done_before_calls,
672+
on_done_after_calls=on_done_after_calls,
673+
)
674+
make_request_args['recv_filepath'] = recv_filepath
675+
make_request_args['on_body'] = on_body
676+
return make_request_args
677+
678+
def _default_get_make_request_args(
679+
self,
680+
request_type,
681+
call_args,
682+
coordinator,
683+
future,
684+
on_done_before_calls,
685+
on_done_after_calls,
686+
):
687+
return {
688+
'request': self._request_serializer.serialize_http_request(
689+
request_type, future
690+
),
691+
'type': getattr(
692+
S3RequestType, request_type.upper(), S3RequestType.DEFAULT
693+
),
694+
'on_done': self.get_crt_callback(
695+
future, 'done', on_done_before_calls, on_done_after_calls
696+
),
697+
'on_progress': self.get_crt_callback(future, 'progress'),
698+
}
699+
616700

617701
class RenameTempFileHandler:
618702
def __init__(self, coordinator, final_filename, temp_filename, osutil):
@@ -642,3 +726,11 @@ def __init__(self, coordinator):
642726

643727
def __call__(self, **kwargs):
644728
self._coordinator.set_done_callbacks_complete()
729+
730+
731+
class OnBodyFileObjWriter:
732+
def __init__(self, fileobj):
733+
self._fileobj = fileobj
734+
735+
def __call__(self, chunk, **kwargs):
736+
self._fileobj.write(chunk)

awscli/topics/s3-config.rst

-3
Original file line numberDiff line numberDiff line change
@@ -297,9 +297,6 @@ files to and from S3. Valid choices are:
297297

298298
* S3 to S3 copies - Falls back to using the ``default`` transfer client
299299

300-
* Streaming uploads from standard input and downloads to standard output -
301-
Falls back to using ``default`` transfer client.
302-
303300
* Region redirects - Transfers fail for requests sent to a region that does
304301
not match the region of the targeted S3 bucket.
305302

tests/functional/s3/__init__.py

+19-3
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ def setUp(self):
436436
self.mock_crt_client.return_value.make_request.side_effect = \
437437
self.simulate_make_request_side_effect
438438
self.files = FileCreator()
439+
self.expected_download_content = b'content'
439440

440441
def tearDown(self):
441442
super(BaseCRTTransferClientTest, self).tearDown()
@@ -456,6 +457,8 @@ def get_config_file_contents(self):
456457
def simulate_make_request_side_effect(self, *args, **kwargs):
457458
if kwargs.get('recv_filepath'):
458459
self.simulate_file_download(kwargs['recv_filepath'])
460+
elif kwargs.get('on_body'):
461+
self.simulate_on_body(kwargs['on_body'])
459462
s3_request = FakeCRTS3Request(
460463
future=FakeCRTFuture(kwargs.get('on_done'))
461464
)
@@ -465,11 +468,14 @@ def simulate_file_download(self, recv_filepath):
465468
parent_dir = os.path.dirname(recv_filepath)
466469
if not os.path.isdir(parent_dir):
467470
os.makedirs(parent_dir)
468-
with open(recv_filepath, 'w') as f:
471+
with open(recv_filepath, 'wb') as f:
469472
# The content is arbitrary as most functional tests are just going
470473
# to assert the file exists since it is the CRT writing the
471474
# data to the file.
472-
f.write('content')
475+
f.write(self.expected_download_content)
476+
477+
def simulate_on_body(self, on_body_callback):
478+
on_body_callback(chunk=self.expected_download_content, offset=0)
473479

474480
def get_crt_make_request_calls(self):
475481
return self.mock_crt_client.return_value.make_request.call_args_list
@@ -489,7 +495,8 @@ def assert_crt_make_request_call(
489495
self, make_request_call, expected_type, expected_host,
490496
expected_path, expected_http_method=None,
491497
expected_send_filepath=None,
492-
expected_recv_startswith=None):
498+
expected_recv_startswith=None,
499+
expected_body_content=None):
493500
make_request_kwargs = make_request_call[1]
494501
self.assertEqual(
495502
make_request_kwargs['type'], expected_type)
@@ -522,6 +529,15 @@ def assert_crt_make_request_call(
522529
f"start with {expected_recv_startswith}"
523530
)
524531
)
532+
if expected_body_content is not None:
533+
# Note: The underlying CRT awscrt.io.InputStream does not expose
534+
# a public read method so we have to reach into the private,
535+
# underlying stream to determine the content. We should update
536+
# to use a public interface if a public interface is ever exposed.
537+
self.assertEqual(
538+
make_request_kwargs['request'].body_stream._stream.read(),
539+
expected_body_content
540+
)
525541

526542

527543
class FakeCRTS3Request:

tests/functional/s3/test_cp_command.py

+23-10
Original file line numberDiff line numberDiff line change
@@ -2116,25 +2116,38 @@ def test_does_not_use_crt_client_for_copies(self):
21162116
self.assertEqual(self.get_crt_make_request_calls(), [])
21172117
self.assert_no_remaining_botocore_responses()
21182118

2119-
def test_does_not_use_crt_client_for_streaming_upload(self):
2119+
def test_streaming_upload_using_crt_client(self):
21202120
cmdline = [
21212121
's3', 'cp', '-', 's3://bucket/key'
21222122
]
2123-
self.add_botocore_put_object_response()
21242123
with mock.patch('sys.stdin', BufferedBytesIO(b'foo')):
21252124
self.run_command(cmdline)
2126-
self.assertEqual(self.get_crt_make_request_calls(), [])
2127-
self.assert_no_remaining_botocore_responses()
2125+
crt_requests = self.get_crt_make_request_calls()
2126+
self.assertEqual(len(crt_requests), 1)
2127+
self.assert_crt_make_request_call(
2128+
crt_requests[0],
2129+
expected_type=S3RequestType.PUT_OBJECT,
2130+
expected_host=self.get_virtual_s3_host('bucket'),
2131+
expected_path='/key',
2132+
expected_body_content=b'foo',
2133+
)
21282134

2129-
def test_does_not_use_crt_client_for_streaming_download(self):
2135+
def test_streaming_download_using_crt_client(self):
21302136
cmdline = [
21312137
's3', 'cp', 's3://bucket/key', '-'
21322138
]
2133-
self.add_botocore_head_object_response()
2134-
self.add_botocore_get_object_response()
2135-
self.run_command(cmdline)
2136-
self.assertEqual(self.get_crt_make_request_calls(), [])
2137-
self.assert_no_remaining_botocore_responses()
2139+
result = self.run_command(cmdline)
2140+
crt_requests = self.get_crt_make_request_calls()
2141+
self.assertEqual(len(crt_requests), 1)
2142+
self.assert_crt_make_request_call(
2143+
crt_requests[0],
2144+
expected_type=S3RequestType.GET_OBJECT,
2145+
expected_host=self.get_virtual_s3_host('bucket'),
2146+
expected_path='/key',
2147+
)
2148+
self.assertEqual(
2149+
result.stdout, self.expected_download_content.decode('utf-8')
2150+
)
21382151

21392152
def test_respects_region_parameter(self):
21402153
filename = self.files.create_file('myfile', 'mycontent')

0 commit comments

Comments
 (0)