-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconn.cpp
476 lines (448 loc) · 14.4 KB
/
conn.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
#include "conn.h"
#include "log.h"
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#include <openssl/sha.h>
#include <openssl/rand.h>
#define LOG_TOPIC LT_CONN
static const duration REMOTE_CONN_TIME = 30_sec;
static const duration LOCAL_CONN_TIME = 5_sec;
static const duration SEND_KEEPALIVE_TIME = 5_sec;
static const duration RECV_KEEPALIVE_TIME = 7 * SEND_KEEPALIVE_TIME / 2;
conn::conn(conn_mgr& mgr, const udp_endpoint& who, uint32_t s_time, uint32_t s_token)
: m_mgr(mgr)
, m_who(who)
, m_state(state::outbound)
, m_r_time(0)
, m_r_token(0)
, m_s_time(s_time)
, m_s_token(s_token)
, m_send_keep_alive(0)
{
// Arm expiration
m_timer = m_mgr.m_tm.add(time_from_sec(s_time) + REMOTE_CONN_TIME, [this]() { on_timeout(); });
}
void conn::start_connect(uint8_t time, uint32_t token)
{
assert(m_state == state::outbound);
LOG_INFO("Connecting to localhost");
// Disarm remote connect, arm local connect
m_mgr.m_tm.cancel(m_timer);
m_timer = m_mgr.m_tm.add(now() + LOCAL_CONN_TIME, [this]() { on_timeout(); });
// Update state
m_state = state::starting;
m_r_time = time;
m_r_token = token;
// Kick off socket connect
m_socket = std::make_unique<tcp_socket>(m_mgr.m_tm.get_ios());
m_socket->async_connect(tcp_endpoint(ip_address_v4::loopback(), m_mgr.m_tcp_port),
[this](const error_code& error) { on_connect(error); });
}
void conn::on_packet(const conn_hdr &hdr, const char* data, size_t len)
{
// Only should get data in two states
assert(m_state == state::starting || m_state == state::running);
// Push back keepalive timer
m_mgr.m_tm.cancel(m_timer);
m_timer = m_mgr.m_tm.add(now() + RECV_KEEPALIVE_TIME, [this]() { on_timeout(); });
if (hdr.type != ptype::data && hdr.type != ptype::data_ack) {
if (hdr.type != ptype::final_ack) {
LOG_WARN("Ignore strange packet from %s", to_string(m_who).c_str());
}
return;
}
// If we don't yet have a local connection, queue it up
if (m_state == state::starting) {
LOG_INFO("Still connecting, queueing packet from %s", to_string(m_who).c_str());
m_queue.emplace(hdr, data, len);
if (m_queue.size() > 5) {
m_queue.pop();
}
return;
}
// Process packet
process_packet(hdr, data, len);
}
struct data_hdr {
uint32_t seq;
uint32_t timestamp;
};
struct data_ack_hdr {
uint32_t ack;
uint32_t window;
uint32_t timestamp;
};
void conn::setup_chdr(conn_hdr& hdr)
{
// Helper to stamp state onto chdr
hdr.s_time = m_s_time & 0xff;
hdr.s_token = m_s_token;
hdr.r_time = m_r_time;
hdr.r_token = m_r_token;
}
void conn::send_seq(seq_t seq, const char* buf, size_t len)
{
// Push back keep alive send
m_mgr.m_tm.cancel(m_send_keep_alive);
m_send_keep_alive = m_mgr.m_tm.add(now() + SEND_KEEPALIVE_TIME, [this]() { send_keep_alive(); });
// Make space for headers
conn_hdr& chdr = *((conn_hdr*) (m_mgr.m_send_buf));
data_hdr& dhdr = *((data_hdr*) (m_mgr.m_send_buf + sizeof(conn_hdr)));
char* data = m_mgr.m_send_buf + sizeof(conn_hdr) + sizeof(data_hdr);
assert(sizeof(conn_hdr) + sizeof(data_hdr) + len < 2048);
// Setup chdr
chdr.type = ptype::data;
setup_chdr(chdr);
// Setup dhdr
dhdr.seq = htonl(uint32_t(seq));
dhdr.timestamp = htonl(now_us_wrap());
if (dhdr.timestamp == 0) { dhdr.timestamp = htonl(1); }
// Copy actual data, maybe remove memcpy someday
memcpy(data, buf, len);
// Send away
LOG_DEBUG("send SEQ (%u, %u, %u)", uint32_t(seq), uint32_t(len), htonl(dhdr.timestamp));
m_mgr.send_packet(m_who, sizeof(conn_hdr) + sizeof(data_hdr) + len);
}
void conn::send_ack(seq_t ack, size_t window, timestamp_t stamp)
{
// Push back keep alive send
m_mgr.m_tm.cancel(m_send_keep_alive);
m_send_keep_alive = m_mgr.m_tm.add(now() + SEND_KEEPALIVE_TIME, [this]() { send_keep_alive(); });
// Make space for headers
conn_hdr& chdr = *((conn_hdr*) (m_mgr.m_send_buf));
data_ack_hdr& dhdr = *((data_ack_hdr*) (m_mgr.m_send_buf + sizeof(conn_hdr)));
// Setup chdr
chdr.type = ptype::data_ack;
setup_chdr(chdr);
// Setup dhdr
dhdr.ack = htonl(uint32_t(ack));
dhdr.window = htonl(uint32_t(window));
dhdr.timestamp = htonl(uint32_t(stamp));
LOG_DEBUG("send ACK (%u, %u, %u)", uint32_t(ack), uint32_t(window), uint32_t(stamp));
m_mgr.send_packet(m_who, sizeof(conn_hdr) + sizeof(data_ack_hdr));
}
void conn::send_keep_alive()
{
// Make space for headers
conn_hdr& chdr = *((conn_hdr*) (m_mgr.m_send_buf));
// Setup chdr
chdr.type = ptype::final_ack;
setup_chdr(chdr);
// Send packet + reschedule
LOG_INFO("Sending keepalive to %s", to_string(m_who).c_str());
m_mgr.send_packet(m_who, sizeof(conn_hdr));
m_send_keep_alive = m_mgr.m_tm.add(now() + SEND_KEEPALIVE_TIME, [this]() { send_keep_alive(); });
}
void conn::on_timeout()
{
LOG_INFO("Timeout with state = %d", m_state);
if (m_state == state::outbound || m_state == state::time_wait) {
// Delete myself
m_mgr.m_state.erase(m_who);
} else {
// Kick off destruction
m_socket->close();
}
}
void conn::socket_error(const error_code& error)
{
LOG_INFO("Socket error on %s: %s", to_string(m_who).c_str(), error.message().c_str());
assert(m_state == state::running);
if (!m_recv->stop()) {
return; // Will be called again soon
}
if (!m_send->stop()) {
return; // Will be called again soon
}
LOG_INFO("Changing state to time_wait");
// Cancel sending of keep alive
m_mgr.m_tm.cancel(m_send_keep_alive);
// Cancel per state timer
m_mgr.m_tm.cancel(m_timer);
// Erase send and receive + socket
m_recv.reset();
m_send.reset();
m_socket.reset();
// Go to time wait
go_time_wait();
}
bool conn::process_packet(const conn_hdr &hdr, const char* data, size_t size)
{
assert(m_state == state::running);
if (hdr.type == ptype::data_ack) {
if (size != sizeof(data_ack_hdr)) {
return false;
}
const data_ack_hdr& hdr = *((const data_ack_hdr*) data);
duration d = std::chrono::microseconds(uint32_t(now_us_wrap() - ntohl(hdr.timestamp)));
m_send->on_ack(
seq_t(ntohl(hdr.ack)),
size_t(ntohl(hdr.window)),
(hdr.timestamp == 0 ? NULL : &d));
return true;
} else if (hdr.type == ptype::data) {
if (size < sizeof(data_hdr)) {
return false;
}
if (size > sizeof(data_hdr) + MSS) {
return false;
}
size_t len = size - sizeof(data_hdr);
const data_hdr& hdr = *((const data_hdr*) data);
m_recv->on_packet(
seq_t(ntohl(hdr.seq)),
ntohl(hdr.timestamp),
data + sizeof(data_hdr),
len);
return true;
}
return false;
}
void conn::on_connect(const error_code& error)
{
m_mgr.m_tm.cancel(m_timer);
if (error) {
LOG_INFO("Connection to localhost failed: %s", error.message().c_str());
go_time_wait();
return;
}
LOG_INFO("Connection to localhost up");
m_state = state::running;
m_recv = std::make_unique<flow_recv>(m_mgr.m_tm, *m_socket,
[this](seq_t ack, size_t window, timestamp_t stamp) {
send_ack(ack, window, stamp);
},
[this](const error_code& err) { socket_error(err); });
m_send = std::make_unique<flow_send>(m_mgr.m_tm, *m_socket,
[this](seq_t seq, const char* buf, size_t len) {
send_seq(seq, buf, len);
},
[this](const error_code& err) { socket_error(err); });
m_timer = m_mgr.m_tm.add(now() + RECV_KEEPALIVE_TIME, [this]() { on_timeout(); });
m_send_keep_alive = m_mgr.m_tm.add(now() + SEND_KEEPALIVE_TIME, [this]() { send_keep_alive(); });
while(!m_queue.empty()) {
const pkt_queue_entry& entry = m_queue.front();
process_packet(entry.hdr, entry.data, entry.len);
m_queue.pop();
}
}
void conn::go_time_wait()
{
// If I need to wait, do the time wait thing
time_point tp_wait = time_from_sec(m_s_time) + REMOTE_CONN_TIME;
if (now() <= tp_wait) {
m_state = state::time_wait;
m_timer = m_mgr.m_tm.add(tp_wait, [this]() { on_timeout(); });
} else {
// Delete myself
m_mgr.m_state.erase(m_who);
}
}
conn_mgr::conn_mgr(timer_mgr& tm, udp_port& udp, uint16_t tcp_port, size_t goal_conns)
: m_tm(tm)
, m_udp(udp)
, m_tcp_port(tcp_port)
, m_goal_conns(std::max(goal_conns, size_t(4)))
, m_max_conns(2*m_goal_conns)
{
m_udp.add_protocol([this](const udp_endpoint& src, const char* buf, size_t len) -> bool {
if (len >= sizeof(conn_hdr) && buf[0] == 'M') {
on_packet(src, buf, len);
return true;
}
return false;
});
RAND_bytes(m_secret, 16);
}
bool conn_mgr::has_conn(const udp_endpoint& remote)
{
// TODO: Who calls this, does this mean what we thing?
auto it = m_state.find(remote);
if (it != m_state.end() && it->second.m_state != conn::state::time_wait) {
return true;
}
return false;
}
void conn_mgr::send_probe(const udp_endpoint& remote)
{
auto it = m_state.find(remote);
if (it == m_state.end()) {
// Make a new 'outgoing' state if needed
uint32_t s_time = now_sec();
uint32_t s_token = make_token(remote, s_time);
it = m_state.emplace(std::piecewise_construct,
std::forward_as_tuple(remote),
std::forward_as_tuple(*this, remote, s_time, s_token)).first;
}
if (it->second.m_state != conn::state::outbound) {
LOG_INFO("Not in outbound state, ignoring send_probe");
return;
}
// Make space for headers
conn_hdr& chdr = *((conn_hdr*) (m_send_buf));
// Setup chdr
chdr.type = ptype::probe;
it->second.setup_chdr(chdr);
LOG_INFO("Sending probe to %s", to_string(remote).c_str());
send_packet(remote, sizeof(conn_hdr));
}
uint32_t conn_mgr::make_token(const udp_endpoint& who, uint32_t time) {
// This doesn't need to be globally agreed on, it's a secret
// hash, so I can change it, also, no need to correct endian
unsigned char buf[20];
std::string who_str = to_string(who);
SHA_CTX ctx;
SHA1_Init(&ctx);
SHA1_Update(&ctx, m_secret, 16);
SHA1_Update(&ctx, (const unsigned char *) who_str.c_str(), who_str.size());
SHA1_Update(&ctx, (const unsigned char *) &time, sizeof(uint32_t));
SHA1_Update(&ctx, m_secret, 16);
SHA1_Final(buf, &ctx);
return *((uint32_t*) buf);
}
void conn_mgr::send_packet(const udp_endpoint& dest, size_t len)
{
conn_hdr& hdr = *((conn_hdr*) m_send_buf);
hdr.magic = 'M';
m_udp.send(dest, m_send_buf, len);
LOG_DEBUG("Sending packet, type = %d, s_time = %u, s_token = %x, r_time = %u, r_token = %x",
int(hdr.type), hdr.s_time, hdr.s_token, hdr.r_time, hdr.r_token);
}
void conn_mgr::on_packet(const udp_endpoint& src, const char* buf, size_t len)
{
const conn_hdr* hdr = (const conn_hdr*) buf;
uint32_t r_time = now_sec();
auto it = m_state.find(src);
LOG_DEBUG("Receiving packet, type = %d, s_time = %u, s_token = %x, r_time = %u, r_token = %x",
int(hdr->type), hdr->s_time, hdr->s_token, hdr->r_time, hdr->r_token);
// Handle probe case
if (hdr->type == ptype::probe) {
if (it == m_state.end() || it->second.m_state == conn::state::outbound) {
// Respond with a valid token
LOG_INFO("Probe from %s", to_string(src).c_str());
conn_hdr& rhdr = *((conn_hdr*) m_send_buf);
rhdr.type = ptype::probe_ack;
rhdr.r_time = hdr->s_time;
rhdr.r_token = hdr->s_token;
if (it == m_state.end()) {
LOG_INFO("Acking blind");
rhdr.s_time = r_time & 0xff;
rhdr.s_token = make_token(src, r_time);
send_packet(src, sizeof(conn_hdr));
return;
}
LOG_INFO("Acking half blind");
rhdr.s_time = it->second.m_s_time & 0xff;
rhdr.s_token = it->second.m_s_token;
send_packet(src, sizeof(conn_hdr));
return;
} else {
LOG_INFO("Probe from %s, Ignoring", to_string(src).c_str());
}
return;
}
// Handle final-ack construction case
if (it == m_state.end() && hdr->type == ptype::final_ack) {
// No current state, if valid, start connection
if (hdr->r_time > (r_time & 0xff)) r_time -= 256;
r_time = (r_time & 0xffffff00) | hdr->r_time;
uint32_t token = make_token(src, r_time);
if (token != hdr->r_token) {
LOG_INFO("From %s: Invalid token for probe_ack", to_string(src).c_str());
return;
}
LOG_INFO("From %s: Valid final_ack for forgotten probe_ack", to_string(src).c_str());
// Brings us to 'outgoing' state
it = m_state.emplace(std::piecewise_construct,
std::forward_as_tuple(src),
std::forward_as_tuple(*this, src, r_time, hdr->r_token)).first;
}
if (it == m_state.end()) {
LOG_INFO("From %s: Ignoring everything but probe/final-ack for empty state", to_string(src).c_str());
return;
}
// If not a valid response based on outgoing, bail right away
if (hdr->r_time != (it->second.m_s_time & 0xff) ||
hdr->r_token != it->second.m_s_token) {
LOG_INFO("From %s: Invalid local token for probe_ack", to_string(src).c_str());
return;
}
// Make sure we are not in time wait
if (it->second.m_state == conn::state::time_wait) {
LOG_INFO("From %s: Ignoring packet while in time_wait", to_string(src).c_str());
return;
}
if (it->second.m_state == conn::state::outbound) {
if (hdr->type != ptype::probe_ack && hdr->type != ptype::final_ack) {
LOG_INFO("Getting a probe-ack while connected, ignoring");
return;
}
// Ok, move outgoing connections forward to starting
if (it->second.m_state == conn::state::outbound) {
it->second.start_connect(hdr->s_time, hdr->s_token);
}
}
// If not valid remote tokens, bail
if (hdr->s_time != it->second.m_r_time ||
hdr->s_token != it->second.m_r_token) {
LOG_INFO("From %s: Invalid remote token for probe_ack", to_string(src).c_str());
return;
}
if (hdr->type == ptype::probe_ack) {
// Maybe Send final ack
LOG_INFO("From %s: Got probe ack, sending final_ack", to_string(src).c_str());
conn_hdr& rhdr = *((conn_hdr*) m_send_buf);
rhdr.type = ptype::final_ack;
it->second.setup_chdr(rhdr);
send_packet(src, sizeof(conn_hdr));
return;
}
// Pass packet on to connection
it->second.on_packet(*hdr, buf + sizeof(conn_hdr), len - sizeof(conn_hdr));
}
/*
int main()
{
g_log_level[LT_FLOW] = LL_DEBUG;
g_log_level[LT_CONN] = LL_DEBUG;
io_service ios;
int listeners = 2;
boost::asio::ip::tcp::acceptor l1 = {ios, tcp_endpoint(ip_address_v4::loopback(), 2000)};
boost::asio::ip::tcp::acceptor l2 = {ios, tcp_endpoint(ip_address_v4::loopback(), 2001)};
l1.listen();
l2.listen();
tcp_socket s1(ios);
tcp_socket s2(ios);
l1.async_accept(s1, [&](const error_code& error) {
if (error) {
LOG_DEBUG("Accept error");
exit(1);
}
LOG_DEBUG("GOT ACCEPT #1");
listeners--;
s1.send(boost::asio::buffer("Hello", 5));
});
l2.async_accept(s2, [&](const error_code& error) {
if (error) {
LOG_DEBUG("Accept error");
exit(1);
}
LOG_DEBUG("GOT ACCEPT #2");
listeners--;
s2.send(boost::asio::buffer("World", 5));
});
LOG_DEBUG("Listeners running");
timer_mgr tm(ios);
udp_port up1(ios, 5000);
udp_port up2(ios, 5001);
conn_mgr cm1(tm, up1, 2000);
conn_mgr cm2(tm, up2, 2001);
LOG_DEBUG("Connection managers running");
for(size_t i = 0; i < 3; i++) {
tm.add(now() + 5_sec*i, [&]() {
cm1.send_probe(udp_endpoint(ip_address_v4::loopback(), 5001));
});
}
LOG_DEBUG("Probe sent");
ios.run();
}
*/