Skip to content

Commit fec29f3

Browse files
committed
kvclient: ensure locking Get requests are always sent to KV
A Get request to a key that's been previously buffered on the client must always be served locally. However, if the Get request is locking in nature, we must also acquire a lock at the leaseholder. This patch addresses a TODO to do exactly this, while ensuring the response from KV is discarded and served locally instead. Informs #139054 Release note: None
1 parent fe860f6 commit fec29f3

File tree

2 files changed

+155
-21
lines changed

2 files changed

+155
-21
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
1213
"github.com/cockroachdb/cockroach/pkg/roachpb"
1314
"github.com/cockroachdb/cockroach/pkg/settings"
1415
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
@@ -289,9 +290,10 @@ func (twb *txnWriteBuffer) applyTransformations(
289290
// TODO(yuzefovich): ensure that we elide the lock acquisition
290291
// whenever possible (e.g. blind UPSERT in an implicit txn).
291292
ts = append(ts, transformation{
292-
stripped: true,
293-
index: i,
294-
resp: ru,
293+
stripped: true,
294+
index: i,
295+
origRequest: req,
296+
resp: ru,
295297
})
296298
twb.addToBuffer(t.Key, t.Value, t.Sequence)
297299

@@ -304,9 +306,10 @@ func (twb *txnWriteBuffer) applyTransformations(
304306
FoundKey: false,
305307
})
306308
ts = append(ts, transformation{
307-
stripped: true,
308-
index: i,
309-
resp: ru,
309+
stripped: true,
310+
index: i,
311+
origRequest: req,
312+
resp: ru,
310313
})
311314
twb.addToBuffer(t.Key, roachpb.Value{}, t.Sequence)
312315

