Skip to content

Commit ec5e7fb

Browse files
tmontgomerylehecka
authored andcommitted
initial resume machinery (#113)
* initial resume machinery * add placeholder calls to retransmit from position. * use std::array for token storage. * use unique_ptr to hold ResumeTracker and ResumeCache so they can be moved. * add server side resume listener callback for handling RESUME association * add client side ReactiveSocket tryClientResume logic * initial framework for resume TCP tests * fix bad function call from confusion over 'this' * add token to SETUP * send ERROR frames when can not resume. Minor fixes for position tracking. Avoid closing ReactiveSocket upon close to allow for resume. * added token to fromClientConnection params. Reworked resume listener to pass position and added methods on ReactiveSocket to query cache position.
1 parent 6cc7a9e commit ec5e7fb

13 files changed

+805
-10
lines changed

CMakeLists.txt

+47-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ add_library(
132132
src/folly/FollyKeepaliveTimer.cpp
133133
src/folly/FollyKeepaliveTimer.h
134134
src/ConnectionSetupPayload.h
135-
src/ConnectionSetupPayload.cpp)
135+
src/ConnectionSetupPayload.cpp
136+
src/ResumeTracker.h
137+
src/ResumeCache.h
138+
src/ResumeCache.cpp
139+
)
136140

137141
target_link_libraries(
138142
ReactiveSocket
@@ -162,7 +166,8 @@ add_executable(
162166
test/ReactiveSocketTest.cpp
163167
test/ReactiveStreamsMocksCompat.h
164168
test/Test.cpp
165-
test/MockStats.h)
169+
test/MockStats.h
170+
)
166171

