Skip to content

Commit e797f78

Browse files
authored
Send client setup data (#108)
1 parent 6c48fc6 commit e797f78

13 files changed

+122
-55
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ add_library(
130130
src/Stats.h
131131
src/Stats.cpp
132132
src/folly/FollyKeepaliveTimer.cpp
133-
src/folly/FollyKeepaliveTimer.h)
133+
src/folly/FollyKeepaliveTimer.h
134+
src/ConnectionSetupPayload.h)
134135

135136
target_link_libraries(
136137
ReactiveSocket

src/ConnectionAutomaton.cpp

Lines changed: 8 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ ConnectionAutomaton::ConnectionAutomaton(
2222
std::unique_ptr<DuplexConnection> connection,
2323
StreamAutomatonFactory factory,
2424
Stats& stats,
25-
bool isServer,
26-
std::unique_ptr<KeepaliveTimer> keepAliveTimer)
25+
bool isServer)
2726
: connection_(std::move(connection)),
2827
factory_(std::move(factory)),
2928
stats_(stats),
30-
isServer_(isServer),
31-
keepaliveTimer_(std::move(keepAliveTimer)) {
29+
isServer_(isServer) {
3230
// We deliberately do not "open" input or output to avoid having c'tor on the
3331
// stack when processing any signals from the connection. See ::connect and
3432
// ::onSubscribe.
@@ -41,41 +39,18 @@ void ConnectionAutomaton::connect() {
4139
// subscription, which might deliver frames in-line.
4240
connection_->setInput(*this);
4341

44-
if (!isServer_) {
45-
uint32_t keepaliveTime = keepaliveTimer_
46-
? keepaliveTimer_->keepaliveTime().count()
47-
: std::numeric_limits<uint32_t>::max();
48-
49-
// TODO set correct version and allow clients to configure
50-
Frame_SETUP frame(
51-
FrameFlags_EMPTY,
52-
0,
53-
keepaliveTime,
54-
std::numeric_limits<uint32_t>::max(),
55-
"",
56-
"",
57-
Payload());
58-
outputFrameOrEnqueue(frame.serializeOut());
59-
}
6042
stats_.socketCreated();
61-
62-
if (keepaliveTimer_) {
63-
keepaliveTimer_->start(this);
64-
}
6543
}
6644

6745
void ConnectionAutomaton::disconnect() {
6846
VLOG(6) << "disconnect";
6947

70-
if (keepaliveTimer_) {
71-
keepaliveTimer_->stop();
72-
}
73-
7448
if (!connectionOutput_) {
7549
return;
7650
}
7751

78-
LOG_IF(WARNING, !pendingWrites_.empty()) << "disconnecting with pending writes (" << pendingWrites_.size() << ")";
52+
LOG_IF(WARNING, !pendingWrites_.empty())
53+
<< "disconnecting with pending writes (" << pendingWrites_.size() << ")";
7954

8055
// Send terminal signals to the DuplexConnection's input and output before
8156
// tearing it down. We must do this per DuplexConnection specification (see
@@ -307,7 +282,8 @@ void ConnectionAutomaton::onClose(ConnectionCloseListener listener) {
307282
closeListeners_.push_back(listener);
308283
}
309284

310-
void ConnectionAutomaton::outputFrameOrEnqueue(std::unique_ptr<folly::IOBuf> frame) {
285+
void ConnectionAutomaton::outputFrameOrEnqueue(
286+
std::unique_ptr<folly::IOBuf> frame) {
311287
if (!connectionOutput_) {
312288
return; // RS destructor has disconnected us from the DuplexConnection
313289
}
@@ -323,7 +299,7 @@ void ConnectionAutomaton::outputFrameOrEnqueue(std::unique_ptr<folly::IOBuf> fra
323299
}
324300

325301
void ConnectionAutomaton::drainOutputFramesQueue() {
326-
// Drain the queue or the allowance.
302+
// Drain the queue or the allowance.
327303
while (!pendingWrites_.empty() && writeAllowance_.tryAcquire()) {
328304
auto frame = std::move(pendingWrites_.front());
329305
pendingWrites_.pop_front();
@@ -332,13 +308,12 @@ void ConnectionAutomaton::drainOutputFramesQueue() {
332308
}
333309

334310
void ConnectionAutomaton::outputFrame(
335-
std::unique_ptr<folly::IOBuf> outputFrame) {
311+
std::unique_ptr<folly::IOBuf> outputFrame) {
336312
std::stringstream ss;
337313
ss << FrameHeader::peekType(*outputFrame);
338314

339315
stats_.frameWritten(ss.str());
340316

341317
connectionOutput_.onNext(std::move(outputFrame));
342318
}
343-
344319
}

src/ConnectionAutomaton.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ class ConnectionAutomaton :
5252
// TODO(stupaq): for testing only, can devirtualise if necessary
5353
StreamAutomatonFactory factory,
5454
Stats& stats,
55-
bool client,
56-
std::unique_ptr<KeepaliveTimer> keepaliveTimer);
55+
bool client);
5756

5857
/// Kicks off connection procedure.
5958
///
@@ -165,7 +164,6 @@ class ConnectionAutomaton :
165164
pendingWrites_; // TODO(stupaq): two vectors?
166165
Stats& stats_;
167166
bool isServer_;
168-
std::unique_ptr<KeepaliveTimer> keepaliveTimer_;
169167
std::vector<ConnectionCloseListener> closeListeners_;
170168
};
171169
}

src/ConnectionSetupPayload.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2004-present Facebook. All Rights Reserved.
2+
3+
#pragma once
4+
5+
#include <string>
6+
#include "Payload.h"
7+
8+
namespace reactivesocket {
9+
class ConnectionSetupPayload {
10+
public:
11+
ConnectionSetupPayload(
12+
std::string _metadataMimeType = "",
13+
std::string _dataMimeType = "",
14+
Payload _payload = Payload())
15+
: metadataMimeType(std::move(_metadataMimeType)),
16+
dataMimeType(std::move(_dataMimeType)),
17+
payload(std::move(_payload)){};
18+
19+
std::string metadataMimeType;
20+
std::string dataMimeType;
21+
Payload payload;
22+
};
23+
}

src/ReactiveSocket.cpp

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <folly/MoveWrapper.h>
1212

1313
#include "src/ConnectionAutomaton.h"
14+
#include "src/ConnectionSetupPayload.h"
1415
#include "src/DuplexConnection.h"
1516
#include "src/Frame.h"
1617
#include "src/Payload.h"
@@ -44,14 +45,15 @@ ReactiveSocket::ReactiveSocket(
4445
std::placeholders::_1,
4546
std::placeholders::_2),
4647
stats,
47-
isServer,
48-
std::move(keepaliveTimer))),
48+
isServer)),
4949
handler_(std::move(handler)),
50-
nextStreamId_(isServer ? 1 : 2) {}
50+
nextStreamId_(isServer ? 1 : 2),
51+
keepaliveTimer_(std::move(keepaliveTimer)) {}
5152

