Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ ALL_TESTS = [
"//pkg/sql/auditlogging:auditlogging_test",
"//pkg/sql/backfill:backfill_test",
"//pkg/sql/bulkingest:bulkingest_test",
"//pkg/sql/bulkmerge:bulkmerge_test",
"//pkg/sql/bulksst:bulksst_test",
"//pkg/sql/bulkutil:bulkutil_test",
"//pkg/sql/cacheutil:cacheutil_test",
Expand Down Expand Up @@ -1885,6 +1886,8 @@ GO_TARGETS = [
"//pkg/sql/backfill:backfill_test",
"//pkg/sql/bulkingest:bulkingest",
"//pkg/sql/bulkingest:bulkingest_test",
"//pkg/sql/bulkmerge:bulkmerge",
"//pkg/sql/bulkmerge:bulkmerge_test",
"//pkg/sql/bulksst:bulksst",
"//pkg/sql/bulksst:bulksst_test",
"//pkg/sql/bulkutil:bulkutil",
Expand Down
1 change: 0 additions & 1 deletion pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catenumpb",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/dbdesc",
Expand Down
32 changes: 2 additions & 30 deletions pkg/backup/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package backup

import (
"context"
"fmt"
"hash/fnv"
"math/rand"
"strings"
Expand All @@ -22,9 +21,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -249,15 +248,6 @@ func (s dbSplitAndScatterer) findDestination(res *kvpb.AdminScatterResponse) roa
return roachpb.NodeID(0)
}

func routingDatumsForSQLInstance(
sqlInstanceID base.SQLInstanceID,
) (rowenc.EncDatum, rowenc.EncDatum) {
routingBytes := roachpb.Key(fmt.Sprintf("node%d", sqlInstanceID))
startDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes)))
endDatum := rowenc.DatumToEncDatumUnsafe(types.Bytes, tree.NewDBytes(tree.DBytes(routingBytes.Next())))
return startDatum, endDatum
}

type entryNode struct {
entry execinfrapb.RestoreSpanEntry
node roachpb.NodeID
Expand Down Expand Up @@ -393,7 +383,7 @@ func (gssp *generativeSplitAndScatterProcessor) Next() (
// The routing datums informs the router which output stream should be used.
routingDatum, ok := gssp.routingDatumCache.getRoutingDatum(scatteredEntry.node)
if !ok {
routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
routingDatum, _ = physicalplan.RoutingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node))
gssp.routingDatumCache.putRoutingDatum(scatteredEntry.node, routingDatum)
}

Expand Down Expand Up @@ -745,24 +735,6 @@ var splitAndScatterOutputTypes = []*types.T{
types.Bytes, // RestoreDataEntry bytes
}

// routingSpanForSQLInstance provides the mapping to be used during distsql planning
// when setting up the output router.
func routingSpanForSQLInstance(sqlInstanceID base.SQLInstanceID) ([]byte, []byte, error) {
var alloc tree.DatumAlloc
startDatum, endDatum := routingDatumsForSQLInstance(sqlInstanceID)

startBytes, endBytes := make([]byte, 0), make([]byte, 0)
startBytes, err := startDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, startBytes)
if err != nil {
return nil, nil, err
}
endBytes, err = endDatum.Encode(splitAndScatterOutputTypes[0], &alloc, catenumpb.DatumEncoding_ASCENDING_KEY, endBytes)
if err != nil {
return nil, nil, err
}
return startBytes, endBytes, nil
}

func init() {
rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor
}
33 changes: 3 additions & 30 deletions pkg/backup/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
package backup

