@@ -2232,7 +2232,7 @@ where
2232
2232
new_desc : RelationDesc ,
2233
2233
expected_version : RelationVersion ,
2234
2234
) -> Result < ( ) , StorageError < Self :: Timestamp > > {
2235
- let ( data_shard, existing_write_frontier , existing_read_capabilities ) = {
2235
+ let data_shard = {
2236
2236
let self_collections = self . collections . lock ( ) . expect ( "lock poisoned" ) ;
2237
2237
let existing = self_collections
2238
2238
. get ( & existing_collection)
@@ -2243,11 +2243,7 @@ where
2243
2243
return Err ( StorageError :: IdentifierInvalid ( existing_collection) ) ;
2244
2244
}
2245
2245
2246
- (
2247
- existing. collection_metadata . data_shard ,
2248
- existing. write_frontier . clone ( ) ,
2249
- existing. read_capabilities . clone ( ) ,
2250
- )
2246
+ existing. collection_metadata . data_shard
2251
2247
} ;
2252
2248
2253
2249
let persist_client = self
@@ -2319,46 +2315,11 @@ where
2319
2315
// TODO(alter_table): Do we need to advance the since of the table to match the time this
2320
2316
// new version was registered with txn-wal?
2321
2317
2322
- // Because this is a Table, we know it's managed by txn_wal, and thus it's logical write
2323
- // frontier is probably in advance of the write_handle's upper. So we fast forward the
2324
- // write frontier to match that of the existing collection.
2325
- let write_frontier =
2326
- if PartialOrder :: less_equal ( write_handle. upper ( ) , & existing_write_frontier) {
2327
- existing_write_frontier
2328
- } else {
2329
- mz_ore:: soft_panic_or_log!(
2330
- "WriteHandle frontier in advance of logical write frontier, {:?} vs {:?}" ,
2331
- write_handle. upper( ) ,
2332
- existing_write_frontier
2333
- ) ;
2334
- write_handle. upper ( ) . clone ( )
2335
- } ;
2336
-
2337
- // Note: The new collection is now the "primary collection" so we specify `None` here.
2338
- let collection_desc = CollectionDescription :: < T > :: for_table ( new_desc. clone ( ) , None ) ;
2339
- let collection_meta = CollectionMetadata {
2340
- persist_location : self . persist_location . clone ( ) ,
2341
- relation_desc : collection_desc. desc . clone ( ) ,
2342
- data_shard,
2343
- // TODO(alter_table): Support changes to sources.
2344
- remap_shard : None ,
2345
- txns_shard : Some ( self . txns_read . txns_id ( ) . clone ( ) ) ,
2346
- } ;
2347
- let collection_state = CollectionState :: new (
2348
- collection_desc,
2349
- since_handle. since ( ) . clone ( ) ,
2350
- write_frontier,
2351
- Vec :: new ( ) ,
2352
- collection_meta,
2353
- ) ;
2354
-
2355
2318
// Great! Our new schema is registered with Persist, now we need to update our internal
2356
2319
// data structures.
2357
2320
{
2358
2321
let mut self_collections = self . collections . lock ( ) . expect ( "lock poisoned" ) ;
2359
2322
2360
- // Add a record of the new collection.
2361
- self_collections. insert ( new_collection, collection_state) ;
2362
2323
// Update the existing collection so we know it's a "projection" of this new one.
2363
2324
let existing = self_collections
2364
2325
. get_mut ( & existing_collection)
@@ -2376,19 +2337,45 @@ where
2376
2337
} ;
2377
2338
existing. storage_dependencies . push ( new_collection) ;
2378
2339
2379
- // Install the relevant read capabilities on the new collection.
2340
+ let implied_capability = existing. implied_capability . clone ( ) ;
2341
+ let write_frontier = existing. write_frontier . clone ( ) ;
2342
+
2343
+ // Determine the relevant read capabilities on the new collection.
2380
2344
//
2381
2345
// Note(parkmycar): Originally we used `install_collection_dependency_read_holds_inner`
2382
2346
// here, but that only installed a ReadHold on the new collection for the implied
2383
2347
// capability of the existing collection. This would cause runtime panics because it
2384
2348
// would eventually result in negative read capabilities.
2385
2349
let mut changes = ChangeBatch :: new ( ) ;
2386
2350
changes. extend (
2387
- existing_read_capabilities
2351
+ existing
2352
+ . read_capabilities
2388
2353
. frontier ( )
2389
2354
. iter ( )
2390
2355
. map ( |t| ( t. clone ( ) , 1 ) ) ,
2391
2356
) ;
2357
+
2358
+ // Note: The new collection is now the "primary collection" so we specify `None` here.
2359
+ let collection_desc = CollectionDescription :: for_table ( new_desc. clone ( ) , None ) ;
2360
+ let collection_meta = CollectionMetadata {
2361
+ persist_location : self . persist_location . clone ( ) ,
2362
+ relation_desc : collection_desc. desc . clone ( ) ,
2363
+ data_shard,
2364
+ // TODO(alter_table): Support changes to sources.
2365
+ remap_shard : None ,
2366
+ txns_shard : Some ( self . txns_read . txns_id ( ) . clone ( ) ) ,
2367
+ } ;
2368
+ let collection_state = CollectionState :: new (
2369
+ collection_desc,
2370
+ implied_capability,
2371
+ write_frontier,
2372
+ Vec :: new ( ) ,
2373
+ collection_meta,
2374
+ ) ;
2375
+
2376
+ // Add a record of the new collection.
2377
+ self_collections. insert ( new_collection, collection_state) ;
2378
+
2392
2379
let mut updates = BTreeMap :: from ( [ ( new_collection, changes) ] ) ;
2393
2380
StorageCollectionsImpl :: update_read_capabilities_inner (
2394
2381
& self . cmd_tx ,
0 commit comments