Skip to content

Commit 8d449b1

Browse files
dantengskyBohuTANG
andauthored
chore(backport): Backport replace into refactoring to v1.2.636 (#17408)
* refactor: rename components of the `replace into` impl (#17355) refactor: rename components of `replace into` Initially, `REPLACE INTO` was designed as a simplified implementation of `MERGE INTO`, with the expectation that it would gradually evolve into a full-fledged implementation of `MERGE INTO`, as a result, many `MERGE INTO` related terms exist in the components of `REPLACE INTO` impl. However, `MERGE INTO` now has its own independent impl, the remaining `MERGE INTO` related terms in the impl of `REPLACE INTO` can be misleading. Therefore, in this PR, we have renamed all `MERGE INTO` related terms in the implof `REPLACE INTO` to avoid confusion. * feat: introduce block meta cache (#17360) * feat: introduce BlockMetaCache * tweak sys tbl `system.cache` and configs * cargo fmt * update ut gloden file * fix: set cache cap of `memory_cache_block_meta` * tweak cargo audit.toml * chore: refine copy into tests (#17413) * test(stage): decouple `COPY INTO` tests from output data byte size The tests should not assert the exact byte size of data outputs, as this value may change across different versions of `databend-query` due to meta data of data block contains the version the `databend-query`, which is variant lengthed. This change replaces byte size checks with content validation, ensuring test stability while maintaining functional correctness. * test(stage): decouple `COPY INTO` tests from output data byte size * test(stage): decouple `COPY INTO` tests from output data byte size * test(stage): decouple `COPY INTO` tests from output data byte size * test(stage): decouple `COPY INTO` tests from output data byte size * test(stage): decouple `COPY INTO` tests from output data byte size * test(stage): decouple `COPY INTO` tests from output data byte size * test(stage): decouple `COPY INTO` tests from output data byte size 05_0001_set_var.sh * test(stage): decouple `COPY INTO` tests from output data byte size 18_0002_ownership_cover * decouple `COPY INTO` tests from output data byte size 18_0007_privilege_access * decouple `COPY INTO` tests from output data byte size 20_0015_compact_hook_stas_issue_13947 * decouple `COPY INTO` tests from output data byte size * revert scripts/ci/deploy/config/databend-query-node-1.toml * tweak test case parquet_missing_uuid ignore result-set of `copy into` when preparing data. * tweak 00_0015_unload_output, 00_0012_stage_with_connection * refactor: refine `replace into` by caching individual BlockMeta (#17368) * remove dead code * update crossbeam-channel to 0.5.13 Title: crossbeam-channel: double free on Drop Date: 2025-04-08 ID: RUSTSEC-2025-0024 URL: https://rustsec.org/advisories/RUSTSEC-2025-0024 * update audit.toml * tweak logic test 05_0001_set_var bendsql used in CI has been upgraded which uses a different format to display timestamps. * chore: improve license validation log message (#17967) * chore: improve license validation log message Change license validation error log to ERROR level. Several enterprise features don't affect functional correctness but impact performance significantly. Users have experienced unexplained performance degradation due to expired licenses. This change helps users identify such issues faster. * improve the license error message * tweak log msg --------- Co-authored-by: BohuTANG <[email protected]> --------- Co-authored-by: BohuTANG <[email protected]>
1 parent 63cf08b commit 8d449b1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+341
-187
lines changed

โ€Ž.cargo/audit.toml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ ignore = [
3131
# borsh
3232
# Parsing borsh messages with ZST which are not-copy/clone is unsound
3333
"RUSTSEC-2023-0033",
34+
# atomic-polyfill: unmaintained
35+
"RUSTSEC-2023-0089",
3436
# yaml-rust is unmaintained.
3537
"RUSTSEC-2024-0320",
3638
# Degradation of service in h2 servers with CONTINUATION Flood(Wait for tonic to fix)
@@ -47,12 +49,40 @@ ignore = [
4749
"RUSTSEC-2024-0351",
4850
# gix-path: improperly resolves configuration path reported by Git
4951
"RUSTSEC-2024-0371",
52+
# `atty` is unmaintained
53+
"RUSTSEC-2024-0375",
5054
# Remotely exploitable Denial of Service in Tonic, ignored temporarily
5155
"RUSTSEC-2024-0376",
56+
# `instant` is unmaintained
57+
"RUSTSEC-2024-0384",
58+
# `derivative` is unmaintained; consider using an alternative
59+
"RUSTSEC-2024-0388",
5260
#rustls network-reachable panic in `Acceptor::accept`
5361
"RUSTSEC-2024-0399",
5462
# `idna` accepts Punycode labels that do not produce any non-ASCII when decoded
5563
"RUSTSEC-2024-0421",
64+
# protobuf: Crash due to uncontrolled recursion in protobuf crate
65+
"RUSTSEC-2024-0437",
66+
# wasmtime: Wasmtime doesn't fully sandbox all the Windows device filenames
67+
"RUSTSEC-2024-0438",
68+
# wasmtime: Race condition could lead to WebAssembly control-flow integrity and type safety violations
69+
"RUSTSEC-2024-0439",
5670
# gix-worktree-state nonexclusive checkout sets executable files world-writable
5771
"RUSTSEC-2025-0001",
72+
# `fast-float`: Segmentation fault due to lack of bound check
73+
"RUSTSEC-2025-0003",
74+
# ssl::select_next_proto use after free
75+
"RUSTSEC-2025-0004",
76+
# hickory-proto: Hickory DNS failure to verify self-signed RRSIG for DNSKEYs
77+
"RUSTSEC-2025-0006",
78+
# ring: Some AES functions may panic when overflow checking is enabled
79+
"RUSTSEC-2025-0009",
80+
# `backoff` is unmainted.
81+
"RUSTSEC-2025-0012",
82+
# pyo3: Risk of buffer overflow in `PyString::from_object`
83+
"RUSTSEC-2025-0020",
84+
# gix-features: SHA-1 collision attacks are not detected
85+
"RUSTSEC-2025-0021",
86+
# openssl: Use-After-Free in `Md::fetch` and `Cipher::fetch`
87+
"RUSTSEC-2025-0022",
5888
]

โ€ŽCargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/query/config/src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2701,6 +2701,15 @@ pub struct CacheConfig {
27012701
)]
27022702
pub block_meta_count: u64,
27032703

2704+
/// Max number of **segment** which all of its block meta will be cached.
2705+
/// Note that a segment may contain multiple block metadata entries.
2706+
#[clap(
2707+
long = "cache-segment-block-metas-count",
2708+
value_name = "VALUE",
2709+
default_value = "0"
2710+
)]
2711+
pub segment_block_metas_count: u64,
2712+
27042713
/// Max number of cached table statistic meta
27052714
#[clap(
27062715
long = "cache-table-meta-statistic-count",
@@ -2999,6 +3008,7 @@ mod cache_config_converters {
29993008
table_meta_snapshot_count: value.table_meta_snapshot_count,
30003009
table_meta_segment_bytes: value.table_meta_segment_bytes,
30013010
block_meta_count: value.block_meta_count,
3011+
segment_block_metas_count: value.segment_block_metas_count,
30023012
table_meta_statistic_count: value.table_meta_statistic_count,
30033013
enable_table_index_bloom: value.enable_table_bloom_index_cache,
30043014
table_bloom_index_meta_count: value.table_bloom_index_meta_count,
@@ -3043,6 +3053,7 @@ mod cache_config_converters {
30433053
table_data_deserialized_data_bytes: value.table_data_deserialized_data_bytes,
30443054
table_data_deserialized_memory_ratio: value.table_data_deserialized_memory_ratio,
30453055
table_meta_segment_count: None,
3056+
segment_block_metas_count: value.segment_block_metas_count,
30463057
}
30473058
}
30483059
}

