Skip to content

Commit a9657ef

Browse files
authored
[fix](compaction) fix the issue of schema loss caused by cloud disable auto compaction (#47495)
1 parent c68dce1 commit a9657ef

File tree

3 files changed

+225
-5
lines changed

3 files changed

+225
-5
lines changed

Diff for: cloud/src/meta-service/meta_service.cpp

+32-4
Original file line numberDiff line numberDiff line change
@@ -685,10 +685,14 @@ void internal_get_tablet(MetaServiceCode& code, std::string& msg, const std::str
685685
return;
686686
}
687687

688-
if (tablet_meta->has_schema()) { // tablet meta saved before detach schema kv
688+
if (tablet_meta->has_schema() &&
689+
tablet_meta->schema().column_size() > 0) { // tablet meta saved before detach schema kv
689690
tablet_meta->set_schema_version(tablet_meta->schema().schema_version());
690691
}
691-
if (!tablet_meta->has_schema() && !skip_schema) {
692+
693+
if ((!tablet_meta->has_schema() ||
694+
(tablet_meta->has_schema() && tablet_meta->schema().column_size() <= 0)) &&
695+
!skip_schema) {
692696
if (!tablet_meta->has_schema_version()) {
693697
code = MetaServiceCode::INVALID_ARGUMENT;
694698
msg = "tablet_meta must have either schema or schema_version";
@@ -768,8 +772,32 @@ void MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
768772
tablet_meta.set_time_series_compaction_level_threshold(
769773
tablet_meta_info.time_series_compaction_level_threshold());
770774
} else if (tablet_meta_info.has_disable_auto_compaction()) {
771-
tablet_meta.mutable_schema()->set_disable_auto_compaction(
772-
tablet_meta_info.disable_auto_compaction());
775+
if (tablet_meta.has_schema() && tablet_meta.schema().column_size() > 0) {
776+
tablet_meta.mutable_schema()->set_disable_auto_compaction(
777+
tablet_meta_info.disable_auto_compaction());
778+
} else {
779+
auto key = meta_schema_key(
780+
{instance_id, tablet_meta.index_id(), tablet_meta.schema_version()});
781+
ValueBuf val_buf;
782+
err = cloud::get(txn.get(), key, &val_buf);
783+
if (err != TxnErrorCode::TXN_OK) {
784+
code = cast_as<ErrCategory::READ>(err);
785+
msg = fmt::format("failed to get schema, err={}",
786+
err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "not found"
787+
: "internal error");
788+
return;
789+
}
790+
doris::TabletSchemaCloudPB schema_pb;
791+
if (!parse_schema_value(val_buf, &schema_pb)) {
792+
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
793+
msg = fmt::format("malformed schema value, key={}", key);
794+
return;
795+
}
796+
797+
schema_pb.set_disable_auto_compaction(tablet_meta_info.disable_auto_compaction());
798+
put_schema_kv(code, msg, txn.get(), key, schema_pb);
799+
if (code != MetaServiceCode::OK) return;
800+
}
773801
}
774802
int64_t table_id = tablet_meta.table_id();
775803
int64_t index_id = tablet_meta.index_id();

Diff for: cloud/test/schema_kv_test.cpp

+130-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ static std::string next_rowset_id() {
4040
return std::to_string(++cnt);
4141
}
4242

43+
static void fill_schema(doris::TabletSchemaCloudPB* schema, int32_t schema_version) {
44+
schema->set_schema_version(schema_version);
45+
for (int i = 0; i < 10; ++i) {
46+
auto column = schema->add_column();
47+
column->set_unique_id(20000 + i);
48+
column->set_type("INT");
49+
}
50+
}
51+
4352
static void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id,
4453
int64_t partition_id, int64_t tablet_id, const std::string& rowset_id,
4554
int32_t schema_version) {
@@ -49,7 +58,7 @@ static void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t inde
4958
tablet->set_partition_id(partition_id);
5059
tablet->set_tablet_id(tablet_id);
5160
auto schema = tablet->mutable_schema();
52-
schema->set_schema_version(schema_version);
61+
fill_schema(schema, schema_version);
5362
auto first_rowset = tablet->add_rs_metas();
5463
first_rowset->set_rowset_id(0); // required
5564
first_rowset->set_rowset_id_v2(rowset_id);
@@ -148,6 +157,9 @@ TEST(DetachSchemaKVTest, TabletTest) {
148157
saved_tablet.set_partition_id(partition_id);
149158
saved_tablet.set_tablet_id(tablet_id);
150159
saved_tablet.mutable_schema()->set_schema_version(1);
160+
auto column = saved_tablet.mutable_schema()->add_column();
161+
column->set_unique_id(30001);
162+
column->set_type("INT");
151163
std::string tablet_key, tablet_val;
152164
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}, &tablet_key);
153165
ASSERT_TRUE(saved_tablet.SerializeToString(&tablet_val));
@@ -672,4 +684,121 @@ TEST(SchemaKVTest, InsertExistedRowsetTest) {
672684
check_get_tablet(meta_service.get(), 10005, 2);
673685
}
674686

687+
static void check_schema(MetaServiceProxy* meta_service, int64_t tablet_id,
688+
int32_t schema_version) {
689+
brpc::Controller cntl;
690+
GetTabletRequest req;
691+
GetTabletResponse res;
692+
req.set_tablet_id(tablet_id);
693+
meta_service->get_tablet(&cntl, &req, &res, nullptr);
694+
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
695+
ASSERT_TRUE(res.has_tablet_meta()) << tablet_id;
696+
EXPECT_TRUE(res.tablet_meta().has_schema()) << tablet_id;
697+
EXPECT_EQ(res.tablet_meta().schema_version(), schema_version) << tablet_id;
698+
EXPECT_EQ(res.tablet_meta().schema().column_size(), 10) << tablet_id;
699+
};
700+
701+
static void update_tablet(MetaServiceProxy* meta_service, int64_t tablet_id) {
702+
brpc::Controller cntl;
703+
UpdateTabletRequest req;
704+
UpdateTabletResponse res;
705+
706+
auto meta_info = req.add_tablet_meta_infos();
707+
meta_info->set_disable_auto_compaction(true);
708+
meta_info->set_tablet_id(tablet_id);
709+
710+
meta_service->update_tablet(&cntl, &req, &res, nullptr);
711+
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
712+
}
713+
714+
TEST(AlterSchemaKVTest, AlterDisableAutoCompactionTest) {
715+
//case 1 config::write_schema_kv = true;
716+
{
717+
auto meta_service = get_meta_service();
718+
config::write_schema_kv = true;
719+
//config::meta_schema_value_version = 0;
720+
ASSERT_NO_FATAL_FAILURE(
721+
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 0));
722+
check_get_tablet(meta_service.get(), 10004, 0);
723+
check_schema(meta_service.get(), 10004, 0);
724+
725+
//config::meta_schema_value_version = 1;
726+
ASSERT_NO_FATAL_FAILURE(
727+
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
728+
check_get_tablet(meta_service.get(), 10005, 2);
729+
730+
update_tablet(meta_service.get(), 10005);
731+
check_schema(meta_service.get(), 10005, 2);
732+
}
733+
734+
//case 2 config::write_schema_kv = false;
735+
{
736+
auto meta_service = get_meta_service();
737+
config::write_schema_kv = false;
738+
auto defer1 =
739+
std::make_unique<std::function<void()>>([]() { config::write_schema_kv = true; });
740+
741+
//config::meta_schema_value_version = 0;
742+
ASSERT_NO_FATAL_FAILURE(
743+
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 0));
744+
check_get_tablet(meta_service.get(), 10004, 0);
745+
check_schema(meta_service.get(), 10004, 0);
746+
747+
//config::meta_schema_value_version = 1;
748+
ASSERT_NO_FATAL_FAILURE(
749+
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
750+
check_get_tablet(meta_service.get(), 10005, 2);
751+
752+
update_tablet(meta_service.get(), 10005);
753+
check_schema(meta_service.get(), 10005, 2);
754+
}
755+
756+
//case 3 config::write_schema_kv = false, create tablet, config::write_schema_kv = true;
757+
{
758+
auto meta_service = get_meta_service();
759+
config::write_schema_kv = false;
760+
auto defer1 =
761+
std::make_unique<std::function<void()>>([]() { config::write_schema_kv = true; });
762+
763+
//config::meta_schema_value_version = 0;
764+
ASSERT_NO_FATAL_FAILURE(
765+
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 0));
766+
check_get_tablet(meta_service.get(), 10004, 0);
767+
check_schema(meta_service.get(), 10004, 0);
768+
769+
//config::meta_schema_value_version = 1;
770+
ASSERT_NO_FATAL_FAILURE(
771+
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
772+
check_get_tablet(meta_service.get(), 10005, 2);
773+
config::write_schema_kv = true;
774+
update_tablet(meta_service.get(), 10005);
775+
check_schema(meta_service.get(), 10005, 2);
776+
}
777+
778+
//case 4 config::write_schema_kv = false, create tablet, config::write_schema_kv = true;
779+
// meta_schema_value_version = 0, meta_schema_value_version = 1
780+
{
781+
auto meta_service = get_meta_service();
782+
config::write_schema_kv = false;
783+
auto defer1 = std::make_unique<std::function<void()>>([]() {
784+
config::write_schema_kv = true;
785+
config::meta_schema_value_version = 1;
786+
});
787+
788+
config::meta_schema_value_version = 0;
789+
ASSERT_NO_FATAL_FAILURE(
790+
create_tablet(meta_service.get(), 10001, 10002, 10003, 10004, next_rowset_id(), 0));
791+
check_get_tablet(meta_service.get(), 10004, 0);
792+
check_schema(meta_service.get(), 10004, 0);
793+
794+
config::meta_schema_value_version = 1;
795+
ASSERT_NO_FATAL_FAILURE(
796+
create_tablet(meta_service.get(), 10001, 10002, 10003, 10005, next_rowset_id(), 2));
797+
check_get_tablet(meta_service.get(), 10005, 2);
798+
config::write_schema_kv = true;
799+
update_tablet(meta_service.get(), 10005);
800+
check_schema(meta_service.get(), 10005, 2);
801+
}
802+
}
803+
675804
} // namespace doris::cloud
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_alter_property_mow") {
19+
def tableName = "test_alter_property_mow"
20+
21+
sql """ DROP TABLE IF EXISTS ${tableName} """
22+
23+
sql """
24+
CREATE TABLE IF NOT EXISTS ${tableName}
25+
(
26+
k1 INT,
27+
k2 int,
28+
v1 bigint,
29+
v2 INT,
30+
v3 varchar(32) null,
31+
)
32+
UNIQUE KEY (k1, k2)
33+
DISTRIBUTED BY HASH(k2) BUCKETS 1
34+
PROPERTIES (
35+
'replication_num' = '1'
36+
)
37+
"""
38+
39+
sql """
40+
ALTER TABLE ${tableName} SET ('disable_auto_compaction'='true');
41+
"""
42+
43+
sql """
44+
ALTER TABLE ${tableName} SET ('disable_auto_compaction'='false');
45+
"""
46+
47+
sql """
48+
insert into ${tableName} (k1, k2, v1, v2, v3) values(2, 2, 3, 4, 'a')
49+
"""
50+
51+
sql """
52+
insert into ${tableName} (k1, k2, v1, v2, v3) values(1, 2, 3, 4, 'a')
53+
"""
54+
55+
sql """
56+
insert into ${tableName} (k1, k2, v1, v2, v3) values(1, 2, 3, 4, 'a')
57+
"""
58+
59+
sql """
60+
select * from ${tableName}
61+
"""
62+
}
63+

0 commit comments

Comments
 (0)