Skip to content

[storage] Atomic alter table in the storage controller #31408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 31 additions & 47 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2232,7 +2232,7 @@ where
new_desc: RelationDesc,
expected_version: RelationVersion,
) -> Result<(), StorageError<Self::Timestamp>> {
let (data_shard, existing_write_frontier, existing_read_capabilities) = {
let data_shard = {
let self_collections = self.collections.lock().expect("lock poisoned");
let existing = self_collections
.get(&existing_collection)
Expand All @@ -2243,11 +2243,7 @@ where
return Err(StorageError::IdentifierInvalid(existing_collection));
}

(
existing.collection_metadata.data_shard,
existing.write_frontier.clone(),
existing.read_capabilities.clone(),
)
existing.collection_metadata.data_shard
};

let persist_client = self
Expand Down Expand Up @@ -2319,46 +2315,11 @@ where
// TODO(alter_table): Do we need to advance the since of the table to match the time this
// new version was registered with txn-wal?

// Because this is a Table, we know it's managed by txn_wal, and thus it's logical write
// frontier is probably in advance of the write_handle's upper. So we fast forward the
// write frontier to match that of the existing collection.
let write_frontier =
if PartialOrder::less_equal(write_handle.upper(), &existing_write_frontier) {
existing_write_frontier
} else {
mz_ore::soft_panic_or_log!(
"WriteHandle frontier in advance of logical write frontier, {:?} vs {:?}",
write_handle.upper(),
existing_write_frontier
);
write_handle.upper().clone()
};

// Note: The new collection is now the "primary collection" so we specify `None` here.
let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
let collection_meta = CollectionMetadata {
persist_location: self.persist_location.clone(),
relation_desc: collection_desc.desc.clone(),
data_shard,
// TODO(alter_table): Support changes to sources.
remap_shard: None,
txns_shard: Some(self.txns_read.txns_id().clone()),
};
let collection_state = CollectionState::new(
collection_desc,
since_handle.since().clone(),
write_frontier,
Vec::new(),
collection_meta,
);

// Great! Our new schema is registered with Persist, now we need to update our internal
// data structures.
{
let mut self_collections = self.collections.lock().expect("lock poisoned");

// Add a record of the new collection.
self_collections.insert(new_collection, collection_state);
// Update the existing collection so we know it's a "projection" of this new one.
let existing = self_collections
.get_mut(&existing_collection)
Expand All @@ -2376,19 +2337,42 @@ where
};
existing.storage_dependencies.push(new_collection);

// Install the relevant read capabilities on the new collection.
// Copy over the frontiers from the previous version.
// The new table starts with two holds - the implied capability, and the hold from
// the previous version - both at the previous version's read frontier.
let implied_capability = existing.read_capabilities.frontier().to_owned();
let write_frontier = existing.write_frontier.clone();

// Determine the relevant read capabilities on the new collection.
//
// Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
// here, but that only installed a ReadHold on the new collection for the implied
// capability of the existing collection. This would cause runtime panics because it
// would eventually result in negative read capabilities.
let mut changes = ChangeBatch::new();
changes.extend(
existing_read_capabilities
.frontier()
.iter()
.map(|t| (t.clone(), 1)),
changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));

// Note: The new collection is now the "primary collection" so we specify `None` here.
let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
let collection_meta = CollectionMetadata {
persist_location: self.persist_location.clone(),
relation_desc: collection_desc.desc.clone(),
data_shard,
// TODO(alter_table): Support changes to sources.
remap_shard: None,
txns_shard: Some(self.txns_read.txns_id().clone()),
};
let collection_state = CollectionState::new(
collection_desc,
implied_capability,
write_frontier,
Vec::new(),
collection_meta,
);

// Add a record of the new collection.
self_collections.insert(new_collection, collection_state);

let mut updates = BTreeMap::from([(new_collection, changes)]);
StorageCollectionsImpl::update_read_capabilities_inner(
&self.cmd_tx,
Expand Down