Skip to content

Commit 81dd483

Browse files
committed
Add more tests for 'DataStream.pipe()'
1 parent a687ba3 commit 81dd483

File tree

2 files changed

+186
-23
lines changed

2 files changed

+186
-23
lines changed

src/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ function isAsyncTransformHandler(func: TransformHandler<any, any>): boolean {
4444
}
4545

4646
function getId(prefix: string): string {
47-
return `${ prefix }-${ Date.now() }`;
47+
return `${ prefix }-${ Date.now() }${ (Math.random() * 100).toPrecision(2) }`;
4848
}
4949

5050
export {

test/unit/streams/data/pipe.spec.ts

Lines changed: 185 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import test from "ava";
22
import { DataStream } from "../../../../src/streams/data-stream";
33
import { StringStream } from "../../../../src/streams/string-stream";
4+
import { deferReturn } from "../../../_helpers/utils";
45

56
// Run tests for different sets of "maxParallel" values for each stream.
67
const maxParallels = [
@@ -21,31 +22,61 @@ const maxParallels = [
2122
];
2223

2324
for (const maxParallel of maxParallels) {
24-
test(`DataStream can be piped to another DataStream ${ maxParallel.slice(0, 2) }`, async (t) => {
25-
const dsNumber = DataStream.from([1, 2, 3, 4, 5, 6, 7], { maxParallel: maxParallel[0] });
26-
const dest = new DataStream<number, number>({ maxParallel: maxParallel[1] });
25+
test(`DataStream can be piped to another DataStream, ${ maxParallel.slice(0, 2) }`, async (t) => {
26+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7], { maxParallel: maxParallel[0] });
27+
const destStream = new DataStream<number, number>({ maxParallel: maxParallel[1] });
2728

28-
dsNumber.pipe(dest);
29+
sourceStream.pipe(destStream);
2930

30-
t.deepEqual(await dest.toArray(), [1, 2, 3, 4, 5, 6, 7]);
31+
t.deepEqual(await destStream.toArray(), [1, 2, 3, 4, 5, 6, 7]);
3132
});
3233

33-
test(`DataStream with transforms can be piped to another DataStream ${ maxParallel.slice(0, 2) }`, async (t) => {
34-
const dsNumber = DataStream.from([1, 2, 3, 4, 5], { maxParallel: maxParallel[0] });
35-
const dest = new DataStream<number, number>({ maxParallel: maxParallel[1] });
34+
test(`DataStream with transforms can be piped to another DataStream, ${ maxParallel.slice(0, 2) }`, async (t) => {
35+
const sourceStream = DataStream.from([1, 2, 3, 4, 5], { maxParallel: maxParallel[0] });
36+
const destStream = new DataStream<number, number>({ maxParallel: maxParallel[1] });
3637

37-
dsNumber.map((x) => x * 2).pipe(dest);
38+
sourceStream.map((x) => x * 2).pipe(destStream);
3839

39-
t.deepEqual(await dest.toArray(), [2, 4, 6, 8, 10]);
40+
t.deepEqual(await destStream.toArray(), [2, 4, 6, 8, 10]);
4041
});
4142

42-
test(`DataStream can be piped to another DataStream with transforms ${ maxParallel.slice(0, 2) }`, async (t) => {
43-
const dsNumber = DataStream.from([1, 2, 3, 4, 5], { maxParallel: maxParallel[0] });
44-
const dest = new DataStream<number, number>({ maxParallel: maxParallel[1] }).map((x) => x * 2);
43+
test(`DataStream can be piped to another DataStream with transforms, ${ maxParallel.slice(0, 2) }`, async (t) => {
44+
const sourceStream = DataStream.from([1, 2, 3, 4, 5], { maxParallel: maxParallel[0] });
45+
const destStream = sourceStream.pipe(
46+
new DataStream<number, number>({ maxParallel: maxParallel[1] }).map((x) => x * 2));
4547

46-
dsNumber.pipe(dest);
48+
t.deepEqual(await destStream.toArray(), [2, 4, 6, 8, 10]);
49+
});
50+
51+
test(`DataStream with IFCA breaking transforms can be piped to another DataStream, ${ maxParallel.slice(0, 2) }`, async (t) => {
52+
const sourceStream = DataStream
53+
.from([1, 2, 3, 4, 5], { maxParallel: maxParallel[0] })
54+
.flatMap(x => [x, x + 10, x + 100]);
55+
const destStream = sourceStream.pipe(
56+
new DataStream<number, number>({ maxParallel: maxParallel[1] }));
4757

48-
t.deepEqual(await dest.toArray(), [2, 4, 6, 8, 10]);
58+
t.deepEqual(await destStream.toArray(), [1, 11, 101, 2, 12, 102, 3, 13, 103, 4, 14, 104, 5, 15, 105]);
59+
});
60+
61+
test(`DataStream using write can be piped to another DataStream (toArray), ${ maxParallel.slice(0, 2) }`, async (t) => {
62+
const sourceStream = new DataStream<number, number>({ maxParallel: maxParallel[0] })
63+
.filter(x => x % 2 === 0)
64+
.map(x => x * 2);
65+
const destStream = new DataStream<number, string>({ maxParallel: maxParallel[1] }).map((x) => `${ x }`);
66+
67+
sourceStream.pipe(destStream);
68+
69+
// We need to use sinking method here withput awaiting so it can consume sourceStream
70+
// chunks as they come, otherwise the sourceStream will fill up maxParallel and block.
71+
const result = destStream.toArray();
72+
73+
for (const i of [1, 2, 3, 4, 5, 6, 7, 8]) {
74+
await sourceStream.write(i);
75+
}
76+
77+
sourceStream.end();
78+
79+
t.deepEqual(await result, ["4", "8", "12", "16"]);
4980
});
5081

5182
test(`DataStream can be piped to multiple streams ${ maxParallel }`, async (t) => {
@@ -77,12 +108,144 @@ for (const maxParallel of maxParallels) {
77108
t.deepEqual(result4, ["foo-4-", "foo-8-", "foo-12-", "foo-16-", "foo-20-"]);
78109
t.deepEqual(resultString, ["foo", "4", "foo", "8", "foo", "12", "foo", "16", "foo", "20", ""]);
79110
});
111+
112+
test(`DataStream using write can be piped to another DataStream (read) #1, ${ maxParallel.slice(0, 2) }`, async (t) => {
113+
const sourceStream = new DataStream<number, number>({ maxParallel: maxParallel[0] })
114+
.map(x => x * 2);
115+
const destStream = new DataStream<number, string>({ maxParallel: maxParallel[1] }).map((x) => `${ x }`);
116+
117+
sourceStream.pipe(destStream);
118+
119+
const result = [];
120+
121+
for (const i of [1, 2, 3, 4, 5, 6, 7, 8]) {
122+
const [, out] = await Promise.all([sourceStream.write(i), destStream.read()]);
123+
124+
result.push(out);
125+
}
126+
127+
t.deepEqual(result, ["2", "4", "6", "8", "10", "12", "14", "16"]);
128+
});
129+
130+
test(`DataStream using write can be piped to another DataStream (read) #2, ${ maxParallel.slice(0, 2) }`, async (t) => {
131+
const sourceStream = new DataStream<number, number>({ maxParallel: maxParallel[0] })
132+
.filter(x => x % 2 === 0)
133+
.map(x => x * 2);
134+
const destStream = new DataStream<number, string>({ maxParallel: maxParallel[1] }).map((x) => `${ x }`);
135+
136+
sourceStream.pipe(destStream);
137+
138+
const result = [];
139+
140+
for (const i of [1, 2, 3, 4, 5, 6, 7, 8]) {
141+
await sourceStream.write(i);
142+
143+
// Since we filter out odd chunks, we need to read just half of initial chunks number.
144+
if (i % 2 === 0) {
145+
result.push(await destStream.read());
146+
}
147+
}
148+
149+
t.deepEqual(result, ["4", "8", "12", "16"]);
150+
});
80151
}
81152

82-
// TODO Test cases:
83-
// ends piped stream
84-
// doesn't end piped stream
85-
// pipe with write, not from
86-
// pipe with streams transformed through flatMap/batch
87-
// how fast chunks are piped (keeping backpressure)
88-
// reading from piped stream (should be allowed, but may yield unexpected results)
153+
test("DataStream pipe ends destination stream", async (t) => {
154+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
155+
const destStream = new DataStream<number, number>();
156+
157+
sourceStream.pipe(destStream);
158+
159+
for (let i = 0; i < 7; i++) {
160+
await destStream.read();
161+
}
162+
163+
t.throws(() => destStream.write(8), { message: "Write after end" }, "Throws if stream is ended.");
164+
});
165+
166+
test("DataStream pipe does not end destination stream if end:false passed", async (t) => {
167+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
168+
const destStream = new DataStream<number, number>();
169+
170+
sourceStream.pipe(destStream, { end: false });
171+
172+
for (let i = 0; i < 7; i++) {
173+
await destStream.read();
174+
}
175+
176+
t.notThrows(() => destStream.write(8), "Should not throw if stream is not ended.");
177+
});
178+
179+
test("Pipe source can be read from", async (t) => {
180+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
181+
const destStream = new DataStream<number, number>();
182+
183+
sourceStream.pipe(destStream);
184+
185+
const read = await sourceStream.read();
186+
187+
// First chunk, which is one will be send to a piped stream,
188+
// so the result of the read will be second chunk.
189+
t.deepEqual(read, 2);
190+
});
191+
192+
test("Pipe source can be piped from again", async (t) => {
193+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
194+
const destStream1 = new DataStream<number, number>();
195+
const destStream2 = new DataStream<number, number>();
196+
197+
sourceStream.pipe(destStream1);
198+
199+
t.notThrows(() => sourceStream.pipe(destStream2), "Should not throw.");
200+
});
201+
202+
test("Pipe source cannot be transformed further", async (t) => {
203+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
204+
const destStream = new DataStream<number, number>();
205+
206+
sourceStream.pipe(destStream);
207+
208+
t.throws(() => sourceStream.map(x => x * 2), { message: "Stream is not transformable." }, "Should throw.");
209+
});
210+
211+
test("Pipe keeps correct backpressure (1 destination)", async (t) => {
212+
const assert = (stream: any) => {
213+
t.true(stream.ifca.state.all <= stream.ifca.state.maxParallel,
214+
`Backpressure is not exceeded (${ stream.ifca.state }).`);
215+
};
216+
const stream1 = DataStream.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], { maxParallel: 5 });
217+
const stream2 = new DataStream<number>({ maxParallel: 3 })
218+
.map(x => { assert(stream2); deferReturn(10, x); });
219+
220+
stream1.pipe(stream2);
221+
222+
await stream2.toArray();
223+
});
224+
225+
test("Pipe keeps correct backpressure (2 destinations)", async (t) => {
226+
const state = {
227+
stream1: [0],
228+
stream2: [0],
229+
stream3: [0]
230+
};
231+
const assert = (name: string, stream: any) => {
232+
t.true(stream.ifca.state.all <= stream.ifca.state.maxParallel,
233+
`Backpressure is not exceeded (${ name }, ${ stream.ifca.state }).`);
234+
235+
if (name === "stream3") {
236+
t.true(state.stream3.length === state.stream2.length, "Stream3 has same number of chunks done or in progress.");
237+
}
238+
};
239+
const stream1 = DataStream
240+
.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20], { maxParallel: 7 })
241+
.map(x => { state.stream1.push(x); assert("stream1", stream1); return x; });
242+
const stream2 = new DataStream<number>({ maxParallel: 3 })
243+
.map(x => { state.stream2.push(x); assert("stream2", stream2); return deferReturn(10, x); });
244+
const stream3 = new DataStream<number>({ maxParallel: 5 })
245+
.map(x => { state.stream3.push(x); assert("stream3", stream3); return deferReturn(5, x); });
246+
247+
stream1.pipe(stream2);
248+
stream1.pipe(stream3);
249+
250+
await Promise.all([stream2.toArray(), stream3.toArray()]);
251+
});

0 commit comments

Comments
 (0)