Skip to content

Commit 69e113c

Browse files
authored
Add support for RequestResponse (#117)
* Add support for RequestResponse * rebase * Removed all questions from the code since they are mostly for my understanding, and the code should work properly due to copy-paste reasons Check the request payload is not sent twice RequestResponseRequesterBase * Publisher responds with one single RESPONSE frame that has the COMPLETE flag set
2 parents ec5e7fb + e11463b commit 69e113c

23 files changed

+588
-7
lines changed

CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ add_library(
8282
src/automata/ChannelRequester.h
8383
src/automata/ChannelResponder.cpp
8484
src/automata/ChannelResponder.h
85+
src/automata/RequestResponseRequester.cpp
86+
src/automata/RequestResponseRequester.h
87+
src/automata/RequestResponseResponder.cpp
88+
src/automata/RequestResponseResponder.h
8589
src/automata/StreamSubscriptionRequesterBase.cpp
8690
src/automata/StreamSubscriptionRequesterBase.h
8791
src/automata/StreamSubscriptionResponderBase.cpp

TARGETS

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ cxx_library(
3838
'src/AbstractStreamAutomaton.cpp',
3939
'src/automata/ChannelRequester.cpp',
4040
'src/automata/ChannelResponder.cpp',
41+
'src/automata/RequestResponseRequester.cpp',
42+
'src/automata/RequestResponseResponder.cpp',
4143
'src/automata/StreamSubscriptionRequesterBase.cpp',
4244
'src/automata/StreamSubscriptionResponderBase.cpp',
4345
'src/automata/StreamRequester.cpp',

src/AbstractStreamAutomaton.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ void AbstractStreamAutomaton::onNextFrame(
4747
case FrameType::REQUEST_N:
4848
deserializeAndDispatch<Frame_REQUEST_N>(std::move(payload));
4949
return;
50+
case FrameType::REQUEST_RESPONSE:
51+
deserializeAndDispatch<Frame_REQUEST_RESPONSE>(std::move(payload));
52+
return;
5053
case FrameType::CANCEL:
5154
deserializeAndDispatch<Frame_CANCEL>(std::move(payload));
5255
return;

src/AbstractStreamAutomaton.h

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class ConnectionAutomaton;
1717
class Frame_REQUEST_STREAM;
1818
class Frame_REQUEST_SUB;
1919
class Frame_REQUEST_CHANNEL;
20+
class Frame_REQUEST_RESPONSE;
2021
class Frame_REQUEST_N;
2122
class Frame_CANCEL;
2223
class Frame_RESPONSE;
@@ -96,6 +97,7 @@ class AbstractStreamAutomaton {
9697
virtual void onNextFrame(Frame_REQUEST_SUB&& frame) = 0;
9798
virtual void onNextFrame(Frame_REQUEST_CHANNEL&& frame) = 0;
9899
virtual void onNextFrame(Frame_REQUEST_N&& frame) = 0;
100+
virtual void onNextFrame(Frame_REQUEST_RESPONSE&& frame) = 0;
99101
virtual void onNextFrame(Frame_CANCEL&& frame) = 0;
100102
virtual void onNextFrame(Frame_RESPONSE&& frame) = 0;
101103
virtual void onNextFrame(Frame_ERROR&& frame) = 0;

src/Frame.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ std::ostream& operator<<(std::ostream& os, FrameType type) {
4747
return os << "REQUEST_CHANNEL";
4848
case FrameType::REQUEST_N:
4949
return os << "REQUEST_N";
50+
case FrameType::REQUEST_RESPONSE:
51+
return os << "REQUEST_RESPONSE";
5052
case FrameType::REQUEST_FNF:
5153
return os << "REQUEST_FNF";
5254
case FrameType::CANCEL:
@@ -192,6 +194,34 @@ std::ostream& operator<<(std::ostream& os, const Frame_REQUEST_N& frame) {
192194
}
193195
/// @}
194196

197+
/// @{
198+
std::unique_ptr<folly::IOBuf> Frame_REQUEST_RESPONSE::serializeOut() {
199+
auto queue = createBufferQueue(FrameHeader::kSize + payload_.framingSize());
200+
folly::io::QueueAppender appender(&queue, /* do not grow */ 0);
201+
202+
header_.serializeInto(appender);
203+
payload_.serializeInto(appender);
204+
return queue.move();
205+
}
206+
207+
bool Frame_REQUEST_RESPONSE::deserializeFrom(std::unique_ptr<folly::IOBuf> in) {
208+
folly::io::Cursor cur(in.get());
209+
try {
210+
header_.deserializeFrom(cur);
211+
payload_.deserializeFrom(cur, header_.flags_);
212+
} catch (...) {
213+
return false;
214+
}
215+
return true;
216+
}
217+
218+
std::ostream& operator<<(
219+
std::ostream& os,
220+
const Frame_REQUEST_RESPONSE& frame) {
221+
return os << frame.header_ << ", " << frame.payload_;
222+
}
223+
/// @}
224+
195225
/// @{
196226
std::unique_ptr<folly::IOBuf> Frame_REQUEST_FNF::serializeOut() {
197227
auto queue = createBufferQueue(FrameHeader::kSize + payload_.framingSize());

src/Frame.h

+24-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ enum class FrameType : uint16_t {
3838
SETUP = 0x0001,
3939
LEASE = 0x0002,
4040
KEEPALIVE = 0x0003,
41-
// REQUEST_RESPONSE = 0x0004,
41+
REQUEST_RESPONSE = 0x0004,
4242
REQUEST_FNF = 0x0005,
4343
REQUEST_STREAM = 0x0006,
4444
REQUEST_SUB = 0x0007,
@@ -218,6 +218,29 @@ class Frame_REQUEST_CHANNEL : public Frame_REQUEST_Base {
218218
: Frame_REQUEST_CHANNEL(streamId, flags, 0, std::move(payload)) {}
219219
};
220220

221+
class Frame_REQUEST_RESPONSE {
222+
public:
223+
static constexpr bool Trait_CarriesAllowance = false;
224+
225+
Frame_REQUEST_RESPONSE() = default;
226+
Frame_REQUEST_RESPONSE(StreamId streamId, FrameFlags flags, Payload payload)
227+
: header_(
228+
FrameType::REQUEST_RESPONSE,
229+
flags | payload.getFlags(),
230+
streamId),
231+
payload_(std::move(payload)) {
232+
payload_.checkFlags(header_.flags_); // to verify the client didn't set
233+
// METADATA and provided none
234+
}
235+
236+
std::unique_ptr<folly::IOBuf> serializeOut();
237+
bool deserializeFrom(std::unique_ptr<folly::IOBuf> in);
238+
239+
FrameHeader header_;
240+
Payload payload_;
241+
};
242+
std::ostream& operator<<(std::ostream&, const Frame_REQUEST_RESPONSE&);
243+
221244
class Frame_REQUEST_FNF {
222245
public:
223246
static constexpr bool Trait_CarriesAllowance = false;

src/NullRequestHandler.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ void NullRequestHandler::handleRequestSubscription(
4646
response.onError(std::runtime_error("NullRequestHandler"));
4747
}
4848

49+
void NullRequestHandler::handleRequestResponse(
50+
Payload /*request*/,
51+
Subscriber<Payload>& response) {
52+
response.onSubscribe(createManagedInstance<NullSubscription>());
53+
response.onError(std::runtime_error("NullRequestHandler"));
54+
}
55+
4956
void NullRequestHandler::handleFireAndForgetRequest(Payload /*request*/) {}
5057

5158
void NullRequestHandler::handleMetadataPush(

src/NullRequestHandler.h

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ class NullRequestHandler : public RequestHandler {
3535
void handleRequestSubscription(Payload request, Subscriber<Payload>& response)
3636
override;
3737

38+
void handleRequestResponse(Payload request, Subscriber<Payload>& response)
39+
override;
40+
3841
void handleFireAndForgetRequest(Payload request) override;
3942

4043
void handleMetadataPush(std::unique_ptr<folly::IOBuf> request) override;

src/ReactiveSocket.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include "src/RequestHandler.h"
1919
#include "src/automata/ChannelRequester.h"
2020
#include "src/automata/ChannelResponder.h"
21+
#include "src/automata/RequestResponseRequester.h"
22+
#include "src/automata/RequestResponseResponder.h"
2123
#include "src/automata/StreamRequester.h"
2224
#include "src/automata/StreamResponder.h"
2325
#include "src/automata/SubscriptionRequester.h"
@@ -166,6 +168,21 @@ void ReactiveSocket::requestFireAndForget(Payload request) {
166168
connection_->outputFrameOrEnqueue(frame.serializeOut());
167169
}
168170

171+
void ReactiveSocket::requestResponse(
172+
Payload payload,
173+
Subscriber<Payload>& responseSink) {
174+
// TODO(stupaq): handle any exceptions
175+
StreamId streamId = nextStreamId_;
176+
nextStreamId_ += 2;
177+
RequestResponseRequester::Parameters params = {connection_, streamId};
178+
auto automaton = new RequestResponseRequester(params);
179+
connection_->addStream(streamId, *automaton);
180+
automaton->subscribe(responseSink);
181+
responseSink.onSubscribe(*automaton);
182+
automaton->onNext(std::move(payload));
183+
automaton->start();
184+
}
185+
169186
void ReactiveSocket::metadataPush(std::unique_ptr<folly::IOBuf> metadata) {
170187
connection_->outputFrameOrEnqueue(
171188
Frame_METADATA_PUSH(std::move(metadata)).serializeOut());
@@ -247,6 +264,19 @@ bool ReactiveSocket::createResponder(
247264
automaton->start();
248265
break;
249266
}
267+
case FrameType::REQUEST_RESPONSE: {
268+
Frame_REQUEST_RESPONSE frame;
269+
if (!frame.deserializeFrom(std::move(serializedFrame))) {
270+
return false;
271+
}
272+
SubscriptionResponder::Parameters params = {connection_, streamId};
273+
auto automaton = new RequestResponseResponder(params);
274+
connection_->addStream(streamId, *automaton);
275+
handler_->handleRequestResponse(std::move(frame.payload_), *automaton);
276+
automaton->onNextFrame(std::move(frame));
277+
automaton->start();
278+
break;
279+
}
250280
case FrameType::REQUEST_FNF: {
251281
Frame_REQUEST_FNF frame;
252282
if (!frame.deserializeFrom(std::move(serializedFrame))) {

src/ReactiveSocket.h

+2
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class ReactiveSocket {
7878

7979
void requestFireAndForget(Payload request);
8080

81+
void requestResponse(Payload payload, Subscriber<Payload>& responseSink);
82+
8183
void close();
8284

8385
void onClose(CloseListener listener);

src/RequestHandler.h

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ class RequestHandler {
3030
Payload request,
3131
Subscriber<Payload>& response) = 0;
3232

33+
/// Handles a new inbound RequestResponse requested by the other end.
34+
virtual void handleRequestResponse(
35+
Payload request,
36+
Subscriber<Payload>& response) = 0;
37+
3338
/// Handles a new fire-and-forget request sent by the other end.
3439
virtual void handleFireAndForgetRequest(Payload request) = 0;
3540

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright 2004-present Facebook. All Rights Reserved.
2+
3+
#include "RequestResponseRequester.h"
4+
5+
#include <iostream>
6+
7+
namespace reactivesocket {
8+
9+
void RequestResponseRequesterBase::subscribe(Subscriber<Payload>& subscriber) {
10+
DCHECK(!consumingSubscriber_);
11+
consumingSubscriber_.reset(&subscriber);
12+
// FIXME
13+
// Subscriber::onSubscribe is delivered externally, as it may attempt to
14+
// synchronously deliver Subscriber::request.
15+
}
16+
17+
void RequestResponseRequesterBase::onNext(Payload request) {
18+
switch (state_) {
19+
case State::NEW: {
20+
state_ = State::REQUESTED;
21+
Frame_REQUEST_RESPONSE frame(
22+
streamId_, FrameFlags_EMPTY, std::move(std::move(request)));
23+
connection_->outputFrameOrEnqueue(frame.serializeOut());
24+
break;
25+
}
26+
case State::REQUESTED:
27+
// Cannot receive a request payload twice.
28+
CHECK(false);
29+
break;
30+
case State::CLOSED:
31+
break;
32+
}
33+
}
34+
35+
void RequestResponseRequesterBase::request(size_t n) {
36+
if (payload_) {
37+
consumingSubscriber_.onNext(std::move(payload_.value()));
38+
payload_.clear();
39+
} else {
40+
waitingForPayload_ = true;
41+
}
42+
}
43+
44+
void RequestResponseRequesterBase::cancel() {
45+
switch (state_) {
46+
case State::NEW:
47+
state_ = State::CLOSED;
48+
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
49+
break;
50+
case State::REQUESTED: {
51+
state_ = State::CLOSED;
52+
connection_->outputFrameOrEnqueue(Frame_CANCEL(streamId_).serializeOut());
53+
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
54+
} break;
55+
case State::CLOSED:
56+
break;
57+
}
58+
}
59+
60+
void RequestResponseRequesterBase::endStream(StreamCompletionSignal signal) {
61+
switch (state_) {
62+
case State::NEW:
63+
case State::REQUESTED:
64+
// Spontaneous ::endStream signal means an error.
65+
DCHECK(StreamCompletionSignal::GRACEFUL != signal);
66+
state_ = State::CLOSED;
67+
break;
68+
case State::CLOSED:
69+
break;
70+
}
71+
// FIXME: switch on signal and verify that callsites propagate stream errors
72+
consumingSubscriber_.onComplete();
73+
}
74+
75+
void RequestResponseRequesterBase::onNextFrame(Frame_ERROR&& frame) {
76+
switch (state_) {
77+
case State::NEW:
78+
// Cannot receive a frame before sending the initial request.
79+
CHECK(false);
80+
break;
81+
case State::REQUESTED:
82+
state_ = State::CLOSED;
83+
consumingSubscriber_.onError(
84+
std::runtime_error(frame.payload_.moveDataToString()));
85+
connection_->endStream(streamId_, StreamCompletionSignal::ERROR);
86+
break;
87+
case State::CLOSED:
88+
break;
89+
}
90+
}
91+
92+
void RequestResponseRequesterBase::onNextFrame(Frame_RESPONSE&& frame) {
93+
bool end = false;
94+
switch (state_) {
95+
case State::NEW:
96+
// Cannot receive a frame before sending the initial request.
97+
CHECK(false);
98+
break;
99+
case State::REQUESTED:
100+
if (frame.header_.flags_ & FrameFlags_COMPLETE) {
101+
state_ = State::CLOSED;
102+
end = true;
103+
}
104+
break;
105+
case State::CLOSED:
106+
break;
107+
}
108+
if (waitingForPayload_) {
109+
consumingSubscriber_.onNext(std::move(frame.payload_));
110+
} else {
111+
payload_.assign(std::move(frame.payload_));
112+
}
113+
114+
if (end) {
115+
connection_->endStream(streamId_, StreamCompletionSignal::GRACEFUL);
116+
}
117+
}
118+
119+
std::ostream& RequestResponseRequesterBase::logPrefix(std::ostream& os) {
120+
return os << " RequestResponseRequester(" << &connection_ << ", " << streamId_
121+
<< "): ";
122+
}
123+
}

0 commit comments

Comments
 (0)