Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(launcher): db connect options #1182

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion libs/blockscout-service-launcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "blockscout-service-launcher"
version = "0.15.0"
version = "0.16.0"
description = "Allows to launch blazingly fast blockscout rust services"
license = "MIT"
repository = "https://github.com/blockscout/blockscout-rs"
Expand All @@ -26,6 +26,7 @@ prometheus = { version = "0.13", optional = true }
reqwest = { version = "0.11", features = ["json"], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = {version = "1", optional = true }
serde_with = {version = "3", optional = true }
tokio = { version = "1", optional = true }
tonic = { version = "0.8", optional = true }
tracing = { version = "0.1", optional = true }
Expand Down Expand Up @@ -90,6 +91,7 @@ database = [
"dep:anyhow",
"dep:cfg-if",
"dep:serde",
"dep:serde_with",
"dep:tracing",
"dep:url",
]
Expand Down
154 changes: 117 additions & 37 deletions libs/blockscout-service-launcher/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::Context;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
use std::{str::FromStr, time::Duration};
use tracing::log::LevelFilter;

cfg_if::cfg_if! {
if #[cfg(feature = "database-1_0")] {
Expand All @@ -17,7 +19,7 @@ cfg_if::cfg_if! {
pub use sea_orm_migration_0_10::MigratorTrait;
} else {
compile_error!(
"one of the features ['database-0_12', 'database-0_11', 'database-0_10'] \
"one of the features ['database-1_0', 'database-0_12', 'database-0_11', 'database-0_10'] \
must be enabled"
);
}
Expand All @@ -26,17 +28,14 @@ cfg_if::cfg_if! {
const DEFAULT_DB: &str = "postgres";

pub async fn initialize_postgres<Migrator: MigratorTrait>(
connect_options: impl Into<ConnectOptions>,
create_database: bool,
run_migrations: bool,
settings: &DatabaseSettings,
) -> anyhow::Result<DatabaseConnection> {
let connect_options = connect_options.into();
let db_url = settings.connect.clone().url();

// Create database if not exists
if create_database {
let db_url = connect_options.get_url();
if settings.create_database {
let (db_base_url, db_name) = {
let mut db_url = url::Url::from_str(db_url).context("invalid database url")?;
let mut db_url: url::Url = db_url.parse().context("invalid database url")?;
let db_name = db_url
.path_segments()
.and_then(|mut segments| segments.next())
Expand All @@ -51,7 +50,7 @@ pub async fn initialize_postgres<Migrator: MigratorTrait>(
tracing::info!("creating database '{db_name}'");
let db_base_url = format!("{db_base_url}/{DEFAULT_DB}");

let create_database_options = with_connect_options(db_base_url, &connect_options);
let create_database_options = settings.connect_options.apply_to(db_base_url.into());
let db = Database::connect(create_database_options).await?;

let result = db
Expand All @@ -74,44 +73,22 @@ pub async fn initialize_postgres<Migrator: MigratorTrait>(
};
}

let connect_options = settings.connect_options.apply_to(db_url.into());
let db = Database::connect(connect_options).await?;
if run_migrations {
if settings.run_migrations {
Migrator::up(&db, None).await?;
}

Ok(db)
}

fn with_connect_options(url: impl Into<String>, source_options: &ConnectOptions) -> ConnectOptions {
let mut options = ConnectOptions::new(url.into());
if let Some(value) = source_options.get_max_connections() {
options.max_connections(value);
}
if let Some(value) = source_options.get_min_connections() {
options.min_connections(value);
}
if let Some(value) = source_options.get_connect_timeout() {
options.connect_timeout(value);
}
if let Some(value) = source_options.get_idle_timeout() {
options.idle_timeout(value);
}
if let Some(value) = source_options.get_acquire_timeout() {
options.acquire_timeout(value);
}
if let Some(value) = source_options.get_max_lifetime() {
options.max_lifetime(value);
}
options.sqlx_logging(source_options.get_sqlx_logging());
options.sqlx_logging_level(source_options.get_sqlx_logging_level());
options
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct DatabaseSettings {
pub connect: DatabaseConnectSettings,
#[serde(default)]
pub connect_options: DatabaseConnectOptionsSettings,
#[serde(default)]
pub create_database: bool,
#[serde(default)]
pub run_migrations: bool,
Expand Down Expand Up @@ -162,3 +139,106 @@ impl DatabaseKvConnection {
)
}
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct DatabaseConnectOptionsSettings {
/// Maximum number of connections for a pool
pub max_connections: Option<u32>,
/// Minimum number of connections for a pool
pub min_connections: Option<u32>,
/// The connection timeout for a packet connection
pub connect_timeout: Option<Duration>,
/// Maximum idle time for a particular connection to prevent
/// network resource exhaustion
pub idle_timeout: Option<Duration>,
/// Set the maximum amount of time to spend waiting for acquiring a connection
pub acquire_timeout: Option<Duration>,
/// Set the maximum lifetime of individual connections
pub max_lifetime: Option<Duration>,
/// Enable SQLx statement logging
pub sqlx_logging: bool,
#[serde(
deserialize_with = "string_to_level_filter",
serialize_with = "level_filter_to_string"
)]
/// SQLx statement logging level (ignored if `sqlx_logging` is false)
pub sqlx_logging_level: LevelFilter,
#[cfg(feature = "database-1_0")]
#[serde(
deserialize_with = "string_to_level_filter",
serialize_with = "level_filter_to_string"
)]
/// SQLx slow statements logging level (ignored if `sqlx_logging` is false)
pub sqlx_slow_statements_logging_level: LevelFilter,
#[cfg(feature = "database-1_0")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
/// SQLx slow statements duration threshold (ignored if `sqlx_logging` is false)
pub sqlx_slow_statements_logging_threshold: Duration,
}

impl Default for DatabaseConnectOptionsSettings {
fn default() -> Self {
Self {
max_connections: None,
min_connections: None,
connect_timeout: None,
idle_timeout: None,
acquire_timeout: None,
max_lifetime: None,
sqlx_logging: true,
sqlx_logging_level: LevelFilter::Debug,
#[cfg(feature = "database-1_0")]
sqlx_slow_statements_logging_level: LevelFilter::Off,
#[cfg(feature = "database-1_0")]
sqlx_slow_statements_logging_threshold: Duration::from_secs(1),
}
}
}

impl DatabaseConnectOptionsSettings {
fn apply_to(&self, mut options: ConnectOptions) -> ConnectOptions {
if let Some(value) = self.max_connections {
options.max_connections(value);
}
if let Some(value) = self.min_connections {
options.min_connections(value);
}
if let Some(value) = self.connect_timeout {
options.connect_timeout(value);
}
if let Some(value) = self.idle_timeout {
options.idle_timeout(value);
}
if let Some(value) = self.acquire_timeout {
options.acquire_timeout(value);
}
if let Some(value) = self.max_lifetime {
options.max_lifetime(value);
}
options.sqlx_logging(self.sqlx_logging);
options.sqlx_logging_level(self.sqlx_logging_level);
#[cfg(feature = "database-1_0")]
options.sqlx_slow_statements_logging_settings(
self.sqlx_slow_statements_logging_level,
self.sqlx_slow_statements_logging_threshold,
);
options
}
}

fn string_to_level_filter<'de, D>(deserializer: D) -> Result<LevelFilter, D::Error>
where
D: Deserializer<'de>,
{
let string = String::deserialize(deserializer)?;
LevelFilter::from_str(&string).map_err(<D::Error as serde::de::Error>::custom)
}

pub fn level_filter_to_string<S>(x: &LevelFilter, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
s.serialize_str(x.as_str().to_lowercase().as_str())
}
2 changes: 1 addition & 1 deletion libs/env-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"

[dev-dependencies]
blockscout-service-launcher = { path = "../blockscout-service-launcher", features = ["database-0_12"] }
blockscout-service-launcher = { path = "../blockscout-service-launcher", features = ["database-1_0"] }
pretty_assertions = "1.4.0"
tempfile = "3.10.1"
102 changes: 91 additions & 11 deletions libs/env-collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,66 @@ mod tests {
true,
"e.g. `test-url`",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MAX_CONNECTIONS",
Some("`null`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MIN_CONNECTIONS",
Some("`null`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__CONNECT_TIMEOUT",
Some("`null`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__IDLE_TIMEOUT",
Some("`null`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__ACQUIRE_TIMEOUT",
Some("`null`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MAX_LIFETIME",
Some("`null`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_LOGGING",
Some("`true`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_LOGGING_LEVEL",
Some("`debug`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_SLOW_STATEMENTS_LOGGING_LEVEL",
Some("`off`"),
false,
"",
),
var(
"TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_SLOW_STATEMENTS_LOGGING_THRESHOLD",
Some("`1`"),
false,
"",
),
]))
}

Expand All @@ -662,17 +722,27 @@ mod tests {

[anchor]: <> (anchors.envs.start.cool_postfix)

| Variable | Required | Description | Default Value |
|-------------------------------------------|-------------|------------------|---------------|
| `TEST_SERVICE__TEST` | true | e.g. `value` | |
| `TEST_SERVICE__DATABASE__CREATE_DATABASE` | false | | `false` |
| `TEST_SERVICE__DATABASE__RUN_MIGRATIONS` | false | | `false` |
| `TEST_SERVICE__TEST2` | false | e.g. `123` | `1000` |
| `TEST_SERVICE__TEST3_SET` | false | e.g. `false` | `null` |
| `TEST_SERVICE__TEST4_NOT_SET` | false | | `null` |
| `TEST_SERVICE__TEST5_WITH_UNICODE` | false | | `false` |
| `TEST_SERVICE__STRING_WITH_DEFAULT` | false | | `kekek` |
| `TEST_SERVICE__DATABASE__CONNECT__URL` | true | e.g. `test-url` | |
| Variable | Required | Description | Default Value |
|---------------------------------------------------------------|-------------|------------------|---------------|
| `TEST_SERVICE__TEST` | true | e.g. `value` | |
| `TEST_SERVICE__DATABASE__CREATE_DATABASE` | false | | `false` |
| `TEST_SERVICE__DATABASE__RUN_MIGRATIONS` | false | | `false` |
| `TEST_SERVICE__TEST2` | false | e.g. `123` | `1000` |
| `TEST_SERVICE__TEST3_SET` | false | e.g. `false` | `null` |
| `TEST_SERVICE__TEST4_NOT_SET` | false | | `null` |
| `TEST_SERVICE__TEST5_WITH_UNICODE` | false | | `false` |
| `TEST_SERVICE__STRING_WITH_DEFAULT` | false | | `kekek` |
| `TEST_SERVICE__DATABASE__CONNECT__URL` | true | e.g. `test-url` | |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__ACQUIRE_TIMEOUT` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__CONNECT_TIMEOUT` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__IDLE_TIMEOUT` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MAX_CONNECTIONS` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MAX_LIFETIME` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MIN_CONNECTIONS` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_LOGGING` | | | `true` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_LOGGING_LEVEL` | | | `debug` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_SLOW_STATEMENTS_LOGGING_LEVEL` | | | `off` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_SLOW_STATEMENTS_LOGGING_THRESHOLD` | | | `1` |
[anchor]: <> (anchors.envs.end.cool_postfix)
"#
}
Expand Down Expand Up @@ -769,6 +839,16 @@ mod tests {
| `SOME_EXTRA_VARS2` | true | | `example_value2` |
| `TEST_SERVICE__DATABASE__CONNECT__URL` | true | e.g. `test-url` | |
| `TEST_SERVICE__TEST` | true | e.g. `value` | |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__ACQUIRE_TIMEOUT` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__CONNECT_TIMEOUT` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__IDLE_TIMEOUT` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MAX_CONNECTIONS` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MAX_LIFETIME` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__MIN_CONNECTIONS` | | | `null` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_LOGGING` | | | `true` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_LOGGING_LEVEL` | | | `debug` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_SLOW_STATEMENTS_LOGGING_LEVEL` | | | `off` |
| `TEST_SERVICE__DATABASE__CONNECT_OPTIONS__SQLX_SLOW_STATEMENTS_LOGGING_THRESHOLD` | | | `1` |
| `TEST_SERVICE__DATABASE__CREATE_DATABASE` | | | `false` |
| `TEST_SERVICE__DATABASE__RUN_MIGRATIONS` | | | `false` |
| `TEST_SERVICE__STRING_WITH_DEFAULT` | | | `kekek` |
Expand Down
7 changes: 1 addition & 6 deletions service-template/{{project-name}}-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,7 @@ pub async fn run(settings: Settings) -> Result<(), anyhow::Error> {
let health = Arc::new(HealthService::default());

{% if database and migrations %}
let _db_connection = database::initialize_postgres::<Migrator>(
&settings.database.connect.url(),
settings.database.create_database,
settings.database.run_migrations,
)
.await?;
let _db_connection = database::initialize_postgres::<Migrator>(&settings.database).await?;
{% endif %}

// TODO: init services here
Expand Down
Loading