17
17
18
18
use futures:: { Future , Stream } ;
19
19
use reqwest:: r#async:: { Client , Decoder } ;
20
+ use reqwest:: { StatusCode , Url } ;
20
21
21
22
use std:: mem;
22
23
@@ -25,15 +26,45 @@ use crate::query::read_query::InfluxDbReadQuery;
25
26
use crate :: query:: write_query:: InfluxDbWriteQuery ;
26
27
use crate :: query:: InfluxDbQuery ;
27
28
28
- use url:: form_urlencoded;
29
-
30
29
use std:: any:: Any ;
31
30
31
+ #[ derive( Clone , Debug ) ]
32
+ /// Internal Authentication representation
33
+ pub ( crate ) struct InfluxDbAuthentication {
34
+ pub username : String ,
35
+ pub password : String ,
36
+ }
37
+
38
+ #[ derive( Clone , Debug ) ]
32
39
/// Internal Representation of a Client
33
40
pub struct InfluxDbClient {
34
41
url : String ,
35
42
database : String ,
36
- // auth: Option<InfluxDbAuthentication>
43
+ auth : Option < InfluxDbAuthentication > ,
44
+ }
45
+
46
+ impl Into < Vec < ( String , String ) > > for InfluxDbClient {
47
+ fn into ( self ) -> Vec < ( String , String ) > {
48
+ let mut vec: Vec < ( String , String ) > = Vec :: new ( ) ;
49
+ vec. push ( ( "db" . to_string ( ) , self . database ) ) ;
50
+ if let Some ( auth) = self . auth {
51
+ vec. push ( ( "u" . to_string ( ) , auth. username ) ) ;
52
+ vec. push ( ( "p" . to_string ( ) , auth. password ) ) ;
53
+ }
54
+ vec
55
+ }
56
+ }
57
+
58
+ impl < ' a > Into < Vec < ( String , String ) > > for & ' a InfluxDbClient {
59
+ fn into ( self ) -> Vec < ( String , String ) > {
60
+ let mut vec: Vec < ( String , String ) > = Vec :: new ( ) ;
61
+ vec. push ( ( "db" . to_string ( ) , self . database . to_owned ( ) ) ) ;
62
+ if let Some ( auth) = & self . auth {
63
+ vec. push ( ( "u" . to_string ( ) , auth. username . to_owned ( ) ) ) ;
64
+ vec. push ( ( "p" . to_string ( ) , auth. password . to_owned ( ) ) ) ;
65
+ }
66
+ vec
67
+ }
37
68
}
38
69
39
70
impl InfluxDbClient {
@@ -59,9 +90,36 @@ impl InfluxDbClient {
59
90
InfluxDbClient {
60
91
url : url. to_string ( ) ,
61
92
database : database. to_string ( ) ,
93
+ auth : None ,
62
94
}
63
95
}
64
96
97
+ /// Add authentication/authorization information to [`InfluxDbClient`](crate::client::InfluxDbClient)
98
+ ///
99
+ /// # Arguments
100
+ ///
101
+ /// * username: The Username for InfluxDB.
102
+ /// * password: THe Password for the user.
103
+ ///
104
+ /// # Examples
105
+ ///
106
+ /// ```rust
107
+ /// use influxdb::client::InfluxDbClient;
108
+ ///
109
+ /// let _client = InfluxDbClient::new("http://localhost:9086", "test").with_auth("admin", "password");
110
+ /// ```
111
+ pub fn with_auth < ' a , S1 , S2 > ( mut self , username : S1 , password : S2 ) -> Self
112
+ where
113
+ S1 : ToString ,
114
+ S2 : ToString ,
115
+ {
116
+ self . auth = Some ( InfluxDbAuthentication {
117
+ username : username. to_string ( ) ,
118
+ password : password. to_string ( ) ,
119
+ } ) ;
120
+ self
121
+ }
122
+
65
123
/// Returns the name of the database the client is using
66
124
pub fn database_name ( & self ) -> & str {
67
125
& self . database
@@ -100,7 +158,7 @@ impl InfluxDbClient {
100
158
} )
101
159
}
102
160
103
- /// Sends a [`InfluxDbReadQuery`](crate::query::read_query::InfluxDbReadQuery) or [`InfluxDbWriteQuery`](crate::query::write_query::InfluxDbWriteQuery) to the InfluxDB Server.InfluxDbError
161
+ /// Sends a [`InfluxDbReadQuery`](crate::query::read_query::InfluxDbReadQuery) or [`InfluxDbWriteQuery`](crate::query::write_query::InfluxDbWriteQuery) to the InfluxDB Server.
104
162
///
105
163
/// A version capable of parsing the returned string is available under the [serde_integration](crate::integrations::serde_integration)
106
164
///
@@ -120,6 +178,12 @@ impl InfluxDbClient {
120
178
/// .add_field("temperature", 82)
121
179
/// );
122
180
/// ```
181
+ /// # Errors
182
+ ///
183
+ /// If the function can not finish the query,
184
+ /// a [`InfluxDbError`] variant will be returned.
185
+ ///
186
+ /// [`InfluxDbError`]: enum.InfluxDbError.html
123
187
pub fn query < Q > ( & self , q : & Q ) -> Box < dyn Future < Item = String , Error = InfluxDbError > >
124
188
where
125
189
Q : Any + InfluxDbQuery ,
@@ -137,48 +201,71 @@ impl InfluxDbClient {
137
201
} ;
138
202
139
203
let any_value = q as & dyn Any ;
204
+ let basic_parameters: Vec < ( String , String ) > = self . into ( ) ;
140
205
141
206
let client = if let Some ( _) = any_value. downcast_ref :: < InfluxDbReadQuery > ( ) {
142
207
let read_query = query. get ( ) ;
143
- let encoded: String = form_urlencoded:: Serializer :: new ( String :: new ( ) )
144
- . append_pair ( "db" , self . database_name ( ) )
145
- . append_pair ( "q" , & read_query)
146
- . finish ( ) ;
147
- let http_query_string = format ! (
148
- "{url}/query?{encoded}" ,
149
- url = self . database_url( ) ,
150
- encoded = encoded
151
- ) ;
208
+
209
+ let mut url = match Url :: parse_with_params (
210
+ format ! ( "{url}/query" , url = self . database_url( ) ) . as_str ( ) ,
211
+ basic_parameters,
212
+ ) {
213
+ Ok ( url) => url,
214
+ Err ( err) => {
215
+ let error = InfluxDbError :: UrlConstructionError {
216
+ error : format ! ( "{}" , err) ,
217
+ } ;
218
+ return Box :: new ( future:: err :: < String , InfluxDbError > ( error) ) ;
219
+ }
220
+ } ;
221
+ url. query_pairs_mut ( ) . append_pair ( "q" , & read_query. clone ( ) ) ;
222
+
152
223
if read_query. contains ( "SELECT" ) || read_query. contains ( "SHOW" ) {
153
- Client :: new ( ) . get ( http_query_string . as_str ( ) )
224
+ Client :: new ( ) . get ( url )
154
225
} else {
155
- Client :: new ( ) . post ( http_query_string . as_str ( ) )
226
+ Client :: new ( ) . post ( url )
156
227
}
157
228
} else if let Some ( write_query) = any_value. downcast_ref :: < InfluxDbWriteQuery > ( ) {
158
- Client :: new ( )
159
- . post (
160
- format ! (
161
- "{url}/write?db={db}{precision_str}" ,
162
- url = self . database_url( ) ,
163
- db = self . database_name( ) ,
164
- precision_str = write_query. get_precision_modifier( )
165
- )
166
- . as_str ( ) ,
167
- )
168
- . body ( query. get ( ) )
229
+ let mut url = match Url :: parse_with_params (
230
+ format ! ( "{url}/write" , url = self . database_url( ) ) . as_str ( ) ,
231
+ basic_parameters,
232
+ ) {
233
+ Ok ( url) => url,
234
+ Err ( err) => {
235
+ let error = InfluxDbError :: InvalidQueryError {
236
+ error : format ! ( "{}" , err) ,
237
+ } ;
238
+ return Box :: new ( future:: err :: < String , InfluxDbError > ( error) ) ;
239
+ }
240
+ } ;
241
+ url. query_pairs_mut ( )
242
+ . append_pair ( "precision" , & write_query. get_precision ( ) ) ;
243
+ Client :: new ( ) . post ( url) . body ( query. get ( ) )
169
244
} else {
170
245
unreachable ! ( )
171
246
} ;
172
-
173
247
Box :: new (
174
248
client
175
249
. send ( )
250
+ . map_err ( |err| InfluxDbError :: ConnectionError { error : err } )
251
+ . and_then (
252
+ |res| -> future:: FutureResult < reqwest:: r#async:: Response , InfluxDbError > {
253
+ match res. status ( ) {
254
+ StatusCode :: UNAUTHORIZED => {
255
+ futures:: future:: err ( InfluxDbError :: AuthorizationError )
256
+ }
257
+ StatusCode :: FORBIDDEN => {
258
+ futures:: future:: err ( InfluxDbError :: AuthenticationError )
259
+ }
260
+ _ => futures:: future:: ok ( res) ,
261
+ }
262
+ } ,
263
+ )
176
264
. and_then ( |mut res| {
177
265
let body = mem:: replace ( res. body_mut ( ) , Decoder :: empty ( ) ) ;
178
- body. concat2 ( )
179
- } )
180
- . map_err ( |err| InfluxDbError :: ProtocolError {
181
- error : format ! ( "{}" , err) ,
266
+ body. concat2 ( ) . map_err ( |err| InfluxDbError :: ProtocolError {
267
+ error : format ! ( "{}" , err) ,
268
+ } )
182
269
} )
183
270
. and_then ( |body| {
184
271
if let Ok ( utf8) = std:: str:: from_utf8 ( & body) {
@@ -201,3 +288,70 @@ impl InfluxDbClient {
201
288
)
202
289
}
203
290
}
291
+
292
+ #[ cfg( test) ]
293
+ mod tests {
294
+ use crate :: client:: InfluxDbClient ;
295
+
296
+ #[ test]
297
+ fn test_fn_database ( ) {
298
+ let client = InfluxDbClient :: new ( "http://localhost:8068" , "database" ) ;
299
+ assert_eq ! ( "database" , client. database_name( ) ) ;
300
+ }
301
+
302
+ #[ test]
303
+ fn test_with_auth ( ) {
304
+ let client = InfluxDbClient :: new ( "http://localhost:8068" , "database" ) ;
305
+ assert_eq ! ( client. url, "http://localhost:8068" ) ;
306
+ assert_eq ! ( client. database, "database" ) ;
307
+ assert ! ( client. auth. is_none( ) ) ;
308
+ let with_auth = client. with_auth ( "username" , "password" ) ;
309
+ assert ! ( with_auth. auth. is_some( ) ) ;
310
+ let auth = with_auth. auth . unwrap ( ) ;
311
+ assert_eq ! ( & auth. username, "username" ) ;
312
+ assert_eq ! ( & auth. password, "password" ) ;
313
+ }
314
+
315
+ #[ test]
316
+ fn test_into_impl ( ) {
317
+ let client = InfluxDbClient :: new ( "http://localhost:8068" , "database" ) ;
318
+ assert ! ( client. auth. is_none( ) ) ;
319
+ let basic_parameters: Vec < ( String , String ) > = client. into ( ) ;
320
+ assert_eq ! (
321
+ vec![ ( "db" . to_string( ) , "database" . to_string( ) ) ] ,
322
+ basic_parameters
323
+ ) ;
324
+
325
+ let with_auth = InfluxDbClient :: new ( "http://localhost:8068" , "database" )
326
+ . with_auth ( "username" , "password" ) ;
327
+ let basic_parameters_with_auth: Vec < ( String , String ) > = with_auth. into ( ) ;
328
+ assert_eq ! (
329
+ vec![
330
+ ( "db" . to_string( ) , "database" . to_string( ) ) ,
331
+ ( "u" . to_string( ) , "username" . to_string( ) ) ,
332
+ ( "p" . to_string( ) , "password" . to_string( ) )
333
+ ] ,
334
+ basic_parameters_with_auth
335
+ ) ;
336
+
337
+ let client = InfluxDbClient :: new ( "http://localhost:8068" , "database" ) ;
338
+ assert ! ( client. auth. is_none( ) ) ;
339
+ let basic_parameters: Vec < ( String , String ) > = ( & client) . into ( ) ;
340
+ assert_eq ! (
341
+ vec![ ( "db" . to_string( ) , "database" . to_string( ) ) ] ,
342
+ basic_parameters
343
+ ) ;
344
+
345
+ let with_auth = InfluxDbClient :: new ( "http://localhost:8068" , "database" )
346
+ . with_auth ( "username" , "password" ) ;
347
+ let basic_parameters_with_auth: Vec < ( String , String ) > = ( & with_auth) . into ( ) ;
348
+ assert_eq ! (
349
+ vec![
350
+ ( "db" . to_string( ) , "database" . to_string( ) ) ,
351
+ ( "u" . to_string( ) , "username" . to_string( ) ) ,
352
+ ( "p" . to_string( ) , "password" . to_string( ) )
353
+ ] ,
354
+ basic_parameters_with_auth
355
+ ) ;
356
+ }
357
+ }
0 commit comments