Skip to content

fix case of f(scalar, array) invocation #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ paste = "1"
log = "0.4"

[dev-dependencies]
datafusion = { version = "44", default-features = false, features = ["nested_expressions"] }
codspeed-criterion-compat = "2.6"
criterion = "0.5.1"
clap = "4"
Expand Down
278 changes: 163 additions & 115 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,46 @@ impl From<i64> for JsonPath<'_> {
}
}

impl<'s> JsonPath<'s> {
pub fn extract_path(args: &'s [ColumnarValue]) -> Vec<Self> {
args[1..]
enum JsonPathArgs<'a> {
Array(&'a ArrayRef),
Scalars(Vec<JsonPath<'a>>),
}

impl<'s> JsonPathArgs<'s> {
fn extract_path(path_args: &'s [ColumnarValue]) -> DataFusionResult<Self> {
// If there is a single argument as an array, we know how to handle it
if let Some((ColumnarValue::Array(array), &[])) = path_args.split_first() {
return Ok(Self::Array(array));
}

path_args
.iter()
.map(|arg| match arg {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s))) => Self::Key(s),
ColumnarValue::Scalar(ScalarValue::UInt64(Some(i))) => (*i).into(),
ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => (*i).into(),
_ => Self::None,
.enumerate()
.map(|(pos, arg)| match arg {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s))) => {
Ok(JsonPath::Key(s))
}
ColumnarValue::Scalar(ScalarValue::UInt64(Some(i))) => Ok((*i).into()),
ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => Ok((*i).into()),
ColumnarValue::Scalar(
ScalarValue::Null
| ScalarValue::Utf8(None)
| ScalarValue::LargeUtf8(None)
| ScalarValue::UInt64(None)
| ScalarValue::Int64(None),
) => Ok(JsonPath::None),
ColumnarValue::Array(_) => {
// if there was a single arg, which is an array, handled above in the
// split_first case. So this is multiple args of which one is an array
exec_err!("More than 1 path element is not supported when querying JSON using an array.")
}
ColumnarValue::Scalar(arg) => exec_err!(
"Unexpected argument type at position {}, expected string or int, got {arg:?}.",
pos + 1
),
})
.collect()
.collect::<DataFusionResult<_>>()
.map(JsonPathArgs::Scalars)
}
}

