-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflow.h
166 lines (156 loc) · 4.59 KB
/
flow.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#include "time.h"
#include "udp.h"
#include <boost/operators.hpp>
typedef boost::asio::ip::tcp::socket tcp_socket;
// Timestamps are simple for now
typedef uint32_t timestamp_t;
// 32 bit sequence # that support wrapping
class seq_t
: boost::addable<seq_t, size_t>
, boost::less_than_comparable<seq_t>
, boost::equality_comparable<seq_t>
, boost::equivalent<seq_t>
{
public:
explicit seq_t(uint32_t x) : m_value(x) {}
explicit operator uint32_t() const { return m_value; }
seq_t& operator+=(size_t x) { m_value += x; return *this; }
size_t operator-(const seq_t& rhs) const { return size_t(m_value - rhs.m_value); }
bool operator<(const seq_t rhs) const {
return uint32_t(rhs.m_value - m_value - 1) <= 0x7fffffff;
}
private:
uint32_t m_value;
};
static const duration ACK_DELAY = 20_ms;
static const duration MIN_RTO = 50_ms;
static const duration MAX_RTO = 1000_ms;
static const duration KEEP_ALIVE = 5000_ms;
static const size_t WINDOW = 1024*1024;
static const size_t MSS = 1024;
class flow_recv
{
public:
typedef std::function<void (seq_t ack, size_t window, timestamp_t stamp)> send_func_t;
typedef std::function<void (const error_code& err)> on_err_func_t;
// Constructor and destructor
flow_recv(timer_mgr& tm, tcp_socket& sink, const send_func_t& do_send, const on_err_func_t& on_err);
~flow_recv();
// Called when a packet arrives, returns true if true if we
// need to generate an ACK immediately
void on_packet(seq_t seq, timestamp_t stamp, const char* data, size_t len);
// Called to 'stop' socket, returns true if stopped, otherwise will callback on_err
bool stop();
private:
// Start a write
void start_write();
// Write complete
void write_complete(const error_code& err, size_t len);
// Send an ACK now
void send_now(timestamp_t stamp);
// Set ACK timer
void set_ack_timer();
// Timers and sink
timer_mgr& m_tm;
tcp_socket& m_sink;
// Where to send packets
send_func_t m_do_send;
// What to do with errors to socket
on_err_func_t m_on_err;
// Is there a write currently pending
bool m_write_pending;
// Current ack seq number
seq_t m_ack_seq;
// Current 'head' of unwritten data
seq_t m_head_seq;
// Buffer of packets
std::map<seq_t, std::string> m_pkt_buf;
// ACK timer
timer_id m_ack_timer;
};
class flow_send
{
public:
typedef std::function<void (seq_t seq, const char* buf, size_t len)> send_func_t;
typedef std::function<void (const error_code& err)> on_err_func_t;
// Make a new flow_send
flow_send(timer_mgr& tm, tcp_socket& source, const send_func_t& do_send, const on_err_func_t& on_err);
// Destructor
~flow_send();
// Called when an ACK packet arrives
void on_ack(seq_t ack, size_t window, const duration* rtt);
// Called to 'stop' socket, returns true if stopped, otherwise will callback on_err
bool stop();
private:
// Start a read
void start_read();
// read complete
void read_complete(const error_code& err, size_t len);
// Call to handle a hard timeout
void on_timeout();
// Start a timer if needed
void start_timer();
// Resend head packet
void resend_head();
// Compute size of in-flight ring buffer
size_t flight_size();
// Pops some bytes from in-flight
void dequeue_in_flight(size_t len);
// Push some data to in-flight
void enqueue_in_flight(const char* buf, size_t len);
// Get the first len bytes of in-flight
void get_flight_head(char* out, size_t len);
// Timers + source
timer_mgr& m_tm;
tcp_socket& m_source;
// Where to send packets
send_func_t m_do_send;
// What to do on errors to socket
on_err_func_t m_on_err;
// Is a read from source pending
bool m_read_pending;
// Highest sequence send + 1
seq_t m_send_seq;
// Highest sequence acked
seq_t m_ack_seq;
// Receiver window, based on ack_seq
size_t m_window;
// Congestion window, base on ack_seq
size_t m_cwnd;
// Slow start threshold
size_t m_sst;
// New-reno state
size_t m_dup_acks;
bool m_in_recover;
seq_t m_recover_seq;
// Round trip data in microseconds
duration m_rtt_avg;
duration m_rtt_dev;
duration m_rto;
// RTO timer
timer_id m_send_timer;
// Buffer of in flight packets
char *m_in_flight;
uint32_t m_fhead;
uint32_t m_ftail;
// Read + write buffers for packets
char m_read_buf[MSS];
char m_resend_buf[MSS];
};
class udp_flow_mgr
{
public:
udp_flow_mgr(timer_mgr& mgr, udp_port& udp, tcp_socket& tcp, udp_endpoint remote);
private:
bool on_packet(const char* buffer, size_t size);
void send_ack(seq_t ack, size_t window, timestamp_t stamp);
void send_seq(seq_t seq, const char* buf, size_t len);
void do_keepalive();
timer_mgr& m_tm;
udp_port& m_udp;
tcp_socket& m_tcp;
udp_endpoint m_remote;
flow_send m_send;
flow_recv m_recv;
timer_id m_keepalive;
};