Skip to content

Commit da7807e

Browse files
authored
fix(bigtable): ReadRows retries from the last_scanned_row_key (googleapis#8423)
1 parent fd69a4c commit da7807e

File tree

4 files changed

+137
-0
lines changed

4 files changed

+137
-0
lines changed

google/cloud/bigtable/async_row_reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,9 @@ class AsyncRowReader : public std::enable_shared_from_this<
374374
return parser_status;
375375
}
376376
}
377+
if (!response.last_scanned_row_key().empty()) {
378+
last_read_row_key_ = std::move(*response.mutable_last_scanned_row_key());
379+
}
377380
return Status();
378381
}
379382

google/cloud/bigtable/async_row_reader_test.cc

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,82 @@ TEST_F(TableAsyncReadRowsTest, TransientErrorIsRetried) {
662662
ASSERT_EQ(0U, cq_impl_->size());
663663
}
664664

665+
/// @test Verify that the last scanned row is respected.
666+
TEST_F(TableAsyncReadRowsTest, LastScannedRowKeyIsRespected) {
667+
auto& stream2 = AddReader([](btproto::ReadRowsRequest const& req) {
668+
// The server has told that "r2" has been scanned. Our second request
669+
// should use the range ("r2", ""]. This is what is under test.
670+
EXPECT_TRUE(req.has_rows());
671+
auto const& rows = req.rows();
672+
EXPECT_EQ(1, rows.row_ranges_size());
673+
auto const& range = rows.row_ranges(0);
674+
EXPECT_EQ("r2", range.start_key_open());
675+
});
676+
auto& stream1 = AddReader([](btproto::ReadRowsRequest const&) {});
677+
678+
EXPECT_CALL(stream1, Read)
679+
.WillOnce([](btproto::ReadRowsResponse* r, void*) {
680+
*r = bigtable::testing::ReadRowsResponseFromString(
681+
R"(
682+
chunks {
683+
row_key: "r1"
684+
family_name { value: "fam" }
685+
qualifier { value: "col" }
686+
timestamp_micros: 42000
687+
value: "value"
688+
commit_row: true
689+
})");
690+
})
691+
.WillOnce([](btproto::ReadRowsResponse* r, void*) {
692+
std::cout << "Second Read for stream1 " << std::endl;
693+
btproto::ReadRowsResponse resp;
694+
resp.set_last_scanned_row_key("r2");
695+
*r = resp;
696+
})
697+
.RetiresOnSaturation();
698+
EXPECT_CALL(stream1, Finish).WillOnce([](grpc::Status* status, void*) {
699+
*status = grpc::Status(grpc::StatusCode::UNAVAILABLE, "retry");
700+
});
701+
702+
EXPECT_CALL(stream2, Finish).WillOnce([](grpc::Status* status, void*) {
703+
*status = grpc::Status::OK;
704+
});
705+
706+
ExpectRows({"r1", "r2", "r3"});
707+
promises_from_user_cb_[0].set_value(true);
708+
ReadRows();
709+
710+
ASSERT_EQ(1U, cq_impl_->size());
711+
cq_impl_->SimulateCompletion(true); // Finish Start()
712+
ASSERT_EQ(1U, cq_impl_->size());
713+
cq_impl_->SimulateCompletion(true); // Return "r1"
714+
715+
row_futures_[0].get();
716+
717+
ASSERT_EQ(1U, cq_impl_->size());
718+
cq_impl_->SimulateCompletion(true); // Return last_scanned_row_key = "r2"
719+
720+
ASSERT_EQ(1U, cq_impl_->size());
721+
cq_impl_->SimulateCompletion(false); // Finish stream1 with failure
722+
ASSERT_EQ(1U, cq_impl_->size());
723+
cq_impl_->SimulateCompletion(true); // Finish Finish()
724+
725+
ASSERT_EQ(1U, cq_impl_->size());
726+
cq_impl_->SimulateCompletion(true); // Finish timer
727+
ASSERT_EQ(1U, cq_impl_->size());
728+
cq_impl_->SimulateCompletion(true); // Finish Start()
729+
730+
ASSERT_EQ(1U, cq_impl_->size());
731+
cq_impl_->SimulateCompletion(false); // Finish stream
732+
ASSERT_EQ(1U, cq_impl_->size());
733+
EXPECT_TRUE(Unsatisfied(stream_status_future_));
734+
cq_impl_->SimulateCompletion(true); // Finish Finish()
735+
736+
auto stream_status = stream_status_future_.get();
737+
ASSERT_STATUS_OK(stream_status);
738+
ASSERT_EQ(0U, cq_impl_->size());
739+
}
740+
665741
/// @test Verify proper handling of bogus responses from the service.
666742
TEST_F(TableAsyncReadRowsTest, ParserFailure) {
667743
auto& stream = AddReader([](btproto::ReadRowsRequest const&) {});

google/cloud/bigtable/row_reader.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ bool RowReader::NextChunk() {
140140
response_ = {};
141141
return false;
142142
}
143+
if (!response_.last_scanned_row_key().empty()) {
144+
last_read_row_key_ = std::move(*response_.mutable_last_scanned_row_key());
145+
}
143146
}
144147
return true;
145148
}

