Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import static org.assertj.core.api.Assertions.assertThatCode;

import com.scalar.db.api.DistributedStorageAdminRepairTableIntegrationTestBase;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.AdminTestUtils;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

Expand All @@ -23,6 +26,22 @@ protected AdminTestUtils getAdminTestUtils(String testName) {
return new CassandraAdminTestUtils(getProperties(testName));
}

// Override setUp() to share the ClusterManager between admin and adminTestUtils so that schema
// changes made via one are visible to the other. Without sharing, schema metadata propagates
// asynchronously across driver sessions and the repair-then-tableExists test sequence becomes
// flaky.
@Override
@BeforeEach
protected void setUp() throws Exception {
Properties properties = getProperties(TEST_NAME);
ClusterManager clusterManager = new ClusterManager(new DatabaseConfig(properties));
admin = new CassandraAdmin(clusterManager, new DatabaseConfig(properties));
adminTestUtils = new CassandraAdminTestUtils(properties, clusterManager);
Map<String, String> options = getCreationOptions();
admin.createNamespace(getNamespace(), options);
admin.createTable(getNamespace(), getTable(), getTableMetadata(), options);
}

@Test
@Disabled("there is no metadata table for Cassandra")
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public CassandraAdminTestUtils(Properties properties) {
clusterManager = new ClusterManager(databaseConfig);
}

public CassandraAdminTestUtils(Properties properties, ClusterManager clusterManager) {
super(properties);
this.clusterManager = clusterManager;
}

