Skip to content

Commit 602e159

Browse files
authored
refactor(query): Optimizations for Variant type column filter (#17646)
* Refactor(query): Optimizations for Variant type column filter * fix tests * fix * fix * fix tests
1 parent 980a16d commit 602e159

File tree

9 files changed

+146
-62
lines changed

9 files changed

+146
-62
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
635635
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
636636
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "c149502" }
637637
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
638-
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "37d07f0" }
638+
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "1a53512" }
639639
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.2.3" }
640640
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
641641
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.9" }

src/query/functions/src/scalars/variant.rs

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,13 @@ pub fn register(registry: &mut FunctionRegistry) {
374374
}
375375
match RawJsonb::new(val).get_by_name(name, false) {
376376
Ok(Some(v)) => {
377-
if v.as_raw().is_null().unwrap_or_default() {
377+
let raw_jsonb = v.as_raw();
378+
if let Ok(Some(s)) = raw_jsonb.as_str() {
379+
output.push(&s);
380+
} else if raw_jsonb.is_null().unwrap_or_default() {
378381
output.push_null();
379382
} else {
380-
let json_str = cast_to_string(v.as_ref());
383+
let json_str = raw_jsonb.to_string();
381384
output.push(&json_str);
382385
}
383386
}
@@ -405,10 +408,13 @@ pub fn register(registry: &mut FunctionRegistry) {
405408
} else {
406409
match RawJsonb::new(val).get_by_index(idx as usize) {
407410
Ok(Some(v)) => {
408-
if v.as_raw().is_null().unwrap_or_default() {
411+
let raw_jsonb = v.as_raw();
412+
if let Ok(Some(s)) = raw_jsonb.as_str() {
413+
output.push(&s);
414+
} else if raw_jsonb.is_null().unwrap_or_default() {
409415
output.push_null();
410416
} else {
411-
let json_str = cast_to_string(v.as_ref());
417+
let json_str = raw_jsonb.to_string();
412418
output.push(&json_str);
413419
}
414420
}
@@ -595,11 +601,14 @@ pub fn register(registry: &mut FunctionRegistry) {
595601
Ok(json_path) => {
596602
match RawJsonb::new(&buf).select_value_by_path(&json_path) {
597603
Ok(owned_jsonb_opt) => match owned_jsonb_opt {
598-
Some(owned_jsonb) => {
599-
if owned_jsonb.as_raw().is_null().unwrap_or_default() {
604+
Some(v) => {
605+
let raw_jsonb = v.as_raw();
606+
if let Ok(Some(s)) = raw_jsonb.as_str() {
607+
output.push(&s);
608+
} else if raw_jsonb.is_null().unwrap_or_default() {
600609
output.push_null();
601610
} else {
602-
let json_str = cast_to_string(owned_jsonb.as_ref());
611+
let json_str = raw_jsonb.to_string();
603612
output.push(&json_str);
604613
}
605614
}
@@ -1061,10 +1070,13 @@ pub fn register(registry: &mut FunctionRegistry) {
10611070
return;
10621071
}
10631072
}
1064-
if RawJsonb::new(val).is_null().unwrap_or_default() {
1073+
let raw_jsonb = RawJsonb::new(val);
1074+
if let Ok(Some(s)) = raw_jsonb.as_str() {
1075+
output.push(&s);
1076+
} else if raw_jsonb.is_null().unwrap_or_default() {
10651077
output.push_null();
10661078
} else {
1067-
let json_str = cast_to_string(val);
1079+
let json_str = raw_jsonb.to_string();
10681080
output.push(&json_str);
10691081
}
10701082
},
@@ -1082,10 +1094,13 @@ pub fn register(registry: &mut FunctionRegistry) {
10821094
return;
10831095
}
10841096
}
1085-
if RawJsonb::new(val).is_null().unwrap_or_default() {
1097+
let raw_jsonb = RawJsonb::new(val);
1098+
if let Ok(Some(s)) = raw_jsonb.as_str() {
1099+
output.push(&s);
1100+
} else if raw_jsonb.is_null().unwrap_or_default() {
10861101
output.push_null();
10871102
} else {
1088-
let json_str = cast_to_string(val);
1103+
let json_str = raw_jsonb.to_string();
10891104
output.push(&json_str);
10901105
}
10911106
},
@@ -1102,12 +1117,15 @@ pub fn register(registry: &mut FunctionRegistry) {
11021117
return;
11031118
}
11041119
}
1105-
if RawJsonb::new(val).is_null().unwrap_or_default() {
1120+
let raw_jsonb = RawJsonb::new(val);
1121+
if raw_jsonb.is_null().unwrap_or_default() {
11061122
output.push_null();
11071123
return;
11081124
}
1109-
match cast_to_str(val)
1125+
match raw_jsonb
1126+
.as_str()
11101127
.map_err(|e| format!("{e}"))
1128+
.and_then(|r| r.ok_or(format!("invalid json type")))
11111129
.and_then(|s| {
11121130
string_to_date(s.as_bytes(), &ctx.func_ctx.tz).map_err(|e| e.message())
11131131
})
@@ -1137,8 +1155,11 @@ pub fn register(registry: &mut FunctionRegistry) {
11371155
return;
11381156
}
11391157
}
1140-
match cast_to_str(val)
1158+
let raw_jsonb = RawJsonb::new(val);
1159+
match raw_jsonb
1160+
.as_str()
11411161
.map_err(|e| format!("{e}"))
1162+
.and_then(|r| r.ok_or(format!("invalid json type")))
11421163
.and_then(|s| {
11431164
string_to_date(s.as_bytes(), &ctx.func_ctx.tz).map_err(|e| e.message())
11441165
})
@@ -1165,13 +1186,18 @@ pub fn register(registry: &mut FunctionRegistry) {
11651186
return;
11661187
}
11671188
}
1168-
if RawJsonb::new(val).is_null().unwrap_or_default() {
1189+
let raw_jsonb = RawJsonb::new(val);
1190+
if raw_jsonb.is_null().unwrap_or_default() {
11691191
output.push_null();
11701192
return;
11711193
}
1172-
match cast_to_str(val).map_err(|e| format!("{e}")).and_then(|s| {
1173-
string_to_timestamp(s.as_bytes(), &ctx.func_ctx.tz).map_err(|e| e.message())
1174-
}) {
1194+
match raw_jsonb
1195+
.as_str()
1196+
.map_err(|e| format!("{e}"))
1197+
.and_then(|r| r.ok_or(format!("invalid json type")))
1198+
.and_then(|s| {
1199+
string_to_timestamp(s.as_bytes(), &ctx.func_ctx.tz).map_err(|e| e.message())
1200+
}) {
11751201
Ok(ts) => output.push(ts.timestamp().as_microsecond()),
11761202
Err(e) => {
11771203
ctx.set_error(
@@ -1196,9 +1222,15 @@ pub fn register(registry: &mut FunctionRegistry) {
11961222
return;
11971223
}
11981224
}
1199-
match cast_to_str(val).map_err(|e| format!("{e}")).and_then(|s| {
1200-
string_to_timestamp(s.as_bytes(), &ctx.func_ctx.tz).map_err(|e| e.message())
1201-
}) {
1225+
1226+
let raw_jsonb = RawJsonb::new(val);
1227+
match raw_jsonb
1228+
.as_str()
1229+
.map_err(|e| format!("{e}"))
1230+
.and_then(|r| r.ok_or(format!("invalid json type")))
1231+
.and_then(|s| {
1232+
string_to_timestamp(s.as_bytes(), &ctx.func_ctx.tz).map_err(|e| e.message())
1233+
}) {
12021234
Ok(ts) => output.push(ts.timestamp().as_microsecond()),
12031235
Err(_) => {
12041236
output.push_null();
@@ -2134,8 +2166,13 @@ fn get_by_keypath_fn(
21342166
Ok(Some(res)) => {
21352167
match &mut builder {
21362168
ColumnBuilder::String(builder) => {
2137-
let json_str = cast_to_string(res.as_ref());
2138-
builder.put_str(&json_str);
2169+
let raw_jsonb = res.as_raw();
2170+
if let Ok(Some(s)) = raw_jsonb.as_str() {
2171+
builder.put_str(&s);
2172+
} else {
2173+
let json_str = raw_jsonb.to_string();
2174+
builder.put_str(&json_str);
2175+
}
21392176
}
21402177
ColumnBuilder::Variant(builder) => {
21412178
builder.put_slice(res.as_ref());
@@ -2461,30 +2498,6 @@ fn json_object_pick_or_delete_fn(
24612498
}
24622499
}
24632500

2464-
// Extract string for string type, other types convert to JSON string.
2465-
fn cast_to_string(v: &[u8]) -> String {
2466-
let raw_jsonb = RawJsonb::new(v);
2467-
match raw_jsonb.to_str() {
2468-
Ok(v) => v,
2469-
Err(_) => raw_jsonb.to_string(),
2470-
}
2471-
}
2472-
2473-
fn cast_to_str(v: &[u8]) -> Result<String, jsonb::Error> {
2474-
match RawJsonb::new(v).to_str() {
2475-
Ok(val) => Ok(val),
2476-
Err(err) => {
2477-
if err.to_string() == "InvalidJsonb" {
2478-
let s = unsafe { std::str::from_utf8_unchecked(v) };
2479-
let owned_jsonb = s.parse::<OwnedJsonb>()?;
2480-
let raw_jsonb = owned_jsonb.as_raw();
2481-
return raw_jsonb.to_str();
2482-
}
2483-
Err(err)
2484-
}
2485-
}
2486-
}
2487-
24882501
fn cast_to_bool(v: &[u8]) -> Result<bool, jsonb::Error> {
24892502
match RawJsonb::new(v).to_bool() {
24902503
Ok(val) => Ok(val),

src/query/sql/src/planner/semantic/type_check.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2979,6 +2979,59 @@ impl<'a> TypeChecker<'a> {
29792979
let name = op.to_func_name();
29802980
self.resolve_function(span, name.as_str(), vec![], &[left, right])
29812981
}
2982+
BinaryOperator::Eq | BinaryOperator::NotEq => {
2983+
let name = op.to_func_name();
2984+
let box (res, ty) =
2985+
self.resolve_function(span, name.as_str(), vec![], &[left, right])?;
2986+
// When a variant type column is compared with a scalar string value,
2987+
// we try to cast the scalar string value to variant type,
2988+
// because casting variant column data is a time-consuming operation.
2989+
if let ScalarExpr::FunctionCall(ref func) = res {
2990+
if func.arguments.len() != 2 {
2991+
return Ok(Box::new((res, ty)));
2992+
}
2993+
let arg0 = &func.arguments[0];
2994+
let arg1 = &func.arguments[1];
2995+
let (constant_arg_index, constant_arg) = match (arg0, arg1) {
2996+
(ScalarExpr::ConstantExpr(_), _)
2997+
if arg1.data_type()?.remove_nullable() == DataType::Variant
2998+
&& !arg1.used_columns().is_empty()
2999+
&& arg0.data_type()? == DataType::String =>
3000+
{
3001+
(0, arg0)
3002+
}
3003+
(_, ScalarExpr::ConstantExpr(_))
3004+
if arg0.data_type()?.remove_nullable() == DataType::Variant
3005+
&& !arg0.used_columns().is_empty()
3006+
&& arg1.data_type()? == DataType::String =>
3007+
{
3008+
(1, arg1)
3009+
}
3010+
_ => {
3011+
return Ok(Box::new((res, ty)));
3012+
}
3013+
};
3014+
3015+
let wrap_new_arg = ScalarExpr::FunctionCall(FunctionCall {
3016+
span: func.span,
3017+
func_name: "to_variant".to_string(),
3018+
params: vec![],
3019+
arguments: vec![constant_arg.clone()],
3020+
});
3021+
let mut new_arguments = func.arguments.clone();
3022+
new_arguments[constant_arg_index] = wrap_new_arg;
3023+
3024+
let new_func = ScalarExpr::FunctionCall(FunctionCall {
3025+
span: func.span,
3026+
func_name: func.func_name.clone(),
3027+
params: func.params.clone(),
3028+
arguments: new_arguments,
3029+
});
3030+
3031+
return Ok(Box::new((new_func, ty)));
3032+
}
3033+
Ok(Box::new((res, ty)))
3034+
}
29823035
other => {
29833036
let name = other.to_func_name();
29843037
self.resolve_function(span, name.as_str(), vec![], &[left, right])

src/query/storages/common/index/src/bloom_index.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ impl BloomIndex {
246246
for val in column.iter() {
247247
if let ScalarRef::Variant(v) = val {
248248
let raw_jsonb = RawJsonb::new(v);
249-
if let Ok(str_val) = raw_jsonb.to_str() {
250-
builder.push(ScalarRef::String(str_val.as_str()));
249+
if let Ok(Some(str_val)) = raw_jsonb.as_str() {
250+
builder.push(ScalarRef::String(&str_val));
251251
continue;
252252
}
253253
}
@@ -773,6 +773,24 @@ trait EqVisitor {
773773
};
774774
// Only JSON value of string type have bloom index.
775775
if val_type.remove_nullable() == DataType::Variant {
776+
// If the scalar value is variant string, we can try extract the string
777+
// value to take advantage of bloom filtering.
778+
if scalar_type.remove_nullable() == DataType::Variant {
779+
if let Some(val) = scalar.as_variant() {
780+
let raw_jsonb = RawJsonb::new(val);
781+
if let Ok(Some(str_val)) = raw_jsonb.as_str() {
782+
let new_scalar = Scalar::String(str_val.to_string());
783+
let new_scalar_type = DataType::String;
784+
return self.enter_target(
785+
span,
786+
id,
787+
&new_scalar,
788+
&new_scalar_type,
789+
return_type,
790+
);
791+
}
792+
}
793+
}
776794
if scalar_type.remove_nullable() != DataType::String {
777795
return Ok(ControlFlow::Continue(None));
778796
}

tests/sqllogictests/suites/mode/standalone/explain/explain.test

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,7 @@ EXPLAIN SELECT * FROM t3 WHERE c['k3'] = 'v';
11471147
----
11481148
Filter
11491149
├── output columns: [t3.a (#0), t3.b (#1), t3.c (#2)]
1150-
├── filters: [is_true(TRY_CAST(get(t3.c (#2), 'k3') AS String NULL) = 'v')]
1150+
├── filters: [is_true(get(t3.c (#2), 'k3') = '"v"')]
11511151
├── estimated rows: 1.20
11521152
└── TableScan
11531153
├── table: default.default.t3
@@ -1157,7 +1157,7 @@ Filter
11571157
├── partitions total: 2
11581158
├── partitions scanned: 1
11591159
├── pruning stats: [segments: <range pruning: 2 to 2>, blocks: <range pruning: 2 to 2, bloom pruning: 2 to 1>]
1160-
├── push downs: [filters: [is_true(TRY_CAST(get(t3.c (#2), 'k3') AS String NULL) = 'v')], limit: NONE]
1160+
├── push downs: [filters: [is_true(get(t3.c (#2), 'k3') = '"v"')], limit: NONE]
11611161
└── estimated rows: 6.00
11621162

11631163
query T

tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_project_set.test

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ EvalScalar
1717
├── estimated rows: 0.36
1818
└── Filter
1919
├── output columns: [products.name (#0), products.details (#1), json_path_query(products.details (#1), '$.features.*') (#2)]
20-
├── filters: [is_true(TRY_CAST(get(1)(json_path_query(products.details (#1), '$.features.*') (#2)) AS String NULL) = '512GB')]
20+
├── filters: [is_true(get(1)(json_path_query(products.details (#1), '$.features.*') (#2)) = '"512GB"')]
2121
├── estimated rows: 0.36
2222
└── ProjectSet
2323
├── output columns: [products.name (#0), products.details (#1), json_path_query(products.details (#1), '$.features.*') (#2)]
2424
├── estimated rows: 1.80
2525
├── set returning functions: json_path_query(products.details (#1), '$.features.*')
2626
└── Filter
2727
├── output columns: [products.name (#0), products.details (#1)]
28-
├── filters: [is_true(products.name (#0) = 'Laptop'), is_true(TRY_CAST(json_path_query_first(products.details (#1), '$.features.*') AS String NULL) = '16GB')]
28+
├── filters: [is_true(products.name (#0) = 'Laptop'), is_true(json_path_query_first(products.details (#1), '$.features.*') = '"16GB"')]
2929
├── estimated rows: 0.60
3030
└── TableScan
3131
├── table: default.default.products
@@ -35,7 +35,7 @@ EvalScalar
3535
├── partitions total: 1
3636
├── partitions scanned: 1
3737
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1, bloom pruning: 1 to 1>]
38-
├── push downs: [filters: [and_filters(products.name (#0) = 'Laptop', TRY_CAST(json_path_query_first(products.details (#1), '$.features.*') AS String NULL) = '16GB')], limit: NONE]
38+
├── push downs: [filters: [and_filters(products.name (#0) = 'Laptop', json_path_query_first(products.details (#1), '$.features.*') = '"16GB"')], limit: NONE]
3939
└── estimated rows: 3.00
4040

4141
query T
@@ -56,7 +56,7 @@ EvalScalar
5656
├── set returning functions: json_path_query(products.details (#1), '$.features.*')
5757
└── Filter
5858
├── output columns: [products.name (#0), products.details (#1)]
59-
├── filters: [is_true(products.name (#0) = 'Laptop'), is_true(TRY_CAST(json_path_query_first(products.details (#1), '$.features.*') AS String NULL) = '16GB')]
59+
├── filters: [is_true(products.name (#0) = 'Laptop'), is_true(json_path_query_first(products.details (#1), '$.features.*') = '"16GB"')]
6060
├── estimated rows: 0.60
6161
└── TableScan
6262
├── table: default.default.products
@@ -66,7 +66,7 @@ EvalScalar
6666
├── partitions total: 1
6767
├── partitions scanned: 1
6868
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1, bloom pruning: 1 to 1>]
69-
├── push downs: [filters: [and_filters(products.name (#0) = 'Laptop', TRY_CAST(json_path_query_first(products.details (#1), '$.features.*') AS String NULL) = '16GB')], limit: NONE]
69+
├── push downs: [filters: [and_filters(products.name (#0) = 'Laptop', json_path_query_first(products.details (#1), '$.features.*') = '"16GB"')], limit: NONE]
7070
└── estimated rows: 3.00
7171

7272
query T

0 commit comments

Comments
 (0)