Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-20267: Reduce heap pressure when initializing CMS #3856

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion NEWS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ Upgrading
However, nodes still UP and running the old version will. This will eventually cause the migration to fail, as the
cluster will not be in agreement.
- > nodetool cms initialize
Got mismatching cluster metadatas from [/x.x.x.x:7000] aborting migration
Got mismatching cluster metadatas. Check logs on peers ([/x.x.x.x:7000]) for details of mismatches.
Aborting migration.
See 'nodetool help' or 'nodetool help <command>'.
If the cms initialize command fails, it will indicate which nodes’ current metadata does not agree with the node
where the command was executed. To mitigate this situation, bring any mismatching nodes DOWN and rerun the
Expand Down
8 changes: 5 additions & 3 deletions src/java/org/apache/cassandra/net/Verb.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.FetchCMSLog;
import org.apache.cassandra.tcm.FetchPeerLog;
import org.apache.cassandra.tcm.migration.CMSInitializationResponse;
import org.apache.cassandra.tcm.migration.Election;
import org.apache.cassandra.tcm.migration.CMSInitializationRequest;
import org.apache.cassandra.tcm.sequences.DataMovements;
import org.apache.cassandra.tcm.serialization.MessageSerializers;
import org.apache.cassandra.utils.BooleanSerializer;
Expand Down Expand Up @@ -232,9 +234,9 @@ public enum Verb
TCM_NOTIFY_RSP (806, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, () -> ResponseVerbHandler.instance ),
TCM_NOTIFY_REQ (807, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::logStateSerializer, () -> logNotifyHandler(), TCM_NOTIFY_RSP ),
TCM_CURRENT_EPOCH_REQ (808, P0, rpcTimeout, INTERNAL_METADATA, () -> Epoch.messageSerializer, () -> currentEpochRequestHandler(), TCM_NOTIFY_RSP ),
TCM_INIT_MIG_RSP (809, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::metadataHolderSerializer, () -> ResponseVerbHandler.instance ),
TCM_INIT_MIG_REQ (810, P0, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.prepareHandler, TCM_INIT_MIG_RSP ),
TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ),
TCM_INIT_MIG_RSP (809, P0, rpcTimeout, INTERNAL_METADATA, () -> CMSInitializationResponse.serializer, () -> ResponseVerbHandler.instance ),
TCM_INIT_MIG_REQ (810, P0, rpcTimeout, INTERNAL_METADATA, MessageSerializers::initRequestSerializer, () -> Election.instance.prepareHandler, TCM_INIT_MIG_RSP ),
TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> CMSInitializationRequest.Initiator.serializer,() -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ),
TCM_DISCOVER_RSP (812, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, () -> ResponseVerbHandler.instance ),
TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ),
TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout, FETCH_LOG, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ),
Expand Down
28 changes: 28 additions & 0 deletions src/java/org/apache/cassandra/schema/SchemaKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,34 @@ private static void flush()
ALL.forEach(table -> FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush(ColumnFamilyStore.FlushReason.INTERNALLY_FORCED)));
}

/**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
*
* Only used when initializing the CMS
*/
public static UUID calculateSchemaDigest()
{
Digest digest = Digest.forSchema();
for (String table : ALL)
{
ReadCommand cmd = getReadCommandForTableSchema(table);
try (ReadExecutionController executionController = cmd.executionController();
PartitionIterator schema = cmd.executeInternal(executionController))
{
while (schema.hasNext())
{
try (RowIterator partition = schema.next())
{
if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
RowIterators.digest(partition, digest);
}
}
}
}
return UUID.nameUUIDFromBytes(digest.digest());
}