import (
"bytes"
"context"
"math"
"slices"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
Expand All @@ -19,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -123,34 +120,10 @@ func distRestore(
// Plan SplitAndScatter on the coordinator node.
splitAndScatterStageID := p.NewStageOnNodes(sqlInstanceIDs)

defaultStream := int32(0)
rangeRouterSpec := execinfrapb.OutputRouterSpec_RangeRouterSpec{
Spans: nil,
DefaultDest: &defaultStream,
Encodings: []execinfrapb.OutputRouterSpec_RangeRouterSpec_ColumnEncoding{
{
Column: 0,
Encoding: catenumpb.DatumEncoding_ASCENDING_KEY,
},
},
}
for stream, sqlInstanceID := range sqlInstanceIDs {
startBytes, endBytes, err := routingSpanForSQLInstance(sqlInstanceID)
if err != nil {
return nil, nil, err
}

span := execinfrapb.OutputRouterSpec_RangeRouterSpec_Span{
Start: startBytes,
End: endBytes,
Stream: int32(stream),
}
rangeRouterSpec.Spans = append(rangeRouterSpec.Spans, span)
rangeRouterSpec, err := physicalplan.MakeInstanceRouter(sqlInstanceIDs)
if err != nil {
return nil, nil, err
}
// The router expects the spans to be sorted.
slices.SortFunc(rangeRouterSpec.Spans, func(a, b execinfrapb.OutputRouterSpec_RangeRouterSpec_Span) int {
return bytes.Compare(a.Start, b.Start)
})

// TODO(pbardea): This not super principled. I just wanted something that
// wasn't a constant and grew slower than linear with the length of
Expand Down
57 changes: 57 additions & 0 deletions pkg/sql/bulkmerge/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2025 The Cockroach Authors.
#
# Use of this software is governed by the CockroachDB Software License
# included in the /LICENSE file.

load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "bulkmerge",
srcs = [
"merge_coordinator.go",
"merge_loopback.go",
"merge_planning.go",
"merge_processor.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/bulkmerge",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/sql",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "bulkmerge_test",
srcs = [
"main_test.go",
"merge_processor_test.go",
"merge_test.go",
],
embed = [":bulkmerge"],
deps = [
"//pkg/base",
"//pkg/kv/kvclient/kvtenant",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/sem/tree",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
)
30 changes: 30 additions & 0 deletions pkg/sql/bulkmerge/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package bulkmerge

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

//go:generate ../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
securityassets.SetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
kvtenant.InitTestConnectorFactory()
os.Exit(m.Run())
}
87 changes: 87 additions & 0 deletions pkg/sql/bulkmerge/merge_coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package bulkmerge

import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

var (
_ execinfra.Processor = &mergeCoordinator{}
_ execinfra.RowSource = &mergeCoordinator{}
)

// Emits a single row on completion which is a protobuf containing the details
// of the merged SSTs. The protobuf is BulkMergeSpec_Output, which contains the
// list of output SSTs with their URIs and key ranges.
var mergeCoordinatorOutputTypes = []*types.T{
types.Bytes,
}

type mergeCoordinator struct {
execinfra.ProcessorBase
input execinfra.RowSource
}

// Next implements execinfra.RowSource.
func (m *mergeCoordinator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for m.State == execinfra.StateRunning {
row, meta := m.input.Next()
switch {
case row == nil && meta == nil:
m.MoveToDraining(nil /* err */)
case meta != nil && meta.Err != nil:
m.MoveToDraining(meta.Err)
case meta != nil:
m.MoveToDraining(errors.Newf("unexpected meta: %v", meta))
case row != nil:
base := *row[2].Datum.(*tree.DBytes)
return rowenc.EncDatumRow{
rowenc.EncDatum{Datum: tree.NewDBytes(base + "->coordinator")},
}, nil
}
}
return nil, m.DrainHelper()
}

// Start implements execinfra.RowSource.
func (m *mergeCoordinator) Start(ctx context.Context) {
m.StartInternal(ctx, "mergeCoordinator")
m.input.Start(ctx)
}

func init() {
rowexec.NewMergeCoordinatorProcessor = func(
ctx context.Context,
flow *execinfra.FlowCtx,
flowID int32,
spec execinfrapb.MergeCoordinatorSpec,
postSpec *execinfrapb.PostProcessSpec,
input execinfra.RowSource,
) (execinfra.Processor, error) {
mc := &mergeCoordinator{
input: input,
}
err := mc.Init(
ctx, mc, postSpec, mergeCoordinatorOutputTypes, flow, flowID, nil,
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{input},
},
)
if err != nil {
return nil, err
}
return mc, nil
}
}
Loading
Loading