โ€Žsrc/query/config/src/inner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,10 @@ pub struct CacheConfig {
536536
/// Max number of cached table block meta
537537
pub block_meta_count: u64,
538538

539+
/// Max number of **segment** which all of its block meta will be cached.
540+
/// Note that a segment may contain multiple block metadata entries.
541+
pub segment_block_metas_count: u64,
542+
539543
/// Max number of cached table segment
540544
pub table_meta_statistic_count: u64,
541545

@@ -683,6 +687,7 @@ impl Default for CacheConfig {
683687
table_meta_snapshot_count: 256,
684688
table_meta_segment_bytes: 1073741824,
685689
block_meta_count: 0,
690+
segment_block_metas_count: 0,
686691
table_meta_statistic_count: 256,
687692
enable_table_index_bloom: true,
688693
table_bloom_index_meta_count: 3000,

โ€Žsrc/query/service/src/interpreters/interpreter.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_exception::ErrorCode;
2828
use databend_common_exception::Result;
2929
use databend_common_exception::ResultExt;
3030
use databend_common_expression::SendableDataBlockStream;
31+
use databend_common_license::license_manager::get_license_manager;
3132
use databend_common_pipeline_core::always_callback;
3233
use databend_common_pipeline_core::processors::PlanProfile;
3334
use databend_common_pipeline_core::ExecutionInfo;
@@ -91,6 +92,20 @@ pub trait Interpreter: Sync + Send {
9192

9293
ctx.set_status_info("building pipeline");
9394
ctx.check_aborting().with_context(make_error)?;
95+
96+
match get_license_manager()
97+
.manager
98+
.parse_license(ctx.get_license_key().as_str())
99+
{
100+
Ok(_) => (),
101+
Err(e) => {
102+
log::error!(
103+
"[Interpreter] CRITICAL ALERT: License validation FAILED - enterprise features DISABLED, System may operate in DEGRADED MODE with LIMITED CAPABILITIES and REDUCED PERFORMANCE. Please contact us at https://www.databend.com/contact-us/ or email [email protected] to restore full functionality: {}",
104+
e
105+
);
106+
}
107+
};
108+
94109
if self.is_ddl() {
95110
CommitInterpreter::try_create(ctx.clone())?
96111
.execute2()
@@ -175,7 +190,7 @@ fn log_query_start(ctx: &QueryContext) {
175190
}
176191

177192
if let Err(error) = InterpreterQueryLog::log_start(ctx, now, None) {
178-
error!("interpreter.start.error: {:?}", error)
193+
error!("[Interpreter] Query start logging failed: {:?}", error)
179194
}
180195
}
181196

@@ -192,7 +207,7 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
192207
}
193208

194209
if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) {
195-
error!("interpreter.finish.error: {:?}", error)
210+
error!("[Interpreter] Query finish logging failed: {:?}", error)
196211
}
197212
}
198213

