Skip to content

Commit a2882ae

Browse files
craig[bot]kyle-a-wongarulajmanifqazi
committed
140673: sqlstats: add new MaybeFlushWithDrainer method to persistedSqlStats r=kyle-a-wong a=kyle-a-wong Adds a new SSDrainer interface that PersistedSQLStats uses to flush sql stats. Instead of calling ConsumeStats on sslocal.SQLStats, persistedsqlstats will use a provided SSDrainer to either DrainStats or Reset. The ConsumeStats method on sslocal.SQLStats has been removed and its logic has been redistributed to persistedsqlstats, as that was the only consumer of the method. Since all the logic now lives in persistedsqlstats, the flush related functions can be called directly instead of relying on nested callbacks. Resolves: #139143 Epic: CRDB-45771 Release note: None Note: This is a stack committed, only the last commit in the chain needs to be reviewed 141724: kvclient: ensure locking Get requests are always sent to KV r=yuzefovich a=arulajmani First commit from #141672 ---- A Get request to a key that's been previously buffered on the client must always be served locally. However, if the Get request is locking in nature, we must also acquire a lock at the leaseholder. This patch addresses a TODO to do exactly this, while ensuring the response from KV is discarded and served locally instead. Informs #139054 Release note: None 141928: catalog/lease: populate SessionID for historical leases r=fqazi a=fqazi Previously, we were seeing intermittent panics in scenarios where historical descriptors were accessed followed by reads of newer descriptor versions. This would happen because the SessionID on historical versions would be blank, causing our checks for detecting changes to fails. To address this, this patch always populates the session ID inside the descriptor versions. Fixes: #141877 Fixes: #141876 Fixes: #141855 Release note: None Co-authored-by: Kyle Wong <[email protected]> Co-authored-by: Arul Ajmani <[email protected]> Co-authored-by: Faizan Qazi <[email protected]>
4 parents 3ad8fc5 + f10f239 + fec29f3 + a65df49 commit a2882ae

22 files changed

+591
-415
lines changed

