Skip to content

Commit 5bcbb6f

Browse files
authored
Merge pull request #11401 from hfu94/fix_main
Fix globalconfig refresh hang issue
2 parents 1791d07 + 94e4e30 commit 5bcbb6f

File tree

7 files changed

+69
-63
lines changed

7 files changed

+69
-63
lines changed

bindings/java/src/integration/com/apple/foundationdb/MultiClientHelper.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121

2222
import java.util.ArrayList;
2323
import java.util.Collection;
24-
import org.junit.jupiter.api.extension.AfterEachCallback;
25-
import org.junit.jupiter.api.extension.BeforeAllCallback;
2624
import org.junit.jupiter.api.extension.ExtensionContext;
27-
25+
import org.junit.jupiter.api.extension.BeforeAllCallback;
2826
/**
2927
* Callback to help define a multi-client scenario and ensure that
3028
* the clients can be configured properly.
3129
*/
32-
public class MultiClientHelper implements BeforeAllCallback,AfterEachCallback{
30+
public class MultiClientHelper implements BeforeAllCallback {
3331
private String[] clusterFiles;
3432
private Collection<Database> openDatabases;
3533

@@ -67,16 +65,4 @@ Collection<Database> openDatabases(FDB fdb){
6765
public void beforeAll(ExtensionContext arg0) throws Exception {
6866
clusterFiles = readClusterFromEnv();
6967
}
70-
71-
@Override
72-
public void afterEach(ExtensionContext arg0) throws Exception {
73-
//close any databases that have been opened
74-
if(openDatabases!=null){
75-
for(Database db : openDatabases){
76-
db.close();
77-
}
78-
}
79-
openDatabases = null;
80-
}
81-
8268
}

fdbclient/GlobalConfig.actor.cpp

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,7 @@ void GlobalConfig::erase(KeyRangeRef range) {
153153
}
154154
}
155155

156-
// Updates local copy of global configuration by reading the entire key-range
157-
// from storage (proxied through the GrvProxies).
158-
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self, Version lastKnown) {
156+
ACTOR Future<Version> GlobalConfig::refresh(GlobalConfig* self, Version lastKnown, Version largestSeen) {
159157
// TraceEvent trace(SevInfo, "GlobalConfigRefresh");
160158
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
161159

@@ -171,7 +169,11 @@ ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self, Version lastKnown)
171169
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
172170
self->insert(systemKey, kv.value);
173171
}
174-
return Void();
172+
if (reply.version >= largestSeen || largestSeen == std::numeric_limits<Version>::max()) {
173+
return reply.version;
174+
} else {
175+
wait(delay(0.25));
176+
}
175177
} catch (Error& e) {
176178
wait(backoff.onError());
177179
}
@@ -186,52 +188,61 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo*
186188
if (self->initialized.canBeSet()) {
187189
wait(self->cx->onConnected());
188190

189-
wait(self->refresh(self, -1));
191+
Version version = wait(self->refresh(self, -1, 0));
192+
self->lastUpdate = version;
193+
194+
self->cx->addref();
190195
self->initialized.send(Void());
196+
self->cx->delref();
191197
}
192198

