Skip to content

H2 write stream #635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .builder/actions/aws_crt_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def run(self, env):

# Enable S3 tests
env.shell.setenv('AWS_TEST_S3', '1')
env.shell.setenv('AWS_TEST_LOCALHOST', '1')

actions = [
[self.python, '--version'],
Expand Down
112 changes: 98 additions & 14 deletions awscrt/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ def new(cls,
If successful, the Future will contain a new :class:`HttpClientConnection`.
Otherwise, it will contain an exception.
"""
return HttpClientConnection._generic_new(
host_name,
port,
bootstrap,
socket_options,
tls_connection_options,
proxy_options)

@staticmethod
def _generic_new(
host_name,
port,
bootstrap=None,
socket_options=None,
tls_connection_options=None,
proxy_options=None,
expected_version=None):
"""
Initialize the generic part of the HttpClientConnection class.
"""
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(host_name, str)
assert isinstance(port, int)
Expand All @@ -126,24 +146,34 @@ def new(cls,

if not bootstrap:
bootstrap = ClientBootstrap.get_or_create_static_default()

connection = cls()
connection._host_name = host_name
connection._port = port
shutdown_future = None

def on_connection_setup(binding, error_code, http_version):
if error_code == 0:
connection._binding = binding
connection._version = HttpVersion(http_version)
future.set_result(connection)
else:
if expected_version and expected_version != http_version:
# unexpected protocol version
# AWS_ERROR_HTTP_UNSUPPORTED_PROTOCOL
future.set_exception(awscrt.exceptions.from_code(2060))
return
if error_code != 0:
future.set_exception(awscrt.exceptions.from_code(error_code))
return
if http_version == HttpVersion.Http2:
connection = Http2ClientConnection()
else:
connection = HttpClientConnection()
connection._host_name = host_name
connection._port = port

# on_shutdown MUST NOT reference the connection itself, just the shutdown_future within it.
# Otherwise we create a circular reference that prevents the connection from getting GC'd.
shutdown_future = connection.shutdown_future
connection._binding = binding
connection._version = HttpVersion(http_version)
nonlocal shutdown_future
shutdown_future = connection.shutdown_future
future.set_result(connection)

def on_shutdown(error_code):
if shutdown_future is None:
# connection failed, ignore shutdown
return
if error_code:
shutdown_future.set_exception(awscrt.exceptions.from_code(error_code))
else:
Expand Down Expand Up @@ -219,6 +249,33 @@ def request(self, request, on_response=None, on_body=None):
return HttpClientStream(self, request, on_response, on_body)


class Http2ClientConnection(HttpClientConnection):
"""
HTTP/2 client connection.

This class extends HttpClientConnection with HTTP/2 specific functionality.
"""
@classmethod
def new(cls,
Copy link
Contributor

@graebm graebm Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debatable: not sure it's worth giving this an explicit constructor? It's not doing anything to actually cause the user to get an HTTP/2 connection, like setting alpn=h2 or prior_knowledge_http2. It's just forcing an error if the user didn't end up getting an HTTP/2 connection.

For better or worse, our API in C takes all possible options, because it's possible to kick off a connection and not know what you're going to get at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will add more to the connection, like settings, in the near future.

host_name,
port,
bootstrap=None,
socket_options=None,
tls_connection_options=None,
proxy_options=None):
return HttpClientConnection._generic_new(
host_name,
port,
bootstrap,
socket_options,
tls_connection_options,
proxy_options,
HttpVersion.Http2)

def request(self, request, on_response=None, on_body=None, manual_write=False):
return Http2ClientStream(self, request, on_response, on_body, manual_write)


class HttpStreamBase(NativeResource):
"""Base for HTTP stream classes"""
__slots__ = ('_connection', '_completion_future', '_on_body_cb')
Expand Down Expand Up @@ -258,9 +315,12 @@ class HttpClientStream(HttpStreamBase):
completes. If the exchange fails to complete, the Future will
contain an exception indicating why it failed.
"""
__slots__ = ('_response_status_code', '_on_response_cb', '_on_body_cb', '_request')
__slots__ = ('_response_status_code', '_on_response_cb', '_on_body_cb', '_request', '_version')

def __init__(self, connection, request, on_response=None, on_body=None):
self._generic_init(connection, request, on_response, on_body)

def _generic_init(self, connection, request, on_response=None, on_body=None, http2_manual_write=False):
assert isinstance(connection, HttpClientConnection)
assert isinstance(request, HttpRequest)
assert callable(on_response) or on_response is None
Expand All @@ -273,8 +333,14 @@ def __init__(self, connection, request, on_response=None, on_body=None):

# keep HttpRequest alive until stream completes
self._request = request
self._version = connection.version

self._binding = _awscrt.http_client_stream_new(self, connection, request, http2_manual_write)

self._binding = _awscrt.http_client_stream_new(self, connection, request)
@property
def version(self):
"""HttpVersion: Protocol used by this stream"""
return self._version

@property
def response_status_code(self):
Expand Down Expand Up @@ -307,6 +373,24 @@ def _on_complete(self, error_code):
self._completion_future.set_exception(awscrt.exceptions.from_code(error_code))


class Http2ClientStream(HttpClientStream):
def __init__(self, connection, request, on_response=None, on_body=None, manual_write=False):
super()._generic_init(connection, request, on_response, on_body, manual_write)

def write_data(self, data_stream, end_stream=False):
future = Future()
body_stream = InputStream.wrap(data_stream, allow_none=True)

def on_write_complete(error_code):
if error_code:
future.set_exception(awscrt.exceptions.from_code(error_code))
else:
future.set_result(None)

_awscrt.http2_client_stream_write_data(self, body_stream, end_stream, on_write_complete)
return future


class HttpMessageBase(NativeResource):
"""
Base for HttpRequest and HttpResponse classes.
Expand Down
2 changes: 1 addition & 1 deletion crt/aws-c-cal
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ dev = [
"build>=1.2.2", # for building wheels
"sphinx>=7.2.6,<7.3; python_version >= '3.9'", # for building docs
"websockets>=13.1", # for tests
"h2", # for tests
]
3 changes: 2 additions & 1 deletion source/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args);
PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args);

PyObject *aws_py_http_client_stream_new(PyObject *self, PyObject *args);

PyObject *aws_py_http_client_stream_activate(PyObject *self, PyObject *args);

PyObject *aws_py_http2_client_stream_write_data(PyObject *self, PyObject *args);

/* Create capsule around new request-style aws_http_message struct */
PyObject *aws_py_http_message_new_request(PyObject *self, PyObject *args);

Expand Down
70 changes: 69 additions & 1 deletion source/http_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0.
*/
#include "http.h"
#include "io.h"

#include <aws/http/request_response.h>

Expand Down Expand Up @@ -235,7 +236,8 @@ PyObject *aws_py_http_client_stream_new(PyObject *self, PyObject *args) {
PyObject *py_stream = NULL;
PyObject *py_connection = NULL;
PyObject *py_request = NULL;
if (!PyArg_ParseTuple(args, "OOO", &py_stream, &py_connection, &py_request)) {
int h2_manual_write = 0;
if (!PyArg_ParseTuple(args, "OOOp", &py_stream, &py_connection, &py_request, &h2_manual_write)) {
return NULL;
}

Expand Down Expand Up @@ -282,6 +284,7 @@ PyObject *aws_py_http_client_stream_new(PyObject *self, PyObject *args) {
.on_response_body = s_on_incoming_body,
.on_complete = s_on_stream_complete,
.user_data = stream,
.http2_use_manual_data_writes = h2_manual_write,
};

stream->native = aws_http_connection_make_request(native_connection, &request_options);
Expand Down Expand Up @@ -323,3 +326,68 @@ PyObject *aws_py_http_client_stream_activate(PyObject *self, PyObject *args) {

Py_RETURN_NONE;
}

static void s_on_http2_write_data_complete(struct aws_http_stream *stream, int error_code, void *user_data) {
(void)stream;
PyObject *py_on_write_complete = (PyObject *)user_data;
AWS_FATAL_ASSERT(py_on_write_complete);
PyGILState_STATE state;
if (aws_py_gilstate_ensure(&state)) {
return; /* Python has shut down. Nothing matters anymore, but don't crash */
}

/* Invoke on_setup, then clear our reference to it */
PyObject *result = PyObject_CallFunction(py_on_write_complete, "(i)", error_code);
if (result) {
Py_DECREF(result);
} else {
/* Callback might fail during application shutdown */
PyErr_WriteUnraisable(PyErr_Occurred());
}
Py_DECREF(py_on_write_complete);
PyGILState_Release(state);
}

PyObject *aws_py_http2_client_stream_write_data(PyObject *self, PyObject *args) {
(void)self;

PyObject *py_stream = NULL;
PyObject *py_body_stream = NULL;
int end_stream = false;
PyObject *py_on_write_complete = NULL;
if (!PyArg_ParseTuple(args, "OOpO", &py_stream, &py_body_stream, &end_stream, &py_on_write_complete)) {
return NULL;
}

struct aws_http_stream *http_stream = aws_py_get_http_stream(py_stream);
if (!http_stream) {
return NULL;
}

struct aws_input_stream *body_stream = NULL;
// Write an empty stream is allowed.
if (py_body_stream != Py_None) {
/* The py_body_stream has the same lifetime as the C stream, no need to keep it alive from this binding. */
body_stream = aws_py_get_input_stream(py_body_stream);
if (!body_stream) {
return PyErr_AwsLastError();
}
}

/* Make sure the python callback live long enough for C to call. */
Py_INCREF(py_on_write_complete);

struct aws_http2_stream_write_data_options write_options = {
.data = body_stream,
.end_stream = end_stream,
.on_complete = s_on_http2_write_data_complete,
.user_data = py_on_write_complete,
};

int error = aws_http2_stream_write_data(http_stream, &write_options);
if (error) {
Py_DECREF(py_on_write_complete);
return PyErr_AwsLastError();
}
Py_RETURN_NONE;
}
4 changes: 3 additions & 1 deletion source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ int aws_py_translate_py_error(void) {
}

/* Print standard traceback to sys.stderr and clear the error indicator. */
PyErr_Print();
/* Handles the exception in C, do not set the last vars for python. */
PyErr_PrintEx(0 /*set_sys_last_vars*/);
fprintf(stderr, "Treating Python exception as error %d(%s)\n", aws_error_code, aws_error_name(aws_error_code));

return aws_error_code;
Expand Down Expand Up @@ -818,6 +819,7 @@ static PyMethodDef s_module_methods[] = {
AWS_PY_METHOD_DEF(http_client_connection_new, METH_VARARGS),
AWS_PY_METHOD_DEF(http_client_stream_new, METH_VARARGS),
AWS_PY_METHOD_DEF(http_client_stream_activate, METH_VARARGS),
AWS_PY_METHOD_DEF(http2_client_stream_write_data, METH_VARARGS),
AWS_PY_METHOD_DEF(http_message_new_request, METH_VARARGS),
AWS_PY_METHOD_DEF(http_message_get_request_method, METH_VARARGS),
AWS_PY_METHOD_DEF(http_message_set_request_method, METH_VARARGS),
Expand Down
Loading