/**
* @param schemaTableName The name of the table responsible for part of the schema
* @return CFS responsible to hold low-level serialized schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public void upgradeFromGossip(List<String> ignoredEndpoints)
!ignored.contains(ep))
.collect(toImmutableSet());

Election.instance.nominateSelf(candidates, ignored, metadata::equals, metadata);
Election.instance.nominateSelf(candidates, ignored, metadata, true);
ClusterMetadataService.instance().triggerSnapshot();
}
else
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/tcm/Startup.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.migration.Election;
import org.apache.cassandra.tcm.migration.CMSInitializationRequest;
import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
import org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
Expand Down Expand Up @@ -229,8 +230,8 @@ public static void initializeForDiscovery(Runnable initMessaging)
{
Election.instance.nominateSelf(candidates.nodes(),
Collections.singleton(FBUtilities.getBroadcastAddressAndPort()),
(cm) -> true,
null);
ClusterMetadata.current(),
false);
}
}

Expand All @@ -243,7 +244,7 @@ public static void initializeForDiscovery(Runnable initMessaging)
}
else
{
Election.Initiator initiator = Election.instance.initiator();
CMSInitializationRequest.Initiator initiator = Election.instance.initiator();
candidates = Discovery.instance.discoverOnce(initiator == null ? null : initiator.initiator);
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.tcm.migration;

import java.io.IOException;
import java.util.Objects;
import java.util.UUID;

import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.ownership.TokenMap;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.utils.UUIDSerializer;

public class CMSInitializationRequest
{
public static final IVersionedSerializer<CMSInitializationRequest> defaultMessageSerializer = new Serializer(NodeVersion.CURRENT.serializationVersion());

private static volatile Serializer serializerCache;

public static IVersionedSerializer<CMSInitializationRequest> messageSerializer(Version version)
{
Serializer cached = serializerCache;
if (cached != null && cached.serializationVersion.equals(version))
return cached;
cached = new Serializer(version);
serializerCache = cached;
return cached;
}

public final Initiator initiator;
public final Directory directory;
public final TokenMap tokenMap;
public final UUID schemaVersion;

public CMSInitializationRequest(InetAddressAndPort initiator, UUID initToken, ClusterMetadata metadata)
{
this(new Initiator(initiator, initToken), metadata.directory, metadata.tokenMap, SchemaKeyspace.calculateSchemaDigest());
}

public CMSInitializationRequest(Initiator initiator, Directory directory, TokenMap tokenMap, UUID schemaVersion)
{
this.initiator = initiator;
this.directory = directory;
this.tokenMap = tokenMap;
this.schemaVersion = schemaVersion;
}

public static class Serializer implements IVersionedSerializer<CMSInitializationRequest>
{
private final Version serializationVersion;

public Serializer(Version serializationVersion)
{
this.serializationVersion = serializationVersion;
}

@Override
public void serialize(CMSInitializationRequest t, DataOutputPlus out, int version) throws IOException
{
Initiator.serializer.serialize(t.initiator, out, version);
Directory.serializer.serialize(t.directory, out, serializationVersion);
TokenMap.serializer.serialize(t.tokenMap, out, serializationVersion);
UUIDSerializer.serializer.serialize(t.schemaVersion, out, version);
}

@Override
public CMSInitializationRequest deserialize(DataInputPlus in, int version) throws IOException
{
Initiator initiator = Initiator.serializer.deserialize(in, version);
Directory directory = Directory.serializer.deserialize(in, serializationVersion);
TokenMap tokenMap = TokenMap.serializer.deserialize(in, serializationVersion);
UUID schemaVersion = UUIDSerializer.serializer.deserialize(in, version);
return new CMSInitializationRequest(initiator, directory, tokenMap, schemaVersion);
}

@Override
public long serializedSize(CMSInitializationRequest t, int version)
{
return Initiator.serializer.serializedSize(t.initiator, version) +
Directory.serializer.serializedSize(t.directory, serializationVersion) +
TokenMap.serializer.serializedSize(t.tokenMap, serializationVersion) +
UUIDSerializer.serializer.serializedSize(t.schemaVersion, version);
}
}

public static class Initiator
{
public static final Serializer serializer = new Serializer();
public final InetAddressAndPort initiator;
public final UUID initToken;

public Initiator(InetAddressAndPort initiator, UUID initToken)
{
this.initiator = initiator;
this.initToken = initToken;
}

@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (!(o instanceof Initiator)) return false;
Initiator other = (Initiator) o;
return Objects.equals(initiator, other.initiator) && Objects.equals(initToken, other.initToken);
}

@Override
public int hashCode()
{
return Objects.hash(initiator, initToken);
}

@Override
public String toString()
{
return "Initiator{" +
"initiator=" + initiator +
", initToken=" + initToken +
'}';
}

public static class Serializer implements IVersionedSerializer<Initiator>
{
@Override
public void serialize(Initiator t, DataOutputPlus out, int version) throws IOException
{
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator, out, version);
UUIDSerializer.serializer.serialize(t.initToken, out, version);
}

@Override
public Initiator deserialize(DataInputPlus in, int version) throws IOException
{
return new Initiator(InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in, version),
UUIDSerializer.serializer.deserialize(in, version));
}

@Override
public long serializedSize(Initiator t, int version)
{
return InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator, version) +
UUIDSerializer.serializer.serializedSize(t.initToken, version);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.tcm.migration;

import java.io.IOException;

import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;

public class CMSInitializationResponse
{
public static final IVersionedSerializer<CMSInitializationResponse> serializer = new Serializer();

public final CMSInitializationRequest.Initiator initiator;
public final boolean metadataMatches;

public CMSInitializationResponse(CMSInitializationRequest.Initiator initiator, boolean metadataMatches)
{
this.initiator = initiator;
this.metadataMatches = metadataMatches;
}

@Override
public String toString()
{
return "CMSInitializationResponse{" +
"initiator=" + initiator +
", metadataMatches=" + metadataMatches +
'}';
}

private static class Serializer implements IVersionedSerializer<CMSInitializationResponse>
{
@Override
public void serialize(CMSInitializationResponse t, DataOutputPlus out, int version) throws IOException
{
CMSInitializationRequest.Initiator.serializer.serialize(t.initiator, out, version);
out.writeBoolean(t.metadataMatches);
}

@Override
public CMSInitializationResponse deserialize(DataInputPlus in, int version) throws IOException
{
CMSInitializationRequest.Initiator coordinator = CMSInitializationRequest.Initiator.serializer.deserialize(in, version);
boolean metadataMatches = in.readBoolean();
return new CMSInitializationResponse(coordinator, metadataMatches);
}

@Override
public long serializedSize(CMSInitializationResponse t, int version)
{
return CMSInitializationRequest.Initiator.serializer.serializedSize(t.initiator, version) +
TypeSizes.sizeof(t.metadataMatches);
}
}
}
Loading