5253
std::unique_ptr<ReactiveSocket> ReactiveSocket::fromClientConnection(
5354
std::unique_ptr<DuplexConnection> connection,
5455
std::unique_ptr<RequestHandler> handler,
56+
ConnectionSetupPayload setupPayload,
5557
Stats& stats,
5658
std::unique_ptr<KeepaliveTimer> keepaliveTimer) {
5759
std::unique_ptr<ReactiveSocket> socket(new ReactiveSocket(
@@ -61,6 +63,27 @@ std::unique_ptr<ReactiveSocket> ReactiveSocket::fromClientConnection(
6163
stats,
6264
std::move(keepaliveTimer)));
6365
socket->connection_->connect();
66+
67+
uint32_t keepaliveTime = socket->keepaliveTimer_
68+
? socket->keepaliveTimer_->keepaliveTime().count()
69+
: std::numeric_limits<uint32_t>::max();
70+
71+
// TODO set correct version
72+
Frame_SETUP frame(
73+
FrameFlags_EMPTY,
74+
0,
75+
keepaliveTime,
76+
std::numeric_limits<uint32_t>::max(),
77+
std::move(setupPayload.metadataMimeType),
78+
std::move(setupPayload.dataMimeType),
79+
std::move(setupPayload.payload));
80+
81+
socket->connection_->outputFrameOrEnqueue(frame.serializeOut());
82+
83+
if (socket->keepaliveTimer_) {
84+
socket->keepaliveTimer_->start(socket->connection_.get());
85+
}
86+
6487
return socket;
6588
}
6689

@@ -132,7 +155,8 @@ void ReactiveSocket::requestFireAndForget(Payload request) {
132155
}
133156

134157
void ReactiveSocket::metadataPush(std::unique_ptr<folly::IOBuf> metadata) {
135-
connection_->outputFrameOrEnqueue(Frame_METADATA_PUSH(std::move(metadata)).serializeOut());
158+
connection_->outputFrameOrEnqueue(
159+
Frame_METADATA_PUSH(std::move(metadata)).serializeOut());
136160
}
137161

138162
bool ReactiveSocket::createResponder(

src/ReactiveSocket.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <functional>
88
#include <memory>
99

10+
#include "src/ConnectionSetupPayload.h"
1011
#include "src/Payload.h"
1112
#include "src/ReactiveStreamsCompat.h"
1213
#include "src/Stats.h"
@@ -54,6 +55,7 @@ class ReactiveSocket {
5455
static std::unique_ptr<ReactiveSocket> fromClientConnection(
5556
std::unique_ptr<DuplexConnection> connection,
5657
std::unique_ptr<RequestHandler> handler,
58+
ConnectionSetupPayload setupPayload = ConnectionSetupPayload(),
5759
Stats& stats = Stats::noop(),
5860
std::unique_ptr<KeepaliveTimer> keepaliveTimer =
5961
std::unique_ptr<KeepaliveTimer>(nullptr));
@@ -90,5 +92,6 @@ class ReactiveSocket {
9092
const std::shared_ptr<ConnectionAutomaton> connection_;
9193
std::unique_ptr<RequestHandler> handler_;
9294
StreamId nextStreamId_;
95+
std::unique_ptr<KeepaliveTimer> keepaliveTimer_;
9396
};
9497
}

src/automata/ChannelRequester.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ void ChannelRequesterBase::onComplete() {
6868
case State::REQUESTED: {
6969
state_ = State::CLOSED;
7070
connection_->outputFrameOrEnqueue(
71-
Frame_REQUEST_CHANNEL(streamId_, FrameFlags_COMPLETE, 0, Payload()).serializeOut());
71+
Frame_REQUEST_CHANNEL(streamId_, FrameFlags_COMPLETE, 0, Payload())
72+
.serializeOut());
7273
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
7374
} break;
7475
case State::CLOSED:

src/automata/ChannelResponder.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ void ChannelResponderBase::onComplete() {
2727
switch (state_) {
2828
case State::RESPONDING: {
2929
state_ = State::CLOSED;
30-
connection_->outputFrameOrEnqueue(Frame_RESPONSE::complete(streamId_).serializeOut());
30+
connection_->outputFrameOrEnqueue(
31+
Frame_RESPONSE::complete(streamId_).serializeOut());
3132
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
3233
} break;
3334
case State::CLOSED:
@@ -40,7 +41,8 @@ void ChannelResponderBase::onError(folly::exception_wrapper ex) {
4041
case State::RESPONDING: {
4142
state_ = State::CLOSED;
4243
auto msg = ex.what().toStdString();
43-
connection_->outputFrameOrEnqueue(Frame_ERROR::applicationError(streamId_, msg).serializeOut());
44+
connection_->outputFrameOrEnqueue(
45+
Frame_ERROR::applicationError(streamId_, msg).serializeOut());
4446
connection_->endStream(streamId_, StreamCompletionSignal::ERROR);
4547
} break;
4648
case State::CLOSED:
@@ -61,7 +63,8 @@ void ChannelResponderBase::cancel() {
6163
switch (state_) {
6264
case State::RESPONDING: {
6365
state_ = State::CLOSED;
64-
connection_->outputFrameOrEnqueue(Frame_RESPONSE::complete(streamId_).serializeOut());
66+
connection_->outputFrameOrEnqueue(
67+
Frame_RESPONSE::complete(streamId_).serializeOut());
6568
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
6669
} break;
6770
case State::CLOSED:

src/automata/StreamSubscriptionResponderBase.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ void StreamSubscriptionResponderBase::onComplete() {
2727
switch (state_) {
2828
case State::RESPONDING: {
2929
state_ = State::CLOSED;
30-
connection_->outputFrameOrEnqueue(Frame_RESPONSE::complete(streamId_).serializeOut());
30+
connection_->outputFrameOrEnqueue(
31+
Frame_RESPONSE::complete(streamId_).serializeOut());
3132
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
3233
} break;
3334
case State::CLOSED:
@@ -40,7 +41,8 @@ void StreamSubscriptionResponderBase::onError(folly::exception_wrapper ex) {
4041
case State::RESPONDING: {
4142
state_ = State::CLOSED;
4243
auto msg = ex.what().toStdString();
43-
connection_->outputFrameOrEnqueue(Frame_ERROR::applicationError(streamId_, msg).serializeOut());
44+
connection_->outputFrameOrEnqueue(
45+
Frame_ERROR::applicationError(streamId_, msg).serializeOut());
4446
connection_->endStream(streamId_, StreamCompletionSignal::ERROR);
4547
} break;
4648
case State::CLOSED:

src/mixins/ConsumerMixin-inl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ void ConsumerMixin<Frame, Base>::sendRequests() {
4444
toSync = pendingAllowance_.drainWithLimit(toSync);
4545
if (toSync > 0) {
4646
Base::connection_->outputFrameOrEnqueue(
47-
Frame_REQUEST_N(Base::streamId_, static_cast<uint32_t>(toSync)).serializeOut());
47+
Frame_REQUEST_N(Base::streamId_, static_cast<uint32_t>(toSync))
48+
.serializeOut());
4849
}
4950
}
5051

0 commit comments

Comments
 (0)