โ€Žsrc/query/service/tests/it/storages/testdata/configs_table_basic.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
1515
| 'cache' | 'inverted_index_filter_memory_ratio' | '0' | '' |
1616
| 'cache' | 'inverted_index_filter_size' | '2147483648' | '' |
1717
| 'cache' | 'inverted_index_meta_count' | '3000' | '' |
18+
| 'cache' | 'segment_block_metas_count' | '0' | '' |
1819
| 'cache' | 'table_bloom_index_filter_count' | '0' | '' |
1920
| 'cache' | 'table_bloom_index_filter_size' | '2147483648' | '' |
2021
| 'cache' | 'table_bloom_index_meta_count' | '3000' | '' |

โ€Žsrc/query/storages/common/cache/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async-backtrace = { workspace = true }
2626
async-trait = { workspace = true }
2727
bytes = { workspace = true }
2828
crc32fast = "1.3.2"
29-
crossbeam-channel = "0.5.6"
29+
crossbeam-channel = "0.5.15"
3030
hex = "0.4.3"
3131
log = { workspace = true }
3232
parking_lot = { workspace = true }

โ€Žsrc/query/storages/common/cache/src/caches.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,15 @@ use crate::InMemoryLruCache;
3535
/// In memory object cache of SegmentInfo
3636
pub type CompactSegmentInfoCache = InMemoryLruCache<CompactSegmentInfo>;
3737

38-
pub type BlockMetaCache = InMemoryLruCache<Vec<Arc<BlockMeta>>>;
38+
/// In-memory cache for all the block metadata of individual segments.
39+
///
40+
/// Note that this cache may be memory-intensive, as each item of this cache
41+
/// contains ALL the BlockMeta of a segment, for well-compacted segment, the
42+
/// number of BlockMeta might be 1000 ~ 2000.
43+
pub type SegmentBlockMetasCache = InMemoryLruCache<Vec<Arc<BlockMeta>>>;
44+
45+
/// In-memory cache of individual BlockMeta.
46+
pub type BlockMetaCache = InMemoryLruCache<BlockMeta>;
3947

4048
/// In memory object cache of TableSnapshot
4149
pub type TableSnapshotCache = InMemoryLruCache<TableSnapshot>;
@@ -95,9 +103,9 @@ impl CachedObject<TableSnapshot> for TableSnapshot {
95103
}
96104

97105
impl CachedObject<Vec<Arc<BlockMeta>>> for Vec<Arc<BlockMeta>> {
98-
type Cache = BlockMetaCache;
106+
type Cache = SegmentBlockMetasCache;
99107
fn cache() -> Option<Self::Cache> {
100-
CacheManager::instance().get_block_meta_cache()
108+
CacheManager::instance().get_segment_block_metas_cache()
101109
}
102110
}
103111

@@ -187,6 +195,15 @@ impl From<Vec<Arc<BlockMeta>>> for CacheValue<Vec<Arc<BlockMeta>>> {
187195
}
188196
}
189197

