Skip to content

Commit 60d65a8

Browse files
committed
Fire when_available callbacks via async
QUIC typically acks multiple packets at once, which was resulting in multiple immediately sequential calls to the unblock callbacks; this commit schedules it via an async on the event loop instead to batch them.
1 parent f888063 commit 60d65a8

File tree

5 files changed

+39
-19
lines changed

5 files changed

+39
-19
lines changed

connection.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,8 @@ io_result Connection::send() {
308308
}
309309

310310

311-
std::tuple<ngtcp2_settings, ngtcp2_transport_params, ngtcp2_callbacks> Connection::init(Endpoint& ep) {
312-
Debug("loop: ", ep.loop);
313-
io_trigger = ep.loop->resource<uvw::AsyncHandle>();
311+
std::tuple<ngtcp2_settings, ngtcp2_transport_params, ngtcp2_callbacks> Connection::init() {
312+
io_trigger = endpoint.loop->resource<uvw::AsyncHandle>();
314313
io_trigger->on<uvw::AsyncEvent>([this] (auto&, auto&) { on_io_ready(); });
315314

316315
auto result = std::tuple<ngtcp2_settings, ngtcp2_transport_params, ngtcp2_callbacks>{};
@@ -362,7 +361,7 @@ std::tuple<ngtcp2_settings, ngtcp2_transport_params, ngtcp2_callbacks> Connectio
362361
Connection::Connection(Server& s, const ConnectionID& base_cid_, ngtcp2_pkt_hd& header, const Path& path)
363362
: endpoint{s}, base_cid{base_cid_}, dest_cid{header.scid}, path{path} {
364363

365-
auto [settings, tparams, cb] = init(s);
364+
auto [settings, tparams, cb] = init();
366365

367366
cb.recv_client_initial = recv_client_initial;
368367

@@ -417,7 +416,7 @@ Connection::Connection(Server& s, const ConnectionID& base_cid_, ngtcp2_pkt_hd&
417416
Connection::Connection(Client& c, const ConnectionID& scid, const Path& path, uint16_t tunnel_port)
418417
: endpoint{c}, base_cid{scid}, dest_cid{ConnectionID::random(c.rng)}, path{path}, tunnel_port{tunnel_port} {
419418

420-
auto [settings, tparams, cb] = init(c);
419+
auto [settings, tparams, cb] = init();
421420

422421
assert(tunnel_port != 0);
423422

connection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
103103

104104
// Internal base method called invoked during construction to set up common client/server
105105
// settings. dest_cid and path must already be set.
106-
std::tuple<ngtcp2_settings, ngtcp2_transport_params, ngtcp2_callbacks> init(Endpoint& ep);
106+
std::tuple<ngtcp2_settings, ngtcp2_transport_params, ngtcp2_callbacks> init();
107107

108108
// Event trigger used to queue packet processing for this connection
109109
std::shared_ptr<uvw::AsyncHandle> io_trigger;

endpoint.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ class Endpoint {
205205

206206
// Default stream buffer size for streams opened through this endpoint.
207207
size_t default_stream_buffer_size = 64*1024;
208+
209+
uvw::Loop& get_loop() { return *loop; }
208210
};
209211

210212
}

stream.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "stream.h"
22
#include "connection.h"
3+
#include "endpoint.h"
34
#include "log.h"
5+
#include "uvw/async.h"
46

57
#include <cassert>
68
#include <iostream>
@@ -46,13 +48,19 @@ std::ostream& operator<<(std::ostream& o, const StreamID& s) {
4648
return o << u8"Str❰" << s.id << u8"";
4749
}
4850

49-
Stream::Stream(Connection& conn, data_callback_t data_cb, close_callback_t close_cb, size_t buffer_size)
50-
: conn{conn}, data_callback{std::move(data_cb)}, close_callback{std::move(close_cb)}, buffer{buffer_size}
51+
Stream::Stream(Connection& conn, data_callback_t data_cb, close_callback_t close_cb, size_t buffer_size, StreamID id) :
52+
conn{conn},
53+
stream_id{std::move(id)},
54+
data_callback{std::move(data_cb)},
55+
close_callback{std::move(close_cb)},
56+
buffer{buffer_size},
57+
avail_trigger{conn.endpoint.get_loop().resource<uvw::AsyncHandle>()}
5158
{
59+
avail_trigger->on<uvw::AsyncEvent>([this] (auto&, auto&) { handle_unblocked(); });
5260
}
5361

5462
Stream::Stream(Connection& conn, StreamID id, size_t buffer_size)
55-
: conn{conn}, stream_id{id}, buffer{buffer_size}
63+
: Stream{conn, nullptr, nullptr, buffer_size, std::move(id)}
5664
{
5765
}
5866

@@ -122,7 +130,7 @@ void Stream::acknowledge(size_t bytes) {
122130
size -= bytes;
123131
start = size == 0 ? 0 : (start + bytes) % buffer.size(); // reset start to 0 (to reduce wrapping buffers) if empty
124132
if (!unblocked_callbacks.empty())
125-
handle_unblocked();
133+
available_ready();
126134
}
127135

128136
std::pair<bstring_view, bstring_view> Stream::pending() {
@@ -140,26 +148,24 @@ std::pair<bstring_view, bstring_view> Stream::pending() {
140148
}
141149

142150
void Stream::when_available(unblocked_callback_t unblocked_cb) {
151+
assert(available() == 0);
143152
unblocked_callbacks.push(std::move(unblocked_cb));
144-
handle_unblocked();
145153
}
146154

147155
void Stream::handle_unblocked() {
148156
while (!unblocked_callbacks.empty() && available() > 0) {
149-
#ifndef NDEBUG
150-
size_t pre_avail = available();
151-
#endif
152-
bool done = unblocked_callbacks.front()(*this);
153-
if (done)
157+
if (unblocked_callbacks.front()(*this))
154158
unblocked_callbacks.pop();
155159
else
156-
assert(available() < pre_avail);
160+
assert(available() == 0);
157161
}
158162
conn.io_ready();
159163
}
160164

161165
void Stream::io_ready() { conn.io_ready(); }
162166

167+
void Stream::available_ready() { avail_trigger->send(); }
168+
163169
void Stream::wrote(size_t bytes) {
164170
// Called to tell us we sent some bytes off, e.g. wrote(3) changes:
165171
// [ áaarrrrrr ] or [rr áaar]

stream.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <variant>
1111
#include <vector>
1212

13+
#include <uvw/async.h>
14+
1315
namespace quic {
1416

1517
class Connection;
@@ -136,11 +138,18 @@ class Stream : public std::enable_shared_from_this<Stream> {
136138
// callbacks are queued they are invoked in order, space permitting. The stored std::function
137139
// will not be moved or copied after being invoked (i.e. if invoked multiple times it will
138140
// always be invoked on the same instance).
141+
//
142+
// Available callbacks should only be used when the buffer is full, typically immediately after
143+
// an `append_any` call that returns less than the full write. Similarly a false return from an
144+
// unblock function (which keeps the callback alive) should satisfy the same condition.
139145
void when_available(unblocked_callback_t unblocked_cb);
140146

141-
// Calls io_ready() on the stream's connection to trigger sending data
147+
// Calls io_ready() on the stream's connection to scheduling sending outbound data
142148
void io_ready();
143149

150+
// Schedules processing of the "when_available" callbacks
151+
void available_ready();
152+
144153
// Lets you stash some arbitrary data in a shared_ptr; this is not used internally.
145154
void data(std::shared_ptr<void> data);
146155

@@ -160,7 +169,7 @@ class Stream : public std::enable_shared_from_this<Stream> {
160169
private:
161170
friend class Connection;
162171

163-
Stream(Connection& conn, data_callback_t data_cb, close_callback_t close_cb, size_t buffer_size);
172+
Stream(Connection& conn, data_callback_t data_cb, close_callback_t close_cb, size_t buffer_size, StreamID id = {-1});
164173
Stream(Connection& conn, StreamID id, size_t buffer_size);
165174

166175
// Non-copyable, non-movable; we manage it via a unique_ptr held by its Connection
@@ -216,6 +225,10 @@ class Stream : public std::enable_shared_from_this<Stream> {
216225
bool sent_fin{false};
217226
bool is_shutdown{false};
218227

228+
// Async trigger we use to schedule when_available callbacks (so that we can make them happen in
229+
// batches rather than after each and every packet ack).
230+
std::shared_ptr<uvw::AsyncHandle> avail_trigger;
231+
219232
std::variant<std::shared_ptr<void>, std::weak_ptr<void>> user_data;
220233
};
221234

0 commit comments

Comments
 (0)