1010#include " log.h"
1111#include " uvw/async.h"
1212#include " uvw/poll.h"
13+ #include " uvw/timer.h"
1314
1415#include < oxenmq/hex.h>
1516#include < oxenmq/bt_serialize.h>
@@ -91,7 +92,6 @@ namespace {
9192 break ;
9293
9394 case NGTCP2_CRYPTO_LEVEL_HANDSHAKE:
94-
9595 if (!ngtcp2_conn_is_server (conn)) {
9696 if (auto rv = conn.recv_transport_params (data); rv != 0 )
9797 return rv;
@@ -176,6 +176,7 @@ namespace {
176176 uint64_t offset, uint64_t datalen, void * user_data,
177177 void * stream_user_data) {
178178 Debug (" ######################" , __func__);
179+ Debug (" Ack [" , offset, " ," , offset+datalen, " )" );
179180 return static_cast <Connection*>(user_data)->stream_ack ({stream_id}, datalen);
180181 }
181182
@@ -201,13 +202,6 @@ namespace {
201202 // FIXME
202203 return 0 ;
203204 }
204- int extend_max_local_streams_bidi (ngtcp2_conn* conn, uint64_t max_streams, void * user_data) {
205- Debug (" ######################" , __func__);
206- Error (" FIXME UNIMPLEMENTED " , __func__);
207- Warn (" new max streams: " , max_streams);
208- // FIXME
209- return 0 ;
210- }
211205 int rand (
212206 uint8_t * dest, size_t destlen,
213207 const ngtcp2_rand_ctx* rand_ctx,
@@ -312,6 +306,18 @@ std::tuple<ngtcp2_settings, ngtcp2_transport_params, ngtcp2_callbacks> Connectio
312306 io_trigger = endpoint.loop ->resource <uvw::AsyncHandle>();
313307 io_trigger->on <uvw::AsyncEvent>([this ] (auto &, auto &) { on_io_ready (); });
314308
309+ retransmit_timer = endpoint.loop ->resource <uvw::TimerHandle>();
310+ retransmit_timer->on <uvw::TimerEvent>([this ] (auto &, auto &) {
311+ Debug (" Retransmit timer fired!" );
312+ if (auto rv = ngtcp2_conn_handle_expiry (*this , get_timestamp ()); rv != 0 ) {
313+ Warn (" expiry handler invocation returned an error: " , ngtcp2_strerror (rv));
314+ endpoint.close_connection (*this , ngtcp2_err_infer_quic_transport_error_code (rv), false );
315+ } else {
316+ flush_streams ();
317+ }
318+ });
319+ retransmit_timer->start (0ms, 0ms);
320+
315321 auto result = std::tuple<ngtcp2_settings, ngtcp2_transport_params, ngtcp2_callbacks>{};
316322 auto & [settings, tparams, cb] = result;
317323 cb.recv_crypto_data = recv_crypto_data;
@@ -422,7 +428,6 @@ Connection::Connection(Client& c, const ConnectionID& scid, const Path& path, ui
422428
423429 cb.client_initial = client_initial;
424430 cb.recv_retry = recv_retry;
425- cb.extend_max_local_streams_bidi = extend_max_local_streams_bidi;
426431 // cb.extend_max_local_streams_bidi = extend_max_local_streams_bidi;
427432 // cb.recv_new_token = recv_new_token;
428433
@@ -454,11 +459,6 @@ void Connection::on_io_ready() {
454459 Debug (" done " , __func__);
455460}
456461
457- void Connection::on_read (bstring_view data) {
458- Debug (" FIXME UNIMPLEMENTED " , __func__, " , data size: " , data.size ());
459- // FIXME
460- }
461-
462462void Connection::flush_streams () {
463463 // conn, path, pi, dest, destlen, and ts
464464 std::optional<uint64_t > ts;
@@ -491,12 +491,9 @@ void Connection::flush_streams() {
491491
492492 // FIXME: update remote addr? ecn?
493493 auto sent = send ();
494- if (sent.blocked ()) {
495- // FIXME: somewhere (maybe here?) should be setting up a write poll so that, once
496- // writing becomes available again (and the pending packet gets sent), we get back here.
497- // FIXME 2: I think this is already done by send() itself.
498- return false ;
499- }
494+ if (sent.blocked ())
495+ return false ; // We'll get called again when the socket becomes writable
496+
500497 send_buffer_size = 0 ;
501498 if (!sent) {
502499 Warn (" I/O error while trying to send packet: " , sent.str ());
@@ -562,6 +559,7 @@ void Connection::flush_streams() {
562559 switch (nwrite) {
563560 case 0 :
564561 Debug (" Done stream writing to " , stream.id (), " (either stream is congested or we have nothing else to send right now)" );
562+ assert (consumed <= 0 );
565563 break ;
566564 case NGTCP2_ERR_WRITE_MORE:
567565 Debug (" consumed " , consumed, " bytes from stream " , stream.id (), " and have space left" );
@@ -587,8 +585,8 @@ void Connection::flush_streams() {
587585 }
588586 }
589587
590- // Now try more with stream id -1 and no data: this will take care of initial handshake packets,
591- // and should finish off any partially-filled packet from above.
588+ // Now try more with stream id -1 and no data: this takes care of things like initial handshake
589+ // packets, and also finishes off any partially-filled packet from above.
592590 for (;;) {
593591 auto [nwrite, consumed] = add_stream_data (StreamID{}, nullptr , 0 );
594592 Debug (" add_stream_data for non-stream returned [" , nwrite, " ," , consumed, " ]" );
@@ -598,18 +596,39 @@ void Connection::flush_streams() {
598596 continue ;
599597 } else if (nwrite < 0 ) {
600598 Warn (" Error writing non-stream data: " , ngtcp2_strerror (nwrite));
601- return ;
599+ break ;
602600 } else if (nwrite == 0 ) {
603601
604602 // FIXME: Check whether this is actually possible for the -1 streamid?
605- Warn (" Unable to continue non-stream writing: we are congested" );
606- return ;
603+ Debug (" Nothing else to write for non-stream data for now (or we are congested)" );
604+ ngtcp2_conn_stat cstat;
605+ ngtcp2_conn_get_conn_stat (*this , &cstat);
606+ Debug (" Current unacked bytes in flight: " , cstat.bytes_in_flight );
607+ break ;
607608 }
608609
609610 Debug (" Sending non-stream data packet" );
610611 if (!send_packet (nwrite))
611612 return ;
612613 }
614+
615+ schedule_retransmit ();
616+ }
617+
618+ void Connection::schedule_retransmit () {
619+ auto expiry = std::chrono::nanoseconds{ngtcp2_conn_get_expiry (*this )};
620+ Debug (" SCHEDULE RETRANSMIT exp " , expiry.count ());
621+ if (expiry < 0ns) {
622+ retransmit_timer->repeat (0ms);
623+ return ;
624+ }
625+ auto expires_in = std::chrono::duration_cast<std::chrono::milliseconds>(
626+ expiry - get_time ().time_since_epoch ());
627+ Debug (" Next retransmit in " , expires_in.count (), " ms" );
628+ if (expires_in < 1ms)
629+ expires_in = 1ms;
630+ retransmit_timer->repeat (expires_in);
631+ retransmit_timer->again ();
613632}
614633
615634int Connection::stream_opened (StreamID id) {
@@ -638,7 +657,7 @@ int Connection::stream_opened(StreamID id) {
638657 return 0 ;
639658}
640659
641- int Connection::stream_receive (StreamID id, bstring_view data, bool fin) {
660+ int Connection::stream_receive (StreamID id, const bstring_view data, bool fin) {
642661 auto str = get_stream (id);
643662 if (!str->data_callback )
644663 Debug (" Dropping incoming data on stream " , str->id (), " : stream has no data callback set" );
@@ -662,6 +681,9 @@ int Connection::stream_receive(StreamID id, bstring_view data, bool fin) {
662681 str->close_callback (*str, std::nullopt );
663682 streams.erase (id);
664683 io_ready ();
684+ } else {
685+ ngtcp2_conn_extend_max_stream_offset (*this , id.id , data.size ());
686+ ngtcp2_conn_extend_max_offset (*this , data.size ());
665687 }
666688 return 0 ;
667689}
0 commit comments