Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: auto-tune closed timestamp updates #141832

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ feature.stats.enabled boolean true set to true to enable CREATE STATISTICS/ANALY
jobs.retention_time duration 336h0m0s the amount of time for which records for completed jobs are retained application
kv.bulk_sst.target_size byte size 16 MiB target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory system-visible
kv.closed_timestamp.follower_reads.enabled (alias: kv.closed_timestamp.follower_reads_enabled) boolean true allow (all) replicas to serve consistent historical reads based on closed timestamp information system-visible
kv.closed_timestamp.lead_for_global_reads_auto_tune boolean false if nonzero, attempt to auto-tune the lead time that global_read ranges use to publish closed timestamps system-visible
kv.closed_timestamp.lead_for_global_reads_override duration 0s if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps system-visible
kv.closed_timestamp.side_transport_interval duration 200ms the interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport system-visible
kv.closed_timestamp.target_duration duration 3s if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration system-visible
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
<tr><td><div id="setting-kv-bulk-sst-max-allowed-overage" class="anchored"><code>kv.bulk_sst.max_allowed_overage</code></div></td><td>byte size</td><td><code>64 MiB</code></td><td>if positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-bulk-sst-target-size" class="anchored"><code>kv.bulk_sst.target_size</code></div></td><td>byte size</td><td><code>16 MiB</code></td><td>target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-follower-reads-enabled" class="anchored"><code>kv.closed_timestamp.follower_reads.enabled<br />(alias: kv.closed_timestamp.follower_reads_enabled)</code></div></td><td>boolean</td><td><code>true</code></td><td>allow (all) replicas to serve consistent historical reads based on closed timestamp information</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-auto-tune" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_auto_tune</code></div></td><td>boolean</td><td><code>false</code></td><td>if nonzero, attempt to auto-tune the lead time that global_read ranges use to publish closed timestamps</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-lead-for-global-reads-override" class="anchored"><code>kv.closed_timestamp.lead_for_global_reads_override</code></div></td><td>duration</td><td><code>0s</code></td><td>if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-side-transport-interval" class="anchored"><code>kv.closed_timestamp.side_transport_interval</code></div></td><td>duration</td><td><code>200ms</code></td><td>the interval at which the closed timestamp side-transport attempts to advance each range&#39;s closed timestamp; set to 0 to disable the side-transport</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-target-duration" class="anchored"><code>kv.closed_timestamp.target_duration</code></div></td><td>duration</td><td><code>3s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/multiregionccl/multiregionccltestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ go_library(
deps = [
"//pkg/base",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/testutils/serverutils/regionlatency",
"//pkg/testutils/testcluster",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@org_golang_google_grpc//:grpc",
],
)
115 changes: 115 additions & 0 deletions pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ import (
"context"
gosql "database/sql"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

type multiRegionTestClusterParams struct {
Expand Down Expand Up @@ -150,6 +156,115 @@ func TestingCreateMultiRegionClusterWithRegionList(
return tc, sqlDB, cleanup
}

// TestingCreateMultiRegionClusterWithDelay is similar to
// TestingCreateMultiRegionCluster, but it simulates network latency between any
// pair of regions by using the given latency.
func TestingCreateMultiRegionClusterWithDelay(
t testing.TB, numServers int, knobs base.TestingKnobs, delay time.Duration, opts ...MultiRegionTestClusterParamsOption,
) (*testcluster.TestCluster, *gosql.DB, func(), func()) {
regionNames := make([]string, numServers)
for i := 0; i < numServers; i++ {
// "us-east1", "us-east2"...
regionNames[i] = fmt.Sprintf("us-east%d", i+1)
}

result := regionlatency.RoundTripPairs{}
for i := 0; i < numServers; i++ {
for j := i + 1; j < numServers; j++ {
pair := regionlatency.Pair{
A: regionNames[i],
B: regionNames[j],
}
result[pair] = delay
}
}

regionLatencies := result.ToLatencyMap()
serverArgs := make(map[int]base.TestServerArgs)
params := &multiRegionTestClusterParams{}
for _, opt := range opts {
opt(params)
}

pauseAfter := make(chan struct{})
signalAfter := make([]chan struct{}, numServers)
var latencyEnabled atomic.Bool

totalServerCount := 0
for i, region := range regionNames {
signalAfter[i] = make(chan struct{})
serverKnobs := &server.TestingKnobs{
PauseAfterGettingRPCAddress: pauseAfter,
SignalAfterGettingRPCAddress: signalAfter[i],
ContextTestingKnobs: rpc.ContextTestingKnobs{
InjectedLatencyOracle: regionlatency.MakeAddrMap(),
InjectedLatencyEnabled: latencyEnabled.Load,
UnaryClientInterceptor: func(
target string, class rpc.ConnectionClass,
) grpc.UnaryClientInterceptor {
return func(
ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
return invoker(ctx, method, req, reply, cc, opts...)
}
},
},
}

args := base.TestServerArgs{
Settings: params.settings,
Knobs: knobs,
ExternalIODir: params.baseDir,
UseDatabase: params.useDatabase,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: region}},
},
ScanInterval: params.scanInterval,
}
args.Knobs.Server = serverKnobs
serverArgs[totalServerCount] = args
totalServerCount++
}

cs := cluster.MakeTestingClusterSettings()
tc := testcluster.NewTestCluster(t, totalServerCount, base.TestClusterArgs{
ParallelStart: true,
ReplicationMode: params.replicationMode,
ServerArgsPerNode: serverArgs,
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TODOTestTenantDisabled,
Settings: cs,
},
})

