@@ -2232,7 +2232,7 @@ where
22322232 new_desc : RelationDesc ,
22332233 expected_version : RelationVersion ,
22342234 ) -> Result < ( ) , StorageError < Self :: Timestamp > > {
2235- let ( data_shard, existing_write_frontier , existing_read_capabilities ) = {
2235+ let data_shard = {
22362236 let self_collections = self . collections . lock ( ) . expect ( "lock poisoned" ) ;
22372237 let existing = self_collections
22382238 . get ( & existing_collection)
@@ -2243,11 +2243,7 @@ where
22432243 return Err ( StorageError :: IdentifierInvalid ( existing_collection) ) ;
22442244 }
22452245
2246- (
2247- existing. collection_metadata . data_shard ,
2248- existing. write_frontier . clone ( ) ,
2249- existing. read_capabilities . clone ( ) ,
2250- )
2246+ existing. collection_metadata . data_shard
22512247 } ;
22522248
22532249 let persist_client = self
@@ -2319,46 +2315,11 @@ where
23192315 // TODO(alter_table): Do we need to advance the since of the table to match the time this
23202316 // new version was registered with txn-wal?
23212317
2322- // Because this is a Table, we know it's managed by txn_wal, and thus it's logical write
2323- // frontier is probably in advance of the write_handle's upper. So we fast forward the
2324- // write frontier to match that of the existing collection.
2325- let write_frontier =
2326- if PartialOrder :: less_equal ( write_handle. upper ( ) , & existing_write_frontier) {
2327- existing_write_frontier
2328- } else {
2329- mz_ore:: soft_panic_or_log!(
2330- "WriteHandle frontier in advance of logical write frontier, {:?} vs {:?}" ,
2331- write_handle. upper( ) ,
2332- existing_write_frontier
2333- ) ;
2334- write_handle. upper ( ) . clone ( )
2335- } ;
2336-
2337- // Note: The new collection is now the "primary collection" so we specify `None` here.
2338- let collection_desc = CollectionDescription :: < T > :: for_table ( new_desc. clone ( ) , None ) ;
2339- let collection_meta = CollectionMetadata {
2340- persist_location : self . persist_location . clone ( ) ,
2341- relation_desc : collection_desc. desc . clone ( ) ,
2342- data_shard,
2343- // TODO(alter_table): Support changes to sources.
2344- remap_shard : None ,
2345- txns_shard : Some ( self . txns_read . txns_id ( ) . clone ( ) ) ,
2346- } ;
2347- let collection_state = CollectionState :: new (
2348- collection_desc,
2349- since_handle. since ( ) . clone ( ) ,
2350- write_frontier,
2351- Vec :: new ( ) ,
2352- collection_meta,
2353- ) ;
2354-
23552318 // Great! Our new schema is registered with Persist, now we need to update our internal
23562319 // data structures.
23572320 {
23582321 let mut self_collections = self . collections . lock ( ) . expect ( "lock poisoned" ) ;
23592322
2360- // Add a record of the new collection.
2361- self_collections. insert ( new_collection, collection_state) ;
23622323 // Update the existing collection so we know it's a "projection" of this new one.
23632324 let existing = self_collections
23642325 . get_mut ( & existing_collection)
@@ -2376,19 +2337,42 @@ where
23762337 } ;
23772338 existing. storage_dependencies . push ( new_collection) ;
23782339
2379- // Install the relevant read capabilities on the new collection.
2340+ // Copy over the frontiers from the previous version.
2341+ // The new table starts with two holds - the implied capability, and the hold from
2342+ // the previous version - both at the previous version's read frontier.
2343+ let implied_capability = existing. read_capabilities . frontier ( ) . to_owned ( ) ;
2344+ let write_frontier = existing. write_frontier . clone ( ) ;
2345+
2346+ // Determine the relevant read capabilities on the new collection.
23802347 //
23812348 // Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
23822349 // here, but that only installed a ReadHold on the new collection for the implied
23832350 // capability of the existing collection. This would cause runtime panics because it
23842351 // would eventually result in negative read capabilities.
23852352 let mut changes = ChangeBatch :: new ( ) ;
2386- changes. extend (
2387- existing_read_capabilities
2388- . frontier ( )
2389- . iter ( )
2390- . map ( |t| ( t. clone ( ) , 1 ) ) ,
2353+ changes. extend ( implied_capability. iter ( ) . map ( |t| ( t. clone ( ) , 1 ) ) ) ;
2354+
2355+ // Note: The new collection is now the "primary collection" so we specify `None` here.
2356+ let collection_desc = CollectionDescription :: for_table ( new_desc. clone ( ) , None ) ;
2357+ let collection_meta = CollectionMetadata {
2358+ persist_location : self . persist_location . clone ( ) ,
2359+ relation_desc : collection_desc. desc . clone ( ) ,
2360+ data_shard,
2361+ // TODO(alter_table): Support changes to sources.
2362+ remap_shard : None ,
2363+ txns_shard : Some ( self . txns_read . txns_id ( ) . clone ( ) ) ,
2364+ } ;
2365+ let collection_state = CollectionState :: new (
2366+ collection_desc,
2367+ implied_capability,
2368+ write_frontier,
2369+ Vec :: new ( ) ,
2370+ collection_meta,
23912371 ) ;
2372+
2373+ // Add a record of the new collection.
2374+ self_collections. insert ( new_collection, collection_state) ;
2375+
23922376 let mut updates = BTreeMap :: from ( [ ( new_collection, changes) ] ) ;
23932377 StorageCollectionsImpl :: update_read_capabilities_inner (
23942378 & self . cmd_tx ,
0 commit comments