Skip to content

Commit

Permalink
Added transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
emilpriver committed Feb 10, 2024
1 parent e63144e commit 905e9e7
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/lib/database_drivers/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl DatabaseDriver for LibSQLDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
_run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
let queries = query
Expand Down
23 changes: 14 additions & 9 deletions src/lib/database_drivers/maria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,23 @@ impl DatabaseDriver for MariaDBDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
let mut tx = self.db.begin().await?;

match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
if run_in_transaction {
let mut tx = self.db.begin().await?;
match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
}
}
return Ok(());
} else {
self.db.execute(query).await?;
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/lib/database_drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub trait DatabaseDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>>;

// create database with the specific driver
Expand Down
23 changes: 14 additions & 9 deletions src/lib/database_drivers/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,23 @@ impl DatabaseDriver for MySQLDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
let mut tx = self.db.begin().await?;

match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
if run_in_transaction {
let mut tx = self.db.begin().await?;
match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
}
}
return Ok(());
} else {
self.db.execute(query).await?;
}

Ok(())
Expand Down
23 changes: 14 additions & 9 deletions src/lib/database_drivers/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,23 @@ impl DatabaseDriver for PostgresDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
let mut tx = self.db.begin().await?;

match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
if run_in_transaction {
let mut tx = self.db.begin().await?;
match tx.execute(query).await {
Ok(_) => {
tx.commit().await?;
}
Err(e) => {
error!("Error executing query: {}", e);
tx.rollback().await?;
}
}
return Ok(());
} else {
self.db.execute(query).await?;
}

Ok(())
Expand Down
21 changes: 19 additions & 2 deletions src/lib/database_drivers/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::database_drivers::DatabaseDriver;
use anyhow::Result;
use anyhow::{bail, Result};
use libsql_client::{de, local::Client};
use std::fs::{self, File};
use std::future::Future;
Expand Down Expand Up @@ -48,15 +48,32 @@ impl DatabaseDriver for SqliteDriver {
fn execute<'a>(
&'a mut self,
query: &'a str,
run_in_transaction: bool,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + '_>> {
let fut = async move {
if run_in_transaction {
self.db.execute("BEGIN;")?;
}

let queries = query
.split(';')
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<&str>>();
for query in queries {
self.db.execute(query)?;
match self.db.execute(query) {
Ok(_) => {}
Err(e) => {
if run_in_transaction {
self.db.execute("ROLLBACK;")?;
}
bail!("{:?}", e);
}
}
}

if run_in_transaction {
self.db.execute("COMMIT;")?;
}
Ok(())
};
Expand Down
18 changes: 15 additions & 3 deletions src/lib/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ pub async fn up(
if !migrations.contains(&id) {
info!("Running migration {}", id);
let query = read_file_content(&f.1);
let run_in_transaction = query
.split_once('\n')
// by default do we want to run in a transaction
.unwrap_or(("transaction: yes", ""))
.0
.contains("transaction: yes");

database.execute(&query).await?;
database.execute(&query, run_in_transaction).await?;

database.insert_schema_migration(&id).await?;
}
Expand Down Expand Up @@ -124,8 +130,14 @@ pub async fn down(
Some(f) => {
info!("Running rollback for {}", migration);
let query = read_file_content(&f.1);

database.execute(&query).await?;
let run_in_transaction = query
.split_once('\n')
// by default do we want to run in a transaction
.unwrap_or(("transaction: yes", ""))
.0
.contains("transaction: yes");

database.execute(&query, run_in_transaction).await?;

database
.remove_schema_migration(migration.to_string().as_str())
Expand Down

0 comments on commit 905e9e7

Please sign in to comment.