Skip to content

Commit f228c7c

Browse files
committed
catalog: add API for replicating catalogs across tenants for PCR
To add support for replicating catalogs between tenants to support PCR this patch will introduce a new public API SetupOrAdvanceStandbyReaderCatalog, which will copy descriptors and set external data as needed. These schema objects then can be used for read only operations. Additionally, tables will be turned into materialized vies with this change, so that DML cannot be executed on them. Fixes: #129439 Release note: None
1 parent 7467861 commit f228c7c

File tree

5 files changed

+435
-0
lines changed

5 files changed

+435
-0
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ ALL_TESTS = [
393393
"//pkg/sql/catalog/nstree:nstree_test",
394394
"//pkg/sql/catalog/randgen:randgen_test",
395395
"//pkg/sql/catalog/redact:redact_test",
396+
"//pkg/sql/catalog/replication:replication_test",
396397
"//pkg/sql/catalog/resolver:resolver_test",
397398
"//pkg/sql/catalog/schemadesc:schemadesc_test",
398399
"//pkg/sql/catalog/schemaexpr:schemaexpr_test",
@@ -1815,6 +1816,8 @@ GO_TARGETS = [
18151816
"//pkg/sql/catalog/randgen:randgen_test",
18161817
"//pkg/sql/catalog/redact:redact",
18171818
"//pkg/sql/catalog/redact:redact_test",
1819+
"//pkg/sql/catalog/replication:replication",
1820+
"//pkg/sql/catalog/replication:replication_test",
18181821
"//pkg/sql/catalog/resolver:resolver",
18191822
"//pkg/sql/catalog/resolver:resolver_test",
18201823
"//pkg/sql/catalog/rewrite:rewrite",
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "replication",
5+
srcs = ["reader_catalog.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/replication",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/keys",
10+
"//pkg/kv",
11+
"//pkg/roachpb",
12+
"//pkg/settings/cluster",
13+
"//pkg/sql/catalog",
14+
"//pkg/sql/catalog/dbdesc",
15+
"//pkg/sql/catalog/descpb",
16+
"//pkg/sql/catalog/descs",
17+
"//pkg/sql/catalog/funcdesc",
18+
"//pkg/sql/catalog/nstree",
19+
"//pkg/sql/catalog/schemadesc",
20+
"//pkg/sql/catalog/tabledesc",
21+
"//pkg/sql/catalog/typedesc",
22+
"//pkg/util/hlc",
23+
"//pkg/util/protoutil",
24+
"@com_github_cockroachdb_errors//:errors",
25+
],
26+
)
27+
28+
go_test(
29+
name = "replication_test",
30+
srcs = ["reader_catalog_test.go"],
31+
deps = [
32+
":replication",
33+
"//pkg/base",
34+
"//pkg/security/securityassets",
35+
"//pkg/security/securitytest",
36+
"//pkg/server",
37+
"//pkg/sql",
38+
"//pkg/testutils/serverutils",
39+
"//pkg/testutils/sqlutils",
40+
"//pkg/testutils/testcluster",
41+
"//pkg/util/randutil",
42+
"@com_github_stretchr_testify//require",
43+
],
44+
)
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
// Copyright 2024 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package replication
12+
13+
import (
14+
"context"
15+
16+
"github.com/cockroachdb/cockroach/pkg/keys"
17+
"github.com/cockroachdb/cockroach/pkg/kv"
18+
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
20+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
21+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
22+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
23+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
24+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc"
25+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
26+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
27+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
28+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
29+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
30+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
31+
"github.com/cockroachdb/errors"
32+
)
33+
34+
// SetupOrAdvanceStandbyReaderCatalog when invoked inside the reader
35+
// tenant will replicate the descriptors from the tenant specified
36+
// by fromID. The replicated descriptors will be setup such that they
37+
// will access data from fromID. If the descriptors are already replicated
38+
// then this function will advance the timestamp.
39+
func SetupOrAdvanceStandbyReaderCatalog(
40+
ctx context.Context,
41+
fromID roachpb.TenantID,
42+
asOf hlc.Timestamp,
43+
descsCol descs.DB,
44+
st *cluster.Settings,
45+
) error {
46+
extracted, err := getCatalogForTenantAsOf(ctx, st, descsCol.KV(), fromID, asOf)
47+
if err != nil {
48+
return err
49+
}
50+
return descsCol.DescsTxn(
51+
ctx, func(ctx context.Context, txn descs.Txn) error {
52+
// Track which descriptors / namespaces that have been updated,
53+
// the difference between any existing tenant in the reader
54+
// catalog will be deleted (i.e. these are descriptors that exist
55+
// in the reader tenant, but not in the from tenant which we are
56+
// replicating).
57+
descriptorsUpdated := catalog.DescriptorIDSet{}
58+
namespaceUpdated := catalog.DescriptorIDSet{}
59+
allExistingDescs, err := txn.Descriptors().GetAll(ctx, txn.KV())
60+
if err != nil {
61+
return err
62+
}
63+
// Resolve any existing descriptors within the tenant, which
64+
// will be use to compute old values for writing.
65+
b := txn.KV().NewBatch()
66+
if err := extracted.ForEachDescriptor(func(fromDesc catalog.Descriptor) error {
67+
if !shouldSetupForReader(fromDesc.GetID(), fromDesc.GetParentID()) {
68+
return nil
69+
}
70+
// Track this descriptor was updated.
71+
descriptorsUpdated.Add(fromDesc.GetID())
72+
// If there is an existing descriptor with the same ID, we should
73+
// determine the old bytes in storage for the upsert.
74+
var existingRawBytes []byte
75+
existingDesc, err := txn.Descriptors().MutableByID(txn.KV()).Desc(ctx, fromDesc.GetID())
76+
if err == nil {
77+
existingRawBytes = existingDesc.GetRawBytesInStorage()
78+
} else if errors.Is(err, catalog.ErrDescriptorNotFound) {
79+
err = nil
80+
} else {
81+
return err
82+
}
83+
// Existing descriptor should never be a system descriptor.
84+
if existingDesc != nil &&
85+
existingDesc.GetParentID() != fromDesc.GetParentID() {
86+
return errors.AssertionFailedf("existing descriptor in the reader catalog "+
87+
"collides with a descriptor in the from tenant, with differring parent databases.\n"+
88+
"existing descriptor %s (id: %d, parentID: %d)\n "+
89+
"from descriptor: %s (id: %d, parentID: %d)\n",
90+
existingDesc.GetName(), existingDesc.GetID(), existingDesc.GetParentID(),
91+
fromDesc.GetName(), fromDesc.GetID(), fromDesc.GetParentID())
92+
}
93+
var mut catalog.MutableDescriptor
94+
switch t := fromDesc.DescriptorProto().GetUnion().(type) {
95+
case *descpb.Descriptor_Table:
96+
t.Table.Version = 1
97+
var mutBuilder tabledesc.TableDescriptorBuilder
98+
var mutTbl *tabledesc.Mutable
99+
if existingRawBytes != nil {
100+
t.Table.Version = existingDesc.GetVersion()
101+
mutBuilder = existingDesc.NewBuilder().(tabledesc.TableDescriptorBuilder)
102+
mutTbl = mutBuilder.BuildExistingMutableTable()
103+
mutTbl.TableDescriptor = *protoutil.Clone(t.Table).(*descpb.TableDescriptor)
104+
} else {
105+
mutBuilder = tabledesc.NewBuilder(t.Table)
106+
mutTbl = mutBuilder.BuildCreatedMutableTable()
107+
}
108+
mut = mutTbl
109+
// Convert any physical tables into external row tables.
110+
// Note: Materialized views will be converted, but their
111+
// view definition will be wiped.
112+
if mutTbl.IsPhysicalTable() {
113+
mutTbl.ViewQuery = ""
114+
mutTbl.SetExternalRowData(&descpb.ExternalRowData{TenantID: fromID, TableID: fromDesc.GetID(), AsOf: asOf})
115+
}
116+
case *descpb.Descriptor_Database:
117+
t.Database.Version = 1
118+
var mutBuilder dbdesc.DatabaseDescriptorBuilder
119+
if existingRawBytes != nil {
120+
t.Database.Version = existingDesc.GetVersion()
121+
mutBuilder = existingDesc.NewBuilder().(dbdesc.DatabaseDescriptorBuilder)
122+
mutDB := mutBuilder.BuildExistingMutableDatabase()
123+
mutDB.DatabaseDescriptor = *protoutil.Clone(t.Database).(*descpb.DatabaseDescriptor)
124+
mut = mutDB
125+
} else {
126+
mutBuilder = dbdesc.NewBuilder(t.Database)
127+
mut = mutBuilder.BuildCreatedMutable()
128+
}
129+
case *descpb.Descriptor_Schema:
130+
t.Schema.Version = 1
131+
var mutBuilder schemadesc.SchemaDescriptorBuilder
132+
if existingRawBytes != nil {
133+
t.Schema.Version = existingDesc.GetVersion()
134+
mutBuilder = existingDesc.NewBuilder().(schemadesc.SchemaDescriptorBuilder)
135+
mutSchema := mutBuilder.BuildExistingMutableSchema()
136+
mutSchema.SchemaDescriptor = *protoutil.Clone(t.Schema).(*descpb.SchemaDescriptor)
137+
mut = mutSchema
138+
} else {
139+
mutBuilder = schemadesc.NewBuilder(t.Schema)
140+
mut = mutBuilder.BuildCreatedMutable()
141+
}
142+
case *descpb.Descriptor_Function:
143+
t.Function.Version = 1
144+
var mutBuilder funcdesc.FunctionDescriptorBuilder
145+
if existingRawBytes != nil {
146+
t.Function.Version = existingDesc.GetVersion()
147+
mutBuilder = existingDesc.NewBuilder().(funcdesc.FunctionDescriptorBuilder)
148+
mutFunction := mutBuilder.BuildExistingMutableFunction()
149+
mutFunction.FunctionDescriptor = *protoutil.Clone(t.Function).(*descpb.FunctionDescriptor)
150+
mut = mutFunction
151+
} else {
152+
mutBuilder = funcdesc.NewBuilder(t.Function)
153+
mut = mutBuilder.BuildCreatedMutable()
154+
}
155+
case *descpb.Descriptor_Type:
156+
t.Type.Version = 1
157+
var mutBuilder typedesc.TypeDescriptorBuilder
158+
if existingRawBytes != nil {
159+
t.Type.Version = existingDesc.GetVersion()
160+
mutBuilder = existingDesc.NewBuilder().(typedesc.TypeDescriptorBuilder)
161+
mutType := mutBuilder.BuildExistingMutableType()
162+
mutType.TypeDescriptor = *protoutil.Clone(t.Type).(*descpb.TypeDescriptor)
163+
mut = mutType
164+
} else {
165+
mutBuilder = typedesc.NewBuilder(t.Type)
166+
mut = mutBuilder.BuildCreatedMutable()
167+
}
168+
default:
169+
return errors.AssertionFailedf("unknown descriptor type: %T", t)
170+
}
171+
return errors.Wrapf(txn.Descriptors().WriteDescToBatch(ctx, true, mut, b),
172+
"unable to create replicated descriptor: %d %T", mut.GetID(), mut)
173+
}); err != nil {
174+
return err
175+
}
176+
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
177+
if !shouldSetupForReader(e.GetID(), e.GetParentID()) {
178+
return nil
179+
}
180+
namespaceUpdated.Add(e.GetID())
181+
return errors.Wrapf(txn.Descriptors().UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e)
182+
}); err != nil {
183+
return err
184+
}
185+
// Figure out which descriptors should be deleted.
186+
if err := allExistingDescs.ForEachDescriptor(func(desc catalog.Descriptor) error {
187+
// Skip descriptors that were updated above
188+
if !shouldSetupForReader(desc.GetID(), desc.GetParentID()) ||
189+
descriptorsUpdated.Contains(desc.GetID()) {
190+
return nil
191+
}
192+
// Delete the descriptor from the batch
193+
return errors.Wrapf(txn.Descriptors().DeleteDescToBatch(ctx, true, desc.GetID(), b),
194+
"deleting descriptor")
195+
}); err != nil {
196+
return err
197+
}
198+
// Figure out which namespaces should be deleted.
199+
if err := allExistingDescs.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
200+
// Skip descriptors that were updated above
201+
if !shouldSetupForReader(e.GetID(), e.GetParentID()) ||
202+
descriptorsUpdated.Contains(e.GetID()) {
203+
return nil
204+
}
205+
return errors.Wrapf(txn.Descriptors().DeleteNamespaceEntryToBatch(ctx, true, e, b),
206+
"deleting namespace")
207+
}); err != nil {
208+
return err
209+
}
210+
return errors.Wrap(txn.KV().Run(ctx, b), "executing bach for updating catalog")
211+
})
212+
}
213+
214+
// shouldSetupForReader determines if a descriptor should be setup
215+
// access via external row data.
216+
func shouldSetupForReader(id descpb.ID, parentID descpb.ID) bool {
217+
switch id {
218+
case keys.UsersTableID, keys.RoleMembersTableID, keys.RoleOptionsTableID,
219+
keys.DatabaseRoleSettingsTableID, keys.TableStatisticsTableID:
220+
return true
221+
default:
222+
return parentID != keys.SystemDatabaseID &&
223+
id != keys.SystemDatabaseID
224+
}
225+
}
226+
227+
// getCatalogForTenantAsOf reads the descriptors from a given tenant
228+
// at the given timestamp.
229+
func getCatalogForTenantAsOf(
230+
ctx context.Context,
231+
st *cluster.Settings,
232+
db *kv.DB,
233+
tenantID roachpb.TenantID,
234+
asOf hlc.Timestamp,
235+
) (all nstree.Catalog, _ error) {
236+
cf := descs.NewBareBonesCollectionFactory(st, keys.MakeSQLCodec(tenantID))
237+
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
238+
err := txn.SetFixedTimestamp(ctx, asOf)
239+
if err != nil {
240+
return err
241+
}
242+
descsCol := cf.NewCollection(ctx)
243+
defer descsCol.ReleaseAll(ctx)
244+
all, err = descsCol.GetAllFromStorageUnvalidated(ctx, txn)
245+
if err != nil {
246+
return err
247+
}
248+
249+
return nil
250+
})
251+
return all, err
252+
}

0 commit comments

Comments
 (0)