Expand All @@ -116,154 +145,173 @@ pub fn invoke<C: FromIterator<Option<I>> + 'static, I>(
to_scalar: impl Fn(Option<I>) -> ScalarValue,
return_dict: bool,
) -> DataFusionResult<ColumnarValue> {
let Some(first_arg) = args.first() else {
// I think this can't happen, but I assumed the same about args[1] and I was wrong, so better to be safe
let Some((json_arg, path_args)) = args.split_first() else {
return exec_err!("expected at least one argument");
};
match first_arg {
ColumnarValue::Array(json_array) => {
let array = match args.get(1) {
Some(ColumnarValue::Array(a)) => {
if args.len() > 2 {
// TODO perhaps we could support this by zipping the arrays, but it's not trivial, #23
exec_err!("More than 1 path element is not supported when querying JSON using an array.")
} else {
invoke_array(json_array, a, to_array, jiter_find, return_dict)
}
}
Some(ColumnarValue::Scalar(_)) => scalar_apply(
json_array,
&JsonPath::extract_path(args),
to_array,
jiter_find,
return_dict,
),
None => scalar_apply(json_array, &[], to_array, jiter_find, return_dict),
};
array.map(ColumnarValue::from)

let path = JsonPathArgs::extract_path(path_args)?;
match (json_arg, path) {
(ColumnarValue::Array(json_array), JsonPathArgs::Array(path_array)) => {
invoke_array_array(json_array, path_array, to_array, jiter_find, return_dict).map(ColumnarValue::Array)
}
(ColumnarValue::Array(json_array), JsonPathArgs::Scalars(path)) => {
invoke_array_scalars(json_array, &path, to_array, jiter_find, return_dict).map(ColumnarValue::Array)
}
(ColumnarValue::Scalar(s), JsonPathArgs::Array(path_array)) => {
invoke_scalar_array(s, path_array, jiter_find, to_array)
}
(ColumnarValue::Scalar(s), JsonPathArgs::Scalars(path)) => {
invoke_scalar_scalars(s, &path, jiter_find, to_scalar)
}
ColumnarValue::Scalar(s) => invoke_scalar(s, args, jiter_find, to_scalar),
}
}

fn invoke_array<C: FromIterator<Option<I>> + 'static, I>(
fn invoke_array_array<C: FromIterator<Option<I>> + 'static, I>(
json_array: &ArrayRef,
needle_array: &ArrayRef,
path_array: &ArrayRef,
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
return_dict: bool,
) -> DataFusionResult<ArrayRef> {
downcast_dictionary_array!(
needle_array => match needle_array.values().data_type() {
DataType::Utf8 => zip_apply(json_array, needle_array.downcast_dict::<StringArray>().unwrap(), to_array, jiter_find, true, return_dict),
DataType::LargeUtf8 => zip_apply(json_array, needle_array.downcast_dict::<LargeStringArray>().unwrap(), to_array, jiter_find, true, return_dict),
DataType::Utf8View => zip_apply(json_array, needle_array.downcast_dict::<StringViewArray>().unwrap(), to_array, jiter_find, true, return_dict),
DataType::Int64 => zip_apply(json_array, needle_array.downcast_dict::<Int64Array>().unwrap(), to_array, jiter_find, false, return_dict),
DataType::UInt64 => zip_apply(json_array, needle_array.downcast_dict::<UInt64Array>().unwrap(), to_array, jiter_find, false, return_dict),
other => exec_err!("unexpected second argument type, expected string or int array, got {:?}", other),
},
DataType::Utf8 => zip_apply(json_array, needle_array.as_string::<i32>(), to_array, jiter_find, true, return_dict),
DataType::LargeUtf8 => zip_apply(json_array, needle_array.as_string::<i64>(), to_array, jiter_find, true, return_dict),
DataType::Utf8View => zip_apply(json_array, needle_array.as_string_view(), to_array, jiter_find, true, return_dict),
DataType::Int64 => zip_apply(json_array, needle_array.as_primitive::<Int64Type>(), to_array, jiter_find, false, return_dict),
DataType::UInt64 => zip_apply(json_array, needle_array.as_primitive::<UInt64Type>(), to_array, jiter_find, false, return_dict),
other => exec_err!("unexpected second argument type, expected string or int array, got {:?}", other)
json_array => {
let values = invoke_array_array(json_array.values(), path_array, to_array, jiter_find, return_dict)?;
post_process_dict(json_array, values, return_dict)
}
DataType::Utf8 => zip_apply(json_array.as_string::<i32>().iter(), path_array, to_array, jiter_find),
DataType::LargeUtf8 => zip_apply(json_array.as_string::<i64>().iter(), path_array, to_array, jiter_find),
DataType::Utf8View => zip_apply(json_array.as_string_view().iter(), path_array, to_array, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, is_object_lookup_array(path_array.data_type())) {
zip_apply(string_array.iter(), path_array, to_array, jiter_find)
} else {
exec_err!("unexpected json array type {:?}", other)
}
)
}

fn zip_apply<'a, P: Into<JsonPath<'a>>, C: FromIterator<Option<I>> + 'static, I>(
fn invoke_array_scalars<C: FromIterator<Option<I>>, I>(
json_array: &ArrayRef,
path_array: impl ArrayAccessor<Item = P>,
path: &[JsonPath],
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
object_lookup: bool,
return_dict: bool,
) -> DataFusionResult<ArrayRef> {
fn inner<'j, C: FromIterator<Option<I>>, I>(
json_iter: impl IntoIterator<Item = Option<&'j str>>,
path: &[JsonPath],
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
json_iter
.into_iter()
.map(|opt_json| jiter_find(opt_json, path).ok())
.collect::<C>()
}

let c = downcast_dictionary_array!(
json_array => {
let values = zip_apply(json_array.values(), path_array, to_array, jiter_find, object_lookup, false)?;
let values = invoke_array_scalars(json_array.values(), path, to_array, jiter_find, false)?;
return post_process_dict(json_array, values, return_dict);
}
DataType::Utf8 => zip_apply_iter(json_array.as_string::<i32>().iter(), path_array, jiter_find),
DataType::LargeUtf8 => zip_apply_iter(json_array.as_string::<i64>().iter(), path_array, jiter_find),
DataType::Utf8View => zip_apply_iter(json_array.as_string_view().iter(), path_array, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, object_lookup) {
zip_apply_iter(string_array.iter(), path_array, jiter_find)
DataType::Utf8 => inner(json_array.as_string::<i32>(), path, jiter_find),
DataType::LargeUtf8 => inner(json_array.as_string::<i64>(), path, jiter_find),
DataType::Utf8View => inner(json_array.as_string_view(), path, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, is_object_lookup(path)) {
inner(string_array, path, jiter_find)
} else {
return exec_err!("unexpected json array type {:?}", other);
}
);

to_array(c)
}

#[allow(clippy::needless_pass_by_value)] // ArrayAccessor is implemented on references
fn zip_apply_iter<'a, 'j, P: Into<JsonPath<'a>>, C: FromIterator<Option<I>> + 'static, I>(
json_iter: impl Iterator<Item = Option<&'j str>>,
path_array: impl ArrayAccessor<Item = P>,
fn invoke_scalar_array<C: FromIterator<Option<I>> + 'static, I>(
scalar: &ScalarValue,
path_array: &ArrayRef,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
json_iter
.enumerate()
.map(|(i, opt_json)| {
if path_array.is_null(i) {
None
} else {
let path = path_array.value(i).into();
jiter_find(opt_json, &[path]).ok()
}
})
.collect::<C>()
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
) -> DataFusionResult<ColumnarValue> {
let s = extract_json_scalar(scalar)?;
// TODO: possible optimization here if path_array is a dictionary; can apply against the
// dictionary values directly for less work
zip_apply(
std::iter::repeat(s).take(path_array.len()),
path_array,
to_array,
jiter_find,
)
.map(ColumnarValue::Array)
}

fn invoke_scalar<I>(
fn invoke_scalar_scalars<I>(
scalar: &ScalarValue,
args: &[ColumnarValue],
path: &[JsonPath],
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
to_scalar: impl Fn(Option<I>) -> ScalarValue,
) -> DataFusionResult<ColumnarValue> {
match scalar {
ScalarValue::Dictionary(_, b) => invoke_scalar(b.as_ref(), args, jiter_find, to_scalar),
ScalarValue::Utf8(s) | ScalarValue::Utf8View(s) | ScalarValue::LargeUtf8(s) => {
let path = JsonPath::extract_path(args);
let v = jiter_find(s.as_ref().map(String::as_str), &path).ok();
Ok(ColumnarValue::Scalar(to_scalar(v)))
}
ScalarValue::Union(type_id_value, union_fields, _) => {
let opt_json = json_from_union_scalar(type_id_value.as_ref(), union_fields);
let v = jiter_find(opt_json, &JsonPath::extract_path(args)).ok();
Ok(ColumnarValue::Scalar(to_scalar(v)))
}
_ => {
exec_err!("unexpected first argument type, expected string or JSON union")
}
}
let s = extract_json_scalar(scalar)?;
let v = jiter_find(s, path).ok();
Ok(ColumnarValue::Scalar(to_scalar(v)))
}

fn scalar_apply<C: FromIterator<Option<I>>, I>(
json_array: &ArrayRef,
path: &[JsonPath],
fn zip_apply<'a, C: FromIterator<Option<I>> + 'static, I>(
json_array: impl IntoIterator<Item = Option<&'a str>>,
path_array: &ArrayRef,
to_array: impl Fn(C) -> DataFusionResult<ArrayRef>,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
return_dict: bool,
) -> DataFusionResult<ArrayRef> {
#[allow(clippy::needless_pass_by_value)] // ArrayAccessor is implemented on references
fn inner<'a, 'j, P: Into<JsonPath<'a>>, C: FromIterator<Option<I>> + 'static, I>(
json_iter: impl IntoIterator<Item = Option<&'j str>>,
path_array: impl ArrayAccessor<Item = P>,
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
json_iter
.into_iter()
.enumerate()
.map(|(i, opt_json)| {
if path_array.is_null(i) {
None
} else {
let path = path_array.value(i).into();
jiter_find(opt_json, &[path]).ok()
}
})
.collect::<C>()
}

let c = downcast_dictionary_array!(
json_array => {
let values = scalar_apply(json_array.values(), path, to_array, jiter_find, false)?;
return post_process_dict(json_array, values, return_dict);
}
DataType::Utf8 => scalar_apply_iter(json_array.as_string::<i32>().iter(), path, jiter_find),
DataType::LargeUtf8 => scalar_apply_iter(json_array.as_string::<i64>().iter(), path, jiter_find),
DataType::Utf8View => scalar_apply_iter(json_array.as_string_view().iter(), path, jiter_find),
other => if let Some(string_array) = nested_json_array(json_array, is_object_lookup(path)) {
scalar_apply_iter(string_array.iter(), path, jiter_find)
} else {
return exec_err!("unexpected json array type {:?}", other);
}
path_array => match path_array.values().data_type() {
DataType::Utf8 => inner(json_array, path_array.downcast_dict::<StringArray>().unwrap(), jiter_find),
DataType::LargeUtf8 => inner(json_array, path_array.downcast_dict::<LargeStringArray>().unwrap(), jiter_find),
DataType::Utf8View => inner(json_array, path_array.downcast_dict::<StringViewArray>().unwrap(), jiter_find),
DataType::Int64 => inner(json_array, path_array.downcast_dict::<Int64Array>().unwrap(), jiter_find),
DataType::UInt64 => inner(json_array, path_array.downcast_dict::<UInt64Array>().unwrap(), jiter_find),
other => return exec_err!("unexpected second argument type, expected string or int array, got {:?}", other),
},
DataType::Utf8 => inner(json_array, path_array.as_string::<i32>(), jiter_find),
DataType::LargeUtf8 => inner(json_array, path_array.as_string::<i64>(), jiter_find),
DataType::Utf8View => inner(json_array, path_array.as_string_view(), jiter_find),
DataType::Int64 => inner(json_array, path_array.as_primitive::<Int64Type>(), jiter_find),
DataType::UInt64 => inner(json_array, path_array.as_primitive::<UInt64Type>(), jiter_find),
other => return exec_err!("unexpected second argument type, expected string or int array, got {:?}", other)
);

to_array(c)
}

fn extract_json_scalar(scalar: &ScalarValue) -> DataFusionResult<Option<&str>> {
match scalar {
ScalarValue::Dictionary(_, b) => extract_json_scalar(b.as_ref()),
ScalarValue::Utf8(s) | ScalarValue::Utf8View(s) | ScalarValue::LargeUtf8(s) => Ok(s.as_deref()),
ScalarValue::Union(type_id_value, union_fields, _) => {
Ok(json_from_union_scalar(type_id_value.as_ref(), union_fields))
}
_ => {
exec_err!("unexpected first argument type, expected string or JSON union")
}
}
}

/// Take a dictionary array of JSON data and an array of result values and combine them.
fn post_process_dict<T: ArrowDictionaryKeyType>(
dict_array: &DictionaryArray<T>,
Expand Down Expand Up @@ -295,12 +343,12 @@ fn is_object_lookup(path: &[JsonPath]) -> bool {
}
}

fn scalar_apply_iter<'j, C: FromIterator<Option<I>>, I>(
json_iter: impl Iterator<Item = Option<&'j str>>,
path: &[JsonPath],
jiter_find: impl Fn(Option<&str>, &[JsonPath]) -> Result<I, GetError>,
) -> C {
json_iter.map(|opt_json| jiter_find(opt_json, path).ok()).collect::<C>()
fn is_object_lookup_array(data_type: &DataType) -> bool {
match data_type {
DataType::Dictionary(_, value_type) => is_object_lookup_array(value_type),
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => true,
_ => false,
}
}

pub fn jiter_json_find<'j>(opt_json: Option<&'j str>, path: &[JsonPath]) -> Option<(Jiter<'j>, Peek)> {
Expand Down
2 changes: 1 addition & 1 deletion src/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use datafusion::logical_expr::sqlparser::ast::BinaryOperator;
pub(crate) struct JsonFunctionRewriter;

impl FunctionRewrite for JsonFunctionRewriter {
fn name(&self) -> &str {
fn name(&self) -> &'static str {
"JsonFunctionRewriter"
}

Expand Down
Loading
Loading