Skip to content

Commit 12333e9

Browse files
committed
Add a user-provided-buffers mode, and use it.
This is slightly cleaner when dealing with uvw's buffers. It seems to make no appreciable performance difference, though.
1 parent af93246 commit 12333e9

8 files changed

+142
-61
lines changed

connection.cpp

+19-10
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "uvw/poll.h"
1313
#include "uvw/timer.h"
1414

15+
#include <iterator>
1516
#include <oxenmq/hex.h>
1617
#include <oxenmq/bt_serialize.h>
1718

@@ -516,19 +517,27 @@ void Connection::flush_streams() {
516517
while (!strs.empty() && stream_packets < 10) {
517518
for (auto it = strs.begin(); it != strs.end(); ) {
518519
auto& stream = **it;
519-
auto [first, second] = stream.pending();
520+
auto bufs = stream.pending();
520521
if (stream.is_shutdown ||
521-
(first.empty() && !stream.is_new && !(stream.is_closing && !stream.sent_fin))) {
522+
(bufs.empty() && !stream.is_new && !(stream.is_closing && !stream.sent_fin))) {
522523
it = strs.erase(it);
523524
continue;
524525
}
525-
std::array<ngtcp2_vec, 2> vecs;
526-
vecs[0].base = const_cast<uint8_t*>(u8data(first));
527-
vecs[0].len = first.size();
528-
vecs[1].base = const_cast<uint8_t*>(u8data(second));
529-
vecs[1].len = second.size();
530-
size_t vecs_size = first.empty() ? 0 : second.empty() ? 1 : 2;
531-
Debug("Sending ", vecs[0].len, "+", vecs[1].len, " data for ", stream.id());
526+
std::vector<ngtcp2_vec> vecs;
527+
vecs.reserve(bufs.size());
528+
std::transform(bufs.begin(), bufs.end(), std::back_inserter(vecs),
529+
[](const auto& buf) { return ngtcp2_vec{const_cast<uint8_t*>(u8data(buf)), buf.size()}; });
530+
531+
#ifndef NDEBUG
532+
{
533+
std::string buf_sizes;
534+
for (auto& b : bufs) {
535+
if (!buf_sizes.empty()) buf_sizes += '+';
536+
buf_sizes += std::to_string(b.size());
537+
}
538+
Debug("Sending ", buf_sizes.empty() ? "no" : buf_sizes, " data for ", stream.id());
539+
}
540+
#endif
532541

533542
uint32_t extra_flags = 0;
534543
if (stream.is_closing && !stream.sent_fin) {
@@ -539,7 +548,7 @@ void Connection::flush_streams() {
539548
stream.is_new = false;
540549
}
541550

542-
auto [nwrite, consumed] = add_stream_data(stream.id(), vecs.data(), vecs_size, extra_flags);
551+
auto [nwrite, consumed] = add_stream_data(stream.id(), vecs.data(), vecs.size(), extra_flags);
543552
Debug("add_stream_data for stream ", stream.id(), " returned [", nwrite, ",", consumed, "]");
544553

545554
if (nwrite > 0) {

endpoint.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
#include <uvw/poll.h>
2121
#include <uvw/timer.h>
2222

23-
#if defined(__linux__) && !defined(NO_RECVMMSG)
23+
// True if we support recvmmsg/sendmmsg
24+
#if defined(__linux__) && !defined(LOKINET_NO_RECVMMSG)
2425
# define LOKINET_HAVE_RECVMMSG
2526
#endif
2627

stream.cpp

+70-14
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,24 @@ Stream::Stream(Connection& conn, StreamID id, size_t buffer_size)
6767
void Stream::set_buffer_size(size_t size) {
6868
if (used() != 0)
6969
throw std::runtime_error{"Cannot update buffer size while buffer is in use"};
70-
if (size == 0)
71-
size = 64*1024;
72-
else if (size < 2048)
70+
if (size > 0 && size < 2048)
7371
size = 2048;
7472

7573
buffer.resize(size);
7674
buffer.shrink_to_fit();
7775
start = size = unacked_size = 0;
7876
}
7977

78+
size_t Stream::buffer_size() const {
79+
return buffer.empty()
80+
? size + start // start is the acked amount of the first buffer
81+
: buffer.size();
82+
}
83+
8084
bool Stream::append(bstring_view data) {
81-
size_t avail = available();
82-
if (avail < data.size())
85+
assert(!buffer.empty());
86+
87+
if (data.size() > available())
8388
return false;
8489

8590
// When we are appending we have three cases:
@@ -107,14 +112,20 @@ bool Stream::append(bstring_view data) {
107112
return true;
108113
}
109114
size_t Stream::append_any(bstring_view data) {
110-
size_t avail = available();
111-
if (data.size() > avail)
115+
if (size_t avail = available(); data.size() > avail)
112116
data.remove_suffix(data.size() - avail);
113117
[[maybe_unused]] bool appended = append(data);
114118
assert(appended);
115119
return data.size();
116120
}
117121

122+
void Stream::append_buffer(const std::byte* buffer, size_t length) {
123+
assert(this->buffer.empty());
124+
user_buffers.emplace_back(buffer, length);
125+
size += length;
126+
conn.io_ready();
127+
}
128+
118129
void Stream::acknowledge(size_t bytes) {
119130
// Frees bytes; e.g. acknowledge(3) changes:
120131
// [ áaaaaarr ] to [ áaarr ]
@@ -128,21 +139,61 @@ void Stream::acknowledge(size_t bytes) {
128139

129140
unacked_size -= bytes;
130141
size -= bytes;
131-
start = size == 0 ? 0 : (start + bytes) % buffer.size(); // reset start to 0 (to reduce wrapping buffers) if empty
142+
if (!buffer.empty())
143+
start = size == 0 ? 0 : (start + bytes) % buffer.size(); // reset start to 0 (to reduce wrapping buffers) if empty
144+
else if (size == 0) {
145+
user_buffers.clear();
146+
start = 0;
147+
} else {
148+
while (bytes) {
149+
assert(!user_buffers.empty());
150+
assert(start < user_buffers.front().second);
151+
if (size_t remaining = user_buffers.front().second - start;
152+
bytes >= remaining) {
153+
user_buffers.pop_front();
154+
start = 0;
155+
bytes -= remaining;
156+
} else {
157+
start += bytes;
158+
bytes = 0;
159+
}
160+
}
161+
}
162+
132163
if (!unblocked_callbacks.empty())
133164
available_ready();
134165
}
135166

136-
std::pair<bstring_view, bstring_view> Stream::pending() {
137-
std::pair<bstring_view, bstring_view> bufs;
138-
if (size_t rsize = unsent(); rsize > 0) {
167+
auto get_buffer_it(std::deque<std::pair<std::unique_ptr<const std::byte[]>, size_t>>& bufs, size_t offset) {
168+
auto it = bufs.begin();
169+
while (offset >= it->second) {
170+
offset -= it->second;
171+
it++;
172+
}
173+
return std::make_pair(std::move(it), offset);
174+
}
175+
176+
std::vector<bstring_view> Stream::pending() {
177+
std::vector<bstring_view> bufs;
178+
size_t rsize = unsent();
179+
if (!rsize) return bufs;
180+
if (!buffer.empty()) {
139181
size_t rpos = (start + unacked_size) % buffer.size();
140182
if (size_t rend = rpos + rsize; rend <= buffer.size()) {
141-
bufs.first = {buffer.data() + rpos, rsize};
183+
bufs.emplace_back(buffer.data() + rpos, rsize);
142184
} else { // wrapping
143-
bufs.first = {buffer.data() + rpos, buffer.size() - rpos};
144-
bufs.second = {buffer.data(), rend % buffer.size()};
185+
bufs.reserve(2);
186+
bufs.emplace_back(buffer.data() + rpos, buffer.size() - rpos);
187+
bufs.emplace_back(buffer.data(), rend % buffer.size());
145188
}
189+
} else {
190+
assert(!user_buffers.empty()); // If empty then unsent() should have been 0
191+
auto [it, offset] = get_buffer_it(user_buffers, start + unacked_size);
192+
bufs.reserve(std::distance(it, user_buffers.end()));
193+
assert(it != user_buffers.end());
194+
bufs.emplace_back(it->first.get() + offset, it->second - offset);
195+
for (++it; it != user_buffers.end(); ++it)
196+
bufs.emplace_back(it->first.get(), it->second);
146197
}
147198
return bufs;
148199
}
@@ -153,6 +204,10 @@ void Stream::when_available(unblocked_callback_t unblocked_cb) {
153204
}
154205

155206
void Stream::handle_unblocked() {
207+
if (buffer.empty()) {
208+
while (!unblocked_callbacks.empty() && unblocked_callbacks.front()(*this))
209+
unblocked_callbacks.pop();
210+
}
156211
while (!unblocked_callbacks.empty() && available() > 0) {
157212
if (unblocked_callbacks.front()(*this))
158213
unblocked_callbacks.pop();
@@ -171,6 +226,7 @@ void Stream::wrote(size_t bytes) {
171226
// [ áaarrrrrr ] or [rr áaar]
172227
// to:
173228
// [ áaaaaarrr ] or [aa áaaa]
229+
Debug("wrote ", bytes, ", unsent=",unsent());
174230
assert(bytes <= unsent());
175231
unacked_size += bytes;
176232
}

stream.h

+30-12
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,17 @@ class Stream : public std::enable_shared_from_this<Stream> {
6767

6868
// Sets the size of the outgoing data buffer. This may *only* be used if the buffer is
6969
// currently entirely empty; otherwise a runtime_error is thrown. The minimum buffer size is
70-
// 2048, the default is 64kiB.
70+
// 2048, the default is 64kiB. A value of 0 puts the Stream into user-provided buffer mode
71+
// where only the version of `append` taking ownership of a char* is permitted.
7172
void set_buffer_size(size_t size);
7273

73-
// Returns the size of the buffer (including both pending and free space).
74-
size_t buffer_size() const { return buffer.size(); }
74+
// Returns the size of the buffer (including both pending and free space). If using
75+
// user-provided buffer mode then this is the sum of all held buffers.
76+
size_t buffer_size() const;
7577

76-
// Returns the number of free bytes available in the outgoing stream data buffer
77-
size_t available() const { return is_closing ? 0 : buffer.size() - size; }
78+
// Returns the number of free bytes available in the outgoing stream data buffer. Always 0 in
79+
// user-provided buffer mode.
80+
size_t available() const { return is_closing || buffer.empty() ? 0 : buffer.size() - size; }
7881

7982
// Returns the number of bytes currently referenced in the buffer (i.e. pending or
8083
// sent-but-unacknowledged).
@@ -104,6 +107,11 @@ class Stream : public std::enable_shared_from_this<Stream> {
104107
return append_any(bstring_view{reinterpret_cast<const std::byte*>(data.data()), data.size()});
105108
}
106109

110+
// Takes ownership of the given buffer pointer, queuing it to be sent after any existing buffers
111+
// and freed once fully acked. You *must* have called `set_buffer_size(0)` (or set the
112+
// endpoints default_stream_buffer_size to 0) in order to use this.
113+
void append_buffer(const std::byte* buf, size_t length);
114+
107115
// Starting closing the stream and prevent any more outgoing data from being appended. If
108116
// `error_code` is provided then we close immediately with the given code; if std::nullopt (the
109117
// default) we close gracefully by sending a FIN bit.
@@ -142,6 +150,11 @@ class Stream : public std::enable_shared_from_this<Stream> {
142150
// Available callbacks should only be used when the buffer is full, typically immediately after
143151
// an `append_any` call that returns less than the full write. Similarly a false return from an
144152
// unblock function (which keeps the callback alive) should satisfy the same condition.
153+
//
154+
// In user-provided buffer mode the callback will be invoked after any data has been acked: it
155+
// is up to the caller to look at used()/buffer_size()/etc. to decide what to do. As described
156+
// above, return true to remove this callback, false to keep it and try again after the next
157+
// ack.
145158
void when_available(unblocked_callback_t unblocked_cb);
146159

147160
// Calls io_ready() on the stream's connection to scheduling sending outbound data
@@ -188,12 +201,10 @@ class Stream : public std::enable_shared_from_this<Stream> {
188201
// appending data).
189202
void acknowledge(size_t bytes);
190203

191-
// Returns a view into unwritten stream data. This returns two string_views: if there is no
192-
// pending data to write then both are empty; if the data to write does not wrap the buffer then
193-
// .second will be empty and .first contains the data; if it wraps then both are non-empty and
194-
// the .second buffer data immediately follows the .first buffer data. After writing any of the
195-
// provided data you should call `wrote()` to signal how much data you consumed.
196-
std::pair<bstring_view, bstring_view> pending();
204+
// Returns a view into unwritten stream data. This returns a vector of string_views of the data
205+
// to write, in order. After writing any of the provided data you must call `wrote()` to signal
206+
// how much of the given data was consumed (to advance the next pending() call).
207+
std::vector<bstring_view> pending();
197208

198209
// Called to signal that bytes have been written and should now be considered sent (but still
199210
// unacknowledged), thereby advancing the initial data position returned by the next `pending()`
@@ -205,10 +216,17 @@ class Stream : public std::enable_shared_from_this<Stream> {
205216
StreamID stream_id{-1};
206217

207218
// ring buffer of outgoing stream data that has not yet been acknowledged. This cannot be
208-
// resized once used as ngtcp2 will have pointers into the data.
219+
// resized once used as ngtcp2 will have pointers into the data. If this is empty then we are
220+
// in user-provided buffer mode.
209221
std::vector<std::byte> buffer{65536};
210222

223+
// user-provided buffers; only used when `buffer` is empty (via a `set_buffer_size(0)` or a 0
224+
// size given in the constructor).
225+
std::deque<std::pair<std::unique_ptr<const std::byte[]>, size_t>> user_buffers;
226+
211227
// Offset of the first used byte in the circular buffer, will always be in [0, buffer.size()).
228+
// For user-provided buffers this is the starting offset in the currently sending user-provided
229+
// buffer.
212230
size_t start{0};
213231

214232
// Number of sent-but-unacked packets in the buffer (i.e. [start, start+unacked_size) are sent but

tunnel-client.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ int main(int argc, char *argv[]) {
9191
loop,
9292
dest_port // tunnel destination port
9393
);
94+
tunnel_client->default_stream_buffer_size = 0; // We steal uvw's provided buffers
9495
quic::Debug("Initialized client");
9596

9697
// Start listening for TCP connections:

tunnel-server.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ int main(int argc, char *argv[]) {
9191
tcp.erase(error_handler);
9292
tunnel::install_stream_forwarding(tcp, *stream);
9393
assert(stream->used() == 0);
94-
stream->append(quic::bstring_view{&tunnel::CONNECT_INIT, 1});
94+
95+
stream->append_buffer(new std::byte[1]{tunnel::CONNECT_INIT}, 1);
9596
tcp.read();
9697
});
9798

@@ -100,6 +101,7 @@ int main(int argc, char *argv[]) {
100101
return true;
101102
}
102103
};
104+
s.default_stream_buffer_size = 0; // We steal uvw's provided buffers
103105
quic::Debug("Initialized server");
104106
std::cout << "Listening on localhost:" << listen_port << " with tunnel(s) to localhost port(s):";
105107
if (allowed_ports.empty())

tunnel.cpp

+13-23
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "tunnel.h"
2+
#include "log.h"
23
#include "stream.h"
34
#include "uvw/tcp.h"
45

@@ -11,33 +12,22 @@ void on_outgoing_data(uvw::DataEvent& event, uvw::TCPHandle& client) {
1112
std::string_view data{event.data.get(), event.length};
1213
auto peer = client.peer();
1314
quic::Debug(peer.ip, ":", peer.port, " → lokinet ", quic::buffer_printer{data});
14-
if (auto wrote = stream->append_any(data); wrote < data.size()) {
15-
// This gets complicated: we've received some data to forward but the stream's
16-
// internal buffer is full of unacknowledged data. We have to basically pause the
17-
// local socket until the situation improves, and keep a sort of "overflow" buffer
18-
// here to be reinserted once the stream space frees up.
19-
quic::Debug("quic tunnel is congested (wrote ", wrote, " of ", data.size(), " bytes; pausing local tcp connection reads");
15+
// Steal the buffer from the DataEvent's unique_ptr<char[]>:
16+
stream->append_buffer(reinterpret_cast<const std::byte*>(event.data.release()), event.length);
17+
if (stream->used() >= PAUSE_SIZE) {
18+
quic::Debug("quic tunnel is congested (have ", stream->used(), " bytes in flight); pausing local tcp connection reads");
2019
client.stop();
21-
data.remove_prefix(wrote);
22-
stream->when_available([
23-
client=client.shared_from_this(),
24-
// Steal the unique_ptr<char[]> from DataEvent (but have to use a shared_ptr because
25-
// of std::function).
26-
buffer=std::shared_ptr<char[]>(event.data.release()),
27-
data=std::move(data)
28-
](quic::Stream& s) mutable {
29-
if (auto wrote = s.append_any(data); wrote < data.size()) {
30-
quic::Debug("quic tunnel is partially unstuck (wrote ", wrote, " of ", data.size(), " remaining bytes)");
31-
data.remove_prefix(wrote);
32-
return false; // Not done.
20+
stream->when_available([](quic::Stream& s) {
21+
auto client = s.data<uvw::TCPHandle>();
22+
if (s.used() < PAUSE_SIZE) {
23+
quic::Debug("quic tunnel is no longer congested; resuming tcp connection reading");
24+
client->read();
25+
return true;
3326
}
34-
35-
quic::Debug("quic tunnel is no longer congested; resuming tcp connection reading");
36-
client->read();
37-
return true;
27+
return false;
3828
});
3929
} else {
40-
quic::Debug("Sent ", wrote, " bytes");
30+
quic::Debug("Queued ", event.length, " bytes");
4131
}
4232
}
4333

tunnel.h

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ inline constexpr uint64_t ERROR_BAD_INIT{0x5471908};
2626
// failure)
2727
inline constexpr uint64_t ERROR_TCP{0x5471909};
2828

29+
// We pause reading from the local TCP socket if we have more than this amount of outstanding
30+
// unacked data in the quic tunnel, then resume once it drops below this.
31+
inline constexpr size_t PAUSE_SIZE = 64*1024;
32+
2933
// Callbacks for network events. The uvw::TCPHandle client must contain a shared pointer to the
3034
// associated quic::Stream in its data, and the quic::Stream must contain a weak pointer to the
3135
// uvw::TCPHandle.

0 commit comments

Comments
 (0)