167172
target_link_libraries(
168173
tests
@@ -240,4 +245,44 @@ target_link_libraries(
240245

241246
add_dependencies(tckclient gmock)
242247

248+
add_executable(
249+
tcpresumeclient
250+
test/resume/TcpResumeClient.cpp
251+
test/simple/PrintSubscriber.cpp
252+
test/simple/PrintSubscriber.h
253+
src/ReactiveSocket.cpp
254+
src/ReactiveSocket.h
255+
test/simple/StatsPrinter.cpp
256+
test/simple/StatsPrinter.h)
257+
258+
target_link_libraries(
259+
tcpresumeclient
260+
ReactiveSocket
261+
${FOLLY_LIBRARIES}
262+
gflags
263+
${GMOCK_LIBS}
264+
glog
265+
${CMAKE_THREAD_LIBS_INIT})
266+
267+
add_dependencies(tcpresumeclient gmock)
268+
269+
add_executable(
270+
tcpresumeserver
271+
test/resume/TcpResumeServer.cpp
272+
test/simple/PrintSubscriber.cpp
273+
test/simple/PrintSubscriber.h
274+
test/simple/StatsPrinter.cpp
275+
test/simple/StatsPrinter.h)
276+
277+
target_link_libraries(
278+
tcpresumeserver
279+
ReactiveSocket
280+
${FOLLY_LIBRARIES}
281+
gflags
282+
${GMOCK_LIBS}
283+
glog
284+
${CMAKE_THREAD_LIBS_INIT})
285+
286+
add_dependencies(tcpresumeserver gmock)
287+
243288
# EOF

src/ConnectionAutomaton.cpp

+86-1
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ namespace reactivesocket {
2121
ConnectionAutomaton::ConnectionAutomaton(
2222
std::unique_ptr<DuplexConnection> connection,
2323
StreamAutomatonFactory factory,
24+
ResumeListener resumeListener,
2425
Stats& stats,
2526
bool isServer)
2627
: connection_(std::move(connection)),
2728
factory_(std::move(factory)),
2829
stats_(stats),
29-
isServer_(isServer) {
30+
isServer_(isServer),
31+
isResumable_(true),
32+
resumeTracker_(new ResumeTracker()),
33+
resumeCache_(new ResumeCache()),
34+
resumeListener_(resumeListener) {
3035
// We deliberately do not "open" input or output to avoid having c'tor on the
3136
// stack when processing any signals from the connection. See ::connect and
3237
// ::onSubscribe.
@@ -66,6 +71,12 @@ void ConnectionAutomaton::disconnect() {
6671
}
6772
}
6873

74+
void ConnectionAutomaton::reconnect(std::unique_ptr<DuplexConnection> newConnection) {
75+
disconnect();
76+
connection_ = std::move(newConnection);
77+
connect();
78+
}
79+
6980
ConnectionAutomaton::~ConnectionAutomaton() {
7081
VLOG(6) << "~ConnectionAutomaton";
7182
// We rely on SubscriptionPtr and SubscriberPtr to dispatch appropriate
@@ -127,6 +138,10 @@ void ConnectionAutomaton::onNext(std::unique_ptr<folly::IOBuf> frame) {
127138

128139
stats_.frameRead(ss.str());
129140

141+
// TODO(tmont): If a frame is invalid, it will still be tracked. However, we actually want that. We want to keep
142+
// each side in sync, even if a frame is invalid.
143+
resumeTracker_->trackReceivedFrame(*frame);
144+
130145
auto streamIdPtr = FrameHeader::peekStreamId(*frame);
131146
if (!streamIdPtr) {
132147
// Failed to deserialize the frame.
@@ -183,6 +198,9 @@ void ConnectionAutomaton::onConnectionFrame(
183198
return;
184199
}
185200
case FrameType::SETUP: {
201+
// TODO(tmont): check for ENABLE_RESUME and make sure isResumable_ is true
202+
// TODO(tmont): figure out how best to pass in ResumeIdentificationToken
203+
186204
if (!factory_(0, std::move(payload))) {
187205
assert(false);
188206
}
@@ -194,6 +212,46 @@ void ConnectionAutomaton::onConnectionFrame(
194212
}
195213
return;
196214
}
215+
case FrameType::RESUME: {
216+
Frame_RESUME frame;
217+
if (frame.deserializeFrom(std::move(payload))) {
218+
bool canResume = false;
219+
220+
if (isServer_ && isResumable_) {
221+
// find old ConnectionAutmaton via calling listener.
222+
// Application will call resumeFromAutomaton to setup streams and resume information
223+
canResume = resumeListener_(frame.token_, frame.position_);
224+
}
225+
226+
if (canResume) {
227+
outputFrameOrEnqueue(
228+
Frame_RESUME_OK(resumeTracker_->impliedPosition()).serializeOut());
229+
resumeCache_->retransmitFromPosition(frame.position_, *this);
230+
} else {
231+
outputFrameOrEnqueue(Frame_ERROR::canNotResume("can not resume").serializeOut());
232+
disconnect();
233+
}
234+
} else {
235+
outputFrameOrEnqueue(Frame_ERROR::unexpectedFrame().serializeOut());
236+
disconnect();
237+
}
238+
return;
239+
}
240+
case FrameType::RESUME_OK: {
241+
Frame_RESUME_OK frame;
242+
if (frame.deserializeFrom(std::move(payload))) {
243+
if (!isServer_ && isResumable_ && resumeCache_->isPositionAvailable(frame.position_)) {
244+
resumeCache_->retransmitFromPosition(frame.position_, *this);
245+
} else {
246+
outputFrameOrEnqueue(Frame_ERROR::canNotResume("can not resume").serializeOut());
247+
disconnect();
248+
}
249+
} else {
250+
outputFrameOrEnqueue(Frame_ERROR::unexpectedFrame().serializeOut());
251+
disconnect();
252+
}
253+
return;
254+
}
197255
default:
198256
outputFrameOrEnqueue(Frame_ERROR::unexpectedFrame().serializeOut());
199257
disconnect();
@@ -263,6 +321,19 @@ void ConnectionAutomaton::sendKeepalive() {
263321
outputFrameOrEnqueue(pingFrame.serializeOut());
264322
}
265323

324+
void ConnectionAutomaton::sendResume(const ResumeIdentificationToken &token) {
325+
Frame_RESUME resumeFrame(token, resumeTracker_->impliedPosition());
326+
outputFrameOrEnqueue(resumeFrame.serializeOut());
327+
}
328+
329+
bool ConnectionAutomaton::isPositionAvailable(ResumePosition position) {
330+
return resumeCache_->isPositionAvailable(position);
331+
}
332+
333+
ResumePosition ConnectionAutomaton::positionDifference(ResumePosition position) {
334+
return resumeCache_->position() - position;
335+
}
336+
266337
void ConnectionAutomaton::onClose(ConnectionCloseListener listener) {
267338
closeListeners_.push_back(listener);
268339
}
@@ -299,6 +370,20 @@ void ConnectionAutomaton::outputFrame(
299370

300371
stats_.frameWritten(ss.str());
301372

373+
resumeCache_->trackAndCacheSentFrame(*outputFrame);
302374
connectionOutput_.onNext(std::move(outputFrame));
303375
}
376+
377+
void ConnectionAutomaton::resumeFromAutomaton(ConnectionAutomaton& oldAutomaton)
378+
{
379+
if (isServer_ && isResumable_)
380+
{
381+
streams_ = std::move(oldAutomaton.streams_);
382+
factory_ = oldAutomaton.factory_;
383+
384+
resumeTracker_ = std::move(oldAutomaton.resumeTracker_);
385+
resumeCache_ = std::move(oldAutomaton.resumeCache_);
386+
}
387+
}
388+
304389
}

src/ConnectionAutomaton.h

+20
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include "src/Payload.h"
1313
#include "src/ReactiveStreamsCompat.h"
1414
#include "src/Stats.h"
15+
#include "src/ResumeTracker.h"
16+
#include "src/ResumeCache.h"
1517

1618
namespace reactivesocket {
1719

@@ -32,6 +34,9 @@ using StreamId = uint32_t;
3234
using StreamAutomatonFactory =
3335
std::function<bool(StreamId, std::unique_ptr<folly::IOBuf>)>;
3436

37+
using ResumeListener =
38+
std::function<bool(const ResumeIdentificationToken &token, ResumePosition position)>;
39+
3540
using ConnectionCloseListener = std::function<void()>;
3641

3742
/// Handles connection-level frames and (de)multiplexes streams.
@@ -51,6 +56,7 @@ class ConnectionAutomaton :
5156
std::unique_ptr<DuplexConnection> connection,
5257
// TODO(stupaq): for testing only, can devirtualise if necessary
5358
StreamAutomatonFactory factory,
59+
ResumeListener resumeListener,
5460
Stats& stats,
5561
bool client);
5662

@@ -66,6 +72,9 @@ class ConnectionAutomaton :
6672
/// AbstractStreamAutomaton attached to this ConnectionAutomaton.
6773
void disconnect();
6874

75+
/// Terminate underlying connection and connect new connection
76+
void reconnect(std::unique_ptr<DuplexConnection> newConnection);
77+
6978
~ConnectionAutomaton();
7079

7180
/// @{
@@ -107,9 +116,16 @@ class ConnectionAutomaton :
107116
/// delivered multiple times as long as the caller holds shared_ptr to
108117
/// ConnectionAutomaton.
109118
void endStream(StreamId streamId, StreamCompletionSignal signal);
119+
120+
/// Copy the streams and resumption information from a previous ConnectionAutomaton
121+
void resumeFromAutomaton(ConnectionAutomaton& oldAutomaton);
110122
/// @}
111123

112124
void sendKeepalive();
125+
void sendResume(const ResumeIdentificationToken &token);
126+
127+
bool isPositionAvailable(ResumePosition position);
128+
ResumePosition positionDifference(ResumePosition position);
113129

114130
void onClose(ConnectionCloseListener listener);
115131

@@ -164,6 +180,10 @@ class ConnectionAutomaton :
164180
pendingWrites_; // TODO(stupaq): two vectors?
165181
Stats& stats_;
166182
bool isServer_;
183+
bool isResumable_;
167184
std::vector<ConnectionCloseListener> closeListeners_;
185+
std::unique_ptr<ResumeTracker> resumeTracker_;
186+
std::unique_ptr<ResumeCache> resumeCache_;
187+
ResumeListener resumeListener_;
168188
};
169189
}

src/Frame.cpp

+72-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ std::ostream& operator<<(std::ostream& os, FrameType type) {
6565
return os << "LEASE";
6666
case FrameType::METADATA_PUSH:
6767
return os << "METADATA_PUSH";
68+
case FrameType::RESUME:
69+
return os << "RESUME";
70+
case FrameType::RESUME_OK:
71+
return os << "RESUME_OK";
6872
}
6973
return os << "FrameType(" << static_cast<uint16_t>(type) << ")";
7074
}
@@ -321,6 +325,10 @@ Frame_ERROR Frame_ERROR::applicationError(
321325
return Frame_ERROR(streamId, ErrorCode::APPLICATION_ERROR, Payload(message));
322326
}
323327

328+
Frame_ERROR Frame_ERROR::canNotResume(const std::string& message) {
329+
return Frame_ERROR(0, ErrorCode::CONNECTION_ERROR, Payload(message));
330+
}
331+
324332
std::unique_ptr<folly::IOBuf> Frame_ERROR::serializeOut() {
325333
auto queue = createBufferQueue(
326334
FrameHeader::kSize + sizeof(uint32_t) + payload_.framingSize());
@@ -383,7 +391,7 @@ std::ostream& operator<<(std::ostream& os, const Frame_KEEPALIVE& frame) {
383391
/// @{
384392
std::unique_ptr<folly::IOBuf> Frame_SETUP::serializeOut() {
385393
auto queue = createBufferQueue(
386-
FrameHeader::kSize + 3 * sizeof(uint32_t) + 2 +
394+
FrameHeader::kSize + 3 * sizeof(uint32_t) + token_.size() + 2 +
387395
metadataMimeType_.length() + dataMimeType_.length() +
388396
payload_.framingSize());
389397
folly::io::QueueAppender appender(&queue, /* do not grow */ 0);
@@ -392,8 +400,9 @@ std::unique_ptr<folly::IOBuf> Frame_SETUP::serializeOut() {
392400
appender.writeBE(static_cast<uint32_t>(version_));
393401
appender.writeBE(static_cast<uint32_t>(keepaliveTime_));
394402
appender.writeBE(static_cast<uint32_t>(maxLifetime_));
403+
appender.push((const uint8_t*)token_.data(), token_.size());
395404

396-
CHECK(metadataMimeType_.length() <= std::numeric_limits<uint8_t>::max());
405+
CHECK(metadataMimeType_.length() <= std::numeric_limits<uint8_t>::max());
397406
appender.writeBE(static_cast<uint8_t>(metadataMimeType_.length()));
398407
appender.push(
399408
(const uint8_t*)metadataMimeType_.data(), metadataMimeType_.length());
@@ -413,6 +422,7 @@ bool Frame_SETUP::deserializeFrom(std::unique_ptr<folly::IOBuf> in) {
413422
version_ = cur.readBE<uint32_t>();
414423
keepaliveTime_ = cur.readBE<uint32_t>();
415424
maxLifetime_ = cur.readBE<uint32_t>();
425+
cur.pull(token_.data(), token_.size());
416426

417427
int mdmtLen = cur.readBE<uint8_t>();
418428
metadataMimeType_ = cur.readFixedString(mdmtLen);
@@ -465,4 +475,64 @@ std::ostream& operator<<(std::ostream& os, const Frame_LEASE& frame) {
465475
<< ")";
466476
}
467477
/// @}
478+
479+
/// @{
480+
std::unique_ptr<folly::IOBuf> Frame_RESUME::serializeOut() {
481+
auto queue = createBufferQueue(
482+
FrameHeader::kSize + sizeof(ResumeIdentificationToken) + sizeof(ResumePosition));
483+
folly::io::QueueAppender appender(&queue, /* do not grow */ 0);
484+
485+
header_.serializeInto(appender);
486+
appender.push(token_.data(), sizeof(token_));
487+
appender.writeBE(position_);
488+
489+
return queue.move();
490+
}
491+
492+
bool Frame_RESUME::deserializeFrom(std::unique_ptr<folly::IOBuf> in) {
493+
folly::io::Cursor cur(in.get());
494+
try {
495+
header_.deserializeFrom(cur);
496+
cur.pull(token_.data(), sizeof(token_));
497+
position_ = cur.readBE<ResumePosition>();
498+
} catch (...) {
499+
return false;
500+
}
501+
return true;
502+
}
503+
504+
std::ostream& operator<<(std::ostream& os, const Frame_RESUME& frame) {
505+
return os << frame.header_ << ", ("
506+
<< "token" << ", @" << frame.position_ << ")";
507+
}
508+
/// @}
509+
510+
/// @{
511+
std::unique_ptr<folly::IOBuf> Frame_RESUME_OK::serializeOut() {
512+
auto queue = createBufferQueue(
513+
FrameHeader::kSize + sizeof(ResumePosition));
514+
folly::io::QueueAppender appender(&queue, /* do not grow */ 0);
515+
516+
header_.serializeInto(appender);
517+
appender.writeBE(position_);
518+
519+
return queue.move();
520+
}
521+
522+
bool Frame_RESUME_OK::deserializeFrom(std::unique_ptr<folly::IOBuf> in) {
523+
folly::io::Cursor cur(in.get());
524+
try {
525+
header_.deserializeFrom(cur);
526+
position_ = cur.readBE<ResumePosition>();
527+
} catch (...) {
528+
return false;
529+
}
530+
return true;
531+
}
532+
533+
std::ostream& operator<<(std::ostream& os, const Frame_RESUME_OK& frame) {
534+
return os << frame.header_ << ", (@" << frame.position_ << ")";
535+
}
536+
/// @}
537+
468538
}

0 commit comments

Comments
 (0)