193199
loop {
194200
try {
195-
wait(self->dbInfoChanged.onTrigger());
196-
197-
auto& history = dbInfo->history;
198-
if (history.size() == 0) {
199-
continue;
200-
}
201-
202-
if (self->lastUpdate < history[0].version) {
203-
// This process missed too many global configuration
204-
// history updates or the protocol version changed, so it
205-
// must re-read the entire configuration range.
206-
wait(self->refresh(self, history.back().version));
207-
if (dbInfo->history.size() > 0) {
208-
self->lastUpdate = dbInfo->history.back().version;
209-
}
210-
} else {
211-
// Apply history in order, from lowest version to highest
212-
// version. Mutation history should already be stored in
213-
// ascending version order.
214-
for (const auto& vh : history) {
215-
if (vh.version <= self->lastUpdate) {
216-
continue; // already applied this mutation
201+
// run one iteration at the beginning
202+
wait(delay(0));
203+
if (dbInfo->history.size() > 0) {
204+
if (self->lastUpdate < dbInfo->history[0].version) {
205+
// This process missed too many global configuration
206+
// history updates or the protocol version changed, so it
207+
// must re-read the entire configuration range.
208+
Version version =
209+
wait(self->refresh(self, self->lastUpdate, dbInfo->history.back().version));
210+
self->lastUpdate = version;
211+
// DBInfo could have changed after the wait. If
212+
// changes are present, re-run the loop to make
213+
// sure they are applied.
214+
if (dbInfo->history.size() > 0 &&
215+
dbInfo->history[0].version != std::numeric_limits<Version>::max()) {
216+
continue;
217217
}
218+
} else {
219+
// Apply history in order, from lowest version to highest
220+
// version. Mutation history should already be stored in
221+
// ascending version order.
222+
for (const auto& vh : dbInfo->history) {
223+
if (vh.version <= self->lastUpdate) {
224+
continue; // already applied this mutation
225+
}
218226

219-
for (const auto& mutation : vh.mutations.contents()) {
220-
if (mutation.type == MutationRef::SetValue) {
221-
self->insert(mutation.param1, mutation.param2);
222-
} else if (mutation.type == MutationRef::ClearRange) {
223-
self->erase(KeyRangeRef(mutation.param1, mutation.param2));
224-
} else {
225-
ASSERT(false);
227+
for (const auto& mutation : vh.mutations.contents()) {
228+
if (mutation.type == MutationRef::SetValue) {
229+
self->insert(mutation.param1, mutation.param2);
230+
} else if (mutation.type == MutationRef::ClearRange) {
231+
self->erase(KeyRangeRef(mutation.param1, mutation.param2));
232+
} else {
233+
UNREACHABLE();
234+
}
226235
}
227-
}
228236

229-
ASSERT(vh.version > self->lastUpdate);
230-
self->lastUpdate = vh.version;
237+
ASSERT(vh.version > self->lastUpdate);
238+
self->lastUpdate = vh.version;
239+
}
231240
}
241+
self->configChanged.trigger();
232242
}
233-
234-
self->configChanged.trigger();
243+
// In case this actor is canceled in the d'tor of GlobalConfig we can exit here.
244+
wait(delay(0));
245+
wait(self->dbInfoChanged.onTrigger());
235246
} catch (Error& e) {
236247
throw;
237248
}

fdbclient/NativeAPI.actor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9075,8 +9075,9 @@ void Transaction::checkDeferredError() const {
90759075

90769076
Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database& cx) {
90779077
if (!cx->isError()) {
9078-
double clientSamplingProbability =
9079-
cx->globalConfig->get<double>(fdbClientInfoTxnSampleRate, CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY);
9078+
double sampleRate =
9079+
cx->globalConfig->get<double>(fdbClientInfoTxnSampleRate, std::numeric_limits<double>::infinity());
9080+
double clientSamplingProbability = std::isinf(sampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : sampleRate;
90809081
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) &&
90819082
deterministicRandom()->random01() < clientSamplingProbability &&
90829083
(!g_network->isSimulated() || !g_simulator->speedUpSimulation)) {

fdbclient/include/fdbclient/CommitProxyInterface.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -727,14 +727,16 @@ struct ExclusionSafetyCheckRequest {
727727
struct GlobalConfigRefreshReply {
728728
constexpr static FileIdentifier file_identifier = 12680327;
729729
Arena arena;
730+
Version version;
730731
RangeResultRef result;
731732

732733
GlobalConfigRefreshReply() {}
733-
GlobalConfigRefreshReply(Arena const& arena, RangeResultRef result) : arena(arena), result(result) {}
734+
GlobalConfigRefreshReply(Arena const& arena, Version version, RangeResultRef result)
735+
: arena(arena), version(version), result(result) {}
734736

735737
template <class Ar>
736738
void serialize(Ar& ar) {
737-
serializer(ar, result, arena);
739+
serializer(ar, result, version, arena);
738740
}
739741
};
740742

fdbclient/include/fdbclient/GlobalConfig.actor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,10 @@ class GlobalConfig : NonCopyable {
163163
// of the global configuration keyspace.
164164
void erase(KeyRangeRef range);
165165

166-
ACTOR static Future<Void> refresh(GlobalConfig* self, Version lastKnown);
166+
// Updates local copy of global configuration by reading the entire key-range
167+
// from storage (proxied through the GrvProxies). Returns the version of the
168+
// refreshed data.
169+
ACTOR static Future<Version> refresh(GlobalConfig* self, Version lastKnown, Version largestSeen);
167170
ACTOR static Future<Void> updater(GlobalConfig* self, const ClientDBInfo* dbInfo);
168171

169172
DatabaseContext* cx;

fdbserver/ClusterController.actor.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
10771077
.detail("GrvProxies", db->clientInfo->get().grvProxies)
10781078
.detail("ReqGrvProxies", req.grvProxies)
10791079
.detail("CommitProxies", db->clientInfo->get().commitProxies)
1080+
.detail("GlobalConfigHistorySize", db->clientInfo->get().history.size())
10801081
.detail("ReqCPs", req.commitProxies)
10811082
.detail("TenantMode", db->clientInfo->get().tenantMode.toString())
10821083
.detail("ReqTenantMode", db->config.tenantMode.toString())
@@ -1093,6 +1094,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
10931094
clientInfo.id = deterministicRandom()->randomUniqueID();
10941095
clientInfo.commitProxies = req.commitProxies;
10951096
clientInfo.grvProxies = req.grvProxies;
1097+
clientInfo.history = db->clientInfo->get().history;
10961098
clientInfo.tenantMode = TenantAPI::tenantModeForClusterType(db->clusterType, db->config.tenantMode);
10971099
clientInfo.clusterId = db->serverInfo->get().client.clusterId;
10981100
clientInfo.clusterType = db->clusterType;
@@ -1742,12 +1744,12 @@ ACTOR Future<Void> monitorStorageMetadata(ClusterControllerData* self) {
17421744
ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
17431745
loop {
17441746
state ReadYourWritesTransaction tr(db->db);
1747+
state ClientDBInfo clientInfo;
17451748
loop {
17461749
try {
17471750
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
17481751
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
17491752
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
1750-
state ClientDBInfo clientInfo = db->serverInfo->get().client;
17511753

17521754
if (globalConfigVersion.present()) {
17531755
// Since the history keys end with versionstamps, they
@@ -1758,11 +1760,12 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
17581760
// If the global configuration version key has been set,
17591761
// the history should contain at least one item.
17601762
ASSERT(globalConfigHistory.size() > 0);
1763+
clientInfo = db->serverInfo->get().client;
17611764
clientInfo.history.clear();
17621765

17631766
for (const auto& kv : globalConfigHistory) {
17641767
ObjectReader reader(kv.value.begin(), IncludeVersion());
1765-
if (reader.protocolVersion() != g_network->protocolVersion()) {
1768+
if (reader.protocolVersion() != g_network->protocolVersion() || BUGGIFY_WITH_PROB(0.01)) {
17661769
// If the protocol version has changed, the
17671770
// GlobalConfig actor should refresh its view by
17681771
// reading the entire global configuration key

fdbserver/GrvProxyServer.actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ ACTOR Future<Void> globalConfigRequestServer(GrvProxyData* grvProxyData, GrvProx
366366
// point of view. The client learns the version through a
367367
// ClientDBInfo update).
368368
if (refresh.lastKnown <= cachedVersion) {
369-
refresh.reply.send(GlobalConfigRefreshReply{ cachedData.arena(), cachedData });
369+
refresh.reply.send(GlobalConfigRefreshReply{ cachedData.arena(), cachedVersion, cachedData });
370370
} else {
371371
refresh.reply.sendError(future_version());
372372
}

0 commit comments

Comments
 (0)