@@ -99,21 +99,22 @@ where
99
99
} ;
100
100
let mut responses = start ( client, buf) . await ?;
101
101
102
+ let mut rows = 0 ;
102
103
loop {
103
104
match responses. next ( ) . await ? {
104
105
Message :: DataRow ( _) => { }
105
106
Message :: CommandComplete ( body) => {
106
- let rows = body
107
+ rows = body
107
108
. tag ( )
108
109
. map_err ( Error :: parse) ?
109
110
. rsplit ( ' ' )
110
111
. next ( )
111
112
. unwrap ( )
112
113
. parse ( )
113
114
. unwrap_or ( 0 ) ;
114
- return Ok ( rows) ;
115
115
}
116
- Message :: EmptyQueryResponse => return Ok ( 0 ) ,
116
+ Message :: EmptyQueryResponse => rows = 0 ,
117
+ Message :: ReadyForQuery ( _) => return Ok ( rows) ,
117
118
_ => return Err ( Error :: unexpected_message ( ) ) ,
118
119
}
119
120
}
@@ -203,15 +204,17 @@ impl Stream for RowStream {
203
204
204
205
fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
205
206
let this = self . project ( ) ;
206
- match ready ! ( this. responses. poll_next( cx) ?) {
207
- Message :: DataRow ( body) => {
208
- Poll :: Ready ( Some ( Ok ( Row :: new ( this. statement . clone ( ) , body) ?) ) )
207
+ loop {
208
+ match ready ! ( this. responses. poll_next( cx) ?) {
209
+ Message :: DataRow ( body) => {
210
+ return Poll :: Ready ( Some ( Ok ( Row :: new ( this. statement . clone ( ) , body) ?) ) )
211
+ }
212
+ Message :: EmptyQueryResponse
213
+ | Message :: CommandComplete ( _)
214
+ | Message :: PortalSuspended => { }
215
+ Message :: ReadyForQuery ( _) => return Poll :: Ready ( None ) ,
216
+ _ => return Poll :: Ready ( Some ( Err ( Error :: unexpected_message ( ) ) ) ) ,
209
217
}
210
- Message :: EmptyQueryResponse
211
- | Message :: CommandComplete ( _)
212
- | Message :: PortalSuspended => Poll :: Ready ( None ) ,
213
- Message :: ErrorResponse ( body) => Poll :: Ready ( Some ( Err ( Error :: db ( body) ) ) ) ,
214
- _ => Poll :: Ready ( Some ( Err ( Error :: unexpected_message ( ) ) ) ) ,
215
218
}
216
219
}
217
220
}
0 commit comments