go func() {
for _, c := range signalAfter {
<-c
}
assert.NoError(t, regionLatencies.Apply(tc))
close(pauseAfter)
}()
tc.Start(t)

ctx := context.Background()
cleanup := func() {
tc.Stopper().Stop(ctx)
}

sqlDB := tc.ServerConn(0)

enableLatency := func() {
latencyEnabled.Store(true)
for i := 0; i < numServers; i++ {
tc.Server(i).RPCContext().RemoteClocks.TestingResetLatencyInfos()
}
}

return tc, sqlDB, cleanup, enableLatency
}

// TestingEnsureCorrectPartitioning ensures that the table referenced by the
// supplied FQN has the expected indexes and that all of those indexes have the
// expected partitions.
Expand Down
132 changes: 132 additions & 0 deletions pkg/ccl/multiregionccl/roundtrips_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,135 @@ func TestEnsureLocalReadsOnGlobalTables(t *testing.T) {
writeErr := <-errCh
require.NoError(t, writeErr)
}

func TestEnsureLocalReadsOnGlobalTablesWithDelay(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "https://github.com/cockroachdb/cockroach/issues/102798")
skip.UnderStress(t, "slow under stress")

// ensureOnlyLocalReads looks at a trace to ensure that reads were served
// locally. It returns true if the read was served as a follower read.
ensureOnlyLocalReads := func(t *testing.T, rec tracingpb.Recording) (servedUsingFollowerReads bool) {
for _, sp := range rec {
if sp.Operation == "dist sender send" {
require.True(t, tracing.LogsContainMsg(sp, kvbase.RoutingRequestLocallyMsg),
"query was not served locally: %s", rec)

// Check the child span to find out if the query was served using a
// follower read.
for _, span := range rec {
if span.ParentSpanID == sp.SpanID {
if tracing.LogsContainMsg(span, kvbase.FollowerReadServingMsg) {
servedUsingFollowerReads = true
}
}
}
}
}
return servedUsingFollowerReads
}

presentTimeRead := `SELECT * FROM t.test_table WHERE k=2`
recCh := make(chan tracingpb.Recording, 1)

knobs := base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
if stmt == presentTimeRead {
recCh <- trace
}
},
},
}

numServers := 3
tc, sqlDB, cleanup, delay := multiregionccltestutils.TestingCreateMultiRegionClusterWithDelay(
t, numServers, knobs, 500*time.Millisecond, multiregionccltestutils.WithReplicationMode(base.ReplicationManual),
)
defer cleanup()
_, err := sqlDB.Exec(`SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50 ms'`)
require.NoError(t, err)
_, err = sqlDB.Exec(`SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_auto_tune = true`)
require.NoError(t, err)
_, err = sqlDB.Exec(`CREATE DATABASE t PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"`)
require.NoError(t, err)
_, err = sqlDB.Exec(`CREATE TABLE t.test_table (k INT PRIMARY KEY) LOCALITY GLOBAL`)
require.NoError(t, err)

var tableID uint32
err = sqlDB.QueryRow(`SELECT id from system.namespace WHERE name='test_table'`).Scan(&tableID)
require.NoError(t, err)
tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID))
// Split the range at the start of the table and add a voter to all nodes in
// the cluster.
tc.SplitRangeOrFatal(t, tablePrefix.AsRawKey())
tc.AddVotersOrFatal(t, tablePrefix.AsRawKey(), tc.Target(1), tc.Target(2))

_, _ = sqlDB.Exec("SET CLUSTER SETTING kv.allocator.load_based_rebalancing = off")
_, _ = sqlDB.Exec("SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '10ms'")

delay()

// Set up some write traffic in the background.
errCh := make(chan error)
stopWritesCh := make(chan struct{})
go func() {
i := 0
for {
select {
case <-stopWritesCh:
errCh <- nil
return
case <-time.After(10 * time.Millisecond):
_, err := sqlDB.Exec(`INSERT INTO t.test_table VALUES($1)`, i)
i++
if err != nil {
errCh <- err
return
}
}
}
}()

for i := 0; i < numServers; i++ {
conn := tc.ServerConn(i)
isLeaseHolder := false
testutils.SucceedsSoon(t, func() error {
// Run a query to populate its cache.
_, err = conn.Exec("SELECT * from t.test_table WHERE k=1")
require.NoError(t, err)

// Check that the cache was indeed populated.
cache := tc.Server(i).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry, err := cache.TestingGetCached(
context.Background(), tablePrefix, false /* inverted */, roachpb.LAG_BY_CLUSTER_SETTING,
)
require.NoError(t, err)
require.False(t, entry.Lease.Empty())

if expected, got := roachpb.LEAD_FOR_GLOBAL_READS, entry.ClosedTimestampPolicy; got != expected {
return errors.Newf("expected closedts policy %s, got %s", expected, got)
}

t.Logf("suceeded at populating closed ts policy")
isLeaseHolder = entry.Lease.Replica.NodeID == tc.Server(i).NodeID()
return nil
})

// Run the query to ensure local read.
_, err = conn.Exec(presentTimeRead)
require.NoError(t, err)

rec := <-recCh
followerRead := ensureOnlyLocalReads(t, rec)

// Expect every non-leaseholder to serve a (local) follower read. The
// leaseholder on the other hand won't serve a follower read.
require.Equal(t, !isLeaseHolder, followerRead, "%v", rec)
}

close(stopWritesCh)
writeErr := <-errCh
require.NoError(t, writeErr)
}
Loading
Loading