Skip to content

H2 write stream #635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .builder/actions/aws_crt_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def run(self, env):

# Enable S3 tests
env.shell.setenv('AWS_TEST_S3', '1')
env.shell.setenv('AWS_TEST_LOCALHOST', '1')

actions = [
[self.python, '--version'],
Expand Down
167 changes: 141 additions & 26 deletions awscrt/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ def new(cls,
If successful, the Future will contain a new :class:`HttpClientConnection`.
Otherwise, it will contain an exception.
"""
return HttpClientConnection._new_common(
host_name,
port,
bootstrap,
socket_options,
tls_connection_options,
proxy_options)

@staticmethod
def _new_common(
host_name,
port,
bootstrap=None,
socket_options=None,
tls_connection_options=None,
proxy_options=None,
expected_version=None):
"""
Initialize the generic part of the HttpClientConnection class.
"""
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(host_name, str)
assert isinstance(port, int)
Expand All @@ -120,44 +140,30 @@ def new(cls,
assert isinstance(proxy_options, HttpProxyOptions) or proxy_options is None

future = Future()

try:
if not socket_options:
socket_options = SocketOptions()

if not bootstrap:
bootstrap = ClientBootstrap.get_or_create_static_default()

connection = cls()
connection._host_name = host_name
connection._port = port

def on_connection_setup(binding, error_code, http_version):
if error_code == 0:
connection._binding = binding
connection._version = HttpVersion(http_version)
future.set_result(connection)
else:
future.set_exception(awscrt.exceptions.from_code(error_code))

# on_shutdown MUST NOT reference the connection itself, just the shutdown_future within it.
# Otherwise we create a circular reference that prevents the connection from getting GC'd.
shutdown_future = connection.shutdown_future

def on_shutdown(error_code):
if error_code:
shutdown_future.set_exception(awscrt.exceptions.from_code(error_code))
else:
shutdown_future.set_result(None)
connection_core = _HttpClientConnectionCore(
host_name,
port,
bootstrap=bootstrap,
tls_connection_options=tls_connection_options,
connect_future=future,
expected_version=expected_version)

_awscrt.http_client_connection_new(
bootstrap,
on_connection_setup,
on_shutdown,
host_name,
port,
socket_options,
tls_connection_options,
proxy_options)
proxy_options,
connection_core)

except Exception as e:
future.set_exception(e)
Expand Down Expand Up @@ -219,6 +225,33 @@ def request(self, request, on_response=None, on_body=None):
return HttpClientStream(self, request, on_response, on_body)


class Http2ClientConnection(HttpClientConnection):
"""
HTTP/2 client connection.

This class extends HttpClientConnection with HTTP/2 specific functionality.
"""
@classmethod
def new(cls,
Copy link
Contributor

@graebm graebm Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debatable: not sure it's worth giving this an explicit constructor? It's not doing anything to actually cause the user to get an HTTP/2 connection, like setting alpn=h2 or prior_knowledge_http2. It's just forcing an error if the user didn't end up getting an HTTP/2 connection.

For better or worse, our API in C takes all possible options, because it's possible to kick off a connection and not know what you're going to get at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will add more to the connection, like settings, in the near future.

host_name,
port,
bootstrap=None,
socket_options=None,
tls_connection_options=None,
proxy_options=None):
return HttpClientConnection._new_common(
host_name,
port,
bootstrap,
socket_options,
tls_connection_options,
proxy_options,
HttpVersion.Http2)

def request(self, request, on_response=None, on_body=None, manual_write=False):
return Http2ClientStream(self, request, on_response, on_body, manual_write)


class HttpStreamBase(NativeResource):
"""Base for HTTP stream classes"""
__slots__ = ('_connection', '_completion_future', '_on_body_cb')
Expand Down Expand Up @@ -258,9 +291,12 @@ class HttpClientStream(HttpStreamBase):
completes. If the exchange fails to complete, the Future will
contain an exception indicating why it failed.
"""
__slots__ = ('_response_status_code', '_on_response_cb', '_on_body_cb', '_request')
__slots__ = ('_response_status_code', '_on_response_cb', '_on_body_cb', '_request', '_version')

def __init__(self, connection, request, on_response=None, on_body=None):
self._generic_init(connection, request, on_response, on_body)

def _generic_init(self, connection, request, on_response=None, on_body=None, http2_manual_write=False):
assert isinstance(connection, HttpClientConnection)
assert isinstance(request, HttpRequest)
assert callable(on_response) or on_response is None
Expand All @@ -273,8 +309,14 @@ def __init__(self, connection, request, on_response=None, on_body=None):

# keep HttpRequest alive until stream completes
self._request = request
self._version = connection.version

self._binding = _awscrt.http_client_stream_new(self, connection, request)
self._binding = _awscrt.http_client_stream_new(self, connection, request, http2_manual_write)

@property
def version(self):
"""HttpVersion: Protocol used by this stream"""
return self._version

@property
def response_status_code(self):
Expand Down Expand Up @@ -307,6 +349,24 @@ def _on_complete(self, error_code):
self._completion_future.set_exception(awscrt.exceptions.from_code(error_code))


class Http2ClientStream(HttpClientStream):
def __init__(self, connection, request, on_response=None, on_body=None, manual_write=False):
super()._generic_init(connection, request, on_response, on_body, manual_write)

def write_data(self, data_stream, end_stream=False):
future = Future()
body_stream = InputStream.wrap(data_stream, allow_none=True)

def on_write_complete(error_code):
if error_code:
future.set_exception(awscrt.exceptions.from_code(error_code))
else:
future.set_result(None)

_awscrt.http2_client_stream_write_data(self, body_stream, end_stream, on_write_complete)
return future


class HttpMessageBase(NativeResource):
"""
Base for HttpRequest and HttpResponse classes.
Expand Down Expand Up @@ -625,3 +685,58 @@ def __init__(self,
self.auth_username = auth_username
self.auth_password = auth_password
self.connection_type = connection_type


class _HttpClientConnectionCore:
'''
Private class to keep all the related Python object alive until C land clean up for HttpClientConnection
'''

def __init__(
self,
host_name,
port,
bootstrap=None,
tls_connection_options=None,
connect_future=None,
expected_version=None):
self._shutdown_future = None
self._host_name = host_name
self._port = port
self._bootstrap = bootstrap
self._tls_connection_options = tls_connection_options
self._connect_future = connect_future
self._expected_version = expected_version

def _on_connection_setup(self, binding, error_code, http_version):
if self._expected_version and self._expected_version != http_version:
# unexpected protocol version
# AWS_ERROR_HTTP_UNSUPPORTED_PROTOCOL
self._connect_future.set_exception(awscrt.exceptions.from_code(2060))
return
if error_code != 0:
self._connect_future.set_exception(awscrt.exceptions.from_code(error_code))
return
if http_version == HttpVersion.Http2:
connection = Http2ClientConnection()
else:
connection = HttpClientConnection()

connection._host_name = self._host_name
connection._port = self._port

connection._binding = binding
connection._version = HttpVersion(http_version)
self._shutdown_future = connection.shutdown_future
self._connect_future.set_result(connection)
# release reference to the future, as it points to connection which creates a cycle reference.
self._connect_future = None

def _on_shutdown(self, error_code):
if self._shutdown_future is None:
# connection failed, ignore shutdown
return
if error_code:
self._shutdown_future.set_exception(awscrt.exceptions.from_code(error_code))
else:
self._shutdown_future.set_result(None)
2 changes: 1 addition & 1 deletion crt/aws-c-cal
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ dev = [
"build>=1.2.2", # for building wheels
"sphinx>=7.2.6,<7.3; python_version >= '3.9'", # for building docs
"websockets>=13.1", # for tests
"h2", # for tests
]
3 changes: 2 additions & 1 deletion source/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args);
PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args);

PyObject *aws_py_http_client_stream_new(PyObject *self, PyObject *args);

PyObject *aws_py_http_client_stream_activate(PyObject *self, PyObject *args);

PyObject *aws_py_http2_client_stream_write_data(PyObject *self, PyObject *args);

/* Create capsule around new request-style aws_http_message struct */
PyObject *aws_py_http_message_new_request(PyObject *self, PyObject *args);

Expand Down
Loading