Skip to content

Commit 8cf0036

Browse files
committed
feat(rust): optimize node creation:
- create tokio `Runtime` in a predictable way and reuse it - rework `Command` usage - add `no_logging` flag to the node - optimize node startup - replace readiness poll with a callback
1 parent a5eb87b commit 8cf0036

File tree

206 files changed

+1369
-2108
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

206 files changed

+1369
-2108
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs

+15-15
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use tokio::sync::broadcast::{channel, Receiver, Sender};
44

55
use ockam::SqlxDatabase;
66
use ockam_core::env::get_env_with_default;
7-
use ockam_node::database::{DatabaseConfiguration, DatabaseType, OCKAM_SQLITE_IN_MEMORY};
8-
use ockam_node::Executor;
7+
use ockam_node::database::{DatabaseConfiguration, DatabaseType};
98

109
use crate::cli_state::error::Result;
1110
use crate::cli_state::CliStateError;
@@ -51,11 +50,6 @@ pub struct CliState {
5150
}
5251

5352
impl CliState {
54-
/// Create a new CliState in a given directory
55-
pub fn new(mode: CliStateMode) -> Result<Self> {
56-
Executor::execute_future(Self::create(mode))?
57-
}
58-
5953
pub fn dir(&self) -> Result<PathBuf> {
6054
match &self.mode {
6155
CliStateMode::Persistent(dir) => Ok(dir.to_path_buf()),
@@ -127,14 +121,14 @@ impl CliState {
127121
impl CliState {
128122
/// Return a new CliState using a default directory to store its data or
129123
/// using an in-memory storage if the OCKAM_SQLITE_IN_MEMORY environment variable is set to true
130-
pub fn from_env() -> Result<Self> {
131-
let in_memory = get_env_with_default::<bool>(OCKAM_SQLITE_IN_MEMORY, false)?;
124+
pub async fn new(in_memory: bool) -> Result<Self> {
132125
let mode = if in_memory {
133126
CliStateMode::InMemory
134127
} else {
135128
CliStateMode::with_default_dir()?
136129
};
137-
Self::new(mode)
130+
131+
Self::create(mode).await
138132
}
139133

140134
/// Stop nodes and remove all the directories storing state
@@ -182,7 +176,7 @@ impl CliState {
182176
/// Backup and reset is used to save aside
183177
/// some corrupted local state for later inspection and then reset the state.
184178
/// The database is backed-up only if it is a SQLite database.
185-
pub fn backup_and_reset() -> Result<()> {
179+
pub async fn backup_and_reset() -> Result<()> {
186180
let dir = Self::default_dir()?;
187181

188182
// Reset backup directory
@@ -202,7 +196,7 @@ impl CliState {
202196

203197
// Reset state
204198
Self::delete_at(&dir)?;
205-
Self::new(CliStateMode::Persistent(dir.clone()))?;
199+
Self::create(CliStateMode::Persistent(dir.clone())).await?;
206200

207201
let backup_dir = CliState::backup_default_dir()?;
208202
eprintln!("The {dir:?} directory has been reset and has been backed up to {backup_dir:?}");
@@ -234,15 +228,20 @@ impl CliState {
234228
std::fs::create_dir_all(dir.as_path())?;
235229
}
236230
let database = SqlxDatabase::create(&Self::make_database_configuration(&mode)?).await?;
237-
let configuration = Self::make_application_database_configuration(&mode)?;
238-
let application_database =
239-
SqlxDatabase::create_application_database(&configuration).await?;
240231
debug!("Opened the main database with options {:?}", database);
232+
233+
// TODO: This should not be called unless we're running the App
234+
let application_database = SqlxDatabase::create_application_database(
235+
&Self::make_application_database_configuration(&mode)?,
236+
)
237+
.await?;
241238
debug!(
242239
"Opened the application database with options {:?}",
243240
application_database
244241
);
242+
245243
let (notifications, _) = channel::<Notification>(NOTIFICATIONS_CHANNEL_CAPACITY);
244+
246245
let state = Self {
247246
mode,
248247
database,
@@ -254,6 +253,7 @@ impl CliState {
254253
exporting_enabled: ExportingEnabled::Off,
255254
notifications,
256255
};
256+
257257
Ok(state)
258258
}
259259

implementations/rust/ockam/ockam_api/src/logs/env_variables.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub(crate) const OCKAM_LOG_MAX_FILES: &str = "OCKAM_LOG_MAX_FILES";
2121
/// Log format. Accepted values, see LogFormat. For example: pretty, json, default
2222
pub(crate) const OCKAM_LOG_FORMAT: &str = "OCKAM_LOG_FORMAT";
2323

24-
/// Filter for log messages based on crate names. Accepted values: 'all', 'default', 'comma-separated strings'. For example: ockam_core,ockam_api
24+
/// Filter for log messages based on crate names. Accepted values: 'all' or 'comma-separated strings'. For example: ockam_core,ockam_api
2525
pub(crate) const OCKAM_LOG_CRATES_FILTER: &str = "OCKAM_LOG_CRATES_FILTER";
2626

2727
///

implementations/rust/ockam/ockam_api/src/logs/exporting_configuration.rs

+53-47
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use crate::logs::ExportingEnabled;
77
use crate::CliState;
88
use ockam_core::env::{get_env_with_default, FromString};
99
use ockam_core::errcode::{Kind, Origin};
10-
use ockam_node::Executor;
1110
use std::env::current_exe;
1211
use std::fmt::{Display, Formatter};
1312
use std::net::{SocketAddr, ToSocketAddrs};
@@ -116,14 +115,15 @@ impl ExportingConfiguration {
116115

117116
/// Create a tracing configuration for a user command running in the foreground.
118117
/// (meaning that the process will shut down once the command has been executed)
119-
pub fn foreground(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
120-
match opentelemetry_endpoint(state)? {
118+
pub async fn foreground(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
119+
match opentelemetry_endpoint(state).await? {
121120
None => ExportingConfiguration::off(),
122121
Some(endpoint) => Ok(ExportingConfiguration {
123122
enabled: exporting_enabled(
124123
&endpoint,
125124
opentelemetry_endpoint_foreground_connection_timeout()?,
126-
)?,
125+
)
126+
.await?,
127127
span_export_timeout: span_export_timeout()?,
128128
log_export_timeout: log_export_timeout()?,
129129
span_export_scheduled_delay: foreground_span_export_scheduled_delay()?,
@@ -139,14 +139,15 @@ impl ExportingConfiguration {
139139
}
140140

141141
/// Create a tracing configuration for a background node
142-
pub fn background(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
143-
match opentelemetry_endpoint(state)? {
142+
pub async fn background(state: &CliState) -> ockam_core::Result<ExportingConfiguration> {
143+
match opentelemetry_endpoint(state).await? {
144144
None => ExportingConfiguration::off(),
145145
Some(endpoint) => Ok(ExportingConfiguration {
146146
enabled: exporting_enabled(
147147
&endpoint,
148148
opentelemetry_endpoint_background_connection_timeout()?,
149-
)?,
149+
)
150+
.await?,
150151
span_export_timeout: span_export_timeout()?,
151152
log_export_timeout: log_export_timeout()?,
152153
span_export_scheduled_delay: background_span_export_scheduled_delay()?,
@@ -254,11 +255,11 @@ fn print_debug(message: impl Into<String>) {
254255
/// - Exporting has not been deactivated by the user
255256
/// - The opentelemetry endpoint is accessible
256257
///
257-
fn exporting_enabled(
258+
async fn exporting_enabled(
258259
endpoint: &OpenTelemetryEndpoint,
259260
connection_check_timeout: Duration,
260261
) -> ockam_core::Result<ExportingEnabled> {
261-
if is_endpoint_accessible(&endpoint.url(), connection_check_timeout) {
262+
if is_endpoint_accessible(&endpoint.url(), connection_check_timeout).await {
262263
print_debug("Exporting is enabled");
263264
Ok(ExportingEnabled::On)
264265
} else {
@@ -275,23 +276,37 @@ fn exporting_enabled(
275276
}
276277

277278
/// Return true if the endpoint can be accessed with a TCP connection
278-
fn is_endpoint_accessible(url: &Url, connection_check_timeout: Duration) -> bool {
279+
async fn is_endpoint_accessible(url: &Url, connection_check_timeout: Duration) -> bool {
279280
match to_socket_addr(url) {
280281
Some(address) => {
281282
let retries = FibonacciBackoff::from_millis(100);
282283
let now = Instant::now();
283284

285+
// TODO: Not sure we need to retry really, also maybe it could happen in the background
286+
// to not slow things down
284287
for timeout_duration in retries {
285288
print_debug(format!(
286289
"trying to connect to {address} in {timeout_duration:?}"
287290
));
288-
if std::net::TcpStream::connect_timeout(&address, timeout_duration).is_ok() {
289-
return true;
290-
} else {
291-
if now.elapsed() >= connection_check_timeout {
292-
return false;
293-
};
294-
std::thread::sleep(timeout_duration);
291+
292+
let res = tokio::time::timeout(
293+
timeout_duration,
294+
tokio::net::TcpStream::connect(&address),
295+
)
296+
.await;
297+
298+
match res {
299+
Ok(res) => {
300+
if res.is_ok() {
301+
return true;
302+
}
303+
}
304+
Err(_) => {
305+
if now.elapsed() >= connection_check_timeout {
306+
return false;
307+
};
308+
tokio::time::sleep(timeout_duration).await;
309+
}
295310
}
296311
}
297312
false
@@ -324,36 +339,27 @@ fn to_socket_addr(url: &Url) -> Option<SocketAddr> {
324339
/// Return the tracing endpoint, defined by an environment variable
325340
/// If the endpoint can be established with an Ockam portal to the opentelemetry-relay created in the project
326341
/// use that URL, otherwise use the HTTPS endpoint
327-
fn opentelemetry_endpoint(state: &CliState) -> ockam_core::Result<Option<OpenTelemetryEndpoint>> {
328-
if !is_exporting_set()? {
329-
print_debug("Exporting is turned off");
330-
Ok(None)
331-
} else {
332-
let state = state.clone();
333-
match Executor::execute_future(async move {
334-
// if a project is defined try to use the OpenTelemetry portal
335-
// and if we allow traces to be exported via a portal
336-
if state.projects().get_default_project().await.is_ok()
337-
&& is_exporting_via_portal_set()?
338-
{
339-
print_debug("A default project exists. Getting the project export endpoint");
340-
get_project_endpoint_url(&state).await
341-
} else {
342-
print_debug("A default project does not exist. Getting the default HTTPs endpoint");
343-
get_https_endpoint()
344-
}
345-
}) {
346-
Ok(Ok(url)) => Ok(Some(url)),
347-
Ok(Err(e)) => {
348-
print_debug(format!(
349-
"There was an issue when setting up the exporting of traces: {e:?}"
350-
));
351-
Ok(None)
352-
}
353-
Err(e) => {
354-
print_debug(format!("There was an issue when running the code setting up the exporting of traces: {e:?}"));
355-
Ok(None)
356-
}
342+
async fn opentelemetry_endpoint(
343+
state: &CliState,
344+
) -> ockam_core::Result<Option<OpenTelemetryEndpoint>> {
345+
let res = {
346+
// if a project is defined try to use the OpenTelemetry portal
347+
// and if we allow traces to be exported via a portal
348+
if state.projects().get_default_project().await.is_ok() && is_exporting_via_portal_set()? {
349+
print_debug("A default project exists. Getting the project export endpoint");
350+
get_project_endpoint_url(state).await
351+
} else {
352+
print_debug("A default project does not exist. Getting the default HTTPs endpoint");
353+
get_https_endpoint()
354+
}
355+
};
356+
match res {
357+
Ok(url) => Ok(Some(url)),
358+
Err(e) => {
359+
print_debug(format!(
360+
"There was an issue when setting up the exporting of traces: {e:?}"
361+
));
362+
Ok(None)
357363
}
358364
}
359365
}

0 commit comments

Comments
 (0)