@@ -6,9 +6,9 @@ use crate::{
6
6
use actix_web:: { web, HttpResponse } ;
7
7
use chrono:: Utc ;
8
8
use rand:: { distributions:: Alphanumeric , thread_rng, Rng } ;
9
-
10
- use sqlx:: types:: Uuid ;
11
9
use sqlx:: { PgPool , Postgres , Transaction } ;
10
+ use std:: convert:: { TryInto , TryFrom } ;
11
+ use uuid:: Uuid ;
12
12
13
13
#[ allow( dead_code) ]
14
14
#[ derive( serde:: Deserialize ) ]
@@ -27,12 +27,13 @@ impl TryFrom<FormData> for NewSubscriber {
27
27
}
28
28
}
29
29
30
+ #[ allow( clippy:: async_yields_async) ]
30
31
#[ tracing:: instrument(
31
32
name = "Adding a new subscriber" ,
32
33
skip( form, pool, email_client, base_url) ,
33
34
fields(
34
- email = %form. email,
35
- name = %form. name
35
+ subscriber_email = %form. email,
36
+ subscriber_name = %form. name
36
37
)
37
38
) ]
38
39
pub async fn subscribe (
@@ -43,22 +44,30 @@ pub async fn subscribe(
43
44
) -> HttpResponse {
44
45
let new_subscriber = match form. 0 . try_into ( ) {
45
46
Ok ( subscriber) => subscriber,
46
- Err ( e ) => return HttpResponse :: BadRequest ( ) . body ( e ) ,
47
+ Err ( _ ) => return HttpResponse :: BadRequest ( ) . finish ( ) ,
47
48
} ;
48
49
49
50
let mut transaction = match pool. begin ( ) . await {
50
51
Ok ( t) => t,
51
52
Err ( _) => return HttpResponse :: InternalServerError ( ) . finish ( ) ,
52
53
} ;
53
54
55
+ if search_for_existing_subscription ( & new_subscriber, & mut transaction) . await . is_err ( ) {
56
+ return HttpResponse :: InternalServerError ( ) . finish ( ) ;
57
+ }
58
+
54
59
let subscriber_id = match insert_subscriber ( & new_subscriber, & mut transaction) . await {
55
60
Ok ( id) => id,
56
61
Err ( _) => return HttpResponse :: InternalServerError ( ) . finish ( ) ,
57
62
} ;
58
63
59
64
let subscription_token = generate_subscription_token ( ) ;
60
65
61
- if store_token ( subscriber_id, & subscription_token, & pool) . await . is_err ( ) {
66
+ if store_token ( subscriber_id, & subscription_token, & mut transaction) . await . is_err ( ) {
67
+ return HttpResponse :: InternalServerError ( ) . finish ( ) ;
68
+ }
69
+
70
+ if transaction. commit ( ) . await . is_err ( ) {
62
71
return HttpResponse :: InternalServerError ( ) . finish ( ) ;
63
72
}
64
73
@@ -69,10 +78,6 @@ pub async fn subscribe(
69
78
return HttpResponse :: InternalServerError ( ) . finish ( ) ;
70
79
}
71
80
72
- if transaction. commit ( ) . await . is_err ( ) {
73
- return HttpResponse :: InternalServerError ( ) . finish ( ) ;
74
- }
75
-
76
81
HttpResponse :: Ok ( ) . finish ( )
77
82
}
78
83
@@ -142,12 +147,12 @@ async fn insert_subscriber(
142
147
/// Store subscription token in the database
143
148
#[ tracing:: instrument(
144
149
name = "Saving subscription token in the database" ,
145
- skip( subscriber_id, subscription_token, pool )
150
+ skip( subscriber_id, subscription_token, transaction )
146
151
) ]
147
152
async fn store_token (
148
153
subscriber_id : Uuid ,
149
154
subscription_token : & str ,
150
- pool : & PgPool ,
155
+ transaction : & mut Transaction < ' _ , Postgres >
151
156
) -> Result < ( ) , sqlx:: Error > {
152
157
sqlx:: query!(
153
158
r#"
@@ -157,7 +162,7 @@ async fn store_token(
157
162
subscription_token,
158
163
subscriber_id,
159
164
)
160
- . execute ( pool )
165
+ . execute ( transaction )
161
166
. await
162
167
. map_err ( |e| {
163
168
tracing:: error!( "Failed to execute query: {:?}" , e) ;
@@ -173,4 +178,26 @@ fn generate_subscription_token() -> String {
173
178
. map ( char:: from)
174
179
. take ( 25 )
175
180
. collect ( )
181
+ }
182
+
183
+ async fn search_for_existing_subscription (
184
+ new_subscriber : & NewSubscriber ,
185
+ transaction : & mut Transaction < ' _ , Postgres >
186
+ ) -> Result < ( ) , sqlx:: Error > {
187
+ sqlx:: query!(
188
+ r#"
189
+ SELECT email, name
190
+ FROM subscriptions
191
+ WHERE email = $1 AND name = $2
192
+ "# ,
193
+ new_subscriber. email. as_ref( ) ,
194
+ new_subscriber. name. as_ref( ) ,
195
+ )
196
+ . fetch_optional ( transaction)
197
+ . await
198
+ . map_err ( |e| {
199
+ tracing:: error!( "Failed to execute query: {:?}" , e) ;
200
+ e
201
+ } ) ?;
202
+ Ok ( ( ) )
176
203
}
0 commit comments