@@ -9,47 +9,158 @@ import S2A from "./stream-to-async-iterator";
9
9
describe ( "StreamToAsyncIterator" , function ( ) {
10
10
const filePath = path . join ( __dirname , "test/lorem-ipsum.txt" ) ;
11
11
12
+ function assertClosed ( stream : Readable , iter : S2A ) {
13
+ expect ( stream ) . to . have . property ( "destroyed" , true ) ;
14
+ expect ( stream . listenerCount ( "readable" ) ) . to . equal ( 0 ) ;
15
+ expect ( stream . listenerCount ( "end" ) ) . to . equal ( 0 ) ;
16
+ expect ( stream . listenerCount ( "error" ) ) . to . equal ( 0 ) ;
17
+ expect ( iter ) . to . have . property ( "closed" , true ) ;
18
+ }
19
+
12
20
it ( "should iterate on an object mode stream" , async function ( ) {
13
21
type Obj = { id : number } ;
14
22
const data : Obj [ ] = [ { id : 1 } , { id : 2 } , { id : 3 } ] ;
15
23
const objStream = Readable . from ( data ) ;
24
+ const iter = new S2A < Obj > ( objStream ) ;
16
25
const buff : Obj [ ] = [ ] ;
17
26
18
- for await ( const value of new S2A < Obj > ( objStream ) ) {
27
+ for await ( const value of iter ) {
19
28
buff . push ( value ) ;
20
29
}
21
30
31
+ assertClosed ( objStream , iter ) ;
22
32
expect ( buff ) . to . have . lengthOf ( 3 ) ;
23
33
} ) ;
24
34
35
+ it ( "should iterate on an empty stream" , async function ( ) {
36
+ type Obj = { id : number } ;
37
+ const data : Obj [ ] = [ ] ;
38
+ const objStream = Readable . from ( data ) ;
39
+ const iter = new S2A < Obj > ( objStream ) ;
40
+ const buff : Obj [ ] = [ ] ;
41
+
42
+ for await ( const value of iter ) {
43
+ buff . push ( value ) ;
44
+ }
45
+
46
+ assertClosed ( objStream , iter ) ;
47
+ expect ( buff ) . to . have . lengthOf ( 0 ) ;
48
+ } ) ;
49
+
50
+ it ( "should handle unstable streams" , async function ( ) {
51
+ type Obj = { id : number } ;
52
+ const data : Obj [ ] = [ ] ;
53
+ const objStream = Readable . from ( data ) ;
54
+ const iter = new S2A < Obj > ( objStream ) ;
55
+ const buff : Obj [ ] = [ ] ;
56
+
57
+ objStream . read ( ) ;
58
+ for await ( const value of iter ) {
59
+ buff . push ( value ) ;
60
+ }
61
+
62
+ assertClosed ( objStream , iter ) ;
63
+ expect ( buff ) . to . have . lengthOf ( 0 ) ;
64
+ } ) ;
65
+
66
+ it ( "should handle premature loop break" , async function ( ) {
67
+ type Obj = { id : number } ;
68
+ const data : Obj [ ] = [ { id : 1 } , { id : 2 } , { id : 3 } ] ;
69
+ const objStream = Readable . from ( data ) ;
70
+ const iter = new S2A < Obj > ( objStream ) ;
71
+ const buff : Obj [ ] = [ ] ;
72
+
73
+ let count = 0 ;
74
+ for await ( const value of iter ) {
75
+ if ( count >= 1 ) {
76
+ break ;
77
+ }
78
+ count += 1 ;
79
+ buff . push ( value ) ;
80
+ }
81
+
82
+ assertClosed ( objStream , iter ) ;
83
+ expect ( buff ) . to . have . lengthOf ( 1 ) ;
84
+ } ) ;
85
+
86
+ it ( "should handle stream errors" , async function ( ) {
87
+ type Obj = { id : number } ;
88
+ const data : Obj [ ] = [ { id : 1 } , { id : 2 } , { id : 3 } ] ;
89
+ const objStream = Readable . from ( data ) ;
90
+ const iter = new S2A < Obj > ( objStream ) ;
91
+ const buff : Obj [ ] = [ ] ;
92
+
93
+ const errMessage = "test throw" ;
94
+ await expect (
95
+ ( async ( ) => {
96
+ let count = 0 ;
97
+ for await ( const value of iter ) {
98
+ count += 1 ;
99
+ buff . push ( value ) ;
100
+ if ( count >= 1 ) {
101
+ objStream . emit ( "error" , new Error ( errMessage ) ) ;
102
+ }
103
+ }
104
+ } ) ( )
105
+ ) . to . eventually . be . rejectedWith ( errMessage ) ;
106
+
107
+ assertClosed ( objStream , iter ) ;
108
+ expect ( buff ) . to . have . lengthOf ( 1 ) ;
109
+ } ) ;
110
+
111
+ it ( "should handle manual throws" , async function ( ) {
112
+ type Obj = { id : number } ;
113
+ const data : Obj [ ] = [ { id : 1 } , { id : 2 } , { id : 3 } ] ;
114
+ const objStream = Readable . from ( data ) ;
115
+ const iter = new S2A < Obj > ( objStream ) ;
116
+ const buff : Obj [ ] = [ ] ;
117
+
118
+ const errMessage = "test throw" ;
119
+ await expect (
120
+ ( async ( ) => {
121
+ let count = 0 ;
122
+ for await ( const value of iter ) {
123
+ if ( count >= 1 ) {
124
+ await iter . throw ( new Error ( errMessage ) ) ;
125
+ }
126
+ count += 1 ;
127
+ buff . push ( value ) ;
128
+ }
129
+ } ) ( )
130
+ ) . to . eventually . be . rejectedWith ( errMessage ) ;
131
+
132
+ assertClosed ( objStream , iter ) ;
133
+ expect ( buff ) . to . have . lengthOf ( 1 ) ;
134
+ } ) ;
135
+
25
136
it ( "should iterate on a node stream with string encoding" , async function ( ) {
26
137
const fileStream = fs . createReadStream ( filePath , { encoding : "utf8" } ) ;
138
+ const iter = new S2A < string > ( fileStream ) ;
27
139
const buff : string [ ] = [ ] ;
28
140
29
- for await ( const value of new S2A < string > ( fileStream ) ) {
141
+ for await ( const value of iter ) {
30
142
buff . push ( value ) ;
31
143
}
32
144
33
145
const content = buff . join ( "" ) ;
146
+
147
+ assertClosed ( fileStream , iter ) ;
34
148
expect ( content ) . to . have . lengthOf ( 1502 ) ;
35
149
} ) ;
36
150
37
151
it ( "should iterate on a node stream with a size with string encoding" , async function ( ) {
38
152
const fileStream = fs . createReadStream ( filePath , { encoding : "utf8" } ) ;
153
+ const iter = new S2A < string > ( fileStream , { size : 16 } ) ;
39
154
const buff : string [ ] = [ ] ;
40
155
41
- for await ( const value of new S2A < string > ( fileStream , { size : 16 } ) ) {
156
+ for await ( const value of iter ) {
42
157
buff . push ( value ) ;
43
158
}
44
159
45
160
const content = buff . join ( "" ) ;
161
+
162
+ assertClosed ( fileStream , iter ) ;
46
163
expect ( buff ) . to . have . lengthOf ( 94 ) ;
47
164
expect ( content ) . to . have . lengthOf ( 1502 ) ;
48
165
} ) ;
49
-
50
- it ( "should clean up all stream events when stream ends" ) ;
51
-
52
- it ( "should clean up all stream events when stream errors" ) ;
53
-
54
- it ( "should handle a stream error in middle of iteration with a rejection" ) ;
55
166
} ) ;
0 commit comments