Skip to content

Commit e11463b

Browse files
committed
Publisher responds with one single RESPONSE frame that has the COMPLETE flag set
1 parent ca0a173 commit e11463b

File tree

7 files changed

+123
-12
lines changed

7 files changed

+123
-12
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ add_library(
8383
src/automata/ChannelResponder.cpp
8484
src/automata/ChannelResponder.h
8585
src/automata/RequestResponseRequester.cpp
86+
src/automata/RequestResponseRequester.h
87+
src/automata/RequestResponseResponder.cpp
88+
src/automata/RequestResponseResponder.h
8689
src/automata/StreamSubscriptionRequesterBase.cpp
8790
src/automata/StreamSubscriptionRequesterBase.h
8891
src/automata/StreamSubscriptionResponderBase.cpp

TARGETS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ cxx_library(
3939
'src/automata/ChannelRequester.cpp',
4040
'src/automata/ChannelResponder.cpp',
4141
'src/automata/RequestResponseRequester.cpp',
42+
'src/automata/RequestResponseResponder.cpp',
4243
'src/automata/StreamSubscriptionRequesterBase.cpp',
4344
'src/automata/StreamSubscriptionResponderBase.cpp',
4445
'src/automata/StreamRequester.cpp',
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright 2004-present Facebook. All Rights Reserved.
2+
3+
#include "RequestResponseResponder.h"
4+
5+
#include "src/ConnectionAutomaton.h"
6+
#include "src/Frame.h"
7+
#include "src/Payload.h"
8+
#include "src/ReactiveStreamsCompat.h"
9+
10+
namespace reactivesocket {
11+
12+
void RequestResponseResponderBase::onNext(Payload response) {
13+
switch (state_) {
14+
case State::RESPONDING: {
15+
state_ = State::CLOSED;
16+
Base::onNext(std::move(response), FrameFlags_COMPLETE);
17+
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
18+
break;
19+
}
20+
case State::CLOSED:
21+
break;
22+
}
23+
}
24+
25+
void RequestResponseResponderBase::onComplete() {
26+
switch (state_) {
27+
case State::RESPONDING: {
28+
state_ = State::CLOSED;
29+
connection_->outputFrameOrEnqueue(
30+
Frame_RESPONSE::complete(streamId_).serializeOut());
31+
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
32+
} break;
33+
case State::CLOSED:
34+
break;
35+
}
36+
}
37+
38+
39+
void RequestResponseResponderBase::onError(folly::exception_wrapper ex) {
40+
switch (state_) {
41+
case State::RESPONDING: {
42+
state_ = State::CLOSED;
43+
auto msg = ex.what().toStdString();
44+
connection_->outputFrameOrEnqueue(
45+
Frame_ERROR::applicationError(streamId_, msg).serializeOut());
46+
connection_->endStream(streamId_, StreamCompletionSignal::ERROR);
47+
} break;
48+
case State::CLOSED:
49+
break;
50+
}
51+
}
52+
53+
void RequestResponseResponderBase::endStream(StreamCompletionSignal signal) {
54+
switch (state_) {
55+
case State::RESPONDING:
56+
// Spontaneous ::endStream signal means an error.
57+
DCHECK(StreamCompletionSignal::GRACEFUL != signal);
58+
state_ = State::CLOSED;
59+
break;
60+
case State::CLOSED:
61+
break;
62+
}
63+
Base::endStream(signal);
64+
}
65+
66+
void RequestResponseResponderBase::onNextFrame(Frame_CANCEL&& frame) {
67+
switch (state_) {
68+
case State::RESPONDING:
69+
state_ = State::CLOSED;
70+
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
71+
break;
72+
case State::CLOSED:
73+
break;
74+
}
75+
}
76+
77+
std::ostream& RequestResponseResponderBase::logPrefix(std::ostream& os) {
78+
return os << "RequestResponseResponder(" << &connection_ << ", "
79+
<< streamId_ << "): ";
80+
}
81+
82+
}

src/automata/RequestResponseResponder.h

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
#include "src/Frame.h"
99
#include "src/Payload.h"
1010
#include "src/ReactiveStreamsCompat.h"
11-
#include "src/automata/StreamSubscriptionResponderBase.h"
1211
#include "src/mixins/ExecutorMixin.h"
1312
#include "src/mixins/LoggingMixin.h"
1413
#include "src/mixins/MemoryMixin.h"
14+
#include "src/mixins/MixinTerminator.h"
15+
#include "src/mixins/PublisherMixin.h"
1516
#include "src/mixins/SinkIfMixin.h"
1617
#include "src/mixins/StreamIfMixin.h"
1718

@@ -25,24 +26,41 @@ enum class StreamCompletionSignal;
2526

2627
/// Implementation of stream automaton that represents a RequestResponse
2728
/// responder
28-
class RequestResponseResponderBase : public StreamSubscriptionResponderBase {
29-
using Base = StreamSubscriptionResponderBase;
29+
class RequestResponseResponderBase
30+
: public LoggingMixin<PublisherMixin<Frame_RESPONSE, MixinTerminator>> {
31+
using Base = LoggingMixin<PublisherMixin<Frame_RESPONSE, MixinTerminator>>;
3032

3133
public:
3234
using Base::Base;
3335

36+
/// @{
37+
/// A Subscriber implementation exposed to the user of ReactiveSocket to
38+
/// receive "response" payloads.
39+
void onNext(Payload);
40+
41+
void onComplete();
42+
43+
void onError(folly::exception_wrapper);
44+
/// @}
45+
3446
protected:
3547
/// @{
48+
void endStream(StreamCompletionSignal);
3649

37-
/// Don't need to intercept any frames, let them pass through.
50+
/// Not all frames are intercepted, some just pass through.
3851
using Base::onNextFrame;
3952

40-
std::ostream& logPrefix(std::ostream& os) {
41-
return os << "RequestResponseResponder(" << &connection_ << ", "
42-
<< streamId_ << "): ";
43-
}
53+
void onNextFrame(Frame_CANCEL&&);
54+
55+
std::ostream& logPrefix(std::ostream& os);
4456
/// @}
45-
};
57+
58+
/// State of the Subscription responder.
59+
enum class State : uint8_t {
60+
RESPONDING,
61+
CLOSED,
62+
} state_{State::RESPONDING};
63+
};
4664

4765
using RequestResponseResponder =
4866
SinkIfMixin<StreamIfMixin<LoggingMixin<ExecutorMixin<LoggingMixin<

src/mixins/LoggingMixin.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ class LoggingMixin : public Base {
6969
Base::onNext(std::move(payload));
7070
}
7171

72+
void onNext(Payload payload, FrameFlags flags) {
73+
Base::logPrefix(LOG(INFO)) << "onNext(" << payload << ", " << flags << ")";
74+
Base::onNext(std::move(payload), flags);
75+
}
76+
7277
void onComplete() {
7378
Base::logPrefix(LOG(INFO)) << "onComplete()";
7479
Base::onComplete();

src/mixins/PublisherMixin.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ class PublisherMixin : public Base {
2929
producingSubscription_.reset(&subscription);
3030
}
3131

32-
void onNext(Payload payload) {
33-
ProducedFrame frame(Base::streamId_, FrameFlags_EMPTY, std::move(payload));
32+
void onNext(Payload payload, FrameFlags flags = FrameFlags_EMPTY) {
33+
ProducedFrame frame(Base::streamId_, flags, std::move(payload));
3434
Base::connection_->outputFrameOrEnqueue(frame.serializeOut());
3535
}
3636
/// @}

test/ReactiveSocketTest.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,11 +447,13 @@ TEST(ReactiveSocketTest, RequestResponse) {
447447
EXPECT_CALL(clientInput, onNext_(Equals(&originalPayload)))
448448
.InSequence(s)
449449
.WillOnce(Invoke([&](Payload&) { clientInputSub->cancel(); }));
450+
// Client also receives onComplete() call since the response frame received
451+
// had COMPELTE flag set
452+
EXPECT_CALL(clientInput, onComplete_()).InSequence(s);
450453

451454
EXPECT_CALL(serverOutputSub, cancel_()).InSequence(s).WillOnce(Invoke([&]() {
452455
serverOutput->onComplete();
453456
}));
454-
EXPECT_CALL(clientInput, onComplete_()).InSequence(s);
455457

456458
// Kick off the magic.
457459
clientSock->requestResponse(Payload(originalPayload->clone()), clientInput);

0 commit comments

Comments
 (0)