Skip to content

Commit cd48e17

Browse files
committed
tmp
1 parent 6577550 commit cd48e17

File tree

4 files changed

+235
-121
lines changed

4 files changed

+235
-121
lines changed

be/src/cloud/cloud_schema_change_job.cpp

+18
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,15 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
477477
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
478478
<< "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version
479479
<< "-" << max_version << " new_table_id: " << _new_tablet->tablet_id();
480+
481+
tmp_rowsets_msg = "";
482+
for (const auto& [k, v] : tmp_tablet->rowset_map()) {
483+
tmp_rowsets_msg += fmt::format("version={},rowset={},txn_id={}\n", k.to_string(),
484+
v->rowset_id().to_string(), v->txn_id());
485+
}
486+
LOG_INFO("new_tablet={}, after without lock sync_rowsets, tmp_tablet's rowsets:\n{}",
487+
_new_tablet->tablet_id(), tmp_rowsets_msg);
488+
480489
if (max_version >= start_calc_delete_bitmap_version) {
481490
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
482491
{start_calc_delete_bitmap_version, max_version}, &incremental_rowsets));
@@ -498,6 +507,15 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
498507
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
499508
<< "incremental rowsets with lock, version: " << max_version + 1 << "-"
500509
<< new_max_version << " new_tablet_id: " << _new_tablet->tablet_id();
510+
511+
tmp_rowsets_msg = "";
512+
for (const auto& [k, v] : tmp_tablet->rowset_map()) {
513+
tmp_rowsets_msg += fmt::format("version={},rowset={},txn_id={}\n", k.to_string(),
514+
v->rowset_id().to_string(), v->txn_id());
515+
}
516+
LOG_INFO("new_tablet={}, after with lock sync_rowsets, tmp_tablet's rowsets:\n{}",
517+
_new_tablet->tablet_id(), tmp_rowsets_msg);
518+
501519
std::vector<RowsetSharedPtr> new_incremental_rowsets;
502520
if (new_max_version > max_version) {
503521
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql --
3+
1 1 1 1
4+
2 2 2 2
5+
3 3 3 2
6+
7+
-- !dup_key_count --
8+
9+
-- !sql --
10+
1 666 666 666
11+
10 10 10 10
12+
2 88 88 88
13+
3 30 30 30
14+
8 8 8 8
15+
9 9 9 9
16+

regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_convert_data_replace_on_new_tablet.groovy

-121
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import java.util.concurrent.TimeUnit
19+
import org.awaitility.Awaitility
20+
21+
suite("test_cloud_sc_convert_data_replaced_on_new_tablet", "nonConcurrent") {
22+
if (!isCloudMode()) {
23+
return
24+
}
25+
26+
GetDebugPoint().clearDebugPointsForAllFEs()
27+
GetDebugPoint().clearDebugPointsForAllBEs()
28+
29+
def customBeConfig = [
30+
enable_new_tablet_do_compaction : true
31+
]
32+
33+
setBeConfigTemporary(customBeConfig) {
34+
35+
def table1 = "test_cloud_sc_convert_data_replaced_on_new_tablet"
36+
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
37+
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
38+
`k1` int NOT NULL,
39+
`c1` int,
40+
`c2` int,
41+
`c3` int
42+
)UNIQUE KEY(k1)
43+
DISTRIBUTED BY HASH(k1) BUCKETS 1
44+
PROPERTIES (
45+
"enable_unique_key_merge_on_write" = "true",
46+
"disable_auto_compaction" = "true",
47+
"replication_num" = "1"); """
48+
49+
sql "insert into ${table1} values(1,1,1,1);" // 2
50+
sql "insert into ${table1} values(2,2,2,2);" // 3
51+
sql "insert into ${table1} values(3,3,3,2);" // 4
52+
sql "sync;"
53+
qt_sql "select * from ${table1} order by k1;"
54+
55+
def backends = sql_return_maparray('show backends')
56+
def tabletStats = sql_return_maparray("show tablets from ${table1};")
57+
assert tabletStats.size() == 1
58+
def tabletId = tabletStats[0].TabletId
59+
def tabletBackendId = tabletStats[0].BackendId
60+
def tabletBackend
61+
for (def be : backends) {
62+
if (be.BackendId == tabletBackendId) {
63+
tabletBackend = be
64+
break;
65+
}
66+
}
67+
logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");
68+
69+
70+
def trigger_and_wait_for_compaction = { tablet_id, int start, int end ->
71+
// trigger cumu compaction on new tablet on rowset written after alter_version to change its cumu point
72+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
73+
[tablet_id:"${tablet_id}", start_version: start, end_version: end]);
74+
75+
{
76+
logger.info("trigger cumu compaction on tablet=${tablet_id} BE.Host=${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
77+
def (code, out, err) = be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, tablet_id)
78+
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
79+
assert code == 0
80+
def compactJson = parseJson(out.trim())
81+
assert "success" == compactJson.status.toLowerCase()
82+
}
83+
84+
boolean running
85+
do {
86+
Thread.sleep(1000)
87+
def (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tablet_id)
88+
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
89+
assertEquals(code, 0)
90+
def compactionStatus = parseJson(out.trim())
91+
assertEquals("success", compactionStatus.status.toLowerCase())
92+
running = compactionStatus.run_status
93+
} while (running)
94+
}
95+
96+
trigger_and_wait_for_compaction(tabletId, 3, 4)
97+
98+
99+
try {
100+
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob.process_alter_tablet.sleep")
101+
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob.process_alter_tablet.enter.block")
102+
sql "alter table ${table1} modify column c2 varchar(100);"
103+
104+
Thread.sleep(1000)
105+
106+
tabletStats = sql_return_maparray("show tablets from ${table1};")
107+
def newTabletId = "-1"
108+
for (def stat : tabletStats) {
109+
if (stat.TabletId != tabletId) {
110+
newTabletId = stat.TabletId
111+
break
112+
}
113+
}
114+
logger.info("new_tablet_id: ${newTabletId}")
115+
116+
sql "insert into ${table1} values(1,10,10,10);" // 5
117+
sql "insert into ${table1} values(2,20,20,20);" // 6
118+
sql "insert into ${table1} values(3,30,30,30);" // 7
119+
sql "insert into ${table1} values(1,666,666,666);" // 8
120+
121+
Thread.sleep(2000)
122+
123+
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob.process_alter_tablet.enter.block")
124+
125+
sql "insert into ${table1} values(10,99,99,99);" // 9
126+
sql "insert into ${table1} values(2,88,88,88);" // 10
127+
sql "insert into ${table1} values(8,8,8,8);" // 11
128+
sql "insert into ${table1} values(9,9,9,9);" // 12
129+
sql "insert into ${table1} values(10,10,10,10);" // 13
130+
131+
Thread.sleep(2000)
132+
133+
trigger_and_wait_for_compaction(newTabletId, 11, 12)
134+
135+
// // trigger cumu compaction on new tablet on rowset written after alter_version to change its cumu point
136+
// GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
137+
// [tablet_id:"${newTabletId}", start_version: 11, end_version: 12]);
138+
139+
// {
140+
// logger.info("trigger cumu compaction on tablet=${newTabletId} BE.Host=${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
141+
// def (code, out, err) = be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, newTabletId)
142+
// logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
143+
// assert code == 0
144+
// def compactJson = parseJson(out.trim())
145+
// assert "success" == compactJson.status.toLowerCase()
146+
// }
147+
148+
// boolean running
149+
// do {
150+
// Thread.sleep(1000)
151+
// def (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, newTabletId)
152+
// logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
153+
// assertEquals(code, 0)
154+
// def compactionStatus = parseJson(out.trim())
155+
// assertEquals("success", compactionStatus.status.toLowerCase())
156+
// running = compactionStatus.run_status
157+
// } while (running)
158+
159+
Thread.sleep(1000)
160+
161+
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob.process_alter_tablet.sleep")
162+
163+
// // just to trigger sync_rowsets() on base tablet
164+
// // and let the process of converting historical rowsets happens after base tablet's max version is updated
165+
// // and before the base tablet's delete bitmap is updated
166+
// GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_delete_bitmap.before.merge_delete_bitmap_to_local")
167+
// def t1 = Thread.start { sql "select * from ${table1};" }
168+
169+
// // Thread.sleep(1000)
170+
// // alter version should be 5
171+
// GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob.process_alter_tablet.after.base_tablet.sync_rowsets")
172+
173+
// // // let load 1 finish on new tablet
174+
// // Thread.sleep(1000)
175+
// // GetDebugPoint().disableDebugPointForAllBEs("CloudTabletCalcDeleteBitmapTask::handle.enter.block")
176+
// // t1.join()
177+
178+
// Thread.sleep(1000)
179+
// GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
180+
waitForSchemaChangeDone {
181+
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """
182+
time 1000
183+
}
184+
185+
// GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_delete_bitmap.before.merge_delete_bitmap_to_local")
186+
// t1.join()
187+
188+
// sql "insert into ${table1} values(1,77,77,77);"
189+
190+
qt_dup_key_count "select k1,count() as cnt from ${table1} group by k1 having cnt>1;"
191+
order_qt_sql "select * from ${table1};"
192+
193+
} catch(Exception e) {
194+
logger.info(e.getMessage())
195+
throw e
196+
} finally {
197+
GetDebugPoint().clearDebugPointsForAllBEs()
198+
GetDebugPoint().clearDebugPointsForAllFEs()
199+
}
200+
}
201+
}

0 commit comments

Comments
 (0)