@@ -315,25 +318,36 @@ func (twb *txnWriteBuffer) applyTransformations(
315318
val, served := twb.maybeServeRead(t.Key, t.Sequence)
316319
if served {
317320
log.VEventf(ctx, 2, "serving %s on key %s from the buffer", t.Method(), t.Key)
318-
var ru kvpb.ResponseUnion
321+
var resp kvpb.ResponseUnion
319322
getResp := &kvpb.GetResponse{}
320323
if val.IsPresent() {
321324
getResp.Value = val
322325
}
323-
ru.MustSetInner(getResp)
326+
resp.MustSetInner(getResp)
327+
328+
stripped := true
329+
if t.KeyLockingStrength != lock.None {
330+
// Even though the Get request must be served from the buffer, as the
331+
// transaction performed a previous write to the key, we still need to
332+
// acquire a lock at the leaseholder. As a result, we can't strip the
333+
// request from the batch.
334+
//
335+
// TODO(arul): we could eschew sending this request if we knew there
336+
// was a sufficiently strong lock already present on the key.
337+
stripped = false
338+
baRemote.Requests = append(baRemote.Requests, ru)
339+
}
340+
324341
ts = append(ts, transformation{
325-
stripped: true,
326-
index: i,
327-
resp: ru,
342+
stripped: stripped,
343+
index: i,
344+
origRequest: req,
345+
resp: resp,
328346
})
329347
// We've constructed a response that we'll stitch together with the
330348
// result on the response path; eschew sending the request to the KV
331349
// layer.
332350
//
333-
// TODO(arul): if this is a locking Get request, we can't omit the KV
334-
// request. At least, not unless we know a lock is already present on
335-
// the key.
336-
//
337351
// TODO(arul): if the ReturnRawMVCCValues flag is set, we'll need to
338352
// flush the buffer.
339353
continue
@@ -579,14 +593,14 @@ type transformation struct {
579593
// index of the request in the original batch to which the transformation
580594
// applies.
581595
index int
596+
// origRequest is the original request that was transformed.
597+
origRequest kvpb.Request
582598
// resp is locally produced response that needs to be merged with any
583599
// responses returned by the KV layer. This is set for requests that can be
584600
// evaluated locally (e.g. blind writes, reads that can be served entirely
585-
// from the buffer). If non-empty, stripped must also be true.
601+
// from the buffer). Must be set if stripped is true, but the converse doesn't
602+
// hold.
586603
resp kvpb.ResponseUnion
587-
// origRequest is the original request that was transformed. Set iff stripped
588-
// is false.
589-
origRequest kvpb.Request
590604
}
591605

592606
// toResp returns the response that should be added to the batch response as
@@ -599,8 +613,21 @@ func (t transformation) toResp(
599613
}
600614

601615
var ru kvpb.ResponseUnion
602-
switch br.GetInner().(type) {
603-
case *kvpb.ScanResponse:
616+
switch t.origRequest.(type) {
617+
case *kvpb.PutRequest:
618+
ru = t.resp
619+
case *kvpb.DeleteRequest:
620+
ru = t.resp
621+
case *kvpb.GetRequest:
622+
// Get requests must be served from the local buffer if a transaction
623+
// performed a previous write to the key being read. However, Get requests
624+
// must be sent to the KV layer (i.e. not be stripped) iff they are locking
625+
// in nature.
626+
req := t.origRequest.(*kvpb.GetRequest)
627+
assertTrue(t.stripped == (req.KeyLockingStrength == lock.None),
628+
"Get requests should either be stripped or be locking")
629+
ru = t.resp
630+
case *kvpb.ScanRequest:
604631
scanResp, err := twb.mergeWithScanResp(
605632
t.origRequest.(*kvpb.ScanRequest), br.GetInner().(*kvpb.ScanResponse),
606633
)

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import (
1111
"testing"
1212

1313
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
1415
"github.com/cockroachdb/cockroach/pkg/roachpb"
1516
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
17+
"github.com/cockroachdb/cockroach/pkg/testutils"
1618
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1719
"github.com/cockroachdb/cockroach/pkg/util/log"
1820
"github.com/cockroachdb/errors"
@@ -825,3 +827,108 @@ func TestTxnWriteBufferServesOverlappingReadsCorrectly(t *testing.T) {
825827
require.Len(t, br.Responses, 1)
826828
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
827829
}
830+
831+
// TestTxnWriteBufferLockingGetRequests ensures that locking get requests are
832+
// handled appropriately -- they're sent to KV, to acquire a lock, but the
833+
// read is served from the buffer (upholding read-your-own-write semantics).
834+
func TestTxnWriteBufferLockingGetRequests(t *testing.T) {
835+
defer leaktest.AfterTest(t)()
836+
defer log.Scope(t).Close(t)
837+
ctx := context.Background()
838+
twb, mockSender := makeMockTxnWriteBuffer()
839+
840+
txn := makeTxnProto()
841+
txn.Sequence = 10
842+
keyA := roachpb.Key("a")
843+
valA := "val"
844+
845+
// Blindly write to keys A.
846+
ba := &kvpb.BatchRequest{}
847+
ba.Header = kvpb.Header{Txn: &txn}
848+
putA := putArgs(keyA, valA, txn.Sequence)
849+
ba.Add(putA)
850+
851+
numCalled := mockSender.NumCalled()
852+
br, pErr := twb.SendLocked(ctx, ba)
853+
require.Nil(t, pErr)
854+
require.NotNil(t, br)
855+
// All the requests should be buffered and not make it past the
856+
// txnWriteBuffer.
857+
require.Equal(t, numCalled, mockSender.NumCalled())
858+
// Even though the txnWriteBuffer did not send any Put requests to the KV
859+
// layer above, the responses should still be populated.
860+
require.Len(t, br.Responses, 1)
861+
require.Equal(t, br.Responses[0].GetInner(), &kvpb.PutResponse{})
862+
863+
// Verify the writes were buffered correctly.
864+
expBufferedWrites := []bufferedWrite{
865+
makeBufferedWrite(keyA, makeBufferedValue(valA, 10)),
866+
}
867+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
868+
869+
// Perform a locking read on keyA. Ensure a request is sent to the KV layer,
870+
// but the response is served from the buffer.
871+
testutils.RunValues(t, "str", []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Update}, func(t *testing.T, strength lock.Strength) {
872+
txn.Sequence = 11
873+
ba = &kvpb.BatchRequest{}
874+
ba.Header = kvpb.Header{Txn: &txn}
875+
getA := &kvpb.GetRequest{
876+
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
877+
KeyLockingStrength: strength,
878+
}
879+
ba.Add(getA)
880+
numCalled = mockSender.NumCalled()
881+
882+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
883+
require.Len(t, ba.Requests, 1)
884+
require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
885+
require.Equal(t, strength, ba.Requests[0].GetInner().(*kvpb.GetRequest).KeyLockingStrength)
886+
require.True(t, strength != lock.None) // non-locking Gets aren't sent to KV
887+
888+
br = ba.CreateReply()
889+
br.Txn = ba.Txn
890+
return br, nil
891+
})
892+
893+
br, pErr = twb.SendLocked(ctx, ba)
894+
require.Nil(t, pErr)
895+
require.NotNil(t, br)
896+
require.Len(t, br.Responses, 1)
897+
require.Equal(t, roachpb.MakeValueFromString(valA), *br.Responses[0].GetInner().(*kvpb.GetResponse).Value)
898+
899+
var expNumCalled int
900+
if strength == lock.None {
901+
expNumCalled = numCalled // nothing should be sent to KV
902+
} else {
903+
expNumCalled = numCalled + 1 // a locking request should still be sent to KV
904+
}
905+
require.Equal(t, expNumCalled, mockSender.NumCalled())
906+
})
907+
908+
// Lastly, for completeness, commit the transaction and ensure that the buffer
909+
// is correctly flushed.
910+
ba = &kvpb.BatchRequest{}
911+
ba.Header = kvpb.Header{Txn: &txn}
912+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
913+
914+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
915+
require.Len(t, ba.Requests, 2)
916+
917+
// We now expect the buffer to be flushed along with the commit.
918+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
919+
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner())
920+
921+
br = ba.CreateReply()
922+
br.Txn = ba.Txn
923+
return br, nil
924+
})
925+
926+
br, pErr = twb.SendLocked(ctx, ba)
927+
require.Nil(t, pErr)
928+
require.NotNil(t, br)
929+
930+
// Even though we flushed the buffer, responses from the blind writes should
931+
// not be returned.
932+
require.Len(t, br.Responses, 1)
933+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
934+
}

0 commit comments

Comments
 (0)