Skip to content

Commit 2ac64df

Browse files
committed
WIP
1 parent 0ef1434 commit 2ac64df

File tree

5 files changed

+81
-14
lines changed

5 files changed

+81
-14
lines changed

crates/configuration/src/connect.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ pub fn get_connect_options(
1818
ConnectionUri(Secret::FromEnvironment { variable }) => {
1919
Cow::Owned(environment.read(variable)?)
2020
}
21+
ConnectionUri(Secret::FromEnvironmentMap { variable, map_key }) => {
22+
let map_str = environment.read(variable)?;
23+
let parsed: serde_json::Value = serde_json::from_str(&map_str)?;
24+
let obj = parsed.as_object().unwrap();
25+
Cow::Owned(obj.get(map_key).unwrap().to_string())
26+
}
2127
};
2228

2329
let connect_options = PgConnectOptions::from_url(&uri.parse()?)?;

crates/configuration/src/values/secret.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,13 @@ use crate::environment;
99
#[serde(untagged, rename_all = "camelCase")]
1010
pub enum Secret {
1111
Plain(String),
12-
FromEnvironment { variable: environment::Variable },
12+
FromEnvironment {
13+
variable: environment::Variable,
14+
},
15+
FromEnvironmentMap {
16+
variable: environment::Variable,
17+
map_key: String,
18+
},
1319
}
1420

1521
// This conversion is useful for testing.

crates/connectors/ndc-postgres/src/query.rs

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
mod explain;
77
pub use explain::explain;
88

9+
use ndc_postgres_configuration::version5::DynamicConnectionSettings;
10+
use ndc_postgres_configuration::ConnectionSettings;
911
use tracing::{info_span, Instrument};
1012

1113
use ndc_sdk::connector;
@@ -38,6 +40,9 @@ pub async fn query(
3840
query_request = ?query_request
3941
);
4042

43+
let connection_details =
44+
extract_connection_details_from_query(configuration, &query_request).await?;
45+
4146
let plan = async {
4247
plan_query(configuration, state, query_request).map_err(|err| {
4348
record::translation_error(&err, &state.query_metrics);
@@ -48,10 +53,12 @@ pub async fn query(
4853
.await?;
4954

5055
let result = async {
51-
execute_query(state, plan).await.map_err(|err| {
52-
record::execution_error(&err, &state.query_metrics);
53-
convert::execution_error_to_response(err)
54-
})
56+
execute_query(state, plan, connection_details)
57+
.await
58+
.map_err(|err| {
59+
record::execution_error(&err, &state.query_metrics);
60+
convert::execution_error_to_response(err)
61+
})
5562
}
5663
.instrument(info_span!("Execute query"))
5764
.await?;
@@ -65,6 +72,44 @@ pub async fn query(
6572
timer.complete_with(result)
6673
}
6774

75+
// if this is a dynamic connection situation, which connection string should we be using?
76+
pub struct RequestConnectionDetails {
77+
pool: PgPool
78+
}
79+
80+
async fn extract_connection_details_from_query(
81+
configuration: &configuration::Configuration,
82+
query_request: &models::QueryRequest,
83+
) -> Result<Option<RequestConnectionDetails>, translation::error::Error> {
84+
// check in config if we need to look in request arguments
85+
match &configuration.connection {
86+
ConnectionSettings::Static { .. } => Ok(None),
87+
ConnectionSettings::Dynamic {
88+
dynamic_connection_settings: DynamicConnectionSettings::NamedFromList,
89+
} => {
90+
let connection_identifier = &query_request
91+
.request_arguments
92+
.as_ref()
93+
.and_then(|request_arguments| request_arguments.get("connection_identifier"))
94+
.ok_or_else(|| translation::error::Error::MissingConnectionIdentifier)?;
95+
96+
// lookup uri
97+
let connection_uri = "";
98+
99+
let pool = create_pool(connection_uri, environment, pool_settings)
100+
.instrument(info_span!(
101+
"Create connection pool",
102+
internal.visibility = "user",
103+
))
104+
.await?;
105+
106+
Ok(Some(RequestConnectionDetails {
107+
pool
108+
}))
109+
}
110+
}
111+
}
112+
68113
fn plan_query(
69114
configuration: &configuration::Configuration,
70115
state: &state::State,
@@ -79,15 +124,18 @@ fn plan_query(
79124
async fn execute_query(
80125
state: &state::State,
81126
plan: sql::execution_plan::ExecutionPlan<sql::execution_plan::Query>,
127+
connection_details: Option<RequestConnectionDetails>,
82128
) -> Result<JsonResponse<models::QueryResponse>, query_engine_execution::error::Error> {
83-
let state::Pool::Static {
84-
pool,
85-
database_info,
86-
} = &state.pool
87-
else {
88-
todo!("Dynamic connect for execute_query");
89-
};
90-
129+
match (&state.pool, connection_details) {
130+
(
131+
state::Pool::Static {
132+
pool,
133+
database_info,
134+
},
135+
_,
136+
) => (&pool,Some(database_info))
137+
(state::Pool::Dynamic(_), Some(RequestConnectionDetails { pool })) => (&pool,None),
138+
}
91139
query_engine_execution::query::execute(pool, database_info, &state.query_metrics, plan)
92140
.await
93141
.map(JsonResponse::Serialized)

crates/connectors/ndc-postgres/src/state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ pub async fn create_state(
114114

115115
/// Create a connection pool with default settings.
116116
/// - <https://docs.rs/sqlx/latest/sqlx/pool/struct.PoolOptions.html>
117-
async fn create_pool(
117+
pub async fn create_pool(
118118
connection_url: &str,
119119
environment: impl Environment,
120120
pool_settings: &PoolSettings,

crates/query-engine/translation/src/translation/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ pub enum Error {
6868
tables_in_scope_names: Vec<String>,
6969
scope: usize,
7070
},
71+
MissingConnectionIdentifier,
7172
}
7273

7374
/// Capabilities we don't currently support.
@@ -262,6 +263,12 @@ impl std::fmt::Display for Error {
262263
}
263264
write!(f, "].")
264265
}
266+
Error::MissingConnectionIdentifier => {
267+
write!(
268+
f,
269+
"Expected to find connection_identifier in query_arguments but did not"
270+
)
271+
}
265272
}
266273
}
267274
}

0 commit comments

Comments
 (0)