198+
impl From<BlockMeta> for CacheValue<BlockMeta> {
199+
fn from(value: BlockMeta) -> Self {
200+
CacheValue {
201+
inner: Arc::new(value),
202+
mem_bytes: 0,
203+
}
204+
}
205+
}
206+
190207
impl From<TableSnapshot> for CacheValue<TableSnapshot> {
191208
fn from(value: TableSnapshot) -> Self {
192209
CacheValue {

โ€Žsrc/query/storages/common/cache/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub use cache::Unit;
2626
pub use caches::BlockMetaCache;
2727
pub use caches::CacheValue;
2828
pub use caches::CachedObject;
29+
pub use caches::SegmentBlockMetasCache;
2930
pub use caches::SizedColumnArray;
3031
pub use manager::CacheManager;
3132
pub use providers::DiskCacheError;

โ€Žsrc/query/storages/common/cache/src/manager.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::caches::FileMetaDataCache;
3434
use crate::caches::InvertedIndexFileCache;
3535
use crate::caches::InvertedIndexMetaCache;
3636
use crate::caches::PrunePartitionsCache;
37+
use crate::caches::SegmentBlockMetasCache;
3738
use crate::caches::TableSnapshotCache;
3839
use crate::caches::TableSnapshotStatisticCache;
3940
use crate::InMemoryLruCache;
@@ -78,6 +79,7 @@ pub struct CacheManager {
7879
parquet_meta_data_cache: CacheSlot<FileMetaDataCache>,
7980
table_data_cache: CacheSlot<TableDataCache>,
8081
in_memory_table_data_cache: CacheSlot<ColumnArrayCache>,
82+
segment_block_metas_cache: CacheSlot<SegmentBlockMetasCache>,
8183
block_meta_cache: CacheSlot<BlockMetaCache>,
8284
}
8385

@@ -151,6 +153,7 @@ impl CacheManager {
151153
table_statistic_cache: CacheSlot::new(None),
152154
table_data_cache,
153155
in_memory_table_data_cache,
156+
segment_block_metas_cache: CacheSlot::new(None),
154157
block_meta_cache: CacheSlot::new(None),
155158
}));
156159
} else {
@@ -201,8 +204,14 @@ impl CacheManager {
201204
DEFAULT_PARQUET_META_DATA_CACHE_ITEMS,
202205
);
203206

207+
let segment_block_metas_cache = Self::new_items_cache_slot(
208+
MEMORY_CACHE_SEGMENT_BLOCK_METAS,
209+
config.block_meta_count as usize,
210+
);
211+
204212
let block_meta_cache = Self::new_items_cache_slot(
205213
MEMORY_CACHE_BLOCK_META,
214+
// TODO replace this config
206215
config.block_meta_count as usize,
207216
);
208217

@@ -217,8 +226,9 @@ impl CacheManager {
217226
table_statistic_cache,
218227
table_data_cache,
219228
in_memory_table_data_cache,
220-
block_meta_cache,
229+
segment_block_metas_cache,
221230
parquet_meta_data_cache,
231+
block_meta_cache,
222232
}));
223233
}
224234

@@ -270,6 +280,9 @@ impl CacheManager {
270280
MEMORY_CACHE_TABLE_SNAPSHOT => {
271281
Self::set_items_capacity(&self.table_snapshot_cache, new_capacity, name);
272282
}
283+
MEMORY_CACHE_SEGMENT_BLOCK_METAS => {
284+
Self::set_items_capacity(&self.segment_block_metas_cache, new_capacity, name);
285+
}
273286
MEMORY_CACHE_BLOCK_META => {
274287
Self::set_items_capacity(&self.block_meta_cache, new_capacity, name);
275288
}
@@ -311,6 +324,10 @@ impl CacheManager {
311324
}
312325
}
313326

327+
pub fn get_segment_block_metas_cache(&self) -> Option<SegmentBlockMetasCache> {
328+
self.segment_block_metas_cache.get()
329+
}
330+
314331
pub fn get_block_meta_cache(&self) -> Option<BlockMetaCache> {
315332
self.block_meta_cache.get()
316333
}
@@ -426,4 +443,6 @@ const MEMORY_CACHE_BLOOM_INDEX_FILTER: &str = "memory_cache_bloom_index_filter";
426443
const MEMORY_CACHE_COMPACT_SEGMENT_INFO: &str = "memory_cache_compact_segment_info";
427444
const MEMORY_CACHE_TABLE_STATISTICS: &str = "memory_cache_table_statistics";
428445
const MEMORY_CACHE_TABLE_SNAPSHOT: &str = "memory_cache_table_snapshot";
446+
const MEMORY_CACHE_SEGMENT_BLOCK_METAS: &str = "memory_cache_segment_block_metas";
447+
429448
const MEMORY_CACHE_BLOCK_META: &str = "memory_cache_block_meta";

