Skip to content

Commit

Permalink
Add nested struct reference capability to datafusion-sql
Browse files Browse the repository at this point in the history
  • Loading branch information
adragomir authored and ccciudatu committed Apr 26, 2024
1 parent bab39f7 commit 0ff6b0d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
3 changes: 3 additions & 0 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<GetFieldAccess> {
let field = match expr.clone() {
SQLExpr::Identifier(ident) => GetFieldAccess::NamedStructField {
name: ScalarValue::from(ident.value),
},
SQLExpr::Value(
Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
) => GetFieldAccess::NamedStructField {
Expand Down
98 changes: 98 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,104 @@ fn test_schema_support() {
);
}

#[derive(Default)]
struct AdrCtx {
options: ConfigOptions,
udfs: HashMap<String, Arc<ScalarUDF>>,
udafs: HashMap<String, Arc<AggregateUDF>>,
}

impl AdrCtx {
fn options_mut(&mut self) -> &mut ConfigOptions {
&mut self.options
}

fn with_udf(mut self, udf: ScalarUDF) -> Self {
self.udfs.insert(udf.name().to_string(), Arc::new(udf));
self
}
}

impl ContextProvider for AdrCtx {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let schema = match name.table() {
"tbl" => Ok(Schema::new(vec![
Field::new(
"struct_field",
DataType::Struct(
Fields::from(vec![
Field::new(
"subfield1",
DataType::List(
Arc::new(
Field::new(
"substruct1",
DataType::Struct(
Fields::from(vec![
Field::new("subsubfield1", DataType::Int32, true),
Field::new("subsubfield2", DataType::Binary, true),
])
),
true
)
)
),
true
)
])
),
true
),
])),
_ => plan_err!("No table named: {} found", name.table()),
};
match schema {
Ok(t) => Ok(Arc::new(EmptyTable::new(Arc::new(t)))),
Err(e) => Err(e),
}
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> { self.udfs.get(name).cloned() }
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> { self.udafs.get(name).cloned() }
fn get_variable_type(&self, _: &[String]) -> Option<DataType> {
unimplemented!()
}
fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}
fn options(&self) -> &ConfigOptions {
&self.options
}
fn create_cte_work_table(&self, _name: &str, schema: SchemaRef) -> Result<Arc<dyn TableSource>> { Ok(Arc::new(EmptyTable::new(schema))) }
fn udfs_names(&self) -> Vec<String> {
self.udfs.keys().cloned().collect()
}
fn udafs_names(&self) -> Vec<String> {
self.udafs.keys().cloned().collect()
}
fn udwfs_names(&self) -> Vec<String> {
Vec::new()
}
}

#[test]
fn test_adr_1() {
let dialect = &GenericDialect {};
let sql = r#"SELECT struct_field["subfield1"][0]["subsubfield1"] FROM tbl;"#;
println!("sql: {}", &sql);
let result = DFParser::parse_sql_with_dialect(sql, dialect);
println!("result: {:?}", result);
let mut ast = result.unwrap();
println!("ast: {:?}", ast);
let context = AdrCtx::default();
let planner = SqlToRel::new(&context);
let mut parse_result = DFParser::parse_sql_with_dialect(sql, dialect).unwrap();
let plan_result = planner.statement_to_plan(parse_result.pop_front().unwrap());
println!("result: {:?}", plan_result);

// planner.statement_to_plan(ast.pop_front().unwrap())
}

#[test]
fn parse_decimals() {
let test_data = [
Expand Down

0 comments on commit 0ff6b0d

Please sign in to comment.