Skip to content

Commit 915b1a6

Browse files
Use execute_query for introspection (#155)
<!-- The PR description should answer 2 (maybe 3) important questions: --> ### What The configuration introspection was using `select_first_row` which was not properly streaming all results and had a limits which caused introspections to fail on larger sets of tables. <!-- What is this PR trying to accomplish (and why, if it's not obvious)? --> ### How This removes `select_first_row` and uses `execute_query` to fetch all introspection data. <!-- How is it trying to accomplish it (what are the implementation steps)? --> --------- Co-authored-by: Daniel Harvey <[email protected]>
1 parent 025960a commit 915b1a6

File tree

6 files changed

+81
-43
lines changed

6 files changed

+81
-43
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/configuration/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ workspace = true
1010
ndc-models = { workspace = true }
1111
query-engine-metadata = { path = "../query-engine/metadata" }
1212
query-engine-metrics = { path = "../query-engine/metrics" }
13-
13+
query-engine-execution = { path = "../query-engine/execution" }
14+
query-engine-sql = { path = "../query-engine/sql" }
1415

1516
schemars = { version = "0.8.16", features = ["smol_str", "preserve_order"] }
1617
serde = "1.0.198"

crates/configuration/src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ pub enum Error {
3434
#[error("Error creating connection pool while introspecting the database: {0}")]
3535
ConnectionPoolError(#[from] bb8_tiberius::Error),
3636

37+
#[error("Failed to get connection from pool: {0}")]
38+
GetConnectionFromPool(#[from] bb8::RunError<bb8_tiberius::Error>),
39+
40+
#[error("JSON deserialization error: {0}")]
41+
JsonDeserializationError(String),
42+
43+
#[error("Failed to execute introspection query: {0}")]
44+
IntrospectionQueryExecutionError(String),
45+
3746
// error while parsing stored procedure introspection
3847
#[error("Error parsing stored procedure introspection: {0}")]
3948
StoredProcedureIntrospectionError(serde_json::Error),

crates/configuration/src/version1.rs

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,20 @@ use crate::secret::Secret;
77
use crate::{uri, ConnectionUri};
88

99
use ndc_models::{AggregateFunctionName, CollectionName, ComparisonOperatorName, FieldName};
10+
use query_engine_execution::query::execute_query;
1011
use query_engine_metadata::metadata;
1112
use query_engine_metadata::metadata::stored_procedures::{
1213
StoredProcedureArgumentInfo, StoredProcedureInfo, StoredProcedures,
1314
};
1415
use query_engine_metadata::metadata::{database, Nullable};
15-
1616
use query_engine_metrics::metrics;
17+
use query_engine_sql::sql::{ast::RawSql, string::SQL};
1718
use schemars::JsonSchema;
1819
use serde::{Deserialize, Serialize};
1920
use std::collections::BTreeMap;
2021
use std::collections::BTreeSet;
2122

2223
use thiserror::Error;
23-
use tiberius::Query;
2424

2525
// TODO(KC): Move the `table_configuration.sql` to the `static` folder present
2626
// in the root of this repo.
@@ -156,22 +156,6 @@ async fn create_mssql_pool(
156156
bb8::Pool::builder().max_size(2).build(mgr).await
157157
}
158158

159-
async fn select_first_row(
160-
mssql_pool: &bb8::Pool<bb8_tiberius::ConnectionManager>,
161-
query: &str,
162-
) -> tiberius::Row {
163-
let mut connection = mssql_pool.get().await.unwrap();
164-
165-
// let's do a query to check everything is ok
166-
let select = Query::new(query);
167-
168-
// go!
169-
let stream = select.query(&mut connection).await.unwrap();
170-
171-
// Nothing is fetched, the first result set starts.
172-
stream.into_row().await.unwrap().unwrap()
173-
}
174-
175159
// get_stored_procedures fetches the stored procedures from the database and returns them as a
176160
// vector of introspection::IntrospectStoredProcedure.
177161
async fn configure_stored_procedures(
@@ -181,11 +165,26 @@ async fn configure_stored_procedures(
181165
) -> Result<StoredProcedures, Error> {
182166
match config_options {
183167
Some(config_options) => {
184-
let stored_procedures_row =
185-
select_first_row(mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await;
168+
let mut connection = mssql_pool
169+
.get()
170+
.await
171+
.map_err(Error::GetConnectionFromPool)?;
172+
// Let's do some stored procedures introspection
173+
let mut stored_procs_query = SQL::new();
174+
RawSql::RawText(STORED_PROCS_CONFIGURATION_QUERY.to_string())
175+
.to_sql(&mut stored_procs_query);
176+
let mut stored_procs_rows = Vec::new();
177+
execute_query(
178+
&mut connection,
179+
&stored_procs_query,
180+
&BTreeMap::new(),
181+
&mut stored_procs_rows,
182+
)
183+
.await
184+
.map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?;
186185
let introspected_stored_procedures: Vec<introspection::IntrospectStoredProcedure> =
187-
serde_json::from_str(stored_procedures_row.get(0).unwrap())
188-
.map_err(Error::StoredProcedureIntrospectionError)?;
186+
serde_json::from_slice(&stored_procs_rows)
187+
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;
189188
let new_stored_procedures = get_stored_procedures(introspected_stored_procedures);
190189

191190
// traverse the new stored procedures and add them to the existing stored procedures
@@ -233,26 +232,47 @@ pub async fn configure(
233232
.await
234233
.map_err(Error::ConnectionPoolError)?;
235234

236-
let mut metadata = query_engine_metadata::metadata::Metadata::default();
235+
let mut connection = mssql_pool
236+
.get()
237+
.await
238+
.map_err(Error::GetConnectionFromPool)?;
239+
240+
// Let's do some table introspection
241+
let mut table_query = SQL::new();
242+
RawSql::RawText(TABLE_CONFIGURATION_QUERY.to_string()).to_sql(&mut table_query);
243+
let mut table_rows = Vec::new();
244+
execute_query(
245+
&mut connection,
246+
&table_query,
247+
&BTreeMap::new(),
248+
&mut table_rows,
249+
)
250+
.await
251+
.map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?;
252+
let tables: Vec<introspection::IntrospectionTable> = serde_json::from_slice(&table_rows)
253+
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;
254+
255+
// Let's do some types introspection
256+
let mut types_query = SQL::new();
257+
RawSql::RawText(TYPES_QUERY.to_string()).to_sql(&mut types_query);
258+
let mut types_rows = Vec::new();
259+
execute_query(
260+
&mut connection,
261+
&types_query,
262+
&BTreeMap::new(),
263+
&mut types_rows,
264+
)
265+
.await
266+
.map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?;
267+
let type_names: Vec<TypeItem> = serde_json::from_slice(&types_rows)
268+
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;
237269

270+
let mut metadata = query_engine_metadata::metadata::Metadata::default();
238271
metadata.native_queries = configuration.metadata.native_queries.clone();
239272
metadata.native_mutations = configuration.metadata.native_mutations.clone();
240-
241-
let tables_row = select_first_row(&mssql_pool, TABLE_CONFIGURATION_QUERY).await;
242-
243-
let tables: Vec<introspection::IntrospectionTable> =
244-
serde_json::from_str(tables_row.get(0).unwrap()).unwrap();
245-
246273
metadata.tables = get_tables_info(tables);
247-
248-
let types_row = select_first_row(&mssql_pool, TYPES_QUERY).await;
249-
250-
let type_names: Vec<TypeItem> = serde_json::from_str(types_row.get(0).unwrap()).unwrap();
251-
252274
metadata.comparison_operators = get_comparison_operators(&type_names);
253-
254275
metadata.aggregate_functions = get_aggregate_functions(&type_names);
255-
256276
metadata.stored_procedures = configure_stored_procedures(
257277
&mssql_pool,
258278
configuration.metadata.stored_procedures.clone(),

crates/ndc-sqlserver/src/connector.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,19 @@ impl<Env: Environment + Send + Sync> connector::ConnectorSetup for SQLServerSetu
106106
std::io::Error::new(std::io::ErrorKind::Other, inner).into()
107107
}
108108
configuration::Error::ConnectionPoolError(inner) => {
109-
std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into()
109+
std::io::Error::new(std::io::ErrorKind::Other, inner).into()
110+
}
111+
configuration::Error::GetConnectionFromPool(inner) => {
112+
std::io::Error::new(std::io::ErrorKind::Other, inner).into()
113+
}
114+
configuration::Error::JsonDeserializationError(inner) => {
115+
connector::ParseError::from(std::io::Error::new(std::io::ErrorKind::Other, inner))
116+
}
117+
configuration::Error::IntrospectionQueryExecutionError(inner) => {
118+
connector::ParseError::from(std::io::Error::new(std::io::ErrorKind::Other, inner))
110119
}
111120
configuration::Error::StoredProcedureIntrospectionError(inner) => {
112-
connector::ParseError::from(std::io::Error::new(
113-
std::io::ErrorKind::Other,
114-
inner.to_string(),
115-
))
121+
connector::ParseError::from(std::io::Error::new(std::io::ErrorKind::Other, inner))
116122
}
117123
})?;
118124

crates/query-engine/execution/src/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async fn execute_queries(
7171
}
7272

7373
/// Execute the query on one set of variables.
74-
pub(crate) async fn execute_query(
74+
pub async fn execute_query(
7575
connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>,
7676
query: &sql::string::SQL,
7777
variables: &BTreeMap<ndc_models::VariableName, serde_json::Value>,

0 commit comments

Comments
 (0)