Skip to content

Commit 15eef57

Browse files
author
wuxianrong
committed
Enhance asynchronous performance optimization
1 parent 66b19c7 commit 15eef57

File tree

10 files changed

+223
-67
lines changed

10 files changed

+223
-67
lines changed

src/pika_kv.cc

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
#include "include/pika_cache.h"
1313
#include "include/pika_conf.h"
1414
#include "include/pika_slot_command.h"
15+
#include "include/pika_server.h"
16+
#include "praft/praft.h"
1517

1618
extern std::unique_ptr<PikaConf> g_pika_conf;
19+
extern PikaServer* g_pika_server;
1720
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
1821
void SetCmd::DoInitial() {
1922
if (!CheckArg(argv_.size())) {
@@ -67,24 +70,74 @@ void SetCmd::DoInitial() {
6770
void SetCmd::Do() {
6871
int32_t res = 1;
6972
STAGE_TIMER_GUARD(storage_duration_ms, true);
73+
74+
bool is_raft_leader = false;
75+
if (g_pika_server && g_pika_server->GetRaftManager()) {
76+
auto node = g_pika_server->GetRaftManager()->GetRaftNode(db_->GetDBName());
77+
is_raft_leader = (node && node->IsLeader());
78+
}
79+
80+
storage::CommitCallback callback = nullptr;
81+
82+
if (is_raft_leader && db_->storage() && db_->storage()->IsRaftEnabled()) {
83+
auto self = std::static_pointer_cast<SetCmd>(shared_from_this());
84+
auto resp_ptr = std::make_shared<std::string>();
85+
86+
auto pika_conn = std::dynamic_pointer_cast<PikaClientConn>(GetConn());
87+
if (!pika_conn) {
88+
res_.SetRes(CmdRes::kErrOther, "Invalid connection");
89+
return;
90+
}
91+
92+
callback = [self, resp_ptr, pika_conn](rocksdb::Status status) {
93+
int32_t result = (status.ok() || status.IsNotFound()) ? 1 : 0;
94+
95+
if (status.ok() || status.IsNotFound()) {
96+
if (self->condition_ == SetCmd::kVX) {
97+
self->res_.AppendInteger(self->success_);
98+
} else {
99+
if (result == 1) {
100+
self->res_.SetRes(CmdRes::kOk);
101+
AddSlotKey("k", self->key_, self->db_);
102+
} else {
103+
self->res_.AppendStringLen(-1);
104+
}
105+
}
106+
} else {
107+
self->res_.SetRes(CmdRes::kErrOther, status.ToString());
108+
}
109+
110+
*resp_ptr = std::move(self->res_.message());
111+
pika_conn->WriteResp(*resp_ptr);
112+
pika_conn->NotifyEpoll(true);
113+
};
114+
}
115+
116+
// Call storage layer with optional callback
70117
switch (condition_) {
71118
case SetCmd::kXX:
72-
s_ = db_->storage()->Setxx(key_, value_, &res, static_cast<int32_t>(sec_));
119+
s_ = db_->storage()->Setxx(key_, value_, &res, static_cast<int32_t>(sec_), callback);
73120
break;
74121
case SetCmd::kNX:
75-
s_ = db_->storage()->Setnx(key_, value_, &res, static_cast<int32_t>(sec_));
122+
s_ = db_->storage()->Setnx(key_, value_, &res, static_cast<int32_t>(sec_), callback);
76123
break;
77124
case SetCmd::kVX:
78125
s_ = db_->storage()->Setvx(key_, target_, value_, &success_, static_cast<int32_t>(sec_));
79126
break;
80127
case SetCmd::kEXORPX:
81-
s_ = db_->storage()->Setex(key_, value_, static_cast<int32_t>(sec_));
128+
s_ = db_->storage()->Setex(key_, value_, static_cast<int32_t>(sec_), callback);
82129
break;
83130
default:
84-
s_ = db_->storage()->Set(key_, value_);
131+
s_ = db_->storage()->Set(key_, value_, callback);
85132
break;
86133
}
87134

135+
// For async mode (Leader), response is handled by callback
136+
if (is_raft_leader && db_->storage() && db_->storage()->IsRaftEnabled()) {
137+
return;
138+
}
139+
140+
// For sync mode (non-Leader or non-Raft), set response immediately
88141
if (s_.ok() || s_.IsNotFound()) {
89142
if (condition_ == SetCmd::kVX) {
90143
res_.AppendInteger(success_);

src/pika_server.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,11 @@ PikaServer::PikaServer()
117117

118118
std::lock_guard rwl(storage_options_rw_);
119119
storage_options_.append_log_function =
120-
[this](const ::pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise) {
120+
[this](const ::pikiwidb::Binlog& binlog, std::promise<rocksdb::Status>&& promise,
121+
storage::CommitCallback callback) {
121122
std::string db_name = "db0";
122123

123-
raft_manager_->AppendLog(db_name, binlog, std::move(promise));
124+
raft_manager_->AppendLog(db_name, binlog, std::move(promise), callback);
124125
};
125126
LOG(INFO) << "Raft append_log_function registered in storage_options";
126127
}

src/praft/include/praft/praft.h

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
#include "pstd/include/pstd_mutex.h"
2222
#include "pstd/include/pstd_status.h"
2323
#include "rocksdb/status.h"
24+
#include "storage/batch.h"
2425

2526
class PikaServer;
27+
class Cmd;
2628

2729
// Forward declarations
2830
namespace storage {
@@ -33,6 +35,10 @@ namespace pikiwidb {
3335
class Binlog;
3436
}
3537

38+
namespace net {
39+
class NetConn;
40+
}
41+
3642
namespace pika_raft {
3743

3844
// Raft log entry data structure
@@ -44,13 +50,22 @@ class WriteDoneClosure : public braft::Closure {
4450

4551
void Run() override;
4652

47-
// Set promise for synchronous Raft apply
53+
// Set promise for synchronous Raft apply (used in Follower on_apply)
4854
void SetPromise(std::shared_ptr<std::promise<rocksdb::Status>> p) {
4955
promise_ = p;
5056
}
57+
58+
// Set callback for async response (used in Leader)
59+
void SetCallback(storage::CommitCallback callback) {
60+
callback_ = callback;
61+
}
5162

5263
private:
64+
// For synchronous mode (Follower)
5365
std::shared_ptr<std::promise<rocksdb::Status>> promise_;
66+
67+
// For asynchronous mode (Leader)
68+
storage::CommitCallback callback_;
5469
};
5570

5671
// Pika state machine implementation
@@ -162,10 +177,12 @@ class RaftManager {
162177
// Get cluster information
163178
pstd::Status GetClusterInfo(const std::string& db_name, std::string* info);
164179

165-
// Append binlog
180+
// Append binlog (supports both sync and async modes)
181+
// Sync mode: pass promise, async mode: pass callback
166182
void AppendLog(const std::string& db_name,
167183
const ::pikiwidb::Binlog& log,
168-
std::promise<rocksdb::Status>&& promise);
184+
std::promise<rocksdb::Status>&& promise,
185+
storage::CommitCallback callback = nullptr);
169186

170187
// Get Raft node for a specific DB
171188
std::shared_ptr<PikaRaftNode> GetRaftNode(const std::string& db_name);

src/praft/src/praft.cc

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include "brpc/closure_guard.h"
1717
#include "include/pika_conf.h"
1818
#include "include/pika_server.h"
19+
#include "include/pika_command.h"
20+
#include "include/pika_client_conn.h"
1921
#include "binlog.pb.h"
2022
#include "storage/storage.h"
2123
#include "storage/batch.h"
@@ -568,7 +570,7 @@ braft::PeerId RaftManager::ParsePeerId(const std::string& peer_str) {
568570
void WriteDoneClosure::Run() {
569571
std::unique_ptr<WriteDoneClosure> self_guard(this);
570572

571-
// If promise is set, notify the waiting thread
573+
// If promise is set, notify the waiting thread (synchronous mode)
572574
if (promise_) {
573575
if (status().ok()) {
574576
promise_->set_value(rocksdb::Status::OK());
@@ -578,35 +580,60 @@ void WriteDoneClosure::Run() {
578580
return;
579581
}
580582

581-
// Legacy path for non-promise closures
582-
if (!status().ok()) {
583+
// If callback is set, call it (asynchronous mode for Leader)
584+
if (callback_) {
585+
rocksdb::Status s;
586+
if (status().ok()) {
587+
s = rocksdb::Status::OK();
588+
} else {
589+
s = rocksdb::Status::IOError(status().error_str());
590+
}
591+
// Call callback with status only (result is captured in lambda)
592+
callback_(s);
583593
return;
584594
}
595+
596+
// Legacy path for closures without promise or callback
597+
if (!status().ok()) {
598+
LOG(WARNING) << "Raft operation failed: " << status().error_str();
599+
}
585600
}
586601

587602
void RaftManager::AppendLog(const std::string& db_name,
588603
const ::pikiwidb::Binlog& log,
589-
std::promise<rocksdb::Status>&& promise) {
604+
std::promise<rocksdb::Status>&& promise,
605+
storage::CommitCallback callback) {
590606
auto node = GetRaftNode(db_name);
591607
if (!node) {
592608
LOG(ERROR) << "Raft node not found for DB: " << db_name;
593-
promise.set_value(rocksdb::Status::NotFound("Raft node not found"));
609+
if (callback) {
610+
callback(rocksdb::Status::NotFound("Raft node not found"));
611+
} else {
612+
promise.set_value(rocksdb::Status::NotFound("Raft node not found"));
613+
}
594614
return;
595615
}
596616

597617
if (!node->IsLeader()) {
598618
braft::PeerId leader = node->GetLeaderId();
599619
LOG(WARNING) << "Current node is not leader for DB: " << db_name
600620
<< ", leader: " << leader.to_string();
601-
promise.set_value(rocksdb::Status::Incomplete("Not leader"));
621+
if (callback) {
622+
callback(rocksdb::Status::Incomplete("Not leader, leader is: " + leader.to_string()));
623+
} else {
624+
promise.set_value(rocksdb::Status::Incomplete("Not leader"));
625+
}
602626
return;
603627
}
604628

605-
// 创建 WriteDoneClosure 并传递 promise
606629
auto* done = new WriteDoneClosure();
607-
done->SetPromise(std::make_shared<std::promise<rocksdb::Status>>(std::move(promise)));
608630

609-
// 序列化 binlog
631+
if (callback) {
632+
done->SetCallback(callback);
633+
} else {
634+
done->SetPromise(std::make_shared<std::promise<rocksdb::Status>>(std::move(promise)));
635+
}
636+
610637
butil::IOBuf data;
611638
butil::IOBufAsZeroCopyOutputStream wrapper(&data);
612639
if (!log.SerializeToZeroCopyStream(&wrapper)) {
@@ -615,12 +642,10 @@ void RaftManager::AppendLog(const std::string& db_name,
615642
return;
616643
}
617644

618-
// 创建 Raft 任务
619645
braft::Task task;
620646
task.data = &data;
621647
task.done = done;
622648

623-
// 提交到 Raft
624649
node->GetRaftNode()->apply(task);
625650
}
626651

@@ -649,7 +674,6 @@ rocksdb::Status RaftManager::ApplyBinlogEntry(const std::string& binlog_data) {
649674
}
650675

651676
// 调用 Storage::OnBinlogWrite() 应用 binlog
652-
// 注意:log_index 暂时传 0,后续可以从外部传入
653677
auto status = storage->OnBinlogWrite(binlog, 0);
654678

655679
if (!status.ok()) {

src/storage/include/storage/batch.h

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,24 @@ namespace pikiwidb {
1717
class Binlog;
1818
}
1919

20+
class Cmd;
21+
22+
namespace net {
23+
class NetConn;
24+
}
25+
2026
namespace storage {
2127

2228
class Storage;
2329
class Redis;
2430

25-
using AppendLogFunction = std::function<void(const ::pikiwidb::Binlog&, std::promise<rocksdb::Status>&&)>;
31+
// Callback for async commit result
32+
// Parameters: status (result should be captured in the lambda from storage layer)
33+
// Note: Connection and result should be captured in the lambda, not passed as parameters
34+
using CommitCallback = std::function<void(rocksdb::Status)>;
35+
36+
using AppendLogFunction = std::function<void(const ::pikiwidb::Binlog&, std::promise<rocksdb::Status>&&,
37+
CommitCallback)>;
2638

2739
using ColumnFamilyIndex = uint32_t;
2840

@@ -34,7 +46,7 @@ class Batch {
3446

3547
virtual void Delete(ColumnFamilyIndex cf_idx, const rocksdb::Slice& key) = 0;
3648

37-
virtual rocksdb::Status Commit() = 0;
49+
virtual rocksdb::Status Commit(CommitCallback callback = nullptr) = 0;
3850

3951
int32_t Count() const { return count_; }
4052

@@ -52,7 +64,7 @@ class RocksBatch : public Batch {
5264

5365
void Put(ColumnFamilyIndex cf_idx, const rocksdb::Slice& key, const rocksdb::Slice& value) override;
5466
void Delete(ColumnFamilyIndex cf_idx, const rocksdb::Slice& key) override;
55-
rocksdb::Status Commit() override;
67+
rocksdb::Status Commit(CommitCallback callback = nullptr) override;
5668

5769
private:
5870
rocksdb::WriteBatch batch_;
@@ -69,8 +81,8 @@ class BinlogBatch : public Batch {
6981
void Put(ColumnFamilyIndex cf_idx, const rocksdb::Slice& key, const rocksdb::Slice& value) override;
7082
void Delete(ColumnFamilyIndex cf_idx, const rocksdb::Slice& key) override;
7183

72-
// 同步等待 Raft 应用完成(使用 promise/future)
73-
rocksdb::Status Commit() override;
84+
// Commit to Raft (sync mode if callback is null, async mode otherwise)
85+
rocksdb::Status Commit(CommitCallback callback = nullptr) override;
7486

7587
private:
7688
AppendLogFunction append_log_func_;

0 commit comments

Comments
 (0)