Skip to content

Commit 23f84a8

Browse files
leheckayschimke
authored andcommitted
ConnectionAutomaton: Output frame (#106)
1 parent 255ccf2 commit 23f84a8

11 files changed

+59
-95
lines changed

src/ConnectionAutomaton.cpp

Lines changed: 37 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ void ConnectionAutomaton::connect() {
5555
"",
5656
"",
5757
Payload());
58-
onNextFrame(frame.serializeOut());
58+
outputFrameOrEnqueue(frame.serializeOut());
5959
}
6060
stats_.socketCreated();
6161

@@ -75,6 +75,8 @@ void ConnectionAutomaton::disconnect() {
7575
return;
7676
}
7777

78+
LOG_IF(WARNING, !pendingWrites_.empty()) << "disconnecting with pending writes (" << pendingWrites_.size() << ")";
79+
7880
// Send terminal signals to the DuplexConnection's input and output before
7981
// tearing it down. We must do this per DuplexConnection specification (see
8082
// interface definition).
@@ -103,55 +105,6 @@ void ConnectionAutomaton::addStream(
103105
assert(result.second);
104106
}
105107

106-
void ConnectionAutomaton::onNextFrame(Frame_REQUEST_STREAM&& frame) {
107-
onNextFrame(frame.serializeOut());
108-
}
109-
110-
void ConnectionAutomaton::onNextFrame(Frame_REQUEST_SUB&& frame) {
111-
onNextFrame(frame.serializeOut());
112-
}
113-
114-
void ConnectionAutomaton::onNextFrame(Frame_REQUEST_CHANNEL&& frame) {
115-
onNextFrame(frame.serializeOut());
116-
}
117-
118-
void ConnectionAutomaton::onNextFrame(Frame_REQUEST_N&& frame) {
119-
onNextFrame(frame.serializeOut());
120-
}
121-
122-
void ConnectionAutomaton::onNextFrame(Frame_REQUEST_FNF&& frame) {
123-
onNextFrame(frame.serializeOut());
124-
}
125-
126-
void ConnectionAutomaton::onNextFrame(Frame_METADATA_PUSH&& frame) {
127-
onNextFrame(frame.serializeOut());
128-
}
129-
130-
void ConnectionAutomaton::onNextFrame(Frame_CANCEL&& frame) {
131-
onNextFrame(frame.serializeOut());
132-
}
133-
134-
void ConnectionAutomaton::onNextFrame(Frame_RESPONSE&& frame) {
135-
onNextFrame(frame.serializeOut());
136-
}
137-
138-
void ConnectionAutomaton::onNextFrame(Frame_ERROR&& frame) {
139-
onNextFrame(frame.serializeOut());
140-
}
141-
142-
void ConnectionAutomaton::onNextFrame(std::unique_ptr<folly::IOBuf> frame) {
143-
if (!connectionOutput_) {
144-
return;
145-
}
146-
if (pendingWrites_.empty() && writeAllowance_.tryAcquire()) {
147-
writeFrame(std::move(frame));
148-
} else {
149-
// We either have no allowance to perform the operation, or the queue has
150-
// not been drained (e.g. we're looping in ::request).
151-
pendingWrites_.emplace_back(std::move(frame));
152-
}
153-
}
154-
155108
void ConnectionAutomaton::endStream(
156109
StreamId streamId,
157110
StreamCompletionSignal signal) {
@@ -240,16 +193,16 @@ void ConnectionAutomaton::onConnectionFrame(
240193
if (isServer_) {
241194
if (frame.header_.flags_ & FrameFlags_KEEPALIVE_RESPOND) {
242195
frame.header_.flags_ &= ~(FrameFlags_KEEPALIVE_RESPOND);
243-
writeFrame(frame.serializeOut());
196+
outputFrameOrEnqueue(frame.serializeOut());
244197
} else {
245-
writeFrame(
198+
outputFrameOrEnqueue(
246199
Frame_ERROR::invalid("keepalive without flag").serializeOut());
247200
disconnect();
248201
}
249202
}
250203
// TODO(yschimke) client *should* check the respond flag
251204
} else {
252-
writeFrame(Frame_ERROR::unexpectedFrame().serializeOut());
205+
outputFrameOrEnqueue(Frame_ERROR::unexpectedFrame().serializeOut());
253206
disconnect();
254207
}
255208
}
@@ -283,7 +236,7 @@ void ConnectionAutomaton::onConnectionFrame(
283236
return;
284237
}
285238
default:
286-
writeFrame(Frame_ERROR::unexpectedFrame().serializeOut());
239+
outputFrameOrEnqueue(Frame_ERROR::unexpectedFrame().serializeOut());
287240
disconnect();
288241
return;
289242
}
@@ -317,12 +270,7 @@ void ConnectionAutomaton::request(size_t n) {
317270
// stack.
318271
return;
319272
}
320-
// Drain the queue or the allowance.
321-
while (!pendingWrites_.empty() && writeAllowance_.tryAcquire()) {
322-
auto frame = std::move(pendingWrites_.front());
323-
pendingWrites_.pop_front();
324-
writeFrame(std::move(frame));
325-
}
273+
drainOutputFramesQueue();
326274
}
327275

