@@ -7,10 +7,7 @@ use std::{
7
7
fmt:: { Debug , Display } ,
8
8
} ;
9
9
10
- use crate :: sqlite:: {
11
- HydrationError , Projection , Provider , SourceAlias , SqLiteEntity , SqliteConnection ,
12
- WhereCondition ,
13
- } ;
10
+ use crate :: sqlite:: { HydrationError , Projection , Query , SourceAlias , SqLiteEntity , WhereCondition } ;
14
11
15
12
use super :: DbVersion ;
16
13
@@ -106,65 +103,27 @@ impl PartialOrd for DatabaseVersion {
106
103
}
107
104
}
108
105
109
- /// Provider for the [DatabaseVersion] entities using the `DatabaseVersionProjection` .
110
- pub struct DatabaseVersionProvider < ' conn > {
111
- connection : & ' conn SqliteConnection ,
106
+ /// Query to get [DatabaseVersion] entities.
107
+ pub struct GetDatabaseVersionQuery {
108
+ condition : WhereCondition ,
112
109
}
113
110
114
- impl < ' conn > DatabaseVersionProvider < ' conn > {
115
- /// [DatabaseVersionProvider] constructor.
116
- pub fn new ( connection : & ' conn SqliteConnection ) -> Self {
117
- Self { connection }
118
- }
119
-
120
- /// Method to create the table at the beginning of the migration procedure.
121
- /// This code is temporary and should not last.
122
- pub fn create_table_if_not_exists (
123
- & self ,
124
- application_type : & ApplicationNodeType ,
125
- ) -> StdResult < ( ) > {
126
- let connection = self . get_connection ( ) ;
127
- let sql = "select exists(select name from sqlite_master where type='table' and name='db_version') as table_exists" ;
128
- let table_exists = connection
129
- . prepare ( sql) ?
130
- . iter ( )
131
- . next ( )
132
- . unwrap ( )
133
- . unwrap ( )
134
- . read :: < i64 , _ > ( 0 )
135
- == 1 ;
136
-
137
- if !table_exists {
138
- let sql = format ! ( "
139
- create table db_version (application_type text not null primary key, version integer not null, updated_at text not null);
140
- insert into db_version (application_type, version, updated_at) values ('{application_type}', 0, '{}');
141
- " , Utc :: now( ) . to_rfc3339( ) ) ;
142
- connection. execute ( sql) ?;
143
- }
144
-
145
- Ok ( ( ) )
146
- }
147
-
148
- /// Read the application version from the database.
149
- pub fn get_application_version (
150
- & self ,
151
- application_type : & ApplicationNodeType ,
152
- ) -> StdResult < Option < DatabaseVersion > > {
111
+ impl GetDatabaseVersionQuery {
112
+ /// Query to read the application version from the database.
113
+ pub fn get_application_version ( application_type : & ApplicationNodeType ) -> Self {
153
114
let filters = WhereCondition :: new (
154
115
"application_type = ?*" ,
155
116
vec ! [ Value :: String ( format!( "{application_type}" ) ) ] ,
156
117
) ;
157
- let result = self . find ( filters) ?. next ( ) ;
158
-
159
- Ok ( result)
118
+ Self { condition : filters }
160
119
}
161
120
}
162
121
163
- impl < ' conn > Provider < ' conn > for DatabaseVersionProvider < ' conn > {
122
+ impl Query for GetDatabaseVersionQuery {
164
123
type Entity = DatabaseVersion ;
165
124
166
- fn get_connection ( & ' conn self ) -> & SqliteConnection {
167
- self . connection
125
+ fn filters ( & self ) -> WhereCondition {
126
+ self . condition . clone ( )
168
127
}
169
128
170
129
fn get_definition ( & self , condition : & str ) -> String {
@@ -181,20 +140,14 @@ where {condition}
181
140
}
182
141
}
183
142
184
- /// Write [Provider] for the [DatabaseVersion] entities.
185
- /// This will perform an UPSERT and return the updated entity.
186
- pub struct DatabaseVersionUpdater < ' conn > {
187
- connection : & ' conn SqliteConnection ,
143
+ /// Query to UPSERT [DatabaseVersion] entities.
144
+ pub struct UpdateDatabaseVersionQuery {
145
+ condition : WhereCondition ,
188
146
}
189
147
190
- impl < ' conn > DatabaseVersionUpdater < ' conn > {
191
- /// [DatabaseVersionUpdater] constructor.
192
- pub fn new ( connection : & ' conn SqliteConnection ) -> Self {
193
- Self { connection }
194
- }
195
-
196
- /// Persist the given entity and return the projection of the saved entity.
197
- pub fn save ( & self , version : DatabaseVersion ) -> StdResult < DatabaseVersion > {
148
+ impl UpdateDatabaseVersionQuery {
149
+ /// Define a query that will UPSERT the given version.
150
+ pub fn one ( version : DatabaseVersion ) -> Self {
198
151
let filters = WhereCondition :: new (
199
152
"" ,
200
153
vec ! [
@@ -203,20 +156,16 @@ impl<'conn> DatabaseVersionUpdater<'conn> {
203
156
Value :: String ( version. updated_at. to_rfc3339( ) ) ,
204
157
] ,
205
158
) ;
206
- let entity = self
207
- . find ( filters) ?
208
- . next ( )
209
- . ok_or ( anyhow ! ( "No data returned after insertion" ) ) ?;
210
159
211
- Ok ( entity )
160
+ Self { condition : filters }
212
161
}
213
162
}
214
163
215
- impl < ' conn > Provider < ' conn > for DatabaseVersionUpdater < ' conn > {
164
+ impl Query for UpdateDatabaseVersionQuery {
216
165
type Entity = DatabaseVersion ;
217
166
218
- fn get_connection ( & ' conn self ) -> & SqliteConnection {
219
- self . connection
167
+ fn filters ( & self ) -> WhereCondition {
168
+ self . condition . clone ( )
220
169
}
221
170
222
171
fn get_definition ( & self , _condition : & str ) -> String {
@@ -235,8 +184,6 @@ returning {projection}
235
184
236
185
#[ cfg( test) ]
237
186
mod tests {
238
- use sqlite:: Connection ;
239
-
240
187
use super :: * ;
241
188
242
189
#[ test]
@@ -253,31 +200,34 @@ mod tests {
253
200
254
201
#[ test]
255
202
fn test_definition ( ) {
256
- let connection = Connection :: open_thread_safe ( ":memory:" ) . unwrap ( ) ;
257
- let provider = DatabaseVersionProvider :: new ( & connection ) ;
203
+ let query =
204
+ GetDatabaseVersionQuery :: get_application_version ( & ApplicationNodeType :: Aggregator ) ;
258
205
259
206
assert_eq ! (
260
207
r#"
261
208
select db_version.version as version, db_version.application_type as application_type, db_version.updated_at as updated_at
262
209
from db_version
263
210
where true
264
211
"# ,
265
- provider . get_definition( "true" )
212
+ query . get_definition( "true" )
266
213
)
267
214
}
268
215
269
216
#[ test]
270
217
fn test_updated_entity ( ) {
271
- let connection = Connection :: open_thread_safe ( ":memory:" ) . unwrap ( ) ;
272
- let provider = DatabaseVersionUpdater :: new ( & connection) ;
218
+ let query = UpdateDatabaseVersionQuery :: one ( DatabaseVersion {
219
+ version : 0 ,
220
+ application_type : ApplicationNodeType :: Aggregator ,
221
+ updated_at : Default :: default ( ) ,
222
+ } ) ;
273
223
274
224
assert_eq ! (
275
225
r#"
276
226
insert into db_version (application_type, version, updated_at) values (?, ?, ?)
277
227
on conflict (application_type) do update set version = excluded.version, updated_at = excluded.updated_at
278
228
returning db_version.version as version, db_version.application_type as application_type, db_version.updated_at as updated_at
279
229
"# ,
280
- provider . get_definition( "true" )
230
+ query . get_definition( "true" )
281
231
)
282
232
}
283
233
}
0 commit comments