Skip to content

Commit 3eb1521

Browse files
tmontgomeryyschimke
authored andcommitted
added x86_64 version of SPSC ring buffer (#118)
* added x86_64 version of SPSC ring buffer * add missing atomic include * formatting * update to FB coding on constants.
1 parent 39fd881 commit 3eb1521

19 files changed

+977
-282
lines changed

CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ add_library(
140140
src/ResumeTracker.h
141141
src/ResumeCache.h
142142
src/ResumeCache.cpp
143+
src/concurrent/atomic64.h
144+
src/concurrent/atomic64_gcc_x86_64.h
145+
src/concurrent/OneToOneRingBuffer.h
143146
)
144147

145148
target_link_libraries(
@@ -171,6 +174,7 @@ add_executable(
171174
test/ReactiveStreamsMocksCompat.h
172175
test/Test.cpp
173176
test/MockStats.h
177+
test/concurrent/OneToOneRingBufferTest.cpp
174178
)
175179

176180
target_link_libraries(

src/ConnectionAutomaton.cpp

+28-22
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,11 @@ void ConnectionAutomaton::disconnect() {
7171
}
7272
}
7373

74-
void ConnectionAutomaton::reconnect(std::unique_ptr<DuplexConnection> newConnection) {
75-
disconnect();
76-
connection_ = std::move(newConnection);
77-
connect();
74+
void ConnectionAutomaton::reconnect(
75+
std::unique_ptr<DuplexConnection> newConnection) {
76+
disconnect();
77+
connection_ = std::move(newConnection);
78+
connect();
7879
}
7980

8081
ConnectionAutomaton::~ConnectionAutomaton() {
@@ -138,7 +139,8 @@ void ConnectionAutomaton::onNext(std::unique_ptr<folly::IOBuf> frame) {
138139

139140
stats_.frameRead(ss.str());
140141

141-
// TODO(tmont): If a frame is invalid, it will still be tracked. However, we actually want that. We want to keep
142+
// TODO(tmont): If a frame is invalid, it will still be tracked. However, we
143+
// actually want that. We want to keep
142144
// each side in sync, even if a frame is invalid.
143145
resumeTracker_->trackReceivedFrame(*frame);
144146

@@ -218,17 +220,20 @@ void ConnectionAutomaton::onConnectionFrame(
218220
bool canResume = false;
219221

220222
if (isServer_ && isResumable_) {
221-
// find old ConnectionAutmaton via calling listener.
222-
// Application will call resumeFromAutomaton to setup streams and resume information
223-
canResume = resumeListener_(frame.token_, frame.position_);
223+
// find old ConnectionAutmaton via calling listener.
224+
// Application will call resumeFromAutomaton to setup streams and
225+
// resume information
226+
canResume = resumeListener_(frame.token_, frame.position_);
224227
}
225228

226229
if (canResume) {
227230
outputFrameOrEnqueue(
228-
Frame_RESUME_OK(resumeTracker_->impliedPosition()).serializeOut());
231+
Frame_RESUME_OK(resumeTracker_->impliedPosition())
232+
.serializeOut());
229233
resumeCache_->retransmitFromPosition(frame.position_, *this);
230234
} else {
231-
outputFrameOrEnqueue(Frame_ERROR::canNotResume("can not resume").serializeOut());
235+
outputFrameOrEnqueue(
236+
Frame_ERROR::canNotResume("can not resume").serializeOut());
232237
disconnect();
233238
}
234239
} else {
@@ -240,11 +245,13 @@ void ConnectionAutomaton::onConnectionFrame(
240245
case FrameType::RESUME_OK: {
241246
Frame_RESUME_OK frame;
242247
if (frame.deserializeFrom(std::move(payload))) {
243-
if (!isServer_ && isResumable_ && resumeCache_->isPositionAvailable(frame.position_)) {
248+
if (!isServer_ && isResumable_ &&
249+
resumeCache_->isPositionAvailable(frame.position_)) {
244250
resumeCache_->retransmitFromPosition(frame.position_, *this);
245251
} else {
246-
outputFrameOrEnqueue(Frame_ERROR::canNotResume("can not resume").serializeOut());
247-
disconnect();
252+
outputFrameOrEnqueue(
253+
Frame_ERROR::canNotResume("can not resume").serializeOut());
254+
disconnect();
248255
}
249256
} else {
250257
outputFrameOrEnqueue(Frame_ERROR::unexpectedFrame().serializeOut());
@@ -321,17 +328,18 @@ void ConnectionAutomaton::sendKeepalive() {
321328
outputFrameOrEnqueue(pingFrame.serializeOut());
322329
}
323330

324-
void ConnectionAutomaton::sendResume(const ResumeIdentificationToken &token) {
331+
void ConnectionAutomaton::sendResume(const ResumeIdentificationToken& token) {
325332
Frame_RESUME resumeFrame(token, resumeTracker_->impliedPosition());
326333
outputFrameOrEnqueue(resumeFrame.serializeOut());
327334
}
328335

329336
bool ConnectionAutomaton::isPositionAvailable(ResumePosition position) {
330-
return resumeCache_->isPositionAvailable(position);
337+
return resumeCache_->isPositionAvailable(position);
331338
}
332339

333-
ResumePosition ConnectionAutomaton::positionDifference(ResumePosition position) {
334-
return resumeCache_->position() - position;
340+
ResumePosition ConnectionAutomaton::positionDifference(
341+
ResumePosition position) {
342+
return resumeCache_->position() - position;
335343
}
336344

337345
void ConnectionAutomaton::onClose(ConnectionCloseListener listener) {
@@ -374,16 +382,14 @@ void ConnectionAutomaton::outputFrame(
374382
connectionOutput_.onNext(std::move(outputFrame));
375383
}
376384

377-
void ConnectionAutomaton::resumeFromAutomaton(ConnectionAutomaton& oldAutomaton)
378-
{
379-
if (isServer_ && isResumable_)
380-
{
385+
void ConnectionAutomaton::resumeFromAutomaton(
386+
ConnectionAutomaton& oldAutomaton) {
387+
if (isServer_ && isResumable_) {
381388
streams_ = std::move(oldAutomaton.streams_);
382389
factory_ = oldAutomaton.factory_;
383390

384391
resumeTracker_ = std::move(oldAutomaton.resumeTracker_);
385392
resumeCache_ = std::move(oldAutomaton.resumeCache_);
386393
}
387394
}
388-
389395
}

src/ConnectionAutomaton.h

+7-6
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
#include "ReactiveSocket.h"
1212
#include "src/Payload.h"
1313
#include "src/ReactiveStreamsCompat.h"
14-
#include "src/Stats.h"
15-
#include "src/ResumeTracker.h"
1614
#include "src/ResumeCache.h"
15+
#include "src/ResumeTracker.h"
16+
#include "src/Stats.h"
1717

1818
namespace reactivesocket {
1919

@@ -34,8 +34,8 @@ using StreamId = uint32_t;
3434
using StreamAutomatonFactory =
3535
std::function<bool(StreamId, std::unique_ptr<folly::IOBuf>)>;
3636

37-
using ResumeListener =
38-
std::function<bool(const ResumeIdentificationToken &token, ResumePosition position)>;
37+
using ResumeListener = std::function<
38+
bool(const ResumeIdentificationToken& token, ResumePosition position)>;
3939

4040
using ConnectionCloseListener = std::function<void()>;
4141

@@ -117,12 +117,13 @@ class ConnectionAutomaton :
117117
/// ConnectionAutomaton.
118118
void endStream(StreamId streamId, StreamCompletionSignal signal);
119119

120-
/// Copy the streams and resumption information from a previous ConnectionAutomaton
120+
/// Copy the streams and resumption information from a previous
121+
/// ConnectionAutomaton
121122
void resumeFromAutomaton(ConnectionAutomaton& oldAutomaton);
122123
/// @}
123124

124125
void sendKeepalive();
125-
void sendResume(const ResumeIdentificationToken &token);
126+
void sendResume(const ResumeIdentificationToken& token);
126127

127128
bool isPositionAvailable(ResumePosition position);
128129
ResumePosition positionDifference(ResumePosition position);

src/Frame.cpp

+7-7
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ Frame_ERROR Frame_ERROR::applicationError(
356356
}
357357

358358
Frame_ERROR Frame_ERROR::canNotResume(const std::string& message) {
359-
return Frame_ERROR(0, ErrorCode::CONNECTION_ERROR, Payload(message));
359+
return Frame_ERROR(0, ErrorCode::CONNECTION_ERROR, Payload(message));
360360
}
361361

362362
std::unique_ptr<folly::IOBuf> Frame_ERROR::serializeOut() {
@@ -432,7 +432,7 @@ std::unique_ptr<folly::IOBuf> Frame_SETUP::serializeOut() {
432432
appender.writeBE(static_cast<uint32_t>(maxLifetime_));
433433
appender.push((const uint8_t*)token_.data(), token_.size());
434434

435-
CHECK(metadataMimeType_.length() <= std::numeric_limits<uint8_t>::max());
435+
CHECK(metadataMimeType_.length() <= std::numeric_limits<uint8_t>::max());
436436
appender.writeBE(static_cast<uint8_t>(metadataMimeType_.length()));
437437
appender.push(
438438
(const uint8_t*)metadataMimeType_.data(), metadataMimeType_.length());
@@ -509,7 +509,8 @@ std::ostream& operator<<(std::ostream& os, const Frame_LEASE& frame) {
509509
/// @{
510510
std::unique_ptr<folly::IOBuf> Frame_RESUME::serializeOut() {
511511
auto queue = createBufferQueue(
512-
FrameHeader::kSize + sizeof(ResumeIdentificationToken) + sizeof(ResumePosition));
512+
FrameHeader::kSize + sizeof(ResumeIdentificationToken) +
513+
sizeof(ResumePosition));
513514
folly::io::QueueAppender appender(&queue, /* do not grow */ 0);
514515

515516
header_.serializeInto(appender);
@@ -533,14 +534,14 @@ bool Frame_RESUME::deserializeFrom(std::unique_ptr<folly::IOBuf> in) {
533534

534535
std::ostream& operator<<(std::ostream& os, const Frame_RESUME& frame) {
535536
return os << frame.header_ << ", ("
536-
<< "token" << ", @" << frame.position_ << ")";
537+
<< "token"
538+
<< ", @" << frame.position_ << ")";
537539
}
538540
/// @}
539541

540542
/// @{
541543
std::unique_ptr<folly::IOBuf> Frame_RESUME_OK::serializeOut() {
542-
auto queue = createBufferQueue(
543-
FrameHeader::kSize + sizeof(ResumePosition));
544+
auto queue = createBufferQueue(FrameHeader::kSize + sizeof(ResumePosition));
544545
folly::io::QueueAppender appender(&queue, /* do not grow */ 0);
545546

546547
header_.serializeInto(appender);
@@ -564,5 +565,4 @@ std::ostream& operator<<(std::ostream& os, const Frame_RESUME_OK& frame) {
564565
return os << frame.header_ << ", (@" << frame.position_ << ")";
565566
}
566567
/// @}
567-
568568
}

src/Frame.h

+22-29
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
#pragma once
44

5+
#include <array>
56
#include <iosfwd>
67
#include <limits>
7-
#include <array>
88

99
/// Needed for inline d'tors of frames.
1010
#include <folly/io/IOBuf.h>
@@ -397,7 +397,7 @@ class Frame_SETUP {
397397
uint32_t version,
398398
uint32_t keepaliveTime,
399399
uint32_t maxLifetime,
400-
const ResumeIdentificationToken &token,
400+
const ResumeIdentificationToken& token,
401401
std::string metadataMimeType,
402402
std::string dataMimeType,
403403
Payload payload)
@@ -454,44 +454,37 @@ std::ostream& operator<<(std::ostream&, const Frame_SETUP&);
454454
/// @}
455455

456456
class Frame_RESUME {
457-
public:
458-
static constexpr bool Trait_CarriesAllowance = false;
459-
460-
Frame_RESUME() = default;
461-
Frame_RESUME(
462-
const ResumeIdentificationToken &token,
463-
ResumePosition position)
464-
: header_(FrameType::RESUME, 0, 0),
465-
token_(token),
466-
position_(position) {}
457+
public:
458+
static constexpr bool Trait_CarriesAllowance = false;
459+
460+
Frame_RESUME() = default;
461+
Frame_RESUME(const ResumeIdentificationToken& token, ResumePosition position)
462+
: header_(FrameType::RESUME, 0, 0), token_(token), position_(position) {}
467463

468-
std::unique_ptr<folly::IOBuf> serializeOut();
469-
bool deserializeFrom(std::unique_ptr<folly::IOBuf> in);
464+
std::unique_ptr<folly::IOBuf> serializeOut();
465+
bool deserializeFrom(std::unique_ptr<folly::IOBuf> in);
470466

471-
FrameHeader header_;
472-
ResumeIdentificationToken token_;
473-
ResumePosition position_;
467+
FrameHeader header_;
468+
ResumeIdentificationToken token_;
469+
ResumePosition position_;
474470
};
475471
std::ostream& operator<<(std::ostream&, const Frame_RESUME&);
476472
/// @}
477473

478474
class Frame_RESUME_OK {
479-
public:
480-
static constexpr bool Trait_CarriesAllowance = false;
475+
public:
476+
static constexpr bool Trait_CarriesAllowance = false;
481477

482-
Frame_RESUME_OK() = default;
483-
Frame_RESUME_OK(
484-
ResumePosition position)
485-
: header_(FrameType::RESUME_OK, 0, 0),
486-
position_(position) {}
478+
Frame_RESUME_OK() = default;
479+
Frame_RESUME_OK(ResumePosition position)
480+
: header_(FrameType::RESUME_OK, 0, 0), position_(position) {}
487481

488-
std::unique_ptr<folly::IOBuf> serializeOut();
489-
bool deserializeFrom(std::unique_ptr<folly::IOBuf> in);
482+
std::unique_ptr<folly::IOBuf> serializeOut();
483+
bool deserializeFrom(std::unique_ptr<folly::IOBuf> in);
490484

491-
FrameHeader header_;
492-
ResumePosition position_;
485+
FrameHeader header_;
486+
ResumePosition position_;
493487
};
494488
std::ostream& operator<<(std::ostream&, const Frame_RESUME_OK&);
495489
/// @}
496-
497490
}

src/ReactiveSocket.cpp

+11-7
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,14 @@ std::unique_ptr<ReactiveSocket> ReactiveSocket::fromClientConnection(
6565
ConnectionSetupPayload setupPayload,
6666
Stats& stats,
6767
std::unique_ptr<KeepaliveTimer> keepaliveTimer,
68-
const ResumeIdentificationToken &token) {
68+
const ResumeIdentificationToken& token) {
6969
std::unique_ptr<ReactiveSocket> socket(new ReactiveSocket(
7070
false,
7171
std::move(connection),
7272
std::move(handler),
73-
[](ReactiveSocket&, const ResumeIdentificationToken&, ResumePosition) { return false; },
73+
[](ReactiveSocket&, const ResumeIdentificationToken&, ResumePosition) {
74+
return false;
75+
},
7476
stats,
7577
std::move(keepaliveTimer)));
7678
socket->connection_->connect();
@@ -301,8 +303,10 @@ bool ReactiveSocket::createResponder(
301303
return true;
302304
}
303305

304-
bool ReactiveSocket::resumeListener(const ResumeIdentificationToken& token, ResumePosition position) {
305-
return resumeSocketListener_(*this, token, position);
306+
bool ReactiveSocket::resumeListener(
307+
const ResumeIdentificationToken& token,
308+
ResumePosition position) {
309+
return resumeSocketListener_(*this, token, position);
306310
}
307311

308312
void ReactiveSocket::close() {
@@ -313,12 +317,13 @@ void ReactiveSocket::onClose(CloseListener listener) {
313317
connection_->onClose([listener, this]() { listener(*this); });
314318
}
315319

316-
void ReactiveSocket::resumeFromSocket(ReactiveSocket &socket) {
320+
void ReactiveSocket::resumeFromSocket(ReactiveSocket& socket) {
317321
connection_->resumeFromAutomaton(*socket.connection_);
318322
}
319323

320324
void ReactiveSocket::tryClientResume(
321-
std::unique_ptr<DuplexConnection> newConnection, const ResumeIdentificationToken& token) {
325+
std::unique_ptr<DuplexConnection> newConnection,
326+
const ResumeIdentificationToken& token) {
322327
connection_->reconnect(std::move(newConnection));
323328
connection_->sendResume(token);
324329
}
@@ -330,5 +335,4 @@ bool ReactiveSocket::isPositionAvailable(ResumePosition position) {
330335
ResumePosition ReactiveSocket::positionDifference(ResumePosition position) {
331336
return connection_->positionDifference(position);
332337
}
333-
334338
}

0 commit comments

Comments
 (0)