328276
void ConnectionAutomaton::cancel() {
@@ -341,7 +289,7 @@ void ConnectionAutomaton::handleUnknownStream(
341289
// TODO(stupaq): there are some rules about monotonically increasing stream
342290
// IDs -- let's forget about them for a moment
343291
if (!factory_(streamId, std::move(payload))) {
344-
writeFrame(
292+
outputFrameOrEnqueue(
345293
Frame_ERROR::invalid("unknown stream " + std::to_string(streamId))
346294
.serializeOut());
347295
disconnect();
@@ -352,20 +300,45 @@ void ConnectionAutomaton::handleUnknownStream(
352300
void ConnectionAutomaton::sendKeepalive() {
353301
Frame_KEEPALIVE pingFrame(
354302
FrameFlags_KEEPALIVE_RESPOND, folly::IOBuf::create(0));
355-
writeFrame(pingFrame.serializeOut());
303+
outputFrameOrEnqueue(pingFrame.serializeOut());
356304
}
357305

358306
void ConnectionAutomaton::onClose(ConnectionCloseListener listener) {
359307
closeListeners_.push_back(listener);
360308
}
361309

362-
void ConnectionAutomaton::writeFrame(
363-
std::unique_ptr<folly::IOBuf> outputFrame) {
310+
void ConnectionAutomaton::outputFrameOrEnqueue(std::unique_ptr<folly::IOBuf> frame) {
311+
if (!connectionOutput_) {
312+
return; // RS destructor has disconnected us from the DuplexConnection
313+
}
314+
315+
drainOutputFramesQueue();
316+
if (pendingWrites_.empty() && writeAllowance_.tryAcquire()) {
317+
outputFrame(std::move(frame));
318+
} else {
319+
// We either have no allowance to perform the operation, or the queue has
320+
// not been drained (e.g. we're looping in ::request).
321+
pendingWrites_.emplace_back(std::move(frame));
322+
}
323+
}
324+
325+
void ConnectionAutomaton::drainOutputFramesQueue() {
326+
// Drain the queue or the allowance.
327+
while (!pendingWrites_.empty() && writeAllowance_.tryAcquire()) {
328+
auto frame = std::move(pendingWrites_.front());
329+
pendingWrites_.pop_front();
330+
outputFrame(std::move(frame));
331+
}
332+
}
333+
334+
void ConnectionAutomaton::outputFrame(
335+
std::unique_ptr<folly::IOBuf> outputFrame) {
364336
std::stringstream ss;
365337
ss << FrameHeader::peekType(*outputFrame);
366338

367339
stats_.frameWritten(ss.str());
368340

369341
connectionOutput_.onNext(std::move(outputFrame));
370342
}
343+
371344
}

src/ConnectionAutomaton.h

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,7 @@ class ConnectionAutomaton :
8686
/// Enqueuing a terminal frame does not end the stream.
8787
///
8888
/// This signal corresponds to Subscriber::onNext.
89-
void onNextFrame(Frame_REQUEST_STREAM&&);
90-
void onNextFrame(Frame_REQUEST_SUB&&);
91-
void onNextFrame(Frame_REQUEST_CHANNEL&&);
92-
void onNextFrame(Frame_REQUEST_N&&);
93-
void onNextFrame(Frame_REQUEST_FNF&&);
94-
void onNextFrame(Frame_METADATA_PUSH&&);
95-
void onNextFrame(Frame_CANCEL&&);
96-
void onNextFrame(Frame_RESPONSE&&);
97-
void onNextFrame(Frame_ERROR&&);
98-
99-
void onNextFrame(std::unique_ptr<folly::IOBuf> frame);
89+
void outputFrameOrEnqueue(std::unique_ptr<folly::IOBuf> frame);
10090

10191
/// Indicates that the stream should be removed from the connection.
10292
///
@@ -136,8 +126,6 @@ class ConnectionAutomaton :
136126

137127
void onNext(std::unique_ptr<folly::IOBuf>) override;
138128

139-
void writeFrame(std::unique_ptr<folly::IOBuf>);
140-
141129
void onComplete() override;
142130

143131
void onError(folly::exception_wrapper) override;
@@ -160,6 +148,9 @@ class ConnectionAutomaton :
160148
std::unique_ptr<folly::IOBuf> frame);
161149
/// @}
162150

151+
void drainOutputFramesQueue();
152+
void outputFrame(std::unique_ptr<folly::IOBuf>);
153+
163154
std::unique_ptr<DuplexConnection> connection_;
164155
StreamAutomatonFactory factory_;
165156
// TODO(stupaq): looks like a bug that I have to qualify this

src/ReactiveSocket.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,11 @@ void ReactiveSocket::requestFireAndForget(Payload request) {
128128
nextStreamId_ += 2;
129129
Frame_REQUEST_FNF frame(
130130
streamId, FrameFlags_EMPTY, std::move(std::move(request)));
131-
connection_->onNextFrame(std::move(frame));
131+
connection_->outputFrameOrEnqueue(frame.serializeOut());
132132
}
133133

134134
void ReactiveSocket::metadataPush(std::unique_ptr<folly::IOBuf> metadata) {
135-
connection_->onNextFrame(Frame_METADATA_PUSH(std::move(metadata)));
135+
connection_->outputFrameOrEnqueue(Frame_METADATA_PUSH(std::move(metadata)).serializeOut());
136136
}
137137

138138
bool ReactiveSocket::createResponder(

src/automata/ChannelRequester.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ void ChannelRequesterBase::onNext(Payload request) {
4444
// We must inform ConsumerMixin about an implicit allowance we have
4545
// requested from the remote end.
4646
addImplicitAllowance(initialN);
47-
connection_->onNextFrame(std::move(frame));
47+
connection_->outputFrameOrEnqueue(frame.serializeOut());
4848
// Pump the remaining allowance into the ConsumerMixin _after_ sending the
4949
// initial request.
5050
if (remainingN) {
@@ -67,8 +67,8 @@ void ChannelRequesterBase::onComplete() {
6767
break;
6868
case State::REQUESTED: {
6969
state_ = State::CLOSED;
70-
connection_->onNextFrame(
71-
Frame_REQUEST_CHANNEL(streamId_, FrameFlags_COMPLETE, 0, Payload()));
70+
connection_->outputFrameOrEnqueue(
71+
Frame_REQUEST_CHANNEL(streamId_, FrameFlags_COMPLETE, 0, Payload()).serializeOut());
7272
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
7373
} break;
7474
case State::CLOSED:
@@ -84,7 +84,7 @@ void ChannelRequesterBase::onError(folly::exception_wrapper ex) {
8484
break;
8585
case State::REQUESTED: {
8686
state_ = State::CLOSED;
87-
connection_->onNextFrame(Frame_CANCEL(streamId_));
87+
connection_->outputFrameOrEnqueue(Frame_CANCEL(streamId_).serializeOut());
8888
connection_->endStream(streamId_, StreamCompletionSignal::ERROR);
8989
} break;
9090
case State::CLOSED:
@@ -117,7 +117,7 @@ void ChannelRequesterBase::cancel() {
117117
break;
118118
case State::REQUESTED: {
119119
state_ = State::CLOSED;
120-
connection_->onNextFrame(Frame_CANCEL(streamId_));
120+
connection_->outputFrameOrEnqueue(Frame_CANCEL(streamId_).serializeOut());
121121
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
122122
} break;
123123
case State::CLOSED:

src/automata/ChannelResponder.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ void ChannelResponderBase::onComplete() {
2727
switch (state_) {
2828
case State::RESPONDING: {
2929
state_ = State::CLOSED;
30-
connection_->onNextFrame(Frame_RESPONSE::complete(streamId_));
30+
connection_->outputFrameOrEnqueue(Frame_RESPONSE::complete(streamId_).serializeOut());
3131
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
3232
} break;
3333
case State::CLOSED:
@@ -40,7 +40,7 @@ void ChannelResponderBase::onError(folly::exception_wrapper ex) {
4040
case State::RESPONDING: {
4141
state_ = State::CLOSED;
4242
auto msg = ex.what().toStdString();
43-
connection_->onNextFrame(Frame_ERROR::applicationError(streamId_, msg));
43+
connection_->outputFrameOrEnqueue(Frame_ERROR::applicationError(streamId_, msg).serializeOut());
4444
connection_->endStream(streamId_, StreamCompletionSignal::ERROR);
4545
} break;
4646
case State::CLOSED:
@@ -61,7 +61,7 @@ void ChannelResponderBase::cancel() {
6161
switch (state_) {
6262
case State::RESPONDING: {
6363
state_ = State::CLOSED;
64-
connection_->onNextFrame(Frame_RESPONSE::complete(streamId_));
64+
connection_->outputFrameOrEnqueue(Frame_RESPONSE::complete(streamId_).serializeOut());
6565
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
6666
} break;
6767
case State::CLOSED:

src/automata/StreamRequester.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ void StreamRequesterBase::sendRequestFrame(
1313
Payload&& request) {
1414
Frame_REQUEST_STREAM frame(
1515
streamId_, flags, static_cast<uint32_t>(initialN), std::move(request));
16-
connection_->onNextFrame(std::move(frame));
16+
connection_->outputFrameOrEnqueue(frame.serializeOut());
1717
}
1818

1919
std::ostream& StreamRequesterBase::logPrefix(std::ostream& os) {

src/automata/StreamSubscriptionRequesterBase.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void StreamSubscriptionRequesterBase::cancel() {
6060
break;
6161
case State::REQUESTED: {
6262
state_ = State::CLOSED;
63-
connection_->onNextFrame(Frame_CANCEL(streamId_));
63+
connection_->outputFrameOrEnqueue(Frame_CANCEL(streamId_).serializeOut());
6464
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
6565
} break;
6666
case State::CLOSED:

src/automata/StreamSubscriptionResponderBase.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ void StreamSubscriptionResponderBase::onComplete() {
2727
switch (state_) {
2828
case State::RESPONDING: {
2929
state_ = State::CLOSED;
30-
connection_->onNextFrame(Frame_RESPONSE::complete(streamId_));
30+
connection_->outputFrameOrEnqueue(Frame_RESPONSE::complete(streamId_).serializeOut());
3131
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
3232
} break;
3333
case State::CLOSED:
@@ -40,7 +40,7 @@ void StreamSubscriptionResponderBase::onError(folly::exception_wrapper ex) {
4040
case State::RESPONDING: {
4141
state_ = State::CLOSED;
4242
auto msg = ex.what().toStdString();
43-
connection_->onNextFrame(Frame_ERROR::applicationError(streamId_, msg));
43+
connection_->outputFrameOrEnqueue(Frame_ERROR::applicationError(streamId_, msg).serializeOut());
4444
connection_->endStream(streamId_, StreamCompletionSignal::ERROR);
4545
} break;
4646
case State::CLOSED:

src/automata/SubscriptionRequester.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ void SubscriptionRequesterBase::sendRequestFrame(
1313
Payload&& request) {
1414
Frame_REQUEST_SUB frame(
1515
streamId_, flags, static_cast<uint32_t>(initialN), std::move(request));
16-
connection_->onNextFrame(std::move(frame));
16+
connection_->outputFrameOrEnqueue(frame.serializeOut());
1717
}
1818

1919
std::ostream& SubscriptionRequesterBase::logPrefix(std::ostream& os) {

src/mixins/ConsumerMixin-inl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ void ConsumerMixin<Frame, Base>::sendRequests() {
4343
size_t toSync = Frame_REQUEST_N::kMaxRequestN;
4444
toSync = pendingAllowance_.drainWithLimit(toSync);
4545
if (toSync > 0) {
46-
Base::connection_->onNextFrame(
47-
Frame_REQUEST_N(Base::streamId_, static_cast<uint32_t>(toSync)));
46+
Base::connection_->outputFrameOrEnqueue(
47+
Frame_REQUEST_N(Base::streamId_, static_cast<uint32_t>(toSync)).serializeOut());
4848
}
4949
}
5050

0 commit comments

Comments
 (0)