Skip to content

Commit 63f26c0

Browse files
committed
chore: Tweaked send_newsletters endpoint for idempotency
1 parent 00d24f8 commit 63f26c0

14 files changed

+344
-83
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ actix-web-lab = "0.15"
2020
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
2121
serde = {version = "1.0.163", features = ["derive"]}
2222
serde-aux = "4.2.0"
23-
serde_json = "1"
23+
serde_json = "1"
24+
serde_urlencoded = "0.7.0"
2425
config = {version = "0.13", default-features = false, features = ["yaml"] }
2526
uuid = { version = "1.3.3", features = ["v4", "serde"] }
2627
chrono = "0.4.15"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- Add migration script here
2+
CREATE TYPE header_pair AS (
3+
name TEXT,
4+
value BYTEA
5+
);
6+
CREATE TABLE idempotency (
7+
user_id uuid NOT NULL REFERENCES users(user_id),
8+
idempotency_key TEXT NOT NULL,
9+
response_status_code SMALLINT NOT NULL,
10+
response_headers header_pair[] NOT NULL,
11+
response_body BYTEA NOT NULL,
12+
created_at timestamptz NOT NULL,
13+
PRIMARY KEY(user_id, idempotency_key)
14+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
-- Add migration script here

src/configuration.rs

+11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::domain::SubscriberEmail;
2+
use crate::email_client::EmailClient;
23
use secrecy::{ExposeSecret, Secret};
34
use serde_aux::field_attributes::deserialize_number_from_string;
45
use sqlx::postgres::{PgConnectOptions, PgSslMode};
@@ -32,6 +33,16 @@ pub struct EmailClientSettings {
3233
}
3334

3435
impl EmailClientSettings {
36+
pub fn client(self) -> EmailClient {
37+
let sender = self.sender().expect("Invalid sender email address");
38+
let timeout = std::time::Duration::from_millis(self.timeout_ms);
39+
EmailClient::new(
40+
self.base_url,
41+
sender,
42+
self.authorization_token,
43+
timeout,
44+
)
45+
}
3546
pub fn sender(&self) -> Result<SubscriberEmail, String> {
3647
SubscriberEmail::parse(self.sender_email.clone())
3748
}

src/idempotency/key.rs

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#[derive(Debug)]
2+
pub struct IdempotencyKey(String);
3+
4+
impl TryFrom<String> for IdempotencyKey {
5+
type Error = anyhow::Error;
6+
fn try_from(s: String) -> Result<Self, Self::Error> {
7+
if s.is_empty() {
8+
anyhow::bail!("The idempotency key cannot be empty");
9+
}
10+
let max_length = 50;
11+
if s.len() >= max_length {
12+
anyhow::bail!(
13+
"The idempotency key must be shorter
14+
than {max_length} characters"
15+
);
16+
}
17+
Ok(Self(s))
18+
}
19+
}
20+
21+
impl From<IdempotencyKey> for String {
22+
fn from(k: IdempotencyKey) -> Self {
23+
k.0
24+
}
25+
}
26+
27+
impl AsRef<str> for IdempotencyKey {
28+
fn as_ref(&self) -> &str {
29+
&self.0
30+
}
31+
}

src/idempotency/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod key;
2+
mod persistence;
3+
pub use key::IdempotencyKey;
4+
pub use persistence::get_saved_response;
5+
pub use persistence::save_response;

src/idempotency/persistence.rs

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use super::IdempotencyKey;
2+
use actix_web::body::to_bytes;
3+
use actix_web::http::{self, StatusCode};
4+
use actix_web::HttpResponse;
5+
use sqlx::postgres::PgHasArrayType;
6+
use sqlx::PgPool;
7+
use uuid::Uuid;
8+
9+
#[derive(Debug, sqlx::Type)]
10+
#[sqlx(type_name = "header_pair")]
11+
struct HeaderPairRecord {
12+
name: String,
13+
value: Vec<u8>,
14+
}
15+
16+
impl PgHasArrayType for HeaderPairRecord {
17+
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
18+
sqlx::postgres::PgTypeInfo::with_name("_header_pair")
19+
}
20+
}
21+
22+
pub async fn get_saved_response(
23+
pool: &PgPool,
24+
idempotency_key: &IdempotencyKey,
25+
user_id: Uuid,
26+
) -> Result<Option<HttpResponse>, anyhow::Error> {
27+
let saved_response = sqlx::query!(
28+
r#"
29+
SELECT
30+
response_status_code,
31+
response_headers as "response_headers: Vec<HeaderPairRecord>",
32+
response_body
33+
FROM idempotency
34+
WHERE
35+
user_id = $1 AND
36+
idempotency_key = $2
37+
"#,
38+
user_id,
39+
idempotency_key.as_ref()
40+
)
41+
.fetch_optional(pool)
42+
.await?;
43+
if let Some(r) = saved_response {
44+
let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?;
45+
let mut response = HttpResponse::build(status_code);
46+
for HeaderPairRecord { name, value } in r.response_headers {
47+
response.append_header((name, value));
48+
}
49+
Ok(Some(response.body(r.response_body)))
50+
} else {
51+
Ok(None)
52+
}
53+
}
54+
55+
pub async fn save_response(
56+
pool: &PgPool,
57+
idempotency_key: &IdempotencyKey,
58+
user_id: Uuid,
59+
http_response: HttpResponse,
60+
) -> Result<HttpResponse, anyhow::Error> {
61+
let (response_head, body) = http_response.into_parts();
62+
let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?;
63+
let status_code = response_head.status().as_u16() as i16;
64+
let headers = {
65+
let mut h = Vec::with_capacity(response_head.headers().len());
66+
for (name, value) in response_head.headers().iter() {
67+
let name = name.as_str().to_owned();
68+
let value = value.as_bytes().to_owned();
69+
h.push(HeaderPairRecord { name, value });
70+
}
71+
h
72+
};
73+
sqlx::query_unchecked!(
74+
r#"
75+
INSERT INTO idempotency (
76+
user_id,
77+
idempotency_key,
78+
response_status_code,
79+
response_headers,
80+
response_body,
81+
created_at
82+
)
83+
VALUES ($1, $2, $3, $4, $5, now())
84+
"#,
85+
user_id,
86+
idempotency_key.as_ref(),
87+
status_code,
88+
headers,
89+
body.as_ref(),
90+
)
91+
.execute(pool)
92+
.await?;
93+
94+
let http_response = response_head.set_body(body).map_into_boxed_body();
95+
Ok(http_response)
96+
}

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod authentication;
22
pub mod configuration;
33
pub mod domain;
44
pub mod email_client;
5+
pub mod idempotency;
56
pub mod routes;
67
pub mod session_state;
78
pub mod startup;

src/routes/admin/newsletters/get.rs

+4
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ use actix_web::HttpResponse;
33
use actix_web_flash_messages::IncomingFlashMessages;
44
use std::fmt::Write;
55

6+
use crate::idempotency;
7+
68
pub async fn publish_newsletter_form(
79
flash_messages: IncomingFlashMessages,
810
) -> Result<HttpResponse, actix_web::Error> {
911
let mut incoming_flash = String::new();
1012
for message in flash_messages.iter() {
1113
writeln!(incoming_flash, "<p><i>{}</i></p>", message.content()).unwrap();
1214
}
15+
let idempotency_key = uuid::Uuid::new_v4().to_string();
1316
Ok(HttpResponse::Ok()
1417
.content_type(ContentType::html())
1518
.body(format!(
@@ -48,6 +51,7 @@ pub async fn publish_newsletter_form(
4851
></textarea>
4952
</label>
5053
<br>
54+
<input hidden type="text" name="idempotency_key" value="{idempotency_key}">
5155
<button type="submit">Publish</button>
5256
</form>
5357
<p><a href="/admin/dashboard">&lt;- Back</a></p>

src/routes/admin/newsletters/post.rs

+27-11
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,54 @@
11
use crate::authentication::UserId;
22
use crate::domain::SubscriberEmail;
33
use crate::email_client::EmailClient;
4-
use crate::utils::{e500, see_other};
4+
use crate::idempotency::{get_saved_response, save_response};
5+
use crate::idempotency::IdempotencyKey;
6+
use crate::utils::{e400, e500, see_other};
57
use actix_web::web::ReqData;
68
use actix_web::{web, HttpResponse};
79
use actix_web_flash_messages::FlashMessage;
810
use anyhow::Context;
911
use sqlx::PgPool;
1012

1113
#[derive(serde::Deserialize)]
12-
pub struct BodyData {
14+
pub struct FormData {
1315
title: String,
1416
text_content: String,
1517
html_content: String,
18+
idempotency_key: String,
1619
}
1720

1821
#[tracing::instrument(
1922
name = "Publish a newsletter issue",
20-
skip(body, pool, email_client, user_id),
23+
skip(form, pool, email_client, user_id),
2124
fields(user_id = %*user_id)
2225
)]
2326
pub async fn publish_newsletter(
24-
body: web::Form<BodyData>,
27+
form: web::Form<FormData>,
2528
pool: web::Data<PgPool>,
2629
email_client: web::Data<EmailClient>,
2730
user_id: ReqData<UserId>,
2831
) -> Result<HttpResponse, actix_web::Error> {
32+
let user_id = user_id.into_inner();
33+
let FormData {
34+
title,
35+
text_content,
36+
html_content,
37+
idempotency_key,
38+
} = form.0;
39+
let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;
40+
if let Some(save_response) = get_saved_response(&pool, &idempotency_key, *user_id)
41+
.await
42+
.map_err(e500)?
43+
{
44+
return Ok(save_response);
45+
}
2946
let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?;
3047
for subscriber in subscribers {
3148
match subscriber {
3249
Ok(subscriber) => {
3350
email_client
34-
.send_email(
35-
&subscriber.email,
36-
&body.title,
37-
&body.html_content,
38-
&body.text_content,
39-
)
51+
.send_email(&subscriber.email, &title, &html_content, &text_content)
4052
.await
4153
.with_context(|| {
4254
format!("Failed to send newsletter issue to {}", subscriber.email)
@@ -55,7 +67,11 @@ pub async fn publish_newsletter(
5567
}
5668
}
5769
FlashMessage::info("The newsletter issue has been published!").send();
58-
Ok(see_other("/admin/newsletters"))
70+
let response = see_other("/admin/newsletters");
71+
let response = save_response(&pool, &idempotency_key, *user_id, response)
72+
.await
73+
.map_err(e500)?;
74+
Ok(response)
5975
}
6076

6177
struct ConfirmedSubscriber {

src/utils.rs

+10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
use actix_web::http::header::LOCATION;
2+
use actix_web::http::StatusCode;
23
use actix_web::HttpResponse;
34

5+
// Return a 400 with the user-representation of the validation error as body.
6+
// The error root cause is preserved for logging purposes.
7+
pub fn e400<T: std::fmt::Debug + std::fmt::Display>(e: T) -> actix_web::Error
8+
where
9+
T: std::fmt::Debug + std::fmt::Display + 'static,
10+
{
11+
actix_web::error::ErrorBadRequest(e)
12+
}
13+
414
// Return an opaque 500 while preserving the error root's cause for logging.
515
pub fn e500<T>(e: T) -> actix_web::Error
616
where

0 commit comments

Comments
 (0)