Skip to content

Commit 4ec4d3a

Browse files
committed
Use the freshest frontiers when altering table
1 parent 3eac567 commit 4ec4d3a

File tree

1 file changed

+31
-47
lines changed

1 file changed

+31
-47
lines changed

Diff for: src/storage-client/src/storage_collections.rs

+31-47
Original file line numberDiff line numberDiff line change
@@ -2232,7 +2232,7 @@ where
22322232
new_desc: RelationDesc,
22332233
expected_version: RelationVersion,
22342234
) -> Result<(), StorageError<Self::Timestamp>> {
2235-
let (data_shard, existing_write_frontier, existing_read_capabilities) = {
2235+
let data_shard = {
22362236
let self_collections = self.collections.lock().expect("lock poisoned");
22372237
let existing = self_collections
22382238
.get(&existing_collection)
@@ -2243,11 +2243,7 @@ where
22432243
return Err(StorageError::IdentifierInvalid(existing_collection));
22442244
}
22452245

2246-
(
2247-
existing.collection_metadata.data_shard,
2248-
existing.write_frontier.clone(),
2249-
existing.read_capabilities.clone(),
2250-
)
2246+
existing.collection_metadata.data_shard
22512247
};
22522248

22532249
let persist_client = self
@@ -2319,46 +2315,11 @@ where
23192315
// TODO(alter_table): Do we need to advance the since of the table to match the time this
23202316
// new version was registered with txn-wal?
23212317

2322-
// Because this is a Table, we know it's managed by txn_wal, and thus it's logical write
2323-
// frontier is probably in advance of the write_handle's upper. So we fast forward the
2324-
// write frontier to match that of the existing collection.
2325-
let write_frontier =
2326-
if PartialOrder::less_equal(write_handle.upper(), &existing_write_frontier) {
2327-
existing_write_frontier
2328-
} else {
2329-
mz_ore::soft_panic_or_log!(
2330-
"WriteHandle frontier in advance of logical write frontier, {:?} vs {:?}",
2331-
write_handle.upper(),
2332-
existing_write_frontier
2333-
);
2334-
write_handle.upper().clone()
2335-
};
2336-
2337-
// Note: The new collection is now the "primary collection" so we specify `None` here.
2338-
let collection_desc = CollectionDescription::<T>::for_table(new_desc.clone(), None);
2339-
let collection_meta = CollectionMetadata {
2340-
persist_location: self.persist_location.clone(),
2341-
relation_desc: collection_desc.desc.clone(),
2342-
data_shard,
2343-
// TODO(alter_table): Support changes to sources.
2344-
remap_shard: None,
2345-
txns_shard: Some(self.txns_read.txns_id().clone()),
2346-
};
2347-
let collection_state = CollectionState::new(
2348-
collection_desc,
2349-
since_handle.since().clone(),
2350-
write_frontier,
2351-
Vec::new(),
2352-
collection_meta,
2353-
);
2354-
23552318
// Great! Our new schema is registered with Persist, now we need to update our internal
23562319
// data structures.
23572320
{
23582321
let mut self_collections = self.collections.lock().expect("lock poisoned");
23592322

2360-
// Add a record of the new collection.
2361-
self_collections.insert(new_collection, collection_state);
23622323
// Update the existing collection so we know it's a "projection" of this new one.
23632324
let existing = self_collections
23642325
.get_mut(&existing_collection)
@@ -2376,19 +2337,42 @@ where
23762337
};
23772338
existing.storage_dependencies.push(new_collection);
23782339

2379-
// Install the relevant read capabilities on the new collection.
2340+
// Copy over the frontiers from the previous version.
2341+
// The new table starts with two holds - the implied capability, and the hold from
2342+
// the previous version - both at the previous version's read frontier.
2343+
let implied_capability = existing.read_capabilities.frontier().to_owned();
2344+
let write_frontier = existing.write_frontier.clone();
2345+
2346+
// Determine the relevant read capabilities on the new collection.
23802347
//
23812348
// Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
23822349
// here, but that only installed a ReadHold on the new collection for the implied
23832350
// capability of the existing collection. This would cause runtime panics because it
23842351
// would eventually result in negative read capabilities.
23852352
let mut changes = ChangeBatch::new();
2386-
changes.extend(
2387-
existing_read_capabilities
2388-
.frontier()
2389-
.iter()
2390-
.map(|t| (t.clone(), 1)),
2353+
changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
2354+
2355+
// Note: The new collection is now the "primary collection" so we specify `None` here.
2356+
let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2357+
let collection_meta = CollectionMetadata {
2358+
persist_location: self.persist_location.clone(),
2359+
relation_desc: collection_desc.desc.clone(),
2360+
data_shard,
2361+
// TODO(alter_table): Support changes to sources.
2362+
remap_shard: None,
2363+
txns_shard: Some(self.txns_read.txns_id().clone()),
2364+
};
2365+
let collection_state = CollectionState::new(
2366+
collection_desc,
2367+
implied_capability,
2368+
write_frontier,
2369+
Vec::new(),
2370+
collection_meta,
23912371
);
2372+
2373+
// Add a record of the new collection.
2374+
self_collections.insert(new_collection, collection_state);
2375+
23922376
let mut updates = BTreeMap::from([(new_collection, changes)]);
23932377
StorageCollectionsImpl::update_read_capabilities_inner(
23942378
&self.cmd_tx,

0 commit comments

Comments
 (0)