โ€Žsrc/query/storages/fuse/src/operations/replace.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use rand::prelude::SliceRandom;
2929
use crate::io::BlockBuilder;
3030
use crate::io::ReadSettings;
3131
use crate::operations::mutation::SegmentIndex;
32-
use crate::operations::replace_into::MergeIntoOperationAggregator;
32+
use crate::operations::replace_into::ReplaceIntoOperationAggregator;
3333
use crate::FuseTable;
3434

3535
impl FuseTable {
@@ -102,7 +102,7 @@ impl FuseTable {
102102
let read_settings = ReadSettings::from_ctx(&ctx)?;
103103
let mut items = Vec::with_capacity(num_partition);
104104
for chunk_of_segment_locations in chunks {
105-
let item = MergeIntoOperationAggregator::try_create(
105+
let item = ReplaceIntoOperationAggregator::try_create(
106106
ctx.clone(),
107107
on_conflicts.clone(),
108108
bloom_filter_column_indexes.clone(),

โ€Žsrc/query/storages/fuse/src/operations/replace_into/meta/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod merge_into_operation_meta;
15+
mod replace_into_operation_meta;
1616

17-
pub use merge_into_operation_meta::*;
17+
pub use replace_into_operation_meta::*;

โ€Žsrc/query/storages/fuse/src/operations/replace_into/meta/merge_into_operation_meta.rs renamed to โ€Žsrc/query/storages/fuse/src/operations/replace_into/meta/replace_into_operation_meta.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,8 @@ use databend_common_expression::BlockMetaInfoDowncast;
1919
use databend_common_expression::DataBlock;
2020
use databend_common_expression::Scalar;
2121

22-
// This mod need to be refactored, since it not longer aiming to be
23-
// used in the implementation of `MERGE INTO` statement in the future.
24-
//
25-
// unfortunately, distributed `replace-into` is being implemented in parallel,
26-
// to avoid the potential heavy merge conflicts, the refactoring is postponed.
27-
2822
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
29-
pub enum MergeIntoOperation {
23+
pub enum ReplaceIntoOperation {
3024
Delete(Vec<DeletionByColumn>),
3125
None,
3226
}
@@ -43,8 +37,8 @@ pub struct DeletionByColumn {
4337
pub bloom_hashes: Vec<RowBloomHashes>,
4438
}
4539

46-
#[typetag::serde(name = "merge_into_operation_meta")]
47-
impl BlockMetaInfo for MergeIntoOperation {
40+
#[typetag::serde(name = "replace_into_operation_meta")]
41+
impl BlockMetaInfo for ReplaceIntoOperation {
4842
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
4943
Self::downcast_ref_from(info).is_some_and(|other| self == other)
5044
}
@@ -54,16 +48,16 @@ impl BlockMetaInfo for MergeIntoOperation {
5448
}
5549
}
5650

57-
impl TryFrom<DataBlock> for MergeIntoOperation {
51+
impl TryFrom<DataBlock> for ReplaceIntoOperation {
5852
type Error = ErrorCode;
5953

6054
fn try_from(value: DataBlock) -> Result<Self, Self::Error> {
6155
let meta = value.get_owned_meta().ok_or_else(|| {
6256
ErrorCode::Internal(
63-
"convert MergeIntoOperation from data block failed, no block meta found",
57+
"convert ReplaceIntoOperation from data block failed, no block meta found",
6458
)
6559
})?;
66-
MergeIntoOperation::downcast_from(meta).ok_or_else(|| {
60+
ReplaceIntoOperation::downcast_from(meta).ok_or_else(|| {
6761
ErrorCode::Internal(
6862
"downcast block meta to MutationIntoOperation failed, type mismatch",
6963
)

โ€Žsrc/query/storages/fuse/src/operations/replace_into/mutator/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
mod column_hash;
1616
mod deletion_accumulator;
17-
mod merge_into_mutator;
18-
mod mutator_replace_into;
17+
mod replace_into_mutator;
18+
mod replace_into_operation_agg;
1919

2020
pub use column_hash::row_hash_of_columns;
2121
pub use deletion_accumulator::BlockDeletionKeys;
2222
pub use deletion_accumulator::DeletionAccumulator;
23-
pub use merge_into_mutator::MergeIntoOperationAggregator;
24-
pub use mutator_replace_into::ReplaceIntoMutator;
23+
pub use replace_into_mutator::ReplaceIntoMutator;
24+
pub use replace_into_operation_agg::ReplaceIntoOperationAggregator;

0 commit comments

Comments
ย (0)