Skip to content

Commit 921757c

Browse files
committed
chore: refactoring code for user workflow observability
1 parent 63f26c0 commit 921757c

11 files changed

+209
-77
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
11
-- Add migration script here
2+
ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL;
3+
ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL;
4+
ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- Add migration script here
2+
CREATE TABLE newsletter_issues (
3+
newsletter_issue_id uuid NOT NULL,
4+
title TEXT NOT NULL,
5+
text_content TEXT NOT NULL,
6+
html_content TEXT NOT NULL,
7+
published_at TEXT NOT NULL,
8+
PRIMARY KEY(newsletter_issue_id)
9+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- Add migration script here
2+
CREATE TABLE issue_delivery_queue (
3+
newsletter_issue_id uuid NOT NULL REFERENCES newsletter_issues (newsletter_issue_id),
4+
subscriber_email TEXT NOT NULL,
5+
PRIMARY KEY(newsletter_issue_id, subscriber_email)
6+
);

src/configuration.rs

+1-6
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,7 @@ impl EmailClientSettings {
3636
pub fn client(self) -> EmailClient {
3737
let sender = self.sender().expect("Invalid sender email address");
3838
let timeout = std::time::Duration::from_millis(self.timeout_ms);
39-
EmailClient::new(
40-
self.base_url,
41-
sender,
42-
self.authorization_token,
43-
timeout,
44-
)
39+
EmailClient::new(self.base_url, sender, self.authorization_token, timeout)
4540
}
4641
pub fn sender(&self) -> Result<SubscriberEmail, String> {
4742
SubscriberEmail::parse(self.sender_email.clone())

src/idempotency/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ mod persistence;
33
pub use key::IdempotencyKey;
44
pub use persistence::get_saved_response;
55
pub use persistence::save_response;
6+
pub use persistence::{try_processing, NextAction};

src/idempotency/persistence.rs

+55-15
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use super::IdempotencyKey;
22
use actix_web::body::to_bytes;
3-
use actix_web::http::{self, StatusCode};
3+
use actix_web::http::StatusCode;
44
use actix_web::HttpResponse;
55
use sqlx::postgres::PgHasArrayType;
66
use sqlx::PgPool;
7+
use sqlx::{Postgres, Transaction};
78
use uuid::Uuid;
89

910
#[derive(Debug, sqlx::Type)]
@@ -27,9 +28,9 @@ pub async fn get_saved_response(
2728
let saved_response = sqlx::query!(
2829
r#"
2930
SELECT
30-
response_status_code,
31-
response_headers as "response_headers: Vec<HeaderPairRecord>",
32-
response_body
31+
response_status_code as "response_status_code!",
32+
response_headers as "response_headers!: Vec<HeaderPairRecord>",
33+
response_body as "response_body!"
3334
FROM idempotency
3435
WHERE
3536
user_id = $1 AND
@@ -40,6 +41,7 @@ pub async fn get_saved_response(
4041
)
4142
.fetch_optional(pool)
4243
.await?;
44+
4345
if let Some(r) = saved_response {
4446
let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?;
4547
let mut response = HttpResponse::build(status_code);
@@ -53,7 +55,7 @@ pub async fn get_saved_response(
5355
}
5456

5557
pub async fn save_response(
56-
pool: &PgPool,
58+
mut transaction: Transaction<'static, Postgres>,
5759
idempotency_key: &IdempotencyKey,
5860
user_id: Uuid,
5961
http_response: HttpResponse,
@@ -72,25 +74,63 @@ pub async fn save_response(
7274
};
7375
sqlx::query_unchecked!(
7476
r#"
75-
INSERT INTO idempotency (
76-
user_id,
77-
idempotency_key,
78-
response_status_code,
79-
response_headers,
80-
response_body,
81-
created_at
82-
)
83-
VALUES ($1, $2, $3, $4, $5, now())
77+
UPDATE idempotency
78+
SET
79+
response_status_code = $3,
80+
response_headers = $4,
81+
response_body = $5
82+
WHERE
83+
user_id = $1 AND
84+
idempotency_key = $2
8485
"#,
8586
user_id,
8687
idempotency_key.as_ref(),
8788
status_code,
8889
headers,
8990
body.as_ref(),
9091
)
91-
.execute(pool)
92+
.execute(&mut transaction)
9293
.await?;
94+
transaction.commit().await?;
9395

9496
let http_response = response_head.set_body(body).map_into_boxed_body();
9597
Ok(http_response)
9698
}
99+
100+
pub enum NextAction {
101+
StartProcessing(Transaction<'static, Postgres>),
102+
ReturnSavedResponse(HttpResponse),
103+
}
104+
105+
pub async fn try_processing(
106+
pool: &PgPool,
107+
idempotency_key: &IdempotencyKey,
108+
user_id: Uuid,
109+
) -> Result<NextAction, anyhow::Error> {
110+
let mut transaction = pool.begin().await?;
111+
let n_inserted_rows = sqlx::query!(
112+
r#"
113+
INSERT INTO idempotency (
114+
user_id,
115+
idempotency_key,
116+
created_at
117+
)
118+
VALUES ($1, $2, now())
119+
ON CONFLICT DO NOTHING
120+
"#,
121+
user_id,
122+
idempotency_key.as_ref()
123+
)
124+
.execute(&mut transaction)
125+
.await?
126+
.rows_affected();
127+
128+
if n_inserted_rows > 0 {
129+
Ok(NextAction::StartProcessing(transaction))
130+
} else {
131+
let saved_response = get_saved_response(pool, idempotency_key, user_id)
132+
.await?
133+
.ok_or_else(|| anyhow::anyhow!("We expected a saved response, we didn't find it"))?;
134+
Ok(NextAction::ReturnSavedResponse(saved_response))
135+
}
136+
}

src/routes/admin/newsletters/get.rs

-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ use actix_web::HttpResponse;
33
use actix_web_flash_messages::IncomingFlashMessages;
44
use std::fmt::Write;
55

6-
use crate::idempotency;
7-
86
pub async fn publish_newsletter_form(
97
flash_messages: IncomingFlashMessages,
108
) -> Result<HttpResponse, actix_web::Error> {

src/routes/admin/newsletters/post.rs

+74-49
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
use crate::authentication::UserId;
2-
use crate::domain::SubscriberEmail;
3-
use crate::email_client::EmailClient;
4-
use crate::idempotency::{get_saved_response, save_response};
52
use crate::idempotency::IdempotencyKey;
3+
use crate::idempotency::{get_saved_response, save_response, try_processing, NextAction};
64
use crate::utils::{e400, e500, see_other};
75
use actix_web::web::ReqData;
86
use actix_web::{web, HttpResponse};
97
use actix_web_flash_messages::FlashMessage;
108
use anyhow::Context;
11-
use sqlx::PgPool;
9+
use sqlx::{PgPool, Postgres, Transaction};
10+
use uuid::Uuid;
1211

1312
#[derive(serde::Deserialize)]
1413
pub struct FormData {
@@ -20,13 +19,12 @@ pub struct FormData {
2019

2120
#[tracing::instrument(
2221
name = "Publish a newsletter issue",
23-
skip(form, pool, email_client, user_id),
22+
skip_all,
2423
fields(user_id = %*user_id)
2524
)]
2625
pub async fn publish_newsletter(
2726
form: web::Form<FormData>,
2827
pool: web::Data<PgPool>,
29-
email_client: web::Data<EmailClient>,
3028
user_id: ReqData<UserId>,
3129
) -> Result<HttpResponse, actix_web::Error> {
3230
let user_id = user_id.into_inner();
@@ -37,65 +35,92 @@ pub async fn publish_newsletter(
3735
idempotency_key,
3836
} = form.0;
3937
let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;
38+
let mut transaction = match try_processing(&pool, &idempotency_key, *user_id)
39+
.await
40+
.map_err(e500)?
41+
{
42+
NextAction::StartProcessing(t) => t,
43+
NextAction::ReturnSavedResponse(saved_response) => {
44+
success_message().send();
45+
return Ok(saved_response);
46+
}
47+
};
48+
4049
if let Some(save_response) = get_saved_response(&pool, &idempotency_key, *user_id)
4150
.await
4251
.map_err(e500)?
4352
{
4453
return Ok(save_response);
4554
}
46-
let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?;
47-
for subscriber in subscribers {
48-
match subscriber {
49-
Ok(subscriber) => {
50-
email_client
51-
.send_email(&subscriber.email, &title, &html_content, &text_content)
52-
.await
53-
.with_context(|| {
54-
format!("Failed to send newsletter issue to {}", subscriber.email)
55-
})
56-
.map_err(e500)?;
57-
}
55+
let issue_id = insert_newsletter_issue(&mut transaction, &title, &text_content, &html_content)
56+
.await
57+
.context("Failed to store newsletter issue details")
58+
.map_err(e500)?;
59+
enqueue_delivery_tasks(&mut transaction, issue_id)
60+
.await
61+
.context("Failed to enqueue delivery task")
62+
.map_err(e500)?;
5863

59-
Err(error) => {
60-
tracing::warn!(
61-
error.cause_chain = ?error,
62-
error.message = %error,
63-
"Skipping a confirmed subscriber. \
64-
Their stored contact details are invalid",
65-
);
66-
}
67-
}
68-
}
69-
FlashMessage::info("The newsletter issue has been published!").send();
7064
let response = see_other("/admin/newsletters");
71-
let response = save_response(&pool, &idempotency_key, *user_id, response)
65+
let response = save_response(transaction, &idempotency_key, *user_id, response)
7266
.await
7367
.map_err(e500)?;
68+
success_message().send();
7469
Ok(response)
7570
}
7671

77-
struct ConfirmedSubscriber {
78-
email: SubscriberEmail,
72+
fn success_message() -> FlashMessage {
73+
FlashMessage::info("The newsletter issue has been accepted - \
74+
emails will go out shortly.",)
75+
}
76+
77+
#[tracing::instrument(skip_all)]
78+
async fn insert_newsletter_issue(
79+
transaction: &mut Transaction<'_, Postgres>,
80+
title: &str,
81+
text_content: &str,
82+
html_content: &str,
83+
) -> Result<Uuid, sqlx::Error> {
84+
let newsletter_issue_id = Uuid::new_v4();
85+
sqlx::query!(
86+
r#"
87+
INSERT INTO newsletter_issues (
88+
newsletter_issue_id,
89+
title,
90+
text_content,
91+
html_content,
92+
published_at
93+
)
94+
VALUES ($1, $2, $3, $4, now())
95+
"#,
96+
newsletter_issue_id,
97+
title,
98+
text_content,
99+
html_content
100+
)
101+
.execute(transaction)
102+
.await?;
103+
Ok(newsletter_issue_id)
79104
}
80105

81-
#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))]
82-
async fn get_confirmed_subscribers(
83-
pool: &PgPool,
84-
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
85-
let confirmed_subscribers = sqlx::query!(
106+
#[tracing::instrument(skip_all)]
107+
async fn enqueue_delivery_tasks(
108+
transaction: &mut Transaction<'_, Postgres>,
109+
newsletter_issue_id: Uuid,
110+
) -> Result<(), sqlx::Error> {
111+
sqlx::query!(
86112
r#"
87-
SELECT email
88-
FROM subscriptions
89-
WHERE status = 'confirmed'
113+
INSERT INTO issue_delivery_queue (
114+
newsletter_issue_id,
115+
subscriber_email
116+
)
117+
SELECT $1, email
118+
FROM subscriptions
119+
WHERE status = 'confirmed'
90120
"#,
121+
newsletter_issue_id,
91122
)
92-
.fetch_all(pool)
93-
.await?
94-
.into_iter()
95-
.map(|r| match SubscriberEmail::parse(r.email) {
96-
Ok(email) => Ok(ConfirmedSubscriber { email }),
97-
Err(error) => Err(anyhow::anyhow!(error)),
98-
})
99-
.collect();
100-
Ok(confirmed_subscribers)
123+
.execute(transaction)
124+
.await?;
125+
Ok(())
101126
}

src/utils.rs

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use actix_web::http::header::LOCATION;
2-
use actix_web::http::StatusCode;
32
use actix_web::HttpResponse;
43

54
// Return a 400 with the user-representation of the validation error as body.

tests/api/helpers.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use argon2::{Argon2, PasswordHasher};
33
use once_cell::sync::Lazy;
44
use robust_rust::{
55
configuration::{get_configuration, DatabaseSettings},
6+
email_client::EmailClient,
67
startup::{get_connection_pool, Application},
78
telemetry::{get_subscriber, init_subscriber},
8-
email_client::EmailClient,
99
};
1010
use sqlx::{Connection, Executor, PgConnection, PgPool};
1111
use uuid::Uuid;

0 commit comments

Comments
 (0)