pkg/ccl/serverccl/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ go_library(
1616
"//pkg/server/serverpb",
1717
"//pkg/sql",
1818
"//pkg/sql/contention",
19-
"//pkg/sql/sqlstats/persistedsqlstats",
2019
"//pkg/testutils/serverutils",
2120
"//pkg/testutils/sqlutils",
2221
"//pkg/util/httputil",

pkg/ccl/serverccl/statusccl/tenant_status_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,8 +591,16 @@ func testResetSQLStatsRPCForTenant(
591591
}
592592

593593
if flushed {
594-
testTenant.TenantSQLStats().MaybeFlush(ctx, testTenant.GetTenant().AppStopper())
595-
controlCluster.TenantSQLStats(serverccl.RandomServer).MaybeFlush(ctx, controlCluster.Tenant(0).GetTenant().AppStopper())
594+
testTenantServer := testTenant.TenantSQLServer()
595+
testTenantServer.GetSQLStatsProvider().MaybeFlush(
596+
ctx,
597+
testTenant.GetTenant().AppStopper(),
598+
)
599+
randomTenantServer := controlCluster.TenantSQLServer(serverccl.RandomServer)
600+
randomTenantServer.GetSQLStatsProvider().MaybeFlush(
601+
ctx,
602+
controlCluster.Tenant(0).GetTenant().AppStopper(),
603+
)
596604
}
597605

598606
status := testTenant.TenantStatusSrv()

pkg/ccl/serverccl/tenant_test_utils.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
2020
"github.com/cockroachdb/cockroach/pkg/sql"
2121
"github.com/cockroachdb/cockroach/pkg/sql/contention"
22-
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
2322
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2423
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2524
"github.com/cockroachdb/cockroach/pkg/util/httputil"
@@ -41,7 +40,7 @@ type testTenant struct {
4140
tenantConn *gosql.DB
4241
tenantDB *sqlutils.SQLRunner
4342
tenantStatus serverpb.SQLStatusServer
44-
tenantSQLStats *persistedsqlstats.PersistedSQLStats
43+
tenantSQLServer *sql.Server
4544
tenantContentionRegistry *contention.Registry
4645
}
4746

@@ -53,8 +52,8 @@ func (h *testTenant) GetTenantConn() *sqlutils.SQLRunner {
5352
return h.tenantDB
5453
}
5554

56-
func (h *testTenant) TenantSQLStats() *persistedsqlstats.PersistedSQLStats {
57-
return h.tenantSQLStats
55+
func (h *testTenant) TenantSQLServer() *sql.Server {
56+
return h.tenantSQLServer
5857
}
5958

6059
func (h *testTenant) TenantStatusSrv() serverpb.SQLStatusServer {
@@ -78,7 +77,7 @@ type TestTenant interface {
7877
GetTenant() serverutils.ApplicationLayerInterface
7978
GetTenantDB() *gosql.DB
8079
GetTenantConn() *sqlutils.SQLRunner
81-
TenantSQLStats() *persistedsqlstats.PersistedSQLStats
80+
TenantSQLServer() *sql.Server
8281
TenantStatusSrv() serverpb.SQLStatusServer
8382
TenantContentionRegistry() *contention.Registry
8483
GetRPCContext() *rpc.Context
@@ -95,15 +94,15 @@ func newTestTenant(
9594
tenant, tenantConn := serverutils.StartTenant(t, server, args)
9695
sqlDB := sqlutils.MakeSQLRunner(tenantConn)
9796
status := tenant.StatusServer().(serverpb.SQLStatusServer)
98-
sqlStats := tenant.SQLServer().(*sql.Server).GetSQLStatsProvider()
97+
sqlServer := tenant.SQLServer().(*sql.Server)
9998
contentionRegistry := tenant.ExecutorConfig().(sql.ExecutorConfig).ContentionRegistry
10099

101100
return &testTenant{
102101
tenant: tenant,
103102
tenantConn: tenantConn,
104103
tenantDB: sqlDB,
105104
tenantStatus: status,
106-
tenantSQLStats: sqlStats,
105+
tenantSQLServer: sqlServer,
107106
tenantContentionRegistry: contentionRegistry,
108107
}
109108
}
@@ -197,7 +196,7 @@ type TenantClusterHelper interface {
197196
TenantDB(idx serverIdx) *gosql.DB
198197
TenantHTTPClient(t *testing.T, idx serverIdx, isAdmin bool) *httpClient
199198
TenantAdminHTTPClient(t *testing.T, idx serverIdx) *httpClient
200-
TenantSQLStats(idx serverIdx) *persistedsqlstats.PersistedSQLStats
199+
TenantSQLServer(idx serverIdx) *sql.Server
201200
TenantStatusSrv(idx serverIdx) serverpb.SQLStatusServer
202201
TenantContentionRegistry(idx serverIdx) *contention.Registry
203202
Cleanup(t *testing.T)
@@ -249,8 +248,8 @@ func (c tenantCluster) TenantAdminHTTPClient(t *testing.T, idx serverIdx) *httpC
249248
return c.TenantHTTPClient(t, idx, true /* isAdmin */)
250249
}
251250

252-
func (c tenantCluster) TenantSQLStats(idx serverIdx) *persistedsqlstats.PersistedSQLStats {
253-
return c.Tenant(idx).TenantSQLStats()
251+
func (c tenantCluster) TenantSQLServer(idx serverIdx) *sql.Server {
252+
return c.Tenant(idx).TenantSQLServer()
254253
}
255254

256255
func (c tenantCluster) TenantStatusSrv(idx serverIdx) serverpb.SQLStatusServer {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
1213
"github.com/cockroachdb/cockroach/pkg/roachpb"
1314
"github.com/cockroachdb/cockroach/pkg/settings"
1415
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
@@ -289,9 +290,10 @@ func (twb *txnWriteBuffer) applyTransformations(
289290
// TODO(yuzefovich): ensure that we elide the lock acquisition
290291
// whenever possible (e.g. blind UPSERT in an implicit txn).
291292
ts = append(ts, transformation{
292-
stripped: true,
293-
index: i,
294-
resp: ru,
293+
stripped: true,
294+
index: i,
295+
origRequest: req,
296+
resp: ru,
295297
})
296298
twb.addToBuffer(t.Key, t.Value, t.Sequence)
297299

@@ -304,9 +306,10 @@ func (twb *txnWriteBuffer) applyTransformations(
304306
FoundKey: false,
305307
})
306308
ts = append(ts, transformation{
307-
stripped: true,
308-
index: i,
309-
resp: ru,
309+
stripped: true,
310+
index: i,
311+
origRequest: req,
312+
resp: ru,
310313
})
311314
twb.addToBuffer(t.Key, roachpb.Value{}, t.Sequence)
312315

@@ -315,25 +318,36 @@ func (twb *txnWriteBuffer) applyTransformations(
315318
val, served := twb.maybeServeRead(t.Key, t.Sequence)
316319
if served {
317320
log.VEventf(ctx, 2, "serving %s on key %s from the buffer", t.Method(), t.Key)
318-
var ru kvpb.ResponseUnion
321+
var resp kvpb.ResponseUnion
319322
getResp := &kvpb.GetResponse{}
320323
if val.IsPresent() {
321324
getResp.Value = val
322325
}
323-
ru.MustSetInner(getResp)
326+
resp.MustSetInner(getResp)
327+
328+
stripped := true
329+
if t.KeyLockingStrength != lock.None {
330+
// Even though the Get request must be served from the buffer, as the
331+
// transaction performed a previous write to the key, we still need to
332+
// acquire a lock at the leaseholder. As a result, we can't strip the
333+
// request from the batch.
334+
//
335+
// TODO(arul): we could eschew sending this request if we knew there
336+
// was a sufficiently strong lock already present on the key.
337+
stripped = false
338+
baRemote.Requests = append(baRemote.Requests, ru)
339+
}
340+
324341
ts = append(ts, transformation{
325-
stripped: true,
326-
index: i,
327-
resp: ru,
342+
stripped: stripped,
343+
index: i,
344+
origRequest: req,
345+
resp: resp,
328346
})
329347
// We've constructed a response that we'll stitch together with the
330348
// result on the response path; eschew sending the request to the KV
331349
// layer.
332350
//
333-
// TODO(arul): if this is a locking Get request, we can't omit the KV
334-
// request. At least, not unless we know a lock is already present on
335-
// the key.
336-
//
337351
// TODO(arul): if the ReturnRawMVCCValues flag is set, we'll need to
338352
// flush the buffer.
339353
continue
@@ -579,14 +593,14 @@ type transformation struct {
579593
// index of the request in the original batch to which the transformation
580594
// applies.
581595
index int
596+
// origRequest is the original request that was transformed.
597+
origRequest kvpb.Request
582598
// resp is locally produced response that needs to be merged with any
583599
// responses returned by the KV layer. This is set for requests that can be
584600
// evaluated locally (e.g. blind writes, reads that can be served entirely
585-
// from the buffer). If non-empty, stripped must also be true.
601+
// from the buffer). Must be set if stripped is true, but the converse doesn't
602+
// hold.
586603
resp kvpb.ResponseUnion
587-
// origRequest is the original request that was transformed. Set iff stripped
588-
// is false.
589-
origRequest kvpb.Request
590604
}
591605

592606
// toResp returns the response that should be added to the batch response as
@@ -599,8 +613,21 @@ func (t transformation) toResp(
599613
}
600614

601615
var ru kvpb.ResponseUnion
602-
switch br.GetInner().(type) {
603-
case *kvpb.ScanResponse:
616+
switch t.origRequest.(type) {
617+
case *kvpb.PutRequest:
618+
ru = t.resp
619+
case *kvpb.DeleteRequest:
620+
ru = t.resp
621+
case *kvpb.GetRequest:
622+
// Get requests must be served from the local buffer if a transaction
623+
// performed a previous write to the key being read. However, Get requests
624+
// must be sent to the KV layer (i.e. not be stripped) iff they are locking
625+
// in nature.
626+
req := t.origRequest.(*kvpb.GetRequest)
627+
assertTrue(t.stripped == (req.KeyLockingStrength == lock.None),
628+
"Get requests should either be stripped or be locking")
629+
ru = t.resp
630+
case *kvpb.ScanRequest:
604631
scanResp, err := twb.mergeWithScanResp(
605632
t.origRequest.(*kvpb.ScanRequest), br.GetInner().(*kvpb.ScanResponse),
606633
)

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import (
1111
"testing"
1212

1313
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
1516
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
17+
"github.com/cockroachdb/cockroach/pkg/testutils"
1618
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1719
"github.com/cockroachdb/cockroach/pkg/util/log"
1820
"github.com/cockroachdb/errors"
@@ -825,3 +827,108 @@ func TestTxnWriteBufferServesOverlappingReadsCorrectly(t *testing.T) {
825827
require.Len(t, br.Responses, 1)
826828
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
827829
}
830+
831+
// TestTxnWriteBufferLockingGetRequests ensures that locking get requests are
832+
// handled appropriately -- they're sent to KV, to acquire a lock, but the
833+
// read is served from the buffer (upholding read-your-own-write semantics).
834+
func TestTxnWriteBufferLockingGetRequests(t *testing.T) {
835+
defer leaktest.AfterTest(t)()
836+
defer log.Scope(t).Close(t)
837+
ctx := context.Background()
838+
twb, mockSender := makeMockTxnWriteBuffer()
839+
840+
txn := makeTxnProto()
841+
txn.Sequence = 10
842+
keyA := roachpb.Key("a")
843+
valA := "val"
844+
845+
// Blindly write to keys A.
846+
ba := &kvpb.BatchRequest{}
847+
ba.Header = kvpb.Header{Txn: &txn}
848+
putA := putArgs(keyA, valA, txn.Sequence)
849+
ba.Add(putA)
850+
851+
numCalled := mockSender.NumCalled()
852+
br, pErr := twb.SendLocked(ctx, ba)
853+
require.Nil(t, pErr)
854+
require.NotNil(t, br)
855+
// All the requests should be buffered and not make it past the
856+
// txnWriteBuffer.
857+
require.Equal(t, numCalled, mockSender.NumCalled())
858+
// Even though the txnWriteBuffer did not send any Put requests to the KV
859+
// layer above, the responses should still be populated.
860+
require.Len(t, br.Responses, 1)
861+
require.Equal(t, br.Responses[0].GetInner(), &kvpb.PutResponse{})
862+
863+
// Verify the writes were buffered correctly.
864+
expBufferedWrites := []bufferedWrite{
865+
makeBufferedWrite(keyA, makeBufferedValue(valA, 10)),
866+
}
867+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
868+
869+
// Perform a locking read on keyA. Ensure a request is sent to the KV layer,
870+
// but the response is served from the buffer.
871+
testutils.RunValues(t, "str", []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Update}, func(t *testing.T, strength lock.Strength) {
872+
txn.Sequence = 11
873+
ba = &kvpb.BatchRequest{}
874+
ba.Header = kvpb.Header{Txn: &txn}
875+
getA := &kvpb.GetRequest{
876+
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
877+
KeyLockingStrength: strength,
878+
}
879+
ba.Add(getA)
880+
numCalled = mockSender.NumCalled()
881+
882+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
883+
require.Len(t, ba.Requests, 1)
884+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
885+
require.Equal(t, strength, ba.Requests[0].GetInner().(*kvpb.GetRequest).KeyLockingStrength)
886+
require.True(t, strength != lock.None) // non-locking Gets aren't sent to KV
887+
888+
br = ba.CreateReply()
889+
br.Txn = ba.Txn
890+
return br, nil
891+
})
892+
893+
br, pErr = twb.SendLocked(ctx, ba)
894+
require.Nil(t, pErr)
895+
require.NotNil(t, br)
896+
require.Len(t, br.Responses, 1)
897+
require.Equal(t, roachpb.MakeValueFromString(valA), *br.Responses[0].GetInner().(*kvpb.GetResponse).Value)
898+
899+
var expNumCalled int
900+
if strength == lock.None {
901+
expNumCalled = numCalled // nothing should be sent to KV
902+
} else {
903+
expNumCalled = numCalled + 1 // a locking request should still be sent to KV
904+
}
905+
require.Equal(t, expNumCalled, mockSender.NumCalled())
906+
})
907+
908+
// Lastly, for completeness, commit the transaction and ensure that the buffer
909+
// is correctly flushed.
910+
ba = &kvpb.BatchRequest{}
911+
ba.Header = kvpb.Header{Txn: &txn}
912+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
913+
914+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
915+
require.Len(t, ba.Requests, 2)
916+
917+
// We now expect the buffer to be flushed along with the commit.
918+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
919+
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner())
920+
921+
br = ba.CreateReply()
922+
br.Txn = ba.Txn
923+
return br, nil
924+
})
925+
926+
br, pErr = twb.SendLocked(ctx, ba)
927+
require.Nil(t, pErr)
928+
require.NotNil(t, br)
929+
930+
// Even though we flushed the buffer, responses from the blind writes should
931+
// not be returned.
932+
require.Len(t, br.Responses, 1)
933+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
934+
}

pkg/sql/catalog/lease/descriptor_state.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,12 @@ func newDescriptorVersionState(
181181
version: int(desc.GetVersion()),
182182
}
183183
descState.mu.lease.sessionID = session.ID().UnsafeBytes()
184-
descState.mu.session = session
185-
186184
if buildutil.CrdbTestBuild && !expiration.IsEmpty() {
187185
panic(errors.AssertionFailedf("expiration should always be empty for "+
188186
"session based leases (got: %s on Desc: %s(%d))", expiration.String(), desc.GetName(), desc.GetID()))
189187
}
190188
}
189+
descState.mu.session = session
191190
descState.mu.expiration = expiration
192191

193192
return descState

pkg/sql/catalog/lease/descriptor_version_state.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ type descriptorVersionState struct {
6060
// when the version isn't associated with a lease.
6161
expiration hlc.Timestamp
6262

63-
// The session that was used to acquire this descriptor version, which is
64-
// only populated when the session based leasing mode is *at least* dual
65-
// write.
63+
// The session that was used to acquire this descriptor version.
6664
session sqlliveness.Session
6765

6866
refcount int

0 commit comments

Comments
 (0)