google/cloud/bigtable/row_reader_test.cc

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,61 @@ TEST_F(RowReaderTest, FailedStreamRetriesSkipAlreadyReadRows) {
394394
EXPECT_EQ(++it, reader.end());
395395
}
396396

397+
TEST_F(RowReaderTest, FailedStreamRetriesSkipToLastScannedRow) {
398+
auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
399+
auto parser = absl::make_unique<ReadRowsParserMock>();
400+
parser->SetRows({"r1"});
401+
google::bigtable::v2::ReadRowsResponse response;
402+
response.set_last_scanned_row_key("r2");
403+
{
404+
::testing::InSequence s;
405+
// We start our call with 3 rows in the set: "r1", "r2", "r3".
406+
EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(3)))
407+
.WillOnce(stream->MakeMockReturner());
408+
409+
// The mock `parser` will return "r1". Next, simulate the server returning
410+
// an empty chunk with `last_scanned_row_key` set to "r2".
411+
EXPECT_CALL(*stream, Read)
412+
.WillOnce(DoAll(SetArgPointee<0>(response), Return(true)));
413+
414+
// The stream fails with a retry-able error
415+
EXPECT_CALL(*stream, Read).WillOnce(Return(false));
416+
EXPECT_CALL(*stream, Finish())
417+
.WillOnce(Return(grpc::Status(grpc::StatusCode::INTERNAL, "retry")));
418+
419+
EXPECT_CALL(*retry_policy_, OnFailure(An<grpc::Status const&>()))
420+
.WillOnce(Return(true));
421+
EXPECT_CALL(*backoff_policy_, OnCompletion(An<grpc::Status const&>()))
422+
.WillOnce(Return(std::chrono::milliseconds(0)));
423+
424+
auto* stream_retry =
425+
new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");
426+
427+
// We retry the remaining rows. We have "r1" returned, but the service has
428+
// also told us that "r2" was scanned. This means there is only one row
429+
// remaining to read: "r3".
430+
EXPECT_CALL(*client_, ReadRows(_, RequestWithRowKeysCount(1)))
431+
.WillOnce(stream_retry->MakeMockReturner());
432+
433+
// End the stream to clean up the test
434+
EXPECT_CALL(*stream_retry, Read).WillOnce(Return(false));
435+
EXPECT_CALL(*stream_retry, Finish()).WillOnce(Return(grpc::Status::OK));
436+
}
437+
438+
parser_factory_->AddParser(std::move(parser));
439+
bigtable::RowReader reader(
440+
client_, "", bigtable::RowSet("r1", "r2", "r3"),
441+
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
442+
std::move(retry_policy_), std::move(backoff_policy_),
443+
metadata_update_policy_, std::move(parser_factory_));
444+
445+
auto it = reader.begin();
446+
EXPECT_NE(it, reader.end());
447+
ASSERT_STATUS_OK(*it);
448+
EXPECT_EQ((*it)->row_key(), "r1");
449+
EXPECT_EQ(++it, reader.end());
450+
}
451+
397452
TEST_F(RowReaderTest, FailedParseIsRetried) {
398453
// wrapped in unique_ptr by ReadRows
399454
auto* stream = new MockReadRowsReader("google.bigtable.v2.Bigtable.ReadRows");

0 commit comments

Comments
 (0)