From 905e9e7690ad81aabcb630e8ee77434cac99d9a2 Mon Sep 17 00:00:00 2001 From: emilpriver Date: Sat, 10 Feb 2024 12:06:12 +0100 Subject: [PATCH] Added transactions --- src/lib/database_drivers/libsql.rs | 1 + src/lib/database_drivers/maria.rs | 23 ++++++++++++++--------- src/lib/database_drivers/mod.rs | 1 + src/lib/database_drivers/mysql.rs | 23 ++++++++++++++--------- src/lib/database_drivers/postgres.rs | 23 ++++++++++++++--------- src/lib/database_drivers/sqlite.rs | 21 +++++++++++++++++++-- src/lib/migrate.rs | 18 +++++++++++++++--- 7 files changed, 78 insertions(+), 32 deletions(-) diff --git a/src/lib/database_drivers/libsql.rs b/src/lib/database_drivers/libsql.rs index 104c31b..2e59176 100644 --- a/src/lib/database_drivers/libsql.rs +++ b/src/lib/database_drivers/libsql.rs @@ -44,6 +44,7 @@ impl DatabaseDriver for LibSQLDriver { fn execute<'a>( &'a mut self, query: &'a str, + _run_in_transaction: bool, ) -> Pin> + '_>> { let fut = async move { let queries = query diff --git a/src/lib/database_drivers/maria.rs b/src/lib/database_drivers/maria.rs index 79a768b..5100cb5 100644 --- a/src/lib/database_drivers/maria.rs +++ b/src/lib/database_drivers/maria.rs @@ -79,18 +79,23 @@ impl DatabaseDriver for MariaDBDriver { fn execute<'a>( &'a mut self, query: &'a str, + run_in_transaction: bool, ) -> Pin> + '_>> { let fut = async move { - let mut tx = self.db.begin().await?; - - match tx.execute(query).await { - Ok(_) => { - tx.commit().await?; - } - Err(e) => { - error!("Error executing query: {}", e); - tx.rollback().await?; + if run_in_transaction { + let mut tx = self.db.begin().await?; + match tx.execute(query).await { + Ok(_) => { + tx.commit().await?; + } + Err(e) => { + error!("Error executing query: {}", e); + tx.rollback().await?; + } } + return Ok(()); + } else { + self.db.execute(query).await?; } Ok(()) diff --git a/src/lib/database_drivers/mod.rs b/src/lib/database_drivers/mod.rs index 8a2d711..e788db0 100644 --- a/src/lib/database_drivers/mod.rs +++ b/src/lib/database_drivers/mod.rs @@ -23,6 +23,7 @@ pub trait DatabaseDriver { fn execute<'a>( &'a mut self, query: &'a str, + run_in_transaction: bool, ) -> Pin> + '_>>; // create database with the specific driver diff --git a/src/lib/database_drivers/mysql.rs b/src/lib/database_drivers/mysql.rs index 50772f8..4150341 100644 --- a/src/lib/database_drivers/mysql.rs +++ b/src/lib/database_drivers/mysql.rs @@ -78,18 +78,23 @@ impl DatabaseDriver for MySQLDriver { fn execute<'a>( &'a mut self, query: &'a str, + run_in_transaction: bool, ) -> Pin> + '_>> { let fut = async move { - let mut tx = self.db.begin().await?; - - match tx.execute(query).await { - Ok(_) => { - tx.commit().await?; - } - Err(e) => { - error!("Error executing query: {}", e); - tx.rollback().await?; + if run_in_transaction { + let mut tx = self.db.begin().await?; + match tx.execute(query).await { + Ok(_) => { + tx.commit().await?; + } + Err(e) => { + error!("Error executing query: {}", e); + tx.rollback().await?; + } } + return Ok(()); + } else { + self.db.execute(query).await?; } Ok(()) diff --git a/src/lib/database_drivers/postgres.rs b/src/lib/database_drivers/postgres.rs index 70cea4b..c26ca07 100644 --- a/src/lib/database_drivers/postgres.rs +++ b/src/lib/database_drivers/postgres.rs @@ -71,18 +71,23 @@ impl DatabaseDriver for PostgresDriver { fn execute<'a>( &'a mut self, query: &'a str, + run_in_transaction: bool, ) -> Pin> + '_>> { let fut = async move { - let mut tx = self.db.begin().await?; - - match tx.execute(query).await { - Ok(_) => { - tx.commit().await?; - } - Err(e) => { - error!("Error executing query: {}", e); - tx.rollback().await?; + if run_in_transaction { + let mut tx = self.db.begin().await?; + match tx.execute(query).await { + Ok(_) => { + tx.commit().await?; + } + Err(e) => { + error!("Error executing query: {}", e); + tx.rollback().await?; + } } + return Ok(()); + } else { + self.db.execute(query).await?; } Ok(()) diff --git a/src/lib/database_drivers/sqlite.rs b/src/lib/database_drivers/sqlite.rs index 8886098..c452eb6 100644 --- a/src/lib/database_drivers/sqlite.rs +++ b/src/lib/database_drivers/sqlite.rs @@ -1,5 +1,5 @@ use crate::database_drivers::DatabaseDriver; -use anyhow::Result; +use anyhow::{bail, Result}; use libsql_client::{de, local::Client}; use std::fs::{self, File}; use std::future::Future; @@ -48,15 +48,32 @@ impl DatabaseDriver for SqliteDriver { fn execute<'a>( &'a mut self, query: &'a str, + run_in_transaction: bool, ) -> Pin> + '_>> { let fut = async move { + if run_in_transaction { + self.db.execute("BEGIN;")?; + } + let queries = query .split(';') .map(|x| x.trim()) .filter(|x| !x.is_empty()) .collect::>(); for query in queries { - self.db.execute(query)?; + match self.db.execute(query) { + Ok(_) => {} + Err(e) => { + if run_in_transaction { + self.db.execute("ROLLBACK;")?; + } + bail!("{:?}", e); + } + } + } + + if run_in_transaction { + self.db.execute("COMMIT;")?; } Ok(()) }; diff --git a/src/lib/migrate.rs b/src/lib/migrate.rs index 2ce461e..731dc13 100644 --- a/src/lib/migrate.rs +++ b/src/lib/migrate.rs @@ -55,8 +55,14 @@ pub async fn up( if !migrations.contains(&id) { info!("Running migration {}", id); let query = read_file_content(&f.1); + let run_in_transaction = query + .split_once('\n') + // by default do we want to run in a transaction + .unwrap_or(("transaction: yes", "")) + .0 + .contains("transaction: yes"); - database.execute(&query).await?; + database.execute(&query, run_in_transaction).await?; database.insert_schema_migration(&id).await?; } @@ -124,8 +130,14 @@ pub async fn down( Some(f) => { info!("Running rollback for {}", migration); let query = read_file_content(&f.1); - - database.execute(&query).await?; + let run_in_transaction = query + .split_once('\n') + // by default do we want to run in a transaction + .unwrap_or(("transaction: yes", "")) + .0 + .contains("transaction: yes"); + + database.execute(&query, run_in_transaction).await?; database .remove_schema_migration(migration.to_string().as_str())