@Override
public void dropMetadataTable() {
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ protected List<String> getCommandArgsForTableReparation(
Path configFilePath, Path schemaFilePath) {
return ImmutableList.<String>builder()
.addAll(super.getCommandArgsForTableReparation(configFilePath, schemaFilePath))
.add("--no-scaling")
.add("--no-backup")
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.scalar.db.transaction.consensuscommit.ConsensusCommitAdminRepairTableIntegrationTestBase;
import com.scalar.db.util.AdminTestUtils;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;

public class ConsensusCommitAdminRepairTableIntegrationTestWithObjectStorage
extends ConsensusCommitAdminRepairTableIntegrationTestBase {
Expand All @@ -17,13 +16,4 @@ protected Properties getProps(String testName) {
protected AdminTestUtils getAdminTestUtils(String testName) {
return new ObjectStorageAdminTestUtils(getProperties(testName));
}

@Override
@Disabled("Object Storage recreates missing coordinator tables")
public void
repairTableAndCoordinatorTable_CoordinatorTablesDoNotExist_ShouldThrowIllegalArgumentException() {}

@Override
@Disabled("Object Storage recreates missing coordinator tables")
public void repairTable_ForNonExistingTable_ShouldThrowIllegalArgument() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.scalar.db.api.DistributedStorageAdminRepairTableIntegrationTestBase;
import com.scalar.db.util.AdminTestUtils;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;

public class ObjectStorageAdminRepairTableIntegrationTest
extends DistributedStorageAdminRepairTableIntegrationTestBase {
Expand All @@ -17,8 +16,4 @@ protected Properties getProperties(String testName) {
protected AdminTestUtils getAdminTestUtils(String testName) {
return new ObjectStorageAdminTestUtils(getProperties(testName));
}

@Override
@Disabled("Object Storage recreates missing coordinator tables")
public void repairTable_ForNonExistingTable_ShouldThrowIllegalArgument() {}
}
9 changes: 5 additions & 4 deletions core/src/main/java/com/scalar/db/api/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,14 @@ default boolean tableExists(String namespace, String table) throws ExecutionExce
}

/**
* Repairs a table which may be in an unknown state.
* Repairs a table that may be in an unknown state, such as the table exists in the underlying
* storage but not its ScalarDB metadata or vice versa. This will re-create the table, its
* secondary indexes, and their metadata if necessary.
*
* @param namespace an existing namespace
* @param table an existing table
* @param namespace a namespace
* @param table a table
* @param metadata the metadata associated to the table to repair
* @param options options to repair
* @throws IllegalArgumentException if the table does not exist
* @throws ExecutionException if the operation fails
*/
void repairTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.schemabuilder.Create;
import com.datastax.driver.core.schemabuilder.CreateIndex;
import com.datastax.driver.core.schemabuilder.CreateKeyspace;
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.datastax.driver.core.schemabuilder.SchemaBuilder.Direction;
Expand Down Expand Up @@ -92,8 +93,8 @@ public void createTable(
// Create the system namespace if it does not exist
createKeyspace(systemNamespace, true, options);

createTableInternal(namespace, table, metadata, options);
createSecondaryIndexes(namespace, table, metadata.getSecondaryIndexNames(), options);
createTableInternal(namespace, table, metadata, false, options);
createSecondaryIndexes(namespace, table, metadata.getSecondaryIndexNames(), false);
}

@Override
Expand Down Expand Up @@ -170,17 +171,29 @@ public void truncateTable(String namespace, String table) throws ExecutionExcept
public void createIndex(
String namespace, String table, String columnName, Map<String, String> options)
throws ExecutionException {
createIndexInternal(namespace, table, columnName, false);
}

public void createIndexInternal(
Comment thread
brfrn169 marked this conversation as resolved.
Comment thread
brfrn169 marked this conversation as resolved.
String namespace, String table, String columnName, boolean ifNotExists)
throws ExecutionException {
String indexName = getIndexName(table, columnName);
SchemaStatement createIndex =
SchemaBuilder.createIndex(indexName)
CreateIndex createIndex = SchemaBuilder.createIndex(indexName);
if (ifNotExists) {
createIndex = createIndex.ifNotExists();
}
SchemaStatement createIndexStatement =
createIndex
.onTable(quoteIfNecessary(namespace), quoteIfNecessary(table))
.andColumn(quoteIfNecessary(columnName));

try {
clusterManager.getSession().execute(createIndex.getQueryString());
clusterManager.getSession().execute(createIndexStatement.getQueryString());
} catch (RuntimeException e) {
throw new ExecutionException(
String.format(
"Creating the secondary index for %s.%s.%s failed", namespace, table, columnName),
"Creating the secondary index on the %s column of the %s table failed",
columnName, getFullTableName(namespace, table)),
e);
}
}
Expand Down Expand Up @@ -300,12 +313,13 @@ public boolean namespaceExists(String namespace) throws ExecutionException {
public void repairTable(
String namespace, String table, TableMetadata metadata, Map<String, String> options)
throws ExecutionException {
// We have this check to stay consistent with the behavior of the other admins classes
if (!tableExists(namespace, table)) {
throw new IllegalArgumentException(
"The table " + getFullTableName(namespace, table) + " does not exist");
try {
createTableInternal(namespace, table, metadata, true, options);
createSecondaryIndexes(namespace, table, metadata.getSecondaryIndexNames(), true);
} catch (ExecutionException e) {
throw new ExecutionException(
String.format("Repairing the %s table failed", getFullTableName(namespace, table)), e);
}
// The table metadata are not managed by ScalarDB, so we don't need to do anything here
}

@Override
Expand Down Expand Up @@ -431,10 +445,17 @@ public Set<String> getNamespaceNames() throws ExecutionException {

@VisibleForTesting
void createTableInternal(
String keyspace, String table, TableMetadata metadata, Map<String, String> options)
String keyspace,
String table,
TableMetadata metadata,
boolean ifNotExists,
Map<String, String> options)
throws ExecutionException {
Create createTable =
SchemaBuilder.createTable(quoteIfNecessary(keyspace), quoteIfNecessary(table));
if (ifNotExists) {
createTable = createTable.ifNotExists();
}
// Add columns
for (String pk : metadata.getPartitionKeyNames()) {
createTable =
Expand Down Expand Up @@ -487,16 +508,16 @@ void createTableInternal(
clusterManager.getSession().execute(createTableWithOptions.getQueryString());
} catch (RuntimeException e) {
throw new ExecutionException(
String.format("Creating the table %s.%s failed", keyspace, table), e);
String.format("Creating the %s table failed", getFullTableName(keyspace, table)), e);
}
}

@VisibleForTesting
void createSecondaryIndexes(
String keyspace, String table, Set<String> secondaryIndexNames, Map<String, String> options)
String keyspace, String table, Set<String> secondaryIndexNames, boolean ifNotExists)
throws ExecutionException {
for (String name : secondaryIndexNames) {
createIndex(keyspace, table, name, options);
createIndexInternal(keyspace, table, name, ifNotExists);
}
}

Expand Down
60 changes: 32 additions & 28 deletions core/src/main/java/com/scalar/db/storage/cosmos/CosmosAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,23 @@ public CosmosAdmin(DatabaseConfig databaseConfig) {
public void createTable(
String namespace, String table, TableMetadata metadata, Map<String, String> options)
throws ExecutionException {
checkMetadata(metadata);
try {
// Create the metadata database and container first if they do not exist
createMetadataDatabaseAndContainerIfNotExists();

createContainer(namespace, table, metadata);
putTableMetadata(namespace, table, metadata, false);
createTableInternal(namespace, table, metadata, false);
} catch (IllegalArgumentException e) {
throw e;
} catch (RuntimeException e) {
throw new ExecutionException("Creating the container failed", e);
}
}

private void createTableInternal(
String namespace, String table, TableMetadata metadata, boolean ifNotExists)
throws ExecutionException {
checkMetadata(metadata);
createContainer(namespace, table, metadata, ifNotExists);
putTableMetadata(namespace, table, metadata, true);
}

private void checkMetadata(TableMetadata metadata) {
for (String clusteringKeyName : metadata.getClusteringKeyNames()) {
if (metadata.getColumnDataType(clusteringKeyName) == DataType.BLOB) {
Expand All @@ -117,19 +122,28 @@ private void checkMetadata(TableMetadata metadata) {
}
}

private void createContainer(String database, String table, TableMetadata metadata)
private void createContainer(
String database, String table, TableMetadata metadata, boolean ifNotExists)
throws ExecutionException {
CosmosDatabase cosmosDatabase = client.getDatabase(database);
CosmosContainerProperties properties = computeContainerProperties(table, metadata);
cosmosDatabase.createContainer(properties);

addStoredProcedure(database, table);
if (ifNotExists) {
cosmosDatabase.createContainerIfNotExists(properties);
} else {
cosmosDatabase.createContainer(properties);
}
addStoredProcedure(database, table, ifNotExists);
}

private void addStoredProcedure(String namespace, String table) throws ExecutionException {
private void addStoredProcedure(String namespace, String table, boolean ifNotExists)
throws ExecutionException {
CosmosDatabase cosmosDatabase = client.getDatabase(namespace);
CosmosStoredProcedureProperties storedProcedureProperties =
computeContainerStoredProcedureProperties();

if (ifNotExists && storedProcedureExists(namespace, table)) {
return;
}
cosmosDatabase
.getContainer(table)
.getScripts()
Expand Down Expand Up @@ -510,24 +524,14 @@ public void repairTable(
String namespace, String table, TableMetadata metadata, Map<String, String> options)
throws ExecutionException {
try {
try {
// Since the metadata table may be missing, we cannot use CosmosAdmin.tableExists() as it
// queries the metadata table to verify if the given table exists
client.getDatabase(namespace).getContainer(table).read();
} catch (CosmosException e) {
if (e.getStatusCode() == 404) {
throw new IllegalArgumentException(
"The table " + getFullTableName(namespace, table) + " does not exist");
}
}
createMetadataDatabaseAndContainerIfNotExists();
putTableMetadata(namespace, table, metadata, true);
if (!storedProcedureExists(namespace, table)) {
addStoredProcedure(namespace, table);
}
} catch (ExecutionException | CosmosException e) {
createTableInternal(namespace, table, metadata, true);
updateIndexingPolicy(namespace, table, metadata);
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
throw new ExecutionException(
String.format("Repairing the table %s.%s failed", namespace, table), e);
String.format("Repairing the %s container failed", getFullTableName(namespace, table)),
e);
}
}

Expand Down
Loading