@@ -22,6 +22,7 @@ use datafusion::prelude::{SessionConfig, SessionContext};
2222use datafusion_sql:: parser:: { DFParser , Statement } ;
2323use log:: info;
2424use normalize:: normalize_batch;
25+ use sqllogictest:: { ColumnType , DBOutput } ;
2526use sqlparser:: ast:: Statement as SQLStatement ;
2627use std:: path:: { Path , PathBuf } ;
2728use std:: time:: Duration ;
@@ -46,7 +47,7 @@ pub struct DataFusion {
4647impl sqllogictest:: AsyncDB for DataFusion {
4748 type Error = DFSqlLogicTestError ;
4849
49- async fn run ( & mut self , sql : & str ) -> Result < String > {
50+ async fn run ( & mut self , sql : & str ) -> Result < DBOutput > {
5051 println ! ( "[{}] Running query: \" {}\" " , self . file_name, sql) ;
5152 let result = run_query ( & self . ctx , sql) . await ?;
5253 Ok ( result)
@@ -172,19 +173,42 @@ async fn context_for_test_file(file_name: &str) -> SessionContext {
172173 }
173174}
174175
175- fn format_batches ( batches : Vec < RecordBatch > ) -> Result < String > {
176+ fn convert_batches ( batches : Vec < RecordBatch > ) -> Result < DBOutput > {
176177 let mut bytes = vec ! [ ] ;
178+ if batches. is_empty ( ) {
179+ return Ok ( DBOutput :: StatementComplete ( 0 ) ) ;
180+ }
181+ // TODO: use the actual types
182+ let types = vec ! [ ColumnType :: Any ; batches[ 0 ] . num_columns( ) ] ;
183+
177184 {
178- let builder = WriterBuilder :: new ( ) . has_headers ( false ) . with_delimiter ( b' ' ) ;
185+ let builder = WriterBuilder :: new ( )
186+ . has_headers ( false )
187+ . with_delimiter ( b'\t' ) ;
179188 let mut writer = builder. build ( & mut bytes) ;
180189 for batch in batches {
181190 writer. write ( & normalize_batch ( batch) ) . unwrap ( ) ;
182191 }
183192 }
184- Ok ( String :: from_utf8 ( bytes) . unwrap ( ) )
193+ let res = String :: from_utf8 ( bytes) . unwrap ( ) ;
194+ let rows = res
195+ . lines ( )
196+ . map ( |s| {
197+ s. split ( '\t' )
198+ . map ( |s| {
199+ if s. is_empty ( ) {
200+ "NULL" . to_string ( )
201+ } else {
202+ s. to_string ( )
203+ }
204+ } )
205+ . collect ( )
206+ } )
207+ . collect ( ) ;
208+ Ok ( DBOutput :: Rows { types, rows } )
185209}
186210
187- async fn run_query ( ctx : & SessionContext , sql : impl Into < String > ) -> Result < String > {
211+ async fn run_query ( ctx : & SessionContext , sql : impl Into < String > ) -> Result < DBOutput > {
188212 let sql = sql. into ( ) ;
189213 // Check if the sql is `insert`
190214 if let Ok ( mut statements) = DFParser :: parse_sql ( & sql) {
@@ -198,6 +222,6 @@ async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> Result<Strin
198222 }
199223 let df = ctx. sql ( sql. as_str ( ) ) . await ?;
200224 let results: Vec < RecordBatch > = df. collect ( ) . await ?;
201- let formatted_batches = format_batches ( results) ?;
225+ let formatted_batches = convert_batches ( results) ?;
202226 Ok ( formatted_batches)
203227}
0 commit comments