1
1
use std:: sync:: Arc ;
2
2
3
3
use lambda_runtime:: { run, service_fn, Error , LambdaEvent } ;
4
- use mongodb:: { bson:: doc, event:: { cmap:: CmapEvent , command:: CommandEvent , sdam:: SdamEvent , EventHandler } , options:: ClientOptions , Client } ;
4
+ use mongodb:: {
5
+ bson:: doc,
6
+ event:: { cmap:: CmapEvent , command:: CommandEvent , sdam:: SdamEvent , EventHandler } ,
7
+ options:: ClientOptions ,
8
+ Client ,
9
+ } ;
5
10
use serde:: { Deserialize , Serialize } ;
6
11
use tokio:: sync:: OnceCell ;
7
12
@@ -40,7 +45,8 @@ impl Stats {
40
45
}
41
46
SdamEvent :: ServerHeartbeatFailed ( ev) => {
42
47
assert ! ( !ev. awaited) ;
43
- self . failed_heartbeat_durations_millis . push ( ev. duration . as_millis ( ) ) ;
48
+ self . failed_heartbeat_durations_millis
49
+ . push ( ev. duration . as_millis ( ) ) ;
44
50
}
45
51
_ => ( ) ,
46
52
}
@@ -49,10 +55,12 @@ impl Stats {
49
55
fn handle_command ( & mut self , event : & CommandEvent ) {
50
56
match event {
51
57
CommandEvent :: Succeeded ( ev) => {
52
- self . command_succeeded_durations_millis . push ( ev. duration . as_millis ( ) ) ;
58
+ self . command_succeeded_durations_millis
59
+ . push ( ev. duration . as_millis ( ) ) ;
53
60
}
54
61
CommandEvent :: Failed ( ev) => {
55
- self . command_failed_durations_millis . push ( ev. duration . as_millis ( ) ) ;
62
+ self . command_failed_durations_millis
63
+ . push ( ev. duration . as_millis ( ) ) ;
56
64
}
57
65
_ => ( ) ,
58
66
}
@@ -62,7 +70,8 @@ impl Stats {
62
70
match event {
63
71
CmapEvent :: ConnectionCreated ( _) => {
64
72
self . connections_open += 1 ;
65
- self . max_connections_open = std:: cmp:: max ( self . connections_open , self . max_connections_open ) ;
73
+ self . max_connections_open =
74
+ std:: cmp:: max ( self . connections_open , self . max_connections_open ) ;
66
75
}
67
76
CmapEvent :: ConnectionClosed ( _) => {
68
77
self . connections_open -= 1 ;
@@ -76,23 +85,31 @@ impl State {
76
85
async fn new ( ) -> Self {
77
86
let uri = std:: env:: var ( "MONGODB_URI" )
78
87
. expect ( "MONGODB_URI must be set to the URI of the MongoDB deployment" ) ;
79
- let mut options = ClientOptions :: parse ( uri) . await . expect ( "Failed to parse URI" ) ;
88
+ let mut options = ClientOptions :: parse ( uri)
89
+ . await
90
+ . expect ( "Failed to parse URI" ) ;
80
91
let stats = Arc :: new ( std:: sync:: Mutex :: new ( Stats :: new ( ) ) ) ;
81
92
{
82
93
let stats = Arc :: clone ( & stats) ;
83
- options. sdam_event_handler = Some ( EventHandler :: callback ( move |ev| stats. lock ( ) . unwrap ( ) . handle_sdam ( & ev) ) ) ;
94
+ options. sdam_event_handler = Some ( EventHandler :: callback ( move |ev| {
95
+ stats. lock ( ) . unwrap ( ) . handle_sdam ( & ev)
96
+ } ) ) ;
84
97
}
85
98
{
86
99
let stats = Arc :: clone ( & stats) ;
87
- options. command_event_handler = Some ( EventHandler :: callback ( move |ev| stats. lock ( ) . unwrap ( ) . handle_command ( & ev) ) ) ;
100
+ options. command_event_handler = Some ( EventHandler :: callback ( move |ev| {
101
+ stats. lock ( ) . unwrap ( ) . handle_command ( & ev)
102
+ } ) ) ;
88
103
}
89
104
{
90
105
let stats = Arc :: clone ( & stats) ;
91
- options. cmap_event_handler = Some ( EventHandler :: callback ( move |ev| stats. lock ( ) . unwrap ( ) . handle_cmap ( & ev) ) ) ;
106
+ options. cmap_event_handler = Some ( EventHandler :: callback ( move |ev| {
107
+ stats. lock ( ) . unwrap ( ) . handle_cmap ( & ev)
108
+ } ) ) ;
92
109
}
93
110
94
111
let client = Client :: with_options ( options) . expect ( "Failed to create MongoDB Client" ) ;
95
-
112
+
96
113
Self { client, stats }
97
114
}
98
115
}
@@ -104,14 +121,13 @@ async fn get_state() -> &'static State {
104
121
}
105
122
106
123
#[ derive( Deserialize ) ]
107
- struct Request {
108
- }
124
+ struct Request { }
109
125
110
126
async fn function_handler ( _event : LambdaEvent < Request > ) -> Result < Stats , Error > {
111
127
let state = get_state ( ) . await ;
112
128
let coll = state. client . database ( "faas_test" ) . collection ( "faas_test" ) ;
113
- let id = coll. insert_one ( doc ! { } , None ) . await ?. inserted_id ;
114
- coll. delete_one ( doc ! { "id" : id } , None ) . await ?;
129
+ let id = coll. insert_one ( doc ! { } ) . await ?. inserted_id ;
130
+ coll. delete_one ( doc ! { "id" : id } ) . await ?;
115
131
116
132
let stats = {
117
133
let mut guard = state. stats . lock ( ) . unwrap ( ) ;
@@ -125,4 +141,4 @@ async fn function_handler(_event: LambdaEvent<Request>) -> Result<Stats, Error>
125
141
#[ tokio:: main]
126
142
async fn main ( ) -> Result < ( ) , Error > {
127
143
run ( service_fn ( function_handler) ) . await
128
